spark streaming 코드에서 .format(“kafka”)을 사용한 부분에서 에러가 발생했다.
val dataframe = spark.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, kafkaBrokers)
.option(“subscribe”, kafkaTopic)
.option(“startingOffsets”, “latest”)
.option(“maxOffsetsPerTrigger”, 20)
.load()
.map( x=> {
…
에러는 kafka 관련 data source가 없다는 것이다.
Exception in thread “main” org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of “Structured Streaming + Kafka Integration Guide”.;
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at streaming.KafkaAvroDemo$.main(KafkaAvroDemo.scala:89)
소스는 그냥 읽는 부분이고, 2.4.0, 2.4.1 소스 변화는 없다.
https://github.com/apache/spark/blob/v2.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L651
build.sbt를 보면.. 아래와 같이 되어 있는데. 어딘가 이슈가 있는 건 아닐까??
assemblyMergeStrategy in assembly := {
case “application.conf” => MergeStrategy.concat
case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
코드를 삭제하더라도 힌트는 보이지 않는다. 라이브러리 충돌은 아닌 것 같다..
혹시 META-INF에서 클래스 이름 충돌은 난 것은 아닐까 생각하며.. 아래와 같이 해봤다.
assemblyMergeStrategy in assembly := {
case “application.conf” => MergeStrategy.concat
case PathList(“META-INF”, xs @ _*) => MergeStrategy.filterDistinctLines
case x => MergeStrategy.first
}
오호… 드뎌 겹치는 부분이었다.. 헐.. 저부분인가보다.
[error] deduplicate: different file contents found in the following:
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-sql_2.11/2.4.0/spark-sql_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.0/spark-sql-kafka-0-10_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-avro_2.11/2.4.0/spark-avro_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
kafka 라이브러리를 읽을 려면 아마도 중간에 겹친 녀석이 sbt에 포함되야 한다..
/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.0/spark-sql-kafka-0-10_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
따라서 이 부분을 수정하면 먼가 될 것 같다.. 간단하게 머징 전략의 first, last, discard, name으로 는 이 문제를 해결할 수 없다..
assemblyMergeStrategy in assembly := {
case “application.conf” => MergeStrategy.concat
case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
TO_BE_CONTINUED(내일)
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:651
```} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {```
apache/sparkAdded by GitHub
case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat 하면 될까??
samuel.c [10:38 AM]
이번에는 Exception in thread “main” java.io.IOException: No FileSystem for scheme: file 에러가 발생해서
Exception in thread “main” java.io.IOException: No FileSystem for scheme: file
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:354)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:89)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.(MicroBatchExecution.scala:48)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines를 sbt assembly 정책에 추가한다.
samuel.c [11:22 AM]
assemblyMergeStrategy in assembly := {
case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat
case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines
case “application.conf” => MergeStrategy.concat
case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
이렇게 하니 잘 동작한다..
'scala' 카테고리의 다른 글
scala cats 공부 자료. (1) | 2019.06.18 |
---|---|
[spark] kubernetes(k8s) 배포하기 (0) | 2019.04.12 |
sbt 병렬 다운로드 (0) | 2019.04.08 |
sbt assembly 에러 (0) | 2019.04.08 |
[Spark] Streaming 데이터를 DB에 저장하는 코드 (0) | 2019.04.04 |