foreach 와 foreachPartition의 차이


foreach는 간단히 collection을 리스트로 출력한다.


val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

list.foreach(x => println(x))




foreachPartition는 RDD를 partition처리한다. 

partition 개수는 foreachPartition 전에 미리 지정되야 한다. 하지만 리턴 값이 필요없을 때 사용된다.


val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)

b.foreachPartition(x => println(x.reduce(_ + _))) 





리턴 값이 필요할 때는 mapPartition 또는 mapPartitionsWithIndex을 사용한다. 


val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)

b.foreachPartition(x => println(x.reduce(_ + _)))

val mapped = b.mapPartitionsWithIndex {

    (index, iterator) => {

        println("aaaa -> " + index)

        val myList = iterator.toList

        myList.map(x => x + " -> " + index).iterator

    }

}


mapped.collect()


res41: Array[String] = Array(1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0, 6 -> 1, 7 -> 1, 8 -> 1, 9 -> 1, 10 -> 1)



데이터 프레임도 사용할 수 있다.


       dataframes.mapPartitions(_ grouped 10).foreach { batch =>

       val accountWithSep =

        batch.map {

        case Row(accountId: Int) => accountId.toString()

      }.mkString(",")

    




Posted by '김용환'
,