[spark] dataframe 예제

scala 2017. 3. 15. 20:46



dataframe 예제이다.


scala> val df = Seq(("one", 1), ("one", 1), ("two", 1)).toDF("word", "count")

df: org.apache.spark.sql.DataFrame = [word: string, count: int]


scala> df.show()

+----+-----+

|word|count|

+----+-----+

| one|    1|

| one|    1|

| two|    1|

+----+-----+



scala> df.printSchema

root

 |-- word: string (nullable = true)

 |-- count: integer (nullable = false)




주의할 점은 where절 같이 비교하는 동등 비교를 하고 싶을 때 =만 넣으면 동작되지 않는다.



scala> df.filter(df("word") = "two" && df("count") = 1).show

<console>:29: error: value update is not a member of org.apache.spark.sql.DataFrame

       df.filter(df("word") = "two" && df("count") = 1).show

                 ^

<console>:29: error: value && is not a member of String

       df.filter(df("word") = "two" && df("count") = 1).show

                                    ^




filter에 ===를 넣어야 conditional 처럼 사용할 수 있다. 



scala> df.filter(df("word") === "one" && df("count") === 2).show

+----+-----+

|word|count|

+----+-----+

+----+-----+



scala> df.filter(df("word") === "one" && df("count") === 1).show

+----+-----+

|word|count|

+----+-----+

| one|    1|

| one|    1|

+----+-----+



scala> df.filter(df("word") === "two" && df("count") === 1).show

+----+-----+

|word|count|

+----+-----+

| two|    1|

+----+-----+


not equals문은 =!= 이다. 


 scala> df.filter(df("word") =!= "two" && df("count") === 1).show

+----+-----+

|word|count|

+----+-----+

| one|    1|

| one|    1|

+----+-----+




개수를 얻고 싶으면 count를 호출한다.


scala> df.filter(df("word") =!= "two" && df("count") === 1).count

res36: Long = 2




배열로 얻으려면 collect함수를 호출한다.


scala> df.filter(df("word") =!= "two" && df("count") === 1).collect

res37: Array[org.apache.spark.sql.Row] = Array([one,1], [one,1])




group by가 가능하고 max, min, mean, sum, count를 사용할 수 있다. 


scala> df.groupBy($"word").max().show

+----+----------+

|word|max(count)|

+----+----------+

| two|         1|

| one|         1|

+----+----------+



scala> df.groupBy($"word").min().show

+----+----------+

|word|min(count)|

+----+----------+

| two|         1|

| one|         1|

+----+----------+



scala> df.groupBy($"word").mean().show

+----+----------+

|word|avg(count)|

+----+----------+

| two|       1.0|

| one|       1.0|

+----+----------+



scala> df.groupBy($"word").sum().show

+----+----------+

|word|sum(count)|

+----+----------+

| two|         1|

| one|         2|

+----+----------+



scala> df.groupBy($"word").count().show

+----+-----+

|word|count|

+----+-----+

| two|    1|

| one|    2|

+----+-----+



describe 기능이 있다. 



scala> df.describe()

res67: org.apache.spark.sql.DataFrame = [summary: string, word: string ... 1 more field]


scala> df.describe().explain

== Physical Plan ==

LocalTableScan [summary#469, word#470, count#471]





table에 저장해서 sql처럼 사용할 수도 있다.


scala> df.registerTempTable("df")

warning: there was one deprecation warning; re-run with -deprecation for details


scala> sqlContext.sql("select word, count from df")

res43: org.apache.spark.sql.DataFrame = [word: string, count: int]


scala> sqlContext.sql("select word, count from df").show

+----+-----+

|word|count|

+----+-----+

| one|    1|

| one|    1|

| two|    1|

+----+-----+


scala> sqlContext.sql("select word, count from df where word=\"one\" and count=1").show

+----+-----+

|word|count|

+----+-----+

| one|    1|

| one|    1|

+----+-----+





case cass를 이용해 dataframe을 생성할 수 있다. 


scala> case class Person(name: String, age: Int)

defined class Person


scala> val people = Seq(Person("Jacek", 42), Person("Patryk", 19), Person("Maksym", 5))

people: Seq[Person] = List(Person(Jacek,42), Person(Patryk,19), Person(Maksym,5))


scala> val df = spark.createDataFrame(people)

df: org.apache.spark.sql.DataFrame = [name: string, age: int]


scala> df.show

+------+---+

|  name|age|

+------+---+

| Jacek| 42|

|Patryk| 19|

|Maksym|  5|

+------+---+




특정 파일의 포맷도 읽을 수 있다. 


scala> val df = spark.read.format("com.databricks.spark.csv").option("header", "true").load("Cartier+for+WinnersCurse.csv")


scala> val df = spark.read.orc(path)


scala> val df = spark.read.parquet(path)

Posted by '김용환'
,




hadoop의 위치를 /a/b/c라고 하고 DataFrame에서 partitionBy를 다음처럼 사용한다면,


.write.mode(SaveMode.Append).partitionBy("command", "subcommand").orc(hourPath)


hadoop의 디렉토리 구조가 된다. 


/a/b/c/command=add/action=abc








Posted by '김용환'
,



kafka에서 스트림을 받아 spark stream으로 처리할 때, hdfs에 append를 할 수 없다. 


rdd에 있는 saveAsTextFile밖에 없다. 

rdd.saveAsTextFile(savePath)


대안으로 FileUtils.copyMerge가 있지만, stream 처리할 때는 쓸 수 없다.

org.apache.hadoop.fs.FileUtil 클래스

static boolean copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String addString)



