spark 2.4부터 kubernetes에 연동할 수 있다.

스트리밍 애플리케이션을 개발해서 배포한 내용은 다음과 같다.



kubernetes에 spark streaming job을 실행하려면 다음과 같은 형태로 submit을 해야 한다.
$ bin/spark-submit \
    --master k8s://https://: \
    --deploy-mode cluster \
    --name spark-streaming-job \
    --class main.MainClass \
    --conf spark.executor.instances=1 \
    --conf spark.kubernetes.container.image= \
    local:///path/to/examples.jar


먼저 spark이 깔린 환경으로 dockernize한다. (-m을 사용하면 minikube라서 쓰지 않는다)

$ ./bin/docker-image-tool.sh -t spark-docker build
Sending build context to Docker daemon    259MB
Step 1/15 : FROM openjdk:8-alpine
8-alpine: Pulling from library/openjdk
8e402f1a9c57: Pull complete
4866c822999c: Pull complete
ec484ea07ed1: Pull complete
Digest: sha256:066ad5ab75cfdfbeaff8481f988b4e35a04fef5d24309da2bdd5af59b983b68f
Status: Downloaded newer image for openjdk:8-alpine


실행이 완료되면 다음과 같은 docker image를 생성된 것을 볼 수 있다. 

$ docker images
REPOSITORY                         TAG                 IMAGE ID            CREATED             SIZE
spark-r                            spark-docker        47533c66d1d8        13 hours ago        740MB
spark-py                           spark-docker        b6fec2b48ea6        13 hours ago        446MB
spark                              spark-docker        b91978355818        13 hours ago        355MB



도커 허브에 로그인한다.
$ docker login -u samuel.c 사설_저장소


사용한 spark 2.4.0 환경 도커로 태깅한다.
$ docker tag spark:spark-docker 사설_저장소/samuel_c/spark_docker

spark 2.4.0 환경 도커를 도커 허브에 푸시한다.
$ docker push 사설_저장소/samuel_c/spark_docker
 

spark streaming job 애플리케이션 디렉토리에서 jar를 얻는다.
$ sbt assembly

[info] Assembly up to date: /dev/spark-streaming-job/build/kafka-spark-streaming.jar


장비를 http 서버로 보낸다.
$ scp  -o GSSAPIAuthentication=yes /dev/commerce/spark-demos/build/kafka-spark-streaming.jar ftp장비_디렉토리


잡을 k8s에 submit 한다.
$ bin/spark-submit     \
     --master k8s://https://master_주소:6443    \ 
     --deploy-mode cluster    \
     --name spark-streaming   \  
     --class streaming.KafkaAvroDemo       \
     --conf spark.kubernetes.container.image=사설_저장소/samuel_c/spark_docker  \
     http://ftp장비/kafka-spark-streaming.jar


제대로 실행 중인지 확인한다. Error와 Running을 확인할 수 있다. 
$ kubectl get pods 

spark-streaming-1555032194366-driver   0/1     Error              0          4h
spark-streaming-1555047069996-driver   1/1     Running            0          35m




http 대신 docker에 기본 spark 도커를 기반으로 할 수 있다.


Dockerfile
FROM 사설_저장소/samuel_c/spark_docker

MAINTAINER datalake

ENV http_proxy 프록시
ENV https_proxy 프록시
ENV APP_HOME /app
RUN mkdir -p $APP_HOME
WORKDIR $APP_HOME

# Upload & build source
COPY . $APP_HOME
RUN ./sbt assembly



테스트를 이렇게 할 순 있지만..
docker run -i -t 사설_저장소/samuel_c/spark-demos  /bin/bash


http로 jar를 다운받는 게 가장 빠른 것 같다.

'scala' 카테고리의 다른 글

[sbt] 1.3.0  (0) 2019.09.06
scala cats 공부 자료.  (1) 2019.06.18
[spark] kubernetes(k8s) 배포하기  (0) 2019.04.12
[sbt] spark 앱에서 Failed to find data source: kafka 해결하기  (0) 2019.04.12
sbt 병렬 다운로드  (0) 2019.04.08
sbt assembly 에러  (0) 2019.04.08
Posted by 김용환 '김용환'

댓글을 달아 주세요

