[spark] 집합 함수 - union, intersection, cartesian, subtract, join, cogroup 예제
spark context에 집합 오퍼레이션이 있다.
scala> val rdd1 = sc.parallelize(List("Spark","Scala"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List("Akka","Scala"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[51] at parallelize at <console>:24
합집합
scala> rdd1.union(rdd2).collect()
res11: Array[String] = Array(Spark, Scala, Akka, Scala)
교집합.
scala> rdd1.intersection(rdd2).collect()
res12: Array[String] = Array(Scala)
카테시안
scala> rdd1.cartesian(rdd2).collect()
res13: Array[(String, String)] = Array((Spark,Akka), (Spark,Scala), (Scala,Akka), (Scala,Scala))
차집합(A-B)
scala> rdd1.subtract(rdd2).collect()
res14: Array[String] = Array(Spark)
join 함수는(K, V)와 (K, W)를 호출해 (K, (V, W))인 새로운 RDD를 생성한다.
scala> val hash1 = sc.parallelize( Seq(("1", "A"), ("2", "B"), ("3", "C"), ("1","D")))
hash1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[64] at parallelize at <console>:24
scala> val hash2 = sc.parallelize( Seq(("1", "W"), ("2", "X"), ("3", "Y"), ("2","Z")))
hash2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> hash1.join(hash2).collect()
res15: Array[(String, (String, String))] = Array((1,(A,W)), (1,(D,W)), (2,(B,X)), (2,(B,Z)), (3,(C,Y)))
cogroup 함수는 (K, V)를 (K, Iterable<V>)로 변환한다.
scala> hash1.cogroup(hash2).collect()
res16: Array[(String, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(A, D),CompactBuffer(W))), (2,(CompactBuffer(B),CompactBuffer(X, Z))), (3,(CompactBuffer(C),CompactBuffer(Y))))