아래처럼 사용할 수 있긴 한데..
def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
val hdfs = FileSystem.get(ssc.sparkContext.hadoopConfiguration)
if (!HdfsUtil.exists(dstPath)) HdfsUtil.mkdir(dstPath)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, ssc.sparkContext.hadoopConfiguration, null)
}

단점은 hadoop 3.0에서 사라졌다!!!


https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/release/3.0.0-alpha1/RELEASENOTES.3.0.0-alpha1.html

https://issues.apache.org/jira/browse/HADOOP-12967



Removed FileUtil.copyMerge.



해결 할 수 있는 방법으로 rdd를 Dataframe으로 바꾼 후 저장할 때 orc로 append할 수 있다.


https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FileUtil.html#copyMerge(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path, org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path, boolean, org.apache.hadoop.conf.Configuration, java.lang.String)

stream
.flatMap(line => CustomLog.parse(line))
.repartition(1).foreachRDD { rdd =>
val df = rdd.map { log =>
(log.host, log.stdate, log.sttime)
}.toDF("host", "stdate", "sttime").coalesce(1)
.write.mode(SaveMode.Append).orc(hourPath)
}







Posted by '김용환'
,




 missing EOF at '...' near ''hdfs://..' 에러는 Location 밑에 tblproperties이 없어서 에러가 발생할 수 있다.



 LOCATION 

 'hdfs://hadoop/goolgleplus/log'

 tblproperties ("orc.compress"="NONE")





external 테이블에 location과 stored as orc라는 프로퍼티를 함꼐 추가하면 에러가 발생할 수 있다.

Posted by '김용환'
,




spark context에 집합 오퍼레이션이 있다. 


scala> val rdd1 = sc.parallelize(List("Spark","Scala"))

rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24


scala> val rdd2 = sc.parallelize(List("Akka","Scala"))

rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[51] at parallelize at <console>:24




합집합


scala> rdd1.union(rdd2).collect()

res11: Array[String] = Array(Spark, Scala, Akka, Scala)



교집합.


scala> rdd1.intersection(rdd2).collect()

res12: Array[String] = Array(Scala)



카테시안


scala> rdd1.cartesian(rdd2).collect()

res13: Array[(String, String)] = Array((Spark,Akka), (Spark,Scala), (Scala,Akka), (Scala,Scala))



차집합(A-B)


scala> rdd1.subtract(rdd2).collect()

res14: Array[String] = Array(Spark)





join 함수는(K, V)와 (K, W)를 호출해  (K, (V, W))인 새로운 RDD를 생성한다.



scala> val hash1 = sc.parallelize( Seq(("1", "A"), ("2", "B"), ("3", "C"), ("1","D")))

hash1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[64] at parallelize at <console>:24


scala> val hash2 = sc.parallelize( Seq(("1", "W"), ("2", "X"), ("3", "Y"), ("2","Z")))

hash2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[65] at parallelize at <console>:24




scala> hash1.join(hash2).collect()

res15: Array[(String, (String, String))] = Array((1,(A,W)), (1,(D,W)), (2,(B,X)), (2,(B,Z)), (3,(C,Y)))




cogroup 함수는 (K, V)를 (K, Iterable<V>)로 변환한다.



scala> hash1.cogroup(hash2).collect()

