[spark] join 예제
spark의 join 예제이다.
sql의 join과 같은 개념이다. collection과 case class를 활용해 데이터 집합을 하나로 결합할 수 있다.
scala> val person = sc.parallelize(Array((1, "samuel"), (2, "jackson"), (3, "redis"))).toDF("number", "name")
person: org.apache.spark.sql.DataFrame = [number: int, name: string]
scala> val address = sc.parallelize(Array(("samuel", "seoul"), ("jackson", "new york"), ("juno", "iceland"))).toDF("name", "address")
address: org.apache.spark.sql.DataFrame = [name: string, address: string]
scala> person.show
+------+-------+
|number| name|
+------+-------+
| 1| samuel|
| 2|jackson|
| 3| redis|
+------+-------+
scala> person.join(address, "name").show
+-------+------+--------+
| name|number| address|
+-------+------+--------+
|jackson| 2|new york|
| samuel| 1| seoul|
+-------+------+--------+
case class를 이용할 수도 있다.
scala> val sqlContext = new SQLContext(sc)
warning: there was one deprecation warning; re-run with -deprecation for details
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2418ffcc
scala> case class Person(number: Int, name: String)
defined class Person
scala> case class Address(name: String, address: String)
defined class Address
scala> val person = sqlContext.createDataFrame(Person(1, "samuel") :: Person(2, "jackson") :: Person(3, "redis") :: Nil).as("person_dataframe")
person: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: int, name: string]
scala> val address = sqlContext.createDataFrame(Address("samuel", "seoul") :: Address("jackson", "new york") :: Address("juno", "iceland") :: Nil).as("address_dataframe")
address: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, address: string]
scala> val joined_dataframe = person.join(address, col("person_dataframe.name") === col("address_dataframe.name"), "inner")
joined_dataframe: org.apache.spark.sql.DataFrame = [number: int, name: string ... 2 more fields]
###아래 처럼 사용할 수도 있다.
scala> val joined_dataframe = person.join(address, $"person_dataframe.name" === $"address_dataframe.name", "inner")
joined_dataframe: org.apache.spark.sql.DataFrame = [number: int, name: string ... 2 more fields]
scala> joined_dataframe.show
+------+-------+-------+--------+
|number| name| name| address|
+------+-------+-------+--------+
| 1| samuel| samuel| seoul|
| 2|jackson|jackson|new york|
+------+-------+-------+--------+
만약 필드 중에 null 컬럼 데이터가 있다면, Option을 사용하는 것이 좋은 방법일 것이다.