[spark] RDD join 예제

scala 2017. 3. 29. 23:14


두 RDD의  join 예제이다. 


Member와 Department RDD를 생성한다.


scala> case class Member(id: Int, name: String, dept: Int)


scala> val members = sc.parallelize(List(Member(1,"john", 100), Member(2,"samuel", 200), Member(3,"ethan", 200)))


scala> case class Department(id: Int, name: String)


scala> val departments = sc.parallelize(List(Department(100, "server"), Department(200, "client")))




바로 join하면 에러가 발생한다. 


scala> members.join(departments)

<console>:34: error: value join is not a member of org.apache.spark.rdd.RDD[Member]

       members.join(departments)



먼저 join될 수 있도록 특정 값으로 그룹핑한다. ShuffledRDD 타입이 되었다.


scala> val groupedMembers = members.groupBy(x => x.dept)

groupedMembers: org.apache.spark.rdd.RDD[(Int, Iterable[Member])] = ShuffledRDD[128] at groupBy at <console>:29



scala> groupedMembers.foreach(println)

(100,CompactBuffer(Member(1,john,100)))

(200,CompactBuffer(Member(2,samuel,200), Member(3,ethan,200)))



scala> val groupedDepartments = departments.groupBy(x => x.id)

groupedDepartments: org.apache.spark.rdd.RDD[(Int, Iterable[Department])] = ShuffledRDD[130] at groupBy at <console>:29


scala> groupedDepartments.foreach(println)

(200,CompactBuffer(Department(200,client)))

(100,CompactBuffer(Department(100,server)))





이제 join한다. 정상적으로 join된 것을 볼 수 있다. 



scala> groupedMembers.join(groupedDepartments).foreach(println)

(200,(CompactBuffer(Member(2,samuel,200), Member(3,ethan,200)),CompactBuffer(Department(200,client))))

(100,(CompactBuffer(Member(1,john,100)),CompactBuffer(Department(100,server))))





api를 살펴보면, 여러 필드로 다양한 join을 시도할 수 있다. 



DataFrame join(DataFrame right)
Cartesian join with another DataFrame.
DataFrame join(DataFrame right, Column joinExprs)
Inner join with another DataFrame, using the given join expression.
DataFrame join(DataFrame right, Column joinExprs, java.lang.String joinType)
Join with another DataFrame, using the given join expression.
DataFrame join(DataFrame right, scala.collection.Seq<java.lang.String> usingColumns)
Inner equi-join with another DataFrame using the given columns.
DataFrame join(DataFrame right, java.lang.String usingColumn)
Inner equi-join with another DataFrame using the given column.



Posted by '김용환'
,