res16: Array[(String, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(A, D),CompactBuffer(W))), (2,(CompactBuffer(B),CompactBuffer(X, Z))), (3,(CompactBuffer(C),CompactBuffer(Y))))

Posted by '김용환'
,


Intellij에서 컴파일할 때 org.jboss.interceptor#jboss-interceptor-api;1.1 not found 예외가 발생할 수 있다.


not found: org.jboss.interceptor#jboss-interceptor-api;1.1


원인은 Maven central repository의 SHA-1 정도가 달라서 나는 문제이어서 JBoss쪽 저장소를 추가하면 Intellij 컴파일이 될 것이다 



sbt가 0.13이라면 ~/.sbt/0.13/global.sbt 에 다음 내용을 추가한다


resolvers += "JBoss" at "https://repository.jboss.org/"




sbt가 0.13.10이라면 ~/.sbt/0.13.10/global.sbt 에 다음 내용을 추가한다


resolvers += "JBoss" at "https://repository.jboss.org/"

Posted by '김용환'
,




spark에서 특정 단어의 개수를 찾는 예시이다. 이 예시는 pv, uv를 뽑는데 도움이 되는 코드이다. 



예제


 https://kodejava.org/how-do-i-format-a-date-into-ddmmyyyy/를 참조했다.

$ cat  xxx.txt

     Date date = Calendar.getInstance().getTime();


        // Display a date in day, month, year format

        DateFormat formatter = new SimpleDateFormat("dd/MM/yyyy");

        String today = formatter.format(date);

        System.out.println("Today : " + today);


        // Display date with day name in a short format

        formatter = new SimpleDateFormat("EEE, dd/MM/yyyy");

        today = formatter.format(date);

        System.out.println("Today : " + today);


        // Display date with a short day and month name

        formatter = new SimpleDateFormat("EEE, dd MMM yyyy");

        today = formatter.format(date);

        System.out.println("Today : " + today);


        // Formatting date with full day and month name and show time up to

        // milliseconds with AM/PM

        formatter = new SimpleDateFormat("EEEE, dd MMMM yyyy, hh:mm:ss.SSS a");

        today = formatter.format(date);

        System.out.println("Today : " + today);




scala> val codes = sc.textFile("xxx.txt")

codes: org.apache.spark.rdd.RDD[String] = xxx.txt MapPartitionsRDD[1] at textFile at <console>:24


scala> val lower = codes.map( line =>line.toLowerCase)

lower: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:26


scala> val words = lower.flatMap(line => line.split("\\s+"))

words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at <console>:28


scala> val counts = words.map(word => (word, 1))

counts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:30


scala> val frequency = counts.reduceByKey(_ + _)

frequency: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[5] at reduceByKey at <console>:32


scala> val invFrequency = frequency.map(_.swap)

invFrequency: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[6] at map at <console>:34


scala> invFrequency.top(10).foreach(println)

(23,)

(9,=)

(6,date)

(5,//)

(4,with)

(4,today);)

(4,today)

(4,system.out.println("today)

(4,new)

(4,formatter.format(date);)



이를 다음처럼 축약해서 쓸 수 있다. 



scala> val result = sc.textFile("xxx.txt").map( line =>line.toLowerCase).flatMap(line => line.split("\\s+")).map(word => (word, 1)).reduceByKey(_ + _).map(_.swap)

result: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[21] at map at <console>:24


scala> result.top(10).foreach(println)

(23,)

(9,=)

(6,date)

(5,//)

(4,with)

(4,today);)

(4,today)

(4,system.out.println("today)

(4,new)

(4,formatter.format(date);)





필요없는 코드는 다음처럼 stopWords를 만들어 필터링할 수 있다. 


scala> val stopWords = Set("", "=", "//", ")", "(", ";", ":", "+", "-", "\"")

stopWords: scala.collection.immutable.Set[String] = Set("", =, ), ", -, ;, //, +, (, :)


scala> val result = sc.textFile("xxx.txt").map( line =>line.toLowerCase).flatMap(line => line.split("\\s+")).filter(! stopWords.contains(_)).map(word => (word, 1)).reduceByKey(_ + _).map(_.swap)

result: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[34] at map at <console>:26


scala> result.top(10).foreach(println)

(6,date)

(4,with)

(4,today);)

(4,today)

(4,system.out.println("today)

(4,new)

(4,formatter.format(date);)

(4,formatter)

(3,name)

(3,display)



Posted by '김용환'
,