sparkContext에 mapValues와 reduceByKey 예시를 설명한다.



코드로 간단히 설명하면 다음과 같다. 


val inputrdd = sc.parallelize(Seq(("arth",10), ("arth", 20), ("samuel", 60), ("jack", 65)))


val mapped = inputrdd.mapValues(x => 1);

mapped.collect.foreach(println)


val reduced = mapped.reduceByKey(_ + _)

reduced.collect.foreach(println)


mapValues는 map의 값을 1로 변경한다.

reduceByKey는 key의 값으로 키의 값이 동일한 개수를 얻는다 .


<결과>


inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[275] at parallelize at <console>:56
mapped: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[276] at mapValues at <console>:58
(arth,1)
(arth,1)
(samuel,1)
(jack,1)
reduced: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[277] at reduceByKey at <console>:60
(arth,2)
(samuel,1)
(jack,1)



아래 코드에 대한 설명한다 .

mapValues를 활용해 튜플을 추가하면, map의 값에 tuple을 추가한다.

reduceKey를 활용해서 키의 값을 근거로 값과 개수를 튜플로 추가할 수 있다.

reduceKey를 활용해서 얻은 결과값을 map을 이용해 평균 값을 구한다.

val inputrdd = sc.parallelize(Seq(("arth",10), ("arth", 20), ("samuel", 60), ("jack", 65))) val mapped = inputrdd.mapValues(x => (x, 1)); mapped.collect.foreach(println)
val reduced = mapped.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) reduced.collect.foreach(println) val average = reduced.map { x => val temp = x._2 val total = temp._1 val count = temp._2 (x._1, total / count) } average.collect.foreach(println)


<결과>
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[281] at parallelize at <console>:56
mapped: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[282] at mapValues at <console>:58
(arth,(10,1))
(arth,(20,1))
(samuel,(60,1))
(jack,(65,1))
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[283] at reduceByKey at <console>:60
(arth,(30,2))
(samuel,(60,1))
(jack,(65,1))
average: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[284] at map at <console>:62
(arth,15)
(samuel,60)
(jack,65)



Posted by '김용환'
,