spark context에 집합 오퍼레이션이 있다. 


scala> val rdd1 = sc.parallelize(List("Spark","Scala"))

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24


scala> val rdd2 = sc.parallelize(List("Akka","Scala"))

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[51] at parallelize at <console>:24




합집합


scala> rdd1.union(rdd2).collect()

res11: Array[String] = Array(Spark, Scala, Akka, Scala)



교집합.


scala> rdd1.intersection(rdd2).collect()

res12: Array[String] = Array(Scala)



카테시안


scala> rdd1.cartesian(rdd2).collect()

res13: Array[(String, String)] = Array((Spark,Akka), (Spark,Scala), (Scala,Akka), (Scala,Scala))



차집합(A-B)


scala> rdd1.subtract(rdd2).collect()

res14: Array[String] = Array(Spark)





join 함수는(K, V)와 (K, W)를 호출해  (K, (V, W))인 새로운 RDD를 생성한다.



scala> val hash1 = sc.parallelize( Seq(("1", "A"), ("2", "B"), ("3", "C"), ("1","D")))

hash1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[64] at parallelize at <console>:24


scala> val hash2 = sc.parallelize( Seq(("1", "W"), ("2", "X"), ("3", "Y"), ("2","Z")))

hash2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[65] at parallelize at <console>:24




scala> hash1.join(hash2).collect()

res15: Array[(String, (String, String))] = Array((1,(A,W)), (1,(D,W)), (2,(B,X)), (2,(B,Z)), (3,(C,Y)))




cogroup 함수는 (K, V)를 (K, Iterable<V>)로 변환한다.



scala> hash1.cogroup(hash2).collect()

res16: Array[(String, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(A, D),CompactBuffer(W))), (2,(CompactBuffer(B),CompactBuffer(X, Z))), (3,(CompactBuffer(C),CompactBuffer(Y))))

Posted by '김용환'
,