foreach 와 foreachPartition의 차이
foreach는 간단히 collection을 리스트로 출력한다.
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
list.foreach(x => println(x))
foreachPartition는 RDD를 partition처리한다.
partition 개수는 foreachPartition 전에 미리 지정되야 한다. 하지만 리턴 값이 필요없을 때 사용된다.
val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)
b.foreachPartition(x => println(x.reduce(_ + _)))
리턴 값이 필요할 때는 mapPartition 또는 mapPartitionsWithIndex을 사용한다.
val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)
b.foreachPartition(x => println(x.reduce(_ + _)))
val mapped = b.mapPartitionsWithIndex {
(index, iterator) => {
println("aaaa -> " + index)
val myList = iterator.toList
myList.map(x => x + " -> " + index).iterator
}
}
mapped.collect()
res41: Array[String] = Array(1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0, 6 -> 1, 7 -> 1, 8 -> 1, 9 -> 1, 10 -> 1)
데이터 프레임도 사용할 수 있다.
dataframes.mapPartitions(_ grouped 10).foreach { batch =>
val accountWithSep =
batch.map {
case Row(accountId: Int) => accountId.toString()
}.mkString(",")
'scala' 카테고리의 다른 글
[scala] Array.transpose 예시 (0) | 2017.02.17 |
---|---|
[spark1.6] rdd를 dataframe으로 만드는 방법 (0) | 2017.02.14 |
[zepplin] 여러 spark context 사용하기 (0) | 2017.02.14 |
scala에서 uuid 생성하는 방법 (0) | 2017.02.09 |
[scala] spark에서 partition 줄이기 - repartition, coalesce (0) | 2017.02.08 |