[spark] log4j 직렬화하기 - org.apache.spark.SparkException: Task not serializable 해결하기
scala 2018. 3. 25. 02:41spark 코드에 Log4j의 logger를 직렬화를 진행할 수 없다..
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.LogManager
import org.apache.log4j.Level
import org.apache.log4j.Logger
object MyLog {
def main(args: Array[String]):Unit= {
// 로그 레벨을 WARN으로 설정한다
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
// SparkContext를 생성한다
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
//계산을 시작하고 로깅 정보를 출력한다
log.warn("Started")
val i = 0
val data = sc.parallelize(i to 100000)
data.foreach(i => log.info("My number"+ i))
log.warn("Finished")
}
}
아래와 같은 에러가 발생한다.
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
at MyLog$.main(MyLog.scala:19)
at MyLog.main(MyLog.scala)
Caused by: java.io.NotSerializable
직렬화할 클래스를 만들고 extends Serializable을 추가한다. 즉, 직렬화 클래스를 하나 만들어서 내부어세 RDD를 사용하는 함수를 하나 만든다.
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.LogManager
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark._
import org.apache.spark.rdd.RDD
class MyMapper(n: Int) extends Serializable{
@transient lazy val log = org.apache.log4j.LogManager.getLogger("myLogger")
def dosomething(rdd: RDD[Int]): RDD[String] =
rdd.map{ i =>
log.warn("Serialization of: " + i)
(i + n).toString
}
}
object MyMapper{
def apply(n: Int): MyMapper = new MyMapper(n)
}
object MyLog {
def main(args: Array[String]):Unit= {
// 로그 레벨을 WARN으로 설정한다
val log = LogManager.getRootLogger
log.setLevel(Level.WARN)
// SparkContext를 생성한다
val conf = new SparkConf().setAppName("My App").setMaster("local[*]")
val sc = new SparkContext(conf)
//계산을 시작하고 로깅 정보를 출력한다
log.warn("Started")
val data = sc.parallelize(1 to 100000)
val mapper = MyMapper(1)
val other = mapper.dosomething(data)
other.collect()
log.warn("Finished")
}
}
에러 없이 잘 동작한다.
'scala' 카테고리의 다른 글
[spark] 스파크 잡 튜닝 및 아키텍처 URL 펌질 (0) | 2018.03.29 |
---|---|
[spark] 스파크 애플리케이션 튜닝 방법 - 펌 (0) | 2018.03.26 |
[spark] 스파크 MLlib으로 비정상 데이터를 찾기에 좋은 참조 자료 (0) | 2018.03.14 |
[spark] 여러 모드에서 스파크 잡 실행하기 예제 (0) | 2018.03.14 |
[play2] 간단한 인증 방식 구현 예제(basic authentication) (0) | 2018.02.20 |