Spark에서 DataFrame으로 장난치다가

 requirement failed: Currently correlation calculation for columns with dataType string not supported.라는 에러를 만날 수 있다. 


이는 데이터 타입을 inferScheme을 통해 추론하는데 데이터 타입이 Long/Int로 변환되야 하는데 String 타입으로 변환된 컬럼 데이터를 corr라는 sql 함수로 계산하다가 에러가 발생한 것이다.


이럴 때는 명시적으로 StructType을 사용해 스키마를 지원하는 것이 좋다.



import org.apache.spark.sql.types.{StructType, StructField, StringType, LongType}

val myManualSchema = StructType(Array(
StructField("InvoiceNo", LongType, true),
StructField("StockCode", StringType, true),
StructField("Description", StringType, true),
StructField("Quantity", LongType, true),
StructField("InvoiceDate", StringType, true),
StructField("UnitPrice", LongType, true),
StructField("CustomerID", StringType, true),
StructField("Country", StringType, true)
))

val df = spark.read.format("csv")
.option("header", true)
.schema(myManualSchema)
.load("origin-source/data/retail-data/by-day/2010-12-01.csv")


Posted by '김용환'
,



spark 앱을 로컬(Intellij)에서 테스트/실행할 때 

아래와 같이 SparkDriver 를 바인딩하지 못해 에러가 나는 경우가 종종 발견할 수 있다.




WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.


..java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries!






이럴 때는 다음 코드를 참조해서 SparkConf 인스턴스를 생성할 때 다음 코드로 수정한다. 더 이상 발생하지 않을 것이다.



lazy val sparkConf: SparkConf  = new SparkConf().setMaster("local[1]") //상관없음

                              .setAppName("spark-test") 

                              .set("spark.driver.host", "localhost"); // 반드시 필수




Posted by '김용환'
,


rawdata를 case class로 정의해서 spark Dataset으로 encoding할 때 아래 에러가 발생할 수 있다.



Error:(14, 31) Unable to find encoder for type Flight. An implicit Encoder[Flight] is needed to store Flight instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

    val flights = flightsDF.as[Flight]

Error:(14, 31) not enough arguments for method as: (implicit evidence$2: org.apache.spark.sql.Encoder[Flight])org.apache.spark.sql.Dataset[Flight].

Unspecified value parameter evidence$2.

    val flights = flightsDF.as[Flight]



먼저 case class를 main 앞에 위치시키고 import spark.implicits._ 를 main 안에 위치한다.



case class Flight(DEST_COUNTRY_NAME: String, 
           ORIGIN_COUNTRY_NAME: String, count: BigInt)

