[spark] join 예제

scala 2017. 5. 23. 14:56


spark의 join 예제이다. 


sql의 join과 같은 개념이다. collection과 case class를 활용해 데이터 집합을 하나로 결합할 수 있다. 





scala>     val person = sc.parallelize(Array((1, "samuel"), (2, "jackson"), (3, "redis"))).toDF("number", "name")

person: org.apache.spark.sql.DataFrame = [number: int, name: string]


scala> val address = sc.parallelize(Array(("samuel", "seoul"), ("jackson", "new york"), ("juno", "iceland"))).toDF("name", "address")

address: org.apache.spark.sql.DataFrame = [name: string, address: string]


scala> person.show

+------+-------+

|number|   name|

+------+-------+

|     1| samuel|

|     2|jackson|

|     3|  redis|

+------+-------+


scala> person.join(address, "name").show

+-------+------+--------+

|   name|number| address|

+-------+------+--------+

|jackson|     2|new york|

| samuel|     1|   seoul|

+-------+------+--------+





그러나 여러 join타입(예, left_outer)을 넣으면 에러가 발생한다.

 scala> person.join(address, "name", "left_outer").show
<console>:29: error: overloaded method value join with alternatives:
  (right: org.apache.spark.sql.Dataset[_],joinExprs: org.apache.spark.sql.Column,joinType: String)org.apache.spark.sql.DataFrame <and>
  (right: org.apache.spark.sql.Dataset[_],usingColumns: Seq[String],joinType: String)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.sql.DataFrame, String, String)
       person.join(address, "name", "left_outer").show
                    ^



"name" 대신 Seq("name")을 사용한다.

 
scala> person.join(address, Seq("name"), "inner").show
+-------+------+--------+
|   name|number| address|
+-------+------+--------+
|jackson|     2|new york|
| samuel|     1|   seoul|
+-------+------+--------+



scala> person.join(address, Seq("name"), "left_outer").show
+-------+------+--------+
|   name|number| address|
+-------+------+--------+
|jackson|     2|new york|
| samuel|     1|   seoul|
|  redis|     3|    null|
+-------+------+--------+



case class를 이용할 수도 있다.



scala>     val sqlContext = new SQLContext(sc)

warning: there was one deprecation warning; re-run with -deprecation for details

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2418ffcc


scala>     case class Person(number: Int, name: String)

defined class Person


scala>     case class Address(name: String, address: String)

defined class Address


scala>     val person = sqlContext.createDataFrame(Person(1, "samuel") :: Person(2, "jackson") :: Person(3, "redis") :: Nil).as("person_dataframe")

person: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [number: int, name: string]


scala>     val address = sqlContext.createDataFrame(Address("samuel", "seoul") :: Address("jackson", "new york") :: Address("juno", "iceland") :: Nil).as("address_dataframe")

address: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, address: string]


scala> val joined_dataframe = person.join(address, col("person_dataframe.name") === col("address_dataframe.name"), "inner")

joined_dataframe: org.apache.spark.sql.DataFrame = [number: int, name: string ... 2 more fields]

###아래 처럼 사용할 수도 있다.

scala> val joined_dataframe = person.join(address, $"person_dataframe.name" === $"address_dataframe.name", "inner")

joined_dataframe: org.apache.spark.sql.DataFrame = [number: int, name: string ... 2 more fields]



scala> joined_dataframe.show

+------+-------+-------+--------+

|number|   name|   name| address|

+------+-------+-------+--------+

|     1| samuel| samuel|   seoul|

|     2|jackson|jackson|new york|

+------+-------+-------+--------+







만약 필드 중에 null 컬럼 데이터가 있다면, Option을 사용하는 것이 좋은 방법일 것이다.

'scala' 카테고리의 다른 글

[spark] parquet 사용 예제  (0) 2017.05.26
[spark] zipWithIndex, for-yield 예제  (0) 2017.05.25
[spark] where과 filter의 차이  (0) 2017.05.23
[spark2] spark SQL 예제  (0) 2017.05.20
[spark2] spark2 rdd 생성 -makeRDD  (0) 2017.04.29
Posted by '김용환'
,