[spark] RDD join 예제
두 RDD의 join 예제이다.
Member와 Department RDD를 생성한다.
scala> case class Member(id: Int, name: String, dept: Int)
scala> val members = sc.parallelize(List(Member(1,"john", 100), Member(2,"samuel", 200), Member(3,"ethan", 200)))
scala> case class Department(id: Int, name: String)
scala> val departments = sc.parallelize(List(Department(100, "server"), Department(200, "client")))
바로 join하면 에러가 발생한다.
scala> members.join(departments)
<console>:34: error: value join is not a member of org.apache.spark.rdd.RDD[Member]
members.join(departments)
먼저 join될 수 있도록 특정 값으로 그룹핑한다. ShuffledRDD 타입이 되었다.
scala> val groupedMembers = members.groupBy(x => x.dept)
groupedMembers: org.apache.spark.rdd.RDD[(Int, Iterable[Member])] = ShuffledRDD[128] at groupBy at <console>:29
scala> groupedMembers.foreach(println)
(100,CompactBuffer(Member(1,john,100)))
(200,CompactBuffer(Member(2,samuel,200), Member(3,ethan,200)))
scala> val groupedDepartments = departments.groupBy(x => x.id)
groupedDepartments: org.apache.spark.rdd.RDD[(Int, Iterable[Department])] = ShuffledRDD[130] at groupBy at <console>:29
scala> groupedDepartments.foreach(println)
(200,CompactBuffer(Department(200,client)))
(100,CompactBuffer(Department(100,server)))
이제 join한다. 정상적으로 join된 것을 볼 수 있다.
scala> groupedMembers.join(groupedDepartments).foreach(println)
(200,(CompactBuffer(Member(2,samuel,200), Member(3,ethan,200)),CompactBuffer(Department(200,client))))
(100,(CompactBuffer(Member(1,john,100)),CompactBuffer(Department(100,server))))