spark streaming 코드에서     .format(“kafka”)을 사용한 부분에서 에러가 발생했다.

   val dataframe = spark.readStream
       .format(“kafka”)
       .option(“kafka.bootstrap.servers”, kafkaBrokers)
       .option(“subscribe”, kafkaTopic)
       .option(“startingOffsets”, “latest”)
       .option(“maxOffsetsPerTrigger”, 20)
       .load()
       .map( x=> {


에러는 kafka 관련 data source가 없다는 것이다.
Exception in thread “main” org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of “Structured Streaming + Kafka Integration Guide”.;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at streaming.KafkaAvroDemo$.main(KafkaAvroDemo.scala:89)

소스는 그냥 읽는 부분이고, 2.4.0, 2.4.1 소스 변화는 없다.
https://github.com/apache/spark/blob/v2.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L651


build.sbt를 보면.. 아래와 같이 되어 있는데. 어딘가 이슈가 있는 건 아닐까??
assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}

코드를 삭제하더라도 힌트는 보이지 않는다. 라이브러리 충돌은 아닌 것 같다..
혹시 META-INF에서 클래스 이름 충돌은 난 것은 아닐까 생각하며.. 아래와 같이 해봤다.

assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.filterDistinctLines
 case x => MergeStrategy.first
}


오호… 드뎌 겹치는 부분이었다.. 헐.. 저부분인가보다.

[error] deduplicate: different file contents found in the following:
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-sql_2.11/2.4.0/spark-sql_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.0/spark-sql-kafka-0-10_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-avro_2.11/2.4.0/spark-avro_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

kafka 라이브러리를 읽을 려면 아마도 중간에 겹친 녀석이 sbt에 포함되야 한다..
/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.0/spark-sql-kafka-0-10_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister


따라서 이 부분을 수정하면 먼가 될 것 같다..   간단하게 머징 전략의 first, last, discard, name으로 는 이 문제를 해결할 수 없다..
assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}


TO_BE_CONTINUED(내일)
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:651
                ```} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {```
apache/sparkAdded by GitHub

case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat 하면 될까??

samuel.c [10:38 AM]
이번에는 Exception in thread “main” java.io.IOException: No FileSystem for scheme: file 에러가 발생해서

Exception in thread “main” java.io.IOException: No FileSystem for scheme: file
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:354)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:89)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.(MicroBatchExecution.scala:48)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)


case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines를 sbt assembly 정책에 추가한다.

samuel.c [11:22 AM]
assemblyMergeStrategy in assembly := {
 case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat
 case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}
이렇게 하니 잘 동작한다..

'scala' 카테고리의 다른 글

scala cats 공부 자료.  (1) 2019.06.18
[spark] kubernetes(k8s) 배포하기  (0) 2019.04.12
sbt 병렬 다운로드  (0) 2019.04.08
sbt assembly 에러  (0) 2019.04.08
[Spark] Streaming 데이터를 DB에 저장하는 코드  (0) 2019.04.04
Posted by '김용환'
,