spark 2.4부터 kubernetes에 연동할 수 있다.

스트리밍 애플리케이션을 개발해서 배포한 내용은 다음과 같다.



kubernetes에 spark streaming job을 실행하려면 다음과 같은 형태로 submit을 해야 한다.
$ bin/spark-submit \
    --master k8s://https://: \
    --deploy-mode cluster \
    --name spark-streaming-job \
    --class main.MainClass \
    --conf spark.executor.instances=1 \
    --conf spark.kubernetes.container.image= \
    local:///path/to/examples.jar


먼저 spark이 깔린 환경으로 dockernize한다. (-m을 사용하면 minikube라서 쓰지 않는다)

$ ./bin/docker-image-tool.sh -t spark-docker build
Sending build context to Docker daemon    259MB
Step 1/15 : FROM openjdk:8-alpine
8-alpine: Pulling from library/openjdk
8e402f1a9c57: Pull complete
4866c822999c: Pull complete
ec484ea07ed1: Pull complete
Digest: sha256:066ad5ab75cfdfbeaff8481f988b4e35a04fef5d24309da2bdd5af59b983b68f
Status: Downloaded newer image for openjdk:8-alpine


실행이 완료되면 다음과 같은 docker image를 생성된 것을 볼 수 있다. 

$ docker images
REPOSITORY                         TAG                 IMAGE ID            CREATED             SIZE
spark-r                            spark-docker        47533c66d1d8        13 hours ago        740MB
spark-py                           spark-docker        b6fec2b48ea6        13 hours ago        446MB
spark                              spark-docker        b91978355818        13 hours ago        355MB



도커 허브에 로그인한다.
$ docker login -u samuel.c 사설_저장소


사용한 spark 2.4.0 환경 도커로 태깅한다.
$ docker tag spark:spark-docker 사설_저장소/samuel_c/spark_docker

spark 2.4.0 환경 도커를 도커 허브에 푸시한다.
$ docker push 사설_저장소/samuel_c/spark_docker
 

spark streaming job 애플리케이션 디렉토리에서 jar를 얻는다.
$ sbt assembly

[info] Assembly up to date: /dev/spark-streaming-job/build/kafka-spark-streaming.jar


장비를 http 서버로 보낸다.
$ scp  -o GSSAPIAuthentication=yes /dev/commerce/spark-demos/build/kafka-spark-streaming.jar ftp장비_디렉토리


잡을 k8s에 submit 한다.
$ bin/spark-submit     \
     --master k8s://https://master_주소:6443    \ 
     --deploy-mode cluster    \
     --name spark-streaming   \  
     --class streaming.KafkaAvroDemo       \
     --conf spark.kubernetes.container.image=사설_저장소/samuel_c/spark_docker  \
     http://ftp장비/kafka-spark-streaming.jar


제대로 실행 중인지 확인한다. Error와 Running을 확인할 수 있다. 
$ kubectl get pods 

spark-streaming-1555032194366-driver   0/1     Error              0          4h
spark-streaming-1555047069996-driver   1/1     Running            0          35m




http 대신 docker에 기본 spark 도커를 기반으로 할 수 있다.


Dockerfile
FROM 사설_저장소/samuel_c/spark_docker

MAINTAINER datalake

ENV http_proxy 프록시
ENV https_proxy 프록시
ENV APP_HOME /app
RUN mkdir -p $APP_HOME
WORKDIR $APP_HOME

# Upload & build source
COPY . $APP_HOME
RUN ./sbt assembly



테스트를 이렇게 할 순 있지만..
docker run -i -t 사설_저장소/samuel_c/spark-demos  /bin/bash


http로 jar를 다운받는 게 가장 빠른 것 같다.

'scala' 카테고리의 다른 글

[sbt] 1.3.0  (0) 2019.09.06
scala cats 공부 자료.  (1) 2019.06.18
[sbt] spark 앱에서 Failed to find data source: kafka 해결하기  (0) 2019.04.12
sbt 병렬 다운로드  (0) 2019.04.08
sbt assembly 에러  (0) 2019.04.08
Posted by '김용환'
,

spark streaming 코드에서     .format(“kafka”)을 사용한 부분에서 에러가 발생했다.

   val dataframe = spark.readStream
       .format(“kafka”)
       .option(“kafka.bootstrap.servers”, kafkaBrokers)
       .option(“subscribe”, kafkaTopic)
       .option(“startingOffsets”, “latest”)
       .option(“maxOffsetsPerTrigger”, 20)
       .load()
       .map( x=> {


에러는 kafka 관련 data source가 없다는 것이다.
Exception in thread “main” org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of “Structured Streaming + Kafka Integration Guide”.;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at streaming.KafkaAvroDemo$.main(KafkaAvroDemo.scala:89)

소스는 그냥 읽는 부분이고, 2.4.0, 2.4.1 소스 변화는 없다.
https://github.com/apache/spark/blob/v2.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L651


build.sbt를 보면.. 아래와 같이 되어 있는데. 어딘가 이슈가 있는 건 아닐까??
assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}

코드를 삭제하더라도 힌트는 보이지 않는다. 라이브러리 충돌은 아닌 것 같다..
혹시 META-INF에서 클래스 이름 충돌은 난 것은 아닐까 생각하며.. 아래와 같이 해봤다.

assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.filterDistinctLines
 case x => MergeStrategy.first
}


오호… 드뎌 겹치는 부분이었다.. 헐.. 저부분인가보다.

[error] deduplicate: different file contents found in the following:
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-sql_2.11/2.4.0/spark-sql_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.0/spark-sql-kafka-0-10_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-avro_2.11/2.4.0/spark-avro_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

kafka 라이브러리를 읽을 려면 아마도 중간에 겹친 녀석이 sbt에 포함되야 한다..
/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.0/spark-sql-kafka-0-10_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister


따라서 이 부분을 수정하면 먼가 될 것 같다..   간단하게 머징 전략의 first, last, discard, name으로 는 이 문제를 해결할 수 없다..
assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}


TO_BE_CONTINUED(내일)
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:651
                ```} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {```
apache/sparkAdded by GitHub

case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat 하면 될까??

samuel.c [10:38 AM]
이번에는 Exception in thread “main” java.io.IOException: No FileSystem for scheme: file 에러가 발생해서

Exception in thread “main” java.io.IOException: No FileSystem for scheme: file
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:354)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:89)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.(MicroBatchExecution.scala:48)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)


case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines를 sbt assembly 정책에 추가한다.

samuel.c [11:22 AM]
assemblyMergeStrategy in assembly := {
 case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat
 case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}
이렇게 하니 잘 동작한다..

'scala' 카테고리의 다른 글

scala cats 공부 자료.  (1) 2019.06.18
[spark] kubernetes(k8s) 배포하기  (0) 2019.04.12
sbt 병렬 다운로드  (0) 2019.04.08
sbt assembly 에러  (0) 2019.04.08
[Spark] Streaming 데이터를 DB에 저장하는 코드  (0) 2019.04.04
Posted by '김용환'
,