sparkContext에 mapValues와 reduceByKey 예시를 설명한다.
코드로 간단히 설명하면 다음과 같다.
val inputrdd = sc.parallelize(Seq(("arth",10), ("arth", 20), ("samuel", 60), ("jack", 65)))
val mapped = inputrdd.mapValues(x => 1);
mapped.collect.foreach(println)
val reduced = mapped.reduceByKey(_ + _)
reduced.collect.foreach(println)
mapValues는 map의 값을 1로 변경한다.
reduceByKey는 key의 값으로 키의 값이 동일한 개수를 얻는다 .
<결과>
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[275] at parallelize at <console>:56
mapped: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[276] at mapValues at <console>:58
(arth,1)
(arth,1)
(samuel,1)
(jack,1)
reduced: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[277] at reduceByKey at <console>:60
(arth,2)
(samuel,1)
(jack,1)
아래 코드에 대한 설명한다 .
mapValues를 활용해 튜플을 추가하면, map의 값에 tuple을 추가한다.
reduceKey를 활용해서 키의 값을 근거로 값과 개수를 튜플로 추가할 수 있다.
reduceKey를 활용해서 얻은 결과값을 map을 이용해 평균 값을 구한다.
val inputrdd = sc.parallelize(Seq(("arth",10), ("arth", 20), ("samuel", 60), ("jack", 65)))
val mapped = inputrdd.mapValues(x => (x, 1));
mapped.collect.foreach(println)
val reduced = mapped.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
reduced.collect.foreach(println)
val average = reduced.map { x =>
val temp = x._2
val total = temp._1
val count = temp._2
(x._1, total / count)
}
average.collect.foreach(println)
<결과>
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[281] at parallelize at <console>:56
mapped: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[282] at mapValues at <console>:58
(arth,(10,1))
(arth,(20,1))
(samuel,(60,1))
(jack,(65,1))
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[283] at reduceByKey at <console>:60
(arth,(30,2))
(samuel,(60,1))
(jack,(65,1))
average: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[284] at map at <console>:62
(arth,15)
(samuel,60)
(jack,65)
'scala' 카테고리의 다른 글
[scala] Future 1 (0) | 2016.11.22 |
---|---|
[zeppelin] spark의 변수 공유하기 (0) | 2016.11.18 |
[scala] List.map(function) 예시 (0) | 2016.11.08 |
[zeppelin] zeppelin으로 spark 연동 시 팁 (또는 주의 사항) (0) | 2016.11.07 |
[scala] sys.env 환경변수 확인 예시 (0) | 2016.11.07 |