Spark 1.5, 1.6에서는 json4s는 3.2.x만 쓸 수 있다. 

json4s를 작업하다가 다음과 같은 에러를 만났다. 



org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:333)

at org.apache.spark.rdd.RDD$$anonfun$flatMap$1.apply(RDD.scala:332)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.flatMap(RDD.scala:332)

at stat.googleStat2$.run(CommentStat2.scala:29)


Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$

Serialization stack:

- object not serializable (class: org.json4s.DefaultFormats$, value: org.json4s.DefaultFormats$@2b999ee8)

- field (class: stat.CommentStat2$$anonfun$2, name: formats$1, type: class org.json4s.DefaultFormats$)

- object (class stat.CommentStat2$$anonfun$2, <function1>)

at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)

at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)

at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)




아래와 같은 형태로 암시 formats를 재활용해서 쓰려 했는데.. 에러가 발생했다.


 implicit val formats = DefaultFormats

 

 

 class MyJob {

  implicit val formats = DefaultFormats


   RDD.map(x => x.extract[Double])

   .filter(y => y.extract[Int] == 18)

}





해결하려면, 다음처럼 formats를 각 메소드에서 선언해서 써야 한다.


class MyJob {

   RDD.map({x =>

     implicit val formats = DefaultFormats

     x.extract[Double]

    })

   ...

   .filter({ y =>

     implicit val formats = DefaultFormats

     y.extract[Int] == 18

    })


}



황당스럽지만, DefaultFormats의 슈퍼 타입인 Formats가 3.3부터 Serialziable을 상속받았다.

(DefaultFormats은 json4s 3.3부터 serialzable을 지원한다. )


trait Formats extends Serializable


https://github.com/json4s/json4s/commit/961fb27f5e69669fddc6bae77079a999fc6f04a1





하지만, Spark 1.5, 1.6을 쓰고 , json4s 3.2를 쓰는 사람 입장에서는 불편하지만 저렇게 써야 한다. 



Posted by '김용환'
,