(SparkContext 코드에는 다양한 rdd 생성 방식을 지원한다. 그 중 공부하는 내용을 살펴본다)
전통적인 spark의 rdd 생성 방식은 parallelize 메소드를 사용하는 것이었다.
scala> val rdd1 = sc.parallelize(List(1,2,3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
spark2에서 rdd 생성할 수 있는 makeRDD라는 메소드가 추가되었다.
scala> val rdd2 = sc.makeRDD(List(1,2,3))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
게다가 파티셔닝도 되는 메소드이다.
scala> val rdd3 = sc.makeRDD(List((1, List("a", "b", "c")), (2, List("d", "e", "f"))))
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24
scala> rdd3.preferredLocations(rdd3.partitions(1))
res6: Seq[String] = List(d, e, f)
scala> rdd3.preferredLocations(rdd3.partitions(0))
res7: Seq[String] = List(a, b, c)
api를 살펴보면, 다음과 같다. parallelize 함수를 내부적으로 쓴다.
/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
* @param seq Scala collection to distribute
* @param numSlices number of partitions to divide the collection into
* @return RDD representing distributed collection
*/
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
/**
* Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item.
* @param seq list of tuples of data and location preferences (hostnames of Spark nodes)
* @return RDD representing data partitioned according to location preferences
*/
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)
}