spark 코드에 Log4j의 logger를 직렬화를 진행할 수 없다..



import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.LogManager
import org.apache.log4j.Level
import org.apache.log4j.Logger

object MyLog {
def main(args: Array[String]):Unit= {
// 로그 레벨을 WARN으로 설정한다
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
// SparkContext를 생성한다
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
//계산을 시작하고 로깅 정보를 출력한다
log.warn("Started")
val i = 0
val data = sc.parallelize(i to 100000)
data.foreach(i => log.info("My number"+ i))
log.warn("Finished")
}
}



아래와 같은 에러가 발생한다.


Exception in thread "main" org.apache.spark.SparkException: Task not serializable

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)

at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)

at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)

at MyLog$.main(MyLog.scala:19)

at MyLog.main(MyLog.scala)

Caused by: java.io.NotSerializable






직렬화할 클래스를 만들고 extends Serializable을 추가한다. 즉, 직렬화 클래스를 하나 만들어서 내부어세 RDD를 사용하는 함수를 하나 만든다. 



import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.LogManager
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark._
import org.apache.spark.rdd.RDD


class MyMapper(n: Int) extends Serializable{
@transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")
def dosomething(rdd: RDD[Int]): RDD[String] =
rdd.map{ i =>
log.warn("Serialization of: " + i)
(i + n).toString
}
}

object MyMapper{
def apply(n: Int): MyMapper = new MyMapper(n)
}

object MyLog {
def main(args: Array[String]):Unit= {
// 로그 레벨을 WARN으로 설정한다
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
// SparkContext를 생성한다
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
//계산을 시작하고 로깅 정보를 출력한다
log.warn("Started")
val data = sc.parallelize(1 to 100000)
val mapper = MyMapper(1)
val other = mapper.dosomething(data)
other.collect()
log.warn("Finished")
}
}

에러 없이 잘 동작한다. 

Posted by 김용환 '김용환'