dataframe 예제이다.
scala> val df = Seq(("one", 1), ("one", 1), ("two", 1)).toDF("word", "count")
df: org.apache.spark.sql.DataFrame = [word: string, count: int]
scala> df.show()
+----+-----+
|word|count|
+----+-----+
| one| 1|
| one| 1|
| two| 1|
+----+-----+
scala> df.printSchema
root
|-- word: string (nullable = true)
|-- count: integer (nullable = false)
주의할 점은 where절 같이 비교하는 동등 비교를 하고 싶을 때 =만 넣으면 동작되지 않는다.
scala> df.filter(df("word") = "two" && df("count") = 1).show
<console>:29: error: value update is not a member of org.apache.spark.sql.DataFrame
df.filter(df("word") = "two" && df("count") = 1).show
^
<console>:29: error: value && is not a member of String
df.filter(df("word") = "two" && df("count") = 1).show
^
filter에 ===를 넣어야 conditional 처럼 사용할 수 있다.
scala> df.filter(df("word") === "one" && df("count") === 2).show
+----+-----+
|word|count|
+----+-----+
+----+-----+
scala> df.filter(df("word") === "one" && df("count") === 1).show
+----+-----+
|word|count|
+----+-----+
| one| 1|
| one| 1|
+----+-----+
scala> df.filter(df("word") === "two" && df("count") === 1).show
+----+-----+
|word|count|
+----+-----+
| two| 1|
+----+-----+
not equals문은 =!= 이다.
scala> df.filter(df("word") =!= "two" && df("count") === 1).show
+----+-----+
|word|count|
+----+-----+
| one| 1|
| one| 1|
+----+-----+
개수를 얻고 싶으면 count를 호출한다.
scala> df.filter(df("word") =!= "two" && df("count") === 1).count
res36: Long = 2
배열로 얻으려면 collect함수를 호출한다.
scala> df.filter(df("word") =!= "two" && df("count") === 1).collect
res37: Array[org.apache.spark.sql.Row] = Array([one,1], [one,1])
group by가 가능하고 max, min, mean, sum, count를 사용할 수 있다.
scala> df.groupBy($"word").max().show
+----+----------+
|word|max(count)|
+----+----------+
| two| 1|
| one| 1|
+----+----------+
scala> df.groupBy($"word").min().show
+----+----------+
|word|min(count)|
+----+----------+
| two| 1|
| one| 1|
+----+----------+
scala> df.groupBy($"word").mean().show
+----+----------+
|word|avg(count)|
+----+----------+
| two| 1.0|
| one| 1.0|
+----+----------+
scala> df.groupBy($"word").sum().show
+----+----------+
|word|sum(count)|
+----+----------+
| two| 1|
| one| 2|
+----+----------+
scala> df.groupBy($"word").count().show
+----+-----+
|word|count|
+----+-----+
| two| 1|
| one| 2|
+----+-----+
describe 기능이 있다.
scala> df.describe()
res67: org.apache.spark.sql.DataFrame = [summary: string, word: string ... 1 more field]
scala> df.describe().explain
== Physical Plan ==
LocalTableScan [summary#469, word#470, count#471]
table에 저장해서 sql처럼 사용할 수도 있다.
scala> df.registerTempTable("df")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> sqlContext.sql("select word, count from df")
res43: org.apache.spark.sql.DataFrame = [word: string, count: int]
scala> sqlContext.sql("select word, count from df").show
+----+-----+
|word|count|
+----+-----+
| one| 1|
| one| 1|
| two| 1|
+----+-----+
scala> sqlContext.sql("select word, count from df where word=\"one\" and count=1").show
+----+-----+
|word|count|
+----+-----+
| one| 1|
| one| 1|
+----+-----+
case cass를 이용해 dataframe을 생성할 수 있다.
scala> case class Person(name: String, age: Int)
defined class Person
scala> val people = Seq(Person("Jacek", 42), Person("Patryk", 19), Person("Maksym", 5))
people: Seq[Person] = List(Person(Jacek,42), Person(Patryk,19), Person(Maksym,5))
scala> val df = spark.createDataFrame(people)
df: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> df.show
+------+---+
| name|age|
+------+---+
| Jacek| 42|
|Patryk| 19|
|Maksym| 5|
+------+---+
특정 파일의 포맷도 읽을 수 있다.
scala> val df = spark.read.format("com.databricks.spark.csv").option("header", "true").load("Cartier+for+WinnersCurse.csv")
scala> val df = spark.read.orc(path)
scala> val df = spark.read.parquet(path)
'scala' 카테고리의 다른 글
[spark 1.6] hive 접근하기 (0) | 2017.03.22 |
---|---|
[scala] null var 사용할 때 타입 사용하기 (0) | 2017.03.16 |
[spark] dataframe의 partitionby 사용시 hadoop 디렉토리 구조 (0) | 2017.03.15 |
[spark] kafka stream을 append처리 (0) | 2017.03.15 |
[spark] 집합 함수 - union, intersection, cartesian, subtract, join, cogroup 예제 (0) | 2017.03.15 |