object FlightMain extends SparkHelper {
def main(args: Array[String]): Unit = {

// 3.2
import sparkSession.implicits._

val flightsDF = sparkSession.read.parquet("origin-source/data/flight-data/parquet/2010-summary.parquet")
val flights = flightsDF.as[Flight]


Posted by '김용환'
,



scala-guice는 google-guice를 이용해 scala quill에 맞게 inject를 구현한 간단 코드이다.


libraryDependencies ++= Dependencies.guice


val guice = Seq(
"com.google.inject" % "guice" % "4.2.2",
"com.google.inject.extensions" % "guice-assistedinject" % "4.2.2",
"net.codingwell" %% "scala-guice" % "4.2.2"
)




scala quill 를 사용할 때 데이터 타입 관련해서 Encoder/Decoder로 사용할 Implicits를 구현한다.


package com.google.quill

import java.util.Date

import io.getquill.MappedEncoding
import org.joda.time.DateTime

object Implicits {
implicit val dateTimeDecoder = MappedEncoding[Date, DateTime](Decoders.fromDateField)
implicit val dateTimeEncoder = MappedEncoding[DateTime, Date](_.toDate)
}

object Decoders {
def fromDateField(date: Date): DateTime = {
new DateTime(date)
}
}




데이터베이스에서 사용하는 Orders 클래스를 정의한다.



package com.google.datalake.dao.shopping

import org.joda.time.DateTime

case class Orders(
id: Long,
payment_id: Long,
refund_id: Option[Long],
channel_id: Long,
seller_id: Long,
buyer_id: Option[Long],
buyer_user_id: Long, #...
)



properties 정보

orderDB.dataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource
orderDB.dataSource.url="jdbc:mysql://test.google.com:3306/buy?useTimezone=true&serverTimezone=UTC&characterEncoding=UTF-8"
orderDB.dataSource.user=xxx
orderDB.dataSource.password=xxx
orderDB.dataSource.cachePrepStmts=true
orderDB.dataSource.prepStmtCacheSize=250




실제 Inject 중심 클래스를 정의한다. google-guice의 AbstractModule를 상속하고 scala-guide의 ScalaModule를 믹싱한 DataBaseModule을 정의한다. 

package com.google.datalake.db.modules

import com.google.inject._
import com.google.datalake.db.SelectOrderDB
import io.getquill.{MysqlJdbcContext, SnakeCase}
import net.codingwell.scalaguice.ScalaModule
import com.google.inject.AbstractModule

class DataBaseModule extends AbstractModule with ScalaModule {

override def configure(): Unit = {
bind(classOf[SelectOrderDB]).asEagerSingleton()
}

@Provides
def provideDataBaseSource(): MysqlJdbcContext[SnakeCase] = {
new MysqlJdbcContext(SnakeCase, "orderDB")
}
}


quill을 이용한 DAO 코드이다. MysqlJdbcContext를 inject해서 사용할 수 있게 한다.


package com.google.datalake.db

import com.google.inject.Inject
import com.google.datalake.dao.shopping.Orders
import io.getquill.{MysqlJdbcContext, SnakeCase}

class SelectOrderDB @Inject()(val ctx: MysqlJdbcContext[SnakeCase]) {

import ctx._

def findById(id: Long) = {
val q = quote {
query[Orders].filter(_.id == lift(id))
}
ctx.run(q)
}

def findByIds(ids: List[Long]): Unit = {
val q = quote {
query[Orders]
.filter(p => liftQuery(ids).contains(p.id))
}
ctx.run(q)
}
}


실제 테스크 코드는 다음과 같다.

package com.google.datalake.db

import com.google.inject.Guice
import com.google.datalake.db.modules.DataBaseModule
import org.scalatest.FunSuite

class SelectOrderDBTest extends FunSuite {

val injector = Guice.createInjector(new DataBaseModule()).getInstance(classOf[SelectOrderDB])
val selectOrderDB = injector.asInstanceOf[SelectOrderDB]

test("find by id") {
val row = selectOrderDB.findById(71721303)

println(row)
}

}







Posted by '김용환'
,




google guice는 dependency inject 경량 프레임워크이다.


이를 scala(spark)에서 사용하려면 여러 방법이 있겠지만 내가 아는 방법은 2가지이다.



1. scala-guice를 사용한다.


https://github.com/codingwell/scala-guice


실제로 scala-guice를 사용해 구현해보니 경량스럽게 개발이 가능하다.



google guice와 scala-guice 개념을 이해하는데 도움되는 글


https://www.tutorialspoint.com/guice/guice_provides_annotation.htm


https://www.journaldev.com/2403/google-guice-dependency-injection-example-tutorial


https://medium.freecodecamp.org/a-hands-on-session-with-google-guice-5f25ce588774


AbstractModule을 상속하지 않아도 쉽게 구현 가능하다.

그리고 복잡한 형태의 Module을 사용할 수 있다.






2. finatra의 TwitterModule를 사용한다.


TwitterModule은 scala-guice/google-guice 기반이고 finatra 관련 프로젝트라서 엄청난 dependency를 갖고 있다. 


https://twitter.github.io/finatra/user-guide/getting-started/modules.html#module-configuration-in-servers



TwitterModule은 google-guice의 AbstraceModule을 구현한 것으로 무겁지만(너무 많은 dependency) 쉬운 개발이 가능하고 훨씬 기능이 많다.







Posted by '김용환'
,

ujson (scala) 사용 예시

scala 2019. 1. 25. 15:46


scala에서 json 파싱할 때, 사용할 수 있는 간결한 라이브러리로 ujson이 있다. upickle이 있긴 하지만 아직은 불편하다.


http://www.lihaoyi.com/post/uJsonfastflexibleandintuitiveJSONforScala.html



사용 예시이다.



import ujson.Value


object UjsonSampleMain {

  def main(args: Array[String]): Unit = {

    val input =

      "{\n  \"id\": \"c730433b-082c-4984-9d66-855c243266f0\",\n  \"price\": 10,\n \"emptyValue\": \"\",\n \"name\": \"Foo\",\n " +

        " \"counts\": [1, 2, 3],\n  \"values\": {\n    \"bar\": true,\n    \"baz\": 100.001,\n    \"qux\": [\"a\", \"b\"]\n  }\n}"


    // json을 읽는다

    val data: Value.Value = ujson.read(input)


    // 'id' 값을 읽는다

    println(data("id").isNull)

    println(data("id"))

    println


    // type

    println(data("id"))          // Value.Value

    println(data("id").render()) // String


    // 'counts' 값을 읽는다

    println(data("counts"))

    println


    // 'name' 값을 읽은 후 reverse한 값을 기존 json에 저장한다

    data("name") = data("name").str.reverse

    println(data.render())

    println


    // json의 Map 데이터는 obj에서 관리된다

    println(data.obj)

    println


    // json 문자열을 Map으로 확인한다.

    data.obj.foreach(tuple => println(tuple._1 + ":" + tuple._2))

    println


    // json의 values 문자열을 Map으로 확인한다.

    data.obj("values").obj.foreach { case (k, v) => println(k + ":" + v) }

    println


    // json의 values.bar의 값을 boolean으로 읽는다.

    println(data.obj("values").obj("bar").bool)

    println


    // 숫자 값

    println(data("price").num.intValue())


    try {

      // 잘못된 json

      val wrong = "}" + input + "{"

      ujson.read(wrong)

    } catch {

      case _: Throwable => println("wrong json")

    }

  }

  

  

결과



false

"c730433b-082c-4984-9d66-855c243266f0"


"c730433b-082c-4984-9d66-855c243266f0"

"c730433b-082c-4984-9d66-855c243266f0"

[1,2,3]


{"id":"c730433b-082c-4984-9d66-855c243266f0","price":10,"emptyValue":"","name":"ooF","counts":[1,2,3],"values":{"bar":true,"baz":100.001,"qux":["a","b"]}}


Map(id -> "c730433b-082c-4984-9d66-855c243266f0", price -> 10, emptyValue -> "", name -> "ooF", counts -> [1,2,3], values -> {"bar":true,"baz":100.001,"qux":["a","b"]})


id:"c730433b-082c-4984-9d66-855c243266f0"

price:10

emptyValue:""

name:"ooF"

counts:[1,2,3]

values:{"bar":true,"baz":100.001,"qux":["a","b"]}


bar:true

baz:100.001

qux:["a","b"]


true


10

wrong json




Posted by '김용환'
,



sbt test를 실행하다 다음과 같이 종료 에러가 나면.. (Intellij에서는 이상이 없다)



19/01/17 22:16:40 ERROR Utils: uncaught error in thread spark-listener-group-streams, stopping SparkContext

java.lang.InterruptedException

at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)





build.sbt에 다음 파일을 추가한다.


fork in run := true 


로 수정한다. info 로그를 error로 처리하는 것이 맘에 들지 않지만 .. 작동은 된다.





Intellij와 sbt를 함께 사용하고 있기 떄문에 그럴 수 있는데.


console 커맨드를 사용해 scala 코드를 실행하면 SBT와 동일한 가상 시스템에서 코드가 실행된다. 상황에 따라 System.exit 호출이나 종료되지 않은 스레드와 같이 SBT가 중단될 수 있다. 


테스트 결과 JVM이 종료되면 SBT를 다시 시작해야 하는 상황을 피하기 위해 JVM을 포크하는 fork in run := true를 추가한다.

Posted by '김용환'
,




SparkSession을 종료하지 않으면 아래와 같은 에러가 발생할 수 있다. 




Driver stacktrace:

19/01/17 19:42:45 INFO DAGScheduler: Job 1 failed: count at FirstSparkSample.scala:21, took 0.194101 s

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 22, localhost, executor driver): java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V




SparkSession을 stop하는 코드를  추가한다.


sparksession.stop()

Posted by '김용환'
,

[spark] 셔플 관련 자료

scala 2018. 12. 4. 14:39



spark에는 여러 셔플(shuffle) 함수의 성능적인 부분을 설명한 자료이다.





Everyday I'm Shuffling - Tips for Writing Better Spark Programs, Strata San Jose 2015 from Databricks




책으로 보려면, 아래 책을 참고하깅 바란다. 


Scala and Spark for Big Data Analytics: Explore the concepts of functional programming, data streaming, and machine learning 


https://www.amazon.com/Scala-Spark-Big-Data-Analytics/dp/1785280848


Posted by '김용환'
,





스파크는 선형 회귀 알고리즘의 RDD 기반 구현을 제공한다. 

https://spark.apache.org/docs/latest/mllib-linear-methods.html#regression




SGD(stochastic gradient descent)를 사용해 정규화가 필요 없는 선형 회귀 모델을 학습시킬 수 있다. 이는 다음과 같은 최소 제곱 회귀(least squares regression) 공식을 풀 수 있다.


정규화가 없는 선형 회귀 모델은 최소 제곱 회귀 공식을 사용할 수 있다는 뜻인데. 이게 즉, 평균 제곱 오차와 같다는 내용입니다. 


https://thebook.io/006958/part02/ch03/03/



f(가중치) = 1/n ||A 가중치-y||2 (즉, 평균 제곱 오차(MSE, mean squared error)다)




여기서 데이터 행렬은 n개의 로우를 가지며, 입력 RDD는 A의 로우 셋을 보유하고 각각은 해당 오른쪽 레이블 y를 가진다. 





https://github.com/apache/

spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala를 참조한다.





참고로, 예측값과 타깃값의 차이를 제 곱하여 더한 후에 샘플의 개수로 평균을 내면 평균 제곱 오차입니다.

https://en.wikipedia.org/wiki/Mean_squared_error






Posted by '김용환'
,