RDD에 partitonBy 메소드를 호출하면서 Partitioner를 정할 수 있다. 

기본 Partitioner(https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/Partitioner.html)로는 HashPartitioner, RangePartitioner가 존재한다. 



우선 HashPartitioner를 사용한다. 파티셔닝을 해쉬로 퍼트릴 수 있기 때문에 유용하다. 



먼저 5개의 파티션으로 RDD를 생성했다가 Partitioning을 3개의 HashPartitioner를 사용하는 예제이다. 




scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)), 5)

pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24


scala> pairs.partitioner

res1: Option[org.apache.spark.Partitioner] = None


scala> import org.apache.spark.HashPartitioner

import org.apache.spark.HashPartitioner


scala> val partitioned = pairs.partitionBy(new HashPartitioner(3)).persist()

partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at <console>:27


scala> partitioned.collect

res2: Array[(Int, Int)] = Array((2,2), (1,1), (3,3))


scala> pairs.partitions.length

res7: Int = 5


scala> partitioned.partitions.length

res8: Int = 3


scala> pairs.partitions

res5: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@6ba, org.apache.spark.rdd.ParallelCollectionPartition@6bb, org.apache.spark.rdd.ParallelCollectionPartition@6bc, org.apache.spark.rdd.ParallelCollectionPartition@6bd, org.apache.spark.rdd.ParallelCollectionPartition@6be)


scala> partitioned.partitions

res6: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ShuffledRDDPartition@0, org.apache.spark.rdd.ShuffledRDDPartition@1, org.apache.spark.rdd.ShuffledRDDPartition@2)



persist()는 shuffle을 이미 되도록 해놓기 때문에 성능상 이점을 가진다. 실무에서 사용할 때 유용한 팁이다. 



참고로 RDD.toDebugString() 메소드가 존재하는데 shuffle RDD인지 아닌지를 파악할 때 도움이 된다. 



scala> partitioned.toDebugString

res11: String =

(3) ShuffledRDD[8] at partitionBy at <console>:27 [Memory Deserialized 1x Replicated]

 |       CachedPartitions: 3; MemorySize: 192.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B

 +-(5) ParallelCollectionRDD[7] at parallelize at <console>:24 [Memory Deserialized 1x Replicated]


scala> pairs.toDebugString

res13: String = (5) ParallelCollectionRDD[7] at parallelize at <console>:24 []






다음은 RangePartitioner 예제이다. 내용은 비슷해보인다.



scala> import org.apache.spark.RangePartitioner

import org.apache.spark.RangePartitioner


scala> new RangePartitioner(3, pairs)

res9: org.apache.spark.RangePartitioner[Int,Int] = org.apache.spark.RangePartitioner@7d2d


scala> val rangePartitioned = pairs.partitionBy(new RangePartitioner(3, pairs)).persist()

rangePartitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[8] at partitionBy at <console>:28


scala> rangePartitioned.collect

res10: Array[(Int, Int)] = Array((1,1), (2,2), (3,3))


scala> rangePartitioned.partitions.length

res11: Int = 3



RangePartitioner API(https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/RangePartitioner.html)를 살펴보면, ordering와 정렬순서(오름차순/내림차순)으로 할 수 있는 형태가 있다. HashPartitioner와 크게 다른 내용이라 할 수 있을 듯 싶다. 

소스 : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala

public RangePartitioner(int partitions,
                RDD<? extends scala.Product2<K,V>> rdd,
                boolean ascending,
                scala.math.Ordering<K> evidence$1,
                scala.reflect.ClassTag<K> evidence$2)





'scala' 카테고리의 다른 글

[scala] Product 이해하기  (0) 2017.08.10
[spark] [펌질] wide dependecy, narrow dependency  (0) 2017.08.08
[spark2] cache()와 persist()의 차이  (0) 2017.08.01
[scala] scalatest에서 Exception 처리  (0) 2017.07.27
[scala] scalablitz  (0) 2017.07.27
Posted by '김용환'
,