rdd를 dataframe으로 만드는 방법 (1.6)


1) SQLContext를 사용하는 방법


val sqlContext = new SQLContext(sc) 

import sqlContext.implicits._

rdd.toDF()





2) HiveContext를 이용해 DataFrame.createDataframe 이용


import scala.io.Source
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext 
import org.apache.spark.sql.hive.HiveContext
   
val peopleRDD = sc.textFile(filename)

val schemaString = "name age"

val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

val sqlContext = new HiveContext(sc)

val peopleDF = sqlContext.createDataFrame(rowRDD, schema)
peopleDF.registerTempTable("people")

val results = sqlContext.sql("SELECT name FROM people")
results.collect().foreach(println)




'scala' 카테고리의 다른 글

[spark] spark summit 자료  (0) 2017.02.22
[scala] Array.transpose 예시  (0) 2017.02.17
[spark] foreachPartition 예시  (0) 2017.02.14
[zepplin] 여러 spark context 사용하기  (0) 2017.02.14
scala에서 uuid 생성하는 방법  (0) 2017.02.09
Posted by '김용환'
,




foreach 와 foreachPartition의 차이


foreach는 간단히 collection을 리스트로 출력한다.


val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

list.foreach(x => println(x))




foreachPartition는 RDD를 partition처리한다. 

partition 개수는 foreachPartition 전에 미리 지정되야 한다. 하지만 리턴 값이 필요없을 때 사용된다.


val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)

b.foreachPartition(x => println(x.reduce(_ + _))) 





리턴 값이 필요할 때는 mapPartition 또는 mapPartitionsWithIndex을 사용한다. 


val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 2)

b.foreachPartition(x => println(x.reduce(_ + _)))

val mapped = b.mapPartitionsWithIndex {

    (index, iterator) => {

        println("aaaa -> " + index)

        val myList = iterator.toList

        myList.map(x => x + " -> " + index).iterator

    }

}


mapped.collect()


res41: Array[String] = Array(1 -> 0, 2 -> 0, 3 -> 0, 4 -> 0, 5 -> 0, 6 -> 1, 7 -> 1, 8 -> 1, 9 -> 1, 10 -> 1)



데이터 프레임도 사용할 수 있다.


       dataframes.mapPartitions(_ grouped 10).foreach { batch =>

       val accountWithSep =

        batch.map {

        case Row(accountId: Int) => accountId.toString()

      }.mkString(",")

    




Posted by '김용환'
,

zepplin에서 spark을 테스트하다가 아래와 같은 에러를 만날 수 있다. 

여러 spark context가 쓰인 이유인데, allowMultipleContext를 true로 설정하면 문제가 없다. 



org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:


문제 해결


  val sparkConf = new SparkConf()

    .setAppName("abc")

    .set("spark.driver.allowMultipleContexts", "true");







Posted by '김용환'
,