(SparkContext 코드에는 다양한 rdd 생성 방식을 지원한다. 그 중 공부하는 내용을 살펴본다)


전통적인 spark의 rdd 생성 방식은 parallelize 메소드를 사용하는 것이었다.


scala> val rdd1 = sc.parallelize(List(1,2,3))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24



spark2에서 rdd 생성할 수 있는 makeRDD라는 메소드가 추가되었다.


scala> val rdd2 = sc.makeRDD(List(1,2,3))

rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24



게다가 파티셔닝도 되는 메소드이다.



scala> val rdd3 = sc.makeRDD(List((1, List("a", "b", "c")), (2, List("d", "e", "f"))))

rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24



scala> rdd3.preferredLocations(rdd3.partitions(1))

res6: Seq[String] = List(d, e, f)


scala> rdd3.preferredLocations(rdd3.partitions(0))

res7: Seq[String] = List(a, b, c)




api를 살펴보면, 다음과 같다. parallelize 함수를 내부적으로 쓴다.


  /** Distribute a local Scala collection to form an RDD.
   *
   * This method is identical to `parallelize`.
   * @param seq Scala collection to distribute
   * @param numSlices number of partitions to divide the collection into
   * @return RDD representing distributed collection
   */
  def makeRDD[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
  }

  /**
   * Distribute a local Scala collection to form an RDD, with one or more
   * location preferences (hostnames of Spark nodes) for each object.
   * Create a new partition for each collection item.
   * @param seq list of tuples of data and location preferences (hostnames of Spark nodes)
   * @return RDD representing data partitioned according to location preferences
   */
  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
    assertNotStopped()
    val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
    new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)
  }



Posted by '김용환'
,