spark streaming 코드에서     .format(“kafka”)을 사용한 부분에서 에러가 발생했다.

   val dataframe = spark.readStream
       .format(“kafka”)
       .option(“kafka.bootstrap.servers”, kafkaBrokers)
       .option(“subscribe”, kafkaTopic)
       .option(“startingOffsets”, “latest”)
       .option(“maxOffsetsPerTrigger”, 20)
       .load()
       .map( x=> {


에러는 kafka 관련 data source가 없다는 것이다.
Exception in thread “main” org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of “Structured Streaming + Kafka Integration Guide”.;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at streaming.KafkaAvroDemo$.main(KafkaAvroDemo.scala:89)

소스는 그냥 읽는 부분이고, 2.4.0, 2.4.1 소스 변화는 없다.
https://github.com/apache/spark/blob/v2.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L651


build.sbt를 보면.. 아래와 같이 되어 있는데. 어딘가 이슈가 있는 건 아닐까??
assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}

코드를 삭제하더라도 힌트는 보이지 않는다. 라이브러리 충돌은 아닌 것 같다..
혹시 META-INF에서 클래스 이름 충돌은 난 것은 아닐까 생각하며.. 아래와 같이 해봤다.

assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.filterDistinctLines
 case x => MergeStrategy.first
}


오호… 드뎌 겹치는 부분이었다.. 헐.. 저부분인가보다.

[error] deduplicate: different file contents found in the following:
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-sql_2.11/2.4.0/spark-sql_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.0/spark-sql-kafka-0-10_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-avro_2.11/2.4.0/spark-avro_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

kafka 라이브러리를 읽을 려면 아마도 중간에 겹친 녀석이 sbt에 포함되야 한다..
/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.0/spark-sql-kafka-0-10_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister


따라서 이 부분을 수정하면 먼가 될 것 같다..   간단하게 머징 전략의 first, last, discard, name으로 는 이 문제를 해결할 수 없다..
assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}


TO_BE_CONTINUED(내일)
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:651
                ```} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {```
apache/sparkAdded by GitHub

case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat 하면 될까??

samuel.c [10:38 AM]
이번에는 Exception in thread “main” java.io.IOException: No FileSystem for scheme: file 에러가 발생해서

Exception in thread “main” java.io.IOException: No FileSystem for scheme: file
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:354)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:89)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.(MicroBatchExecution.scala:48)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)


case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines를 sbt assembly 정책에 추가한다.

samuel.c [11:22 AM]
assemblyMergeStrategy in assembly := {
 case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat
 case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}
이렇게 하니 잘 동작한다..

Posted by 김용환 '김용환'

댓글을 달아 주세요

sbt 병렬 다운로드

scala 2019. 4. 8. 19:33

project 디렉토리 하위에 plugins.sbt 파일에 다음 라이브러리를 추가한다.

addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.0.3")

 

sbt의 시퀀스 다운로드로 인해 고통 받는 이들을 위해..

Posted by 김용환 '김용환'

댓글을 달아 주세요

sbt assembly 에러

scala 2019. 4. 8. 19:30

패키지가 비슷한 라이브러리 간에 경합이 발생할 때 build.sbt 파일에 다음을 추가한다. first만 묶는다.

assemblyMergeStrategy in assembly := {
case "application.conf" => MergeStrategy.concat
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}

 

 

간단하게 해결하는 방법이다.. 그러나 이 방법이 모든 내용을 해결해 주지 않는다. 

 

https://knight76.tistory.com/entry/1111-10 참조..

Posted by 김용환 '김용환'

댓글을 달아 주세요

Spark에서

Streaming 데이터를 DB에 저장할 때. 일반적인 데이터 프레임에서 저장하는 방식을 사용할 수 없다.

(만약 사용하면 streaming 데이터 프레임에서 그렇게 저장할 수 없다라는 에러가 나온다)

 

따라서 Sink 형태(ForeachWriter 상속) 방식을 사용해야 한다. 

(단순한 형태의 구현이다 )

 

 

예시) Spark Streaming Data Frame쪽 소스

val writer:ForeachWriter[DeserializedFromKafkaRecord] = new JdbcSink(sinkDBUrlsinkTable);
val query = dataframe
.writeStream
.foreach(writer)
.outputMode("append")
.start()
query.awaitTermination()

 

 

예시) ForeachWriter를 구현한 JdbcSink.scala 파일 

package streaming

import java.sql.Statement
import java.sql.Connection
import java.sql.DriverManager

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.ForeachWriter

class JdbcSink(url: String, tablename: String) extends ForeachWriter[DeserializedFromKafkaRecord]{
val driver = "com.mysql.cj.jdbc.Driver"
var statement:Statement = _
var connection:Connection = _

def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver)
connection = DriverManager.getConnection(url)
this.statement = connection.createStatement()
true
}

override def process(record: DeserializedFromKafkaRecord): Unit = {
if (StringUtils.isEmpty(record.value)) {
throw new IllegalArgumentException
}

val value = record.value.replace("'", "").replace("\"", "")
statement.executeUpdate("insert into " + tablename + "(value) values(\"" + value + "\")")
}

