[spark2] partitonBy, HashPartitioner, RangePartitioner 예제
RDD에 partitonBy 메소드를 호출하면서 Partitioner를 정할 수 있다.
기본 Partitioner(https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/Partitioner.html)로는 HashPartitioner, RangePartitioner가 존재한다.
우선 HashPartitioner를 사용한다. 파티셔닝을 해쉬로 퍼트릴 수 있기 때문에 유용하다.
먼저 5개의 파티션으로 RDD를 생성했다가 Partitioning을 3개의 HashPartitioner를 사용하는 예제이다.
scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)), 5)
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> pairs.partitioner
res1: Option[org.apache.spark.Partitioner] = None
scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner
scala> val partitioned = pairs.partitionBy(new HashPartitioner(3)).persist()
partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[3] at partitionBy at <console>:27
scala> partitioned.collect
res2: Array[(Int, Int)] = Array((2,2), (1,1), (3,3))
scala> pairs.partitions.length
res7: Int = 5
scala> partitioned.partitions.length
res8: Int = 3
scala> pairs.partitions
res5: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@6ba, org.apache.spark.rdd.ParallelCollectionPartition@6bb, org.apache.spark.rdd.ParallelCollectionPartition@6bc, org.apache.spark.rdd.ParallelCollectionPartition@6bd, org.apache.spark.rdd.ParallelCollectionPartition@6be)
scala> partitioned.partitions
res6: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ShuffledRDDPartition@0, org.apache.spark.rdd.ShuffledRDDPartition@1, org.apache.spark.rdd.ShuffledRDDPartition@2)
persist()는 shuffle을 이미 되도록 해놓기 때문에 성능상 이점을 가진다. 실무에서 사용할 때 유용한 팁이다.
참고로 RDD.toDebugString() 메소드가 존재하는데 shuffle RDD인지 아닌지를 파악할 때 도움이 된다.
scala> partitioned.toDebugString
res11: String =
(3) ShuffledRDD[8] at partitionBy at <console>:27 [Memory Deserialized 1x Replicated]
| CachedPartitions: 3; MemorySize: 192.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
+-(5) ParallelCollectionRDD[7] at parallelize at <console>:24 [Memory Deserialized 1x Replicated]
scala> pairs.toDebugString
res13: String = (5) ParallelCollectionRDD[7] at parallelize at <console>:24 []
다음은 RangePartitioner 예제이다. 내용은 비슷해보인다.
scala> import org.apache.spark.RangePartitioner
import org.apache.spark.RangePartitioner
scala> new RangePartitioner(3, pairs)
res9: org.apache.spark.RangePartitioner[Int,Int] = org.apache.spark.RangePartitioner@7d2d
scala> val rangePartitioned = pairs.partitionBy(new RangePartitioner(3, pairs)).persist()
rangePartitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[8] at partitionBy at <console>:28
scala> rangePartitioned.collect
res10: Array[(Int, Int)] = Array((1,1), (2,2), (3,3))
scala> rangePartitioned.partitions.length
res11: Int = 3
RangePartitioner API(https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/RangePartitioner.html)를 살펴보면, ordering와 정렬순서(오름차순/내림차순)으로 할 수 있는 형태가 있다. HashPartitioner와 크게 다른 내용이라 할 수 있을 듯 싶다.
소스 : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala
public RangePartitioner(int partitions, RDD<? extends scala.Product2<K,V>> rdd, boolean ascending, scala.math.Ordering<K> evidence$1, scala.reflect.ClassTag<K> evidence$2)