spark에서 2개의 컬럼을 가진 테이블을 생성한다.


scala> val df = sc.parallelize(Seq((1, "samuel"), (2, "jack"), (3, "jonathan"))).toDF("id", "name")

df: org.apache.spark.sql.DataFrame = [id: int, column: string]



제대로 생성되었는지 확인하기 위해 df("")을 사용하면 안된다. Column 객체만 얻기 때문이다.


scala> df("name")

res11: org.apache.spark.sql.Column = name



show와 select를 사용하면 제대로 저장되었는지 확인할 수 있다.


scala> df.show

+---+--------+

| id|    name|

+---+--------+

|  1|  samuel|

|  2|    jack|

|  3|jonathan|

+---+--------+


scala> df.select("id").show

+---+

| id|

+---+

|  1|

|  2|

|  3|

+---+



scala> df.select("name").show

+--------+

|    name|

+--------+

|  samuel|

|    jack|

|jonathan|

+--------+


데이터 프레임의 메타 정보를 알려면 columns, dtypes, printSchema를 사용한다.


scala> df.columns
res42: Array[String] = Array(id, name)

scala> df.dtypes
res43: Array[(String, String)] = Array((id,IntegerType), (name,StringType))


scala> df.printSchema
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)


그리고 filter,where을 사용해 dataframe의 조건절로 사용할 수 있다. 

scala> df.filter(df("id").equalTo("1")).show
+---+------+
| id|  name|
+---+------+
|  1|samuel|
+---+------+


scala> df.filter("id > 2").show
+---+--------+
| id|    name|
+---+--------+
|  3|jonathan|
+---+--------+



scala> df.filter(df("name").equalTo("samuel")).show
+---+------+
| id|  name|
+---+------+
|  1|samuel|
+---+------+



scala> df.where("id=1").show
+---+------+
| id|  name|
+---+------+
|  1|samuel|
+---+------+


또는 col을 사용해 깔끔하게 사용할 수 도 있다. 


scala> df.where(col("id") isin (Seq("1") : _*)).show
+---+------+
| id|  name|
+---+------+
|  1|samuel|
+---+------+




여러 개의 단어가 있는지 확인하는 isin를 사용할 수 있다. 

scala> df.filter(df("name").isin(Seq("samuel", "jack"): _*)).show
+---+------+
| id|  name|
+---+------+
|  1|samuel|
|  2|  jack|
+---+------+



contains를 사용해 특정 단어가 포함되었는지 확인할 수 있다.



scala> df.where(df("name").contains("j")).show

+---+--------+

| id|    name|

+---+--------+

|  2|    jack|

|  3|jonathan|

+---+--------+



between을 사용해 범위를 안다.


scala> df.where(col("id") between (1,2)).show

+---+------+

| id|  name|

+---+------+

|  1|samuel|

|  2|  jack|

+---+------+



select를 이용해 query language처럼 사용할 수 있다. 


scala> df.select("name").where(col("id") between (1,2)).show

+------+

|  name|

+------+

|samuel|

|  jack|

+------+



컬럼을 dataframe에 추가할 수 있다. 


scala> df.withColumn("xx", col("id")).show

+---+--------+---+

| id|    name| xx|

+---+--------+---+

|  1|  samuel|  1|

|  2|    jack|  2|

|  3|jonathan|  3|

+---+--------+---+



컬럼을 dataframe에서 삭제할 수 있다. 


scala> df.drop(col("xxx"))

res56: org.apache.spark.sql.DataFrame = [id: int, name: string]


scala> df.show

+---+--------+

| id|    name|

+---+--------+

|  1|  samuel|

|  2|    jack|

|  3|jonathan|

+---+--------+




데이터 프레임에 새로운 컬럼을 udf를 이용해서 붙일 수 있다.  



scala> import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.udf


scala> case class Member(id:Int, name:String)

defined class Member


scala> case class Member(id:Int, name:String)

defined class Member


scala> val mudf=udf((id:Int, name:String) => Member(1*100, name))

mudf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,StructType(StructField(id,IntegerType,false), StructField(name,StringType,true)),Some(List(IntegerType, StringType)))


scala> val df1 = df.withColumn("newColumn", mudf($"id", $"name"))

df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]


scala> df1.show

+---+--------+--------------+

| id|    name|     newColumn|

+---+--------+--------------+

|  1|  samuel|  [100,samuel]|

|  2|    jack|    [100,jack]|

|  3|jonathan|[100,jonathan]|

+---+--------+--------------+


Posted by '김용환'
,