spark context에 집합 오퍼레이션이 있다.
scala> val rdd1 = sc.parallelize(List("Spark","Scala"))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24
scala> val rdd2 = sc.parallelize(List("Akka","Scala"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[51] at parallelize at <console>:24
합집합
scala> rdd1.union(rdd2).collect()
res11: Array[String] = Array(Spark, Scala, Akka, Scala)
교집합.
scala> rdd1.intersection(rdd2).collect()
res12: Array[String] = Array(Scala)
카테시안
scala> rdd1.cartesian(rdd2).collect()
res13: Array[(String, String)] = Array((Spark,Akka), (Spark,Scala), (Scala,Akka), (Scala,Scala))
차집합(A-B)
scala> rdd1.subtract(rdd2).collect()
res14: Array[String] = Array(Spark)
join 함수는(K, V)와 (K, W)를 호출해 (K, (V, W))인 새로운 RDD를 생성한다.
scala> val hash1 = sc.parallelize( Seq(("1", "A"), ("2", "B"), ("3", "C"), ("1","D")))
hash1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[64] at parallelize at <console>:24
scala> val hash2 = sc.parallelize( Seq(("1", "W"), ("2", "X"), ("3", "Y"), ("2","Z")))
hash2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[65] at parallelize at <console>:24
scala> hash1.join(hash2).collect()
res15: Array[(String, (String, String))] = Array((1,(A,W)), (1,(D,W)), (2,(B,X)), (2,(B,Z)), (3,(C,Y)))
cogroup 함수는 (K, V)를 (K, Iterable<V>)로 변환한다.
scala> hash1.cogroup(hash2).collect()
res16: Array[(String, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(A, D),CompactBuffer(W))), (2,(CompactBuffer(B),CompactBuffer(X, Z))), (3,(CompactBuffer(C),CompactBuffer(Y))))
'scala' 카테고리의 다른 글
[spark] dataframe의 partitionby 사용시 hadoop 디렉토리 구조 (0) | 2017.03.15 |
---|---|
[spark] kafka stream을 append처리 (0) | 2017.03.15 |
[spark] sbt 빌드시 - not found: org.jboss.interceptor#jboss-interceptor-api;1.1 에러 해결 (0) | 2017.03.15 |
[spark] RDD 테스트 - word count 예제 (0) | 2017.03.15 |
[spark] spark 2.0, 2.1 사용시 주의사항 - java.util.NoSuchElementException: None.get (0) | 2017.03.14 |