override def close(errorOrNull: Throwable): Unit = {
connection.close()
}
}

 

Posted by 김용환 '김용환'

댓글을 달아 주세요

spark streaming 코딩하다 다음과 같은 에러가 발생했다. 

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:168)

 

Dstream을 처리할 때는 spark의 모든 API가 전부 다 허용되지 않는다.

 

Output operations allow DStream’s data to be pushed out to external systems like a database or a file systems. Since the output operations actually allow the transformed data to be consumed by external systems, they trigger the actual execution of all the DStream transformations (similar to actions for RDDs). Currently, the following output operations are defined:

 

Dstream 처리에 허영되는 api는 다음과 같다.

print()
foreachRDD()
saveAsObjectFiles()
saveAsTextFiles()
saveAsHadoopFiles()

http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

Posted by 김용환 '김용환'

댓글을 달아 주세요


Spark에서 Sqlite DB 테이블을 읽어오는 예시이다. 


spark jdbc로 데이터 프레임을 읽을 때 항상 모든 테이블 로우를 읽을 수 있을 뿐 아니라.


특정 쿼리의 데이터만 읽을 수 있다.




두가지 방법이 있는데. 


첫 번째는 option("dbtable)에  쿼리를 추가하는 방법,


두 번째는 jdbc를 읽을 때 predicate(where절 같은 형태)를 추가하는 방법이 있다.




object JDBCMain extends SparkHelper {
def main(args: Array[String]): Unit = {
val driver = "org.sqlite.JDBC"
val path = "origin-source/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:${path}"
val tablename = "flight_info"

// driver loading
import java.sql.DriverManager
Class.forName("org.sqlite.JDBC")
val connection = DriverManager.getConnection(url)
println(connection.isClosed)
println(connection.close())

val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info"""
val newDbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.load()
newDbDataFrame.explain()

println("predicates--")
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
val predicates = Array(
"DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
"DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'")
println(spark.read.jdbc(url, tablename, predicates, props).count())
println(spark.read.jdbc(url, tablename, predicates, props).rdd.getNumPartitions)

val predicates2 = Array(
"DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
"DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'")
println(spark.read.jdbc(url, tablename, predicates2, props).count())
println(spark.read.jdbc(url, tablename, predicates2, props).rdd.getNumPartitions)



이 예시는 Spark Definitive Guide에 있고 깃허브(https://github.com/knight76/spark-definitive-guide-sbt)에 저장되어 있다. 



Posted by 김용환 '김용환'

댓글을 달아 주세요



스파크 (spark)  join은 두가지 전략이 있다.


셔플 조인(shuffle join)과 브로드 캐스트 조인(broadcast join)이 있다.


이 기반에는 wide dependency와 narrow dependency가  있다. 즉 최대한 driver와 executor 간 데이터 교환의 차이를 설명한 것으로서 개발 코드에 따라 성능이 달라진다.

(https://knight76.tistory.com/entry/spark-%ED%8E%8C%EC%A7%88-wide-dependecy-narrow-dependency)



셔플 조인은 조인된 데이터를 동일한 executor로 이동하기 위해 셔플 연산을 사용한다. 각 로우를 해시로 표현 및 생성한 후 적절한 장소에 보낸다. 따라서 데이터 이동이 많이 발생한다. 내부적으로 merge sort join보다 더 많은 비용이 드는 해싱을 사용한다.


따라서 join시 데이터  흐름이 narrow해지고 executor간의 데이터 복사 비용으로 속도가 떨어지게 된다.






브로드캐스트 조인은 데이터를  executor로 복사하지만,  executor간에 데이터복사가 일어나지 않기 때문에 속도가 빨라질 수 있다.


(사실 정확하게 말하면, 데이터의 양/join에 따라 브로드캐스트 조인이 좋을지, 셔플 조인이 좋을지는 테스트해봐야 한다. 이를 위해 spark sql  optimizer가 그걸 내부적으로 결정하기도 한다)






아래 코드는 Spark Definitve Guide의 예제 코드인데, explain으로 어떤 조인이 사용되었는지 알 수 있다. 


package spark.example.tutorial.chapter08

import spark.example.common.SparkHelper

object JoinMain extends SparkHelper {
def main(args: Array[String]): Unit = {
import spark.implicits._

val person = Seq(
(0, "Bill Chaambers", 0, Seq(100)),
(1, "Matei Zaharia", 1, Seq(500, 250, 100)),
(2, "Michael Armbrust", 1, Seq(250, 100)))
.toDF("id", "name", "graduate_program", "spark_status")

val graduateProgram = Seq(
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley"))
.toDF("id", "degree", "department", "school")

person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")

val joinExpr1 = person.col("graduate_program") === graduateProgram.col("id")
person.join(graduateProgram, joinExpr1).explain()

import org.apache.spark.sql.functions.broadcast
val joinExpr2 = person.col("graduate_program") === graduateProgram.col("id")
person.join(broadcast(graduateProgram), joinExpr2).explain()

}
}


보면 둘다 broadcast join이다. spark 내부  optimizer가 힌트를 주든 안주든 내부적으로 broadcast 조인을 할 수 있게 되었다.


== Physical Plan ==

*(1) BroadcastHashJoin [graduate_program#11], [id#26], Inner, BuildLeft

:- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[2, int, false] as bigint)))

:  +- LocalTableScan [id#9, name#10, graduate_program#11, spark_status#12]

+- LocalTableScan [id#26, degree#27, department#28, school#29]


== Physical Plan ==

*(1) BroadcastHashJoin [graduate_program#11], [id#26], Inner, BuildRight

:- LocalTableScan [id#9, name#10, graduate_program#11, spark_status#12]

+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))

   +- LocalTableScan [id#26, degree#27, department#28, school#29]





관련 내용은 참조글은 아래 글을 기반으로 한다.


https://www.waitingforcode.com/apache-spark-sql/shuffle-join-spark-sql/read


https://henning.kropponline.de/2016/12/11/broadcast-join-with-spark/






Posted by 김용환 '김용환'

댓글을 달아 주세요



spark에서 UDF (UserDefinedFunction)을 사용하다 아래와 같은 에러가 발생했다.


Exception in thread "main" java.lang.InternalError: Malformed class name

at java.lang.Class.getSimpleName(Class.java:1330)

at org.apache.spark.sql.execution.aggregate.ScalaUDAF.nodeName(udaf.scala:451)



아래와 같은 UDF 코드인데. 



object UDFExampleDemo {


    def main(args: Array[String]) {

    

        class BoolAnd extends UserDefinedAggregateFunction {

...

        }

        ...

    }

}



아래와 같이 변경해야 에러가 발생하지 않는다. 



class BoolAnd extends UserDefinedAggregateFunction {

...

}

 

 

object UDFExampleDemo {


    def main(args: Array[String]) {

    

        ...

    }

}





node name을 얻을 려면 udaf의 class의 getSimpleName을 호출한다.

override def nodeName: String = udaf.getClass.getSimpleName


java의 Class코드를 보면, 

Malformed class name을 발생하는 코드 앞에 주석 설명이 잘 되어 있다.  getSimpleName은 적당한 깊이의  클래스의 simple name을 읽어 오는 것만 허락한다.  따라서 udaf를 상속한 클래스의 위치가 main 안에 두면 안된다.





 java.lang.Class의 getSimpleName 코드



public String getSimpleName() {
if (isArray())
return getComponentType().getSimpleName()+"[]";

String simpleName = getSimpleBinaryName();
if (simpleName == null) { // top level class
simpleName = getName();
return simpleName.substring(simpleName.lastIndexOf(".")+1); // strip the package name
}
// According to JLS3 "Binary Compatibility" (13.1) the binary
// name of non-package classes (not top level) is the binary
// name of the immediately enclosing class followed by a '$' followed by:
// (for nested and inner classes): the simple name.
// (for local classes): 1 or more digits followed by the simple name.
// (for anonymous classes): 1 or more digits.

// Since getSimpleBinaryName() will strip the binary name of
// the immediatly enclosing class, we are now looking at a
// string that matches the regular expression "\$[0-9]*"
// followed by a simple name (considering the simple of an
// anonymous class to be the empty string).

// Remove leading "\$[0-9]*" from the name
int length = simpleName.length();
if (length < 1 || simpleName.charAt(0) != '$')
throw new InternalError("Malformed class name");


Posted by 김용환 '김용환'

댓글을 달아 주세요



Spark 데이터 프레임의 StatFunctions 패키지 함수 중 monotonically_increasing_id를 사용하면

데이터 프레임의 로우에 할당된 고유 ID를 출력한다.



import org.apache.spark.sql.functions.monotonically_increasing_id
df.select(monotonically_increasing_id()).show(5)



결과


+-----------------------------+

|monotonically_increasing_id()|

+-----------------------------+

|                            0|

|                            1|

|                            2|

|                            3|

|                            4|

+-----------------------------+

only showing top 5 rows


Posted by 김용환 '김용환'

댓글을 달아 주세요