[spark] dataframe 예제

scala 2017. 3. 15. 20:46



dataframe 예제이다.


scala> val df = Seq(("one", 1), ("one", 1), ("two", 1)).toDF("word", "count")

df: org.apache.spark.sql.DataFrame = [word: string, count: int]


scala> df.show()

+----+-----+

|word|count|

+----+-----+

| one|    1|

| one|    1|

| two|    1|

+----+-----+



scala> df.printSchema

root

 |-- word: string (nullable = true)

 |-- count: integer (nullable = false)




주의할 점은 where절 같이 비교하는 동등 비교를 하고 싶을 때 =만 넣으면 동작되지 않는다.



scala> df.filter(df("word") = "two" && df("count") = 1).show

<console>:29: error: value update is not a member of org.apache.spark.sql.DataFrame

       df.filter(df("word") = "two" && df("count") = 1).show

                 ^

<console>:29: error: value && is not a member of String

       df.filter(df("word") = "two" && df("count") = 1).show

                                    ^




filter에 ===를 넣어야 conditional 처럼 사용할 수 있다. 



scala> df.filter(df("word") === "one" && df("count") === 2).show

+----+-----+

|word|count|

+----+-----+

+----+-----+



scala> df.filter(df("word") === "one" && df("count") === 1).show

+----+-----+

|word|count|

+----+-----+

| one|    1|

| one|    1|

+----+-----+



scala> df.filter(df("word") === "two" && df("count") === 1).show

+----+-----+

|word|count|

+----+-----+

| two|    1|

+----+-----+


not equals문은 =!= 이다. 


 scala> df.filter(df("word") =!= "two" && df("count") === 1).show

+----+-----+

|word|count|

+----+-----+

| one|    1|

| one|    1|

+----+-----+




개수를 얻고 싶으면 count를 호출한다.


scala> df.filter(df("word") =!= "two" && df("count") === 1).count

res36: Long = 2




배열로 얻으려면 collect함수를 호출한다.


scala> df.filter(df("word") =!= "two" && df("count") === 1).collect

res37: Array[org.apache.spark.sql.Row] = Array([one,1], [one,1])




group by가 가능하고 max, min, mean, sum, count를 사용할 수 있다. 


scala> df.groupBy($"word").max().show

+----+----------+

|word|max(count)|

+----+----------+

| two|         1|

| one|         1|

+----+----------+



scala> df.groupBy($"word").min().show

+----+----------+

|word|min(count)|

+----+----------+

| two|         1|

| one|         1|

+----+----------+



scala> df.groupBy($"word").mean().show

+----+----------+

|word|avg(count)|

+----+----------+

| two|       1.0|

| one|       1.0|

+----+----------+



scala> df.groupBy($"word").sum().show

+----+----------+

|word|sum(count)|

+----+----------+

| two|         1|

| one|         2|

+----+----------+



scala> df.groupBy($"word").count().show

+----+-----+

|word|count|

+----+-----+

| two|    1|

| one|    2|

+----+-----+



describe 기능이 있다. 



scala> df.describe()

res67: org.apache.spark.sql.DataFrame = [summary: string, word: string ... 1 more field]


scala> df.describe().explain

== Physical Plan ==

LocalTableScan [summary#469, word#470, count#471]





table에 저장해서 sql처럼 사용할 수도 있다.


scala> df.registerTempTable("df")

warning: there was one deprecation warning; re-run with -deprecation for details


scala> sqlContext.sql("select word, count from df")

res43: org.apache.spark.sql.DataFrame = [word: string, count: int]


scala> sqlContext.sql("select word, count from df").show

+----+-----+

|word|count|

+----+-----+

| one|    1|

| one|    1|

| two|    1|

+----+-----+


scala> sqlContext.sql("select word, count from df where word=\"one\" and count=1").show

+----+-----+

|word|count|

+----+-----+

| one|    1|

| one|    1|

+----+-----+





case cass를 이용해 dataframe을 생성할 수 있다. 


scala> case class Person(name: String, age: Int)

defined class Person


scala> val people = Seq(Person("Jacek", 42), Person("Patryk", 19), Person("Maksym", 5))

people: Seq[Person] = List(Person(Jacek,42), Person(Patryk,19), Person(Maksym,5))


scala> val df = spark.createDataFrame(people)

df: org.apache.spark.sql.DataFrame = [name: string, age: int]


scala> df.show

+------+---+

|  name|age|

+------+---+

| Jacek| 42|

|Patryk| 19|

|Maksym|  5|

+------+---+




특정 파일의 포맷도 읽을 수 있다. 


scala> val df = spark.read.format("com.databricks.spark.csv").option("header", "true").load("Cartier+for+WinnersCurse.csv")


scala> val df = spark.read.orc(path)


scala> val df = spark.read.parquet(path)

Posted by '김용환'
,