spark streaming 코드에서     .format(“kafka”)을 사용한 부분에서 에러가 발생했다.

   val dataframe = spark.readStream
       .format(“kafka”)
       .option(“kafka.bootstrap.servers”, kafkaBrokers)
       .option(“subscribe”, kafkaTopic)
       .option(“startingOffsets”, “latest”)
       .option(“maxOffsetsPerTrigger”, 20)
       .load()
       .map( x=> {


에러는 kafka 관련 data source가 없다는 것이다.
Exception in thread “main” org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of “Structured Streaming + Kafka Integration Guide”.;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
    at streaming.KafkaAvroDemo$.main(KafkaAvroDemo.scala:89)

소스는 그냥 읽는 부분이고, 2.4.0, 2.4.1 소스 변화는 없다.
https://github.com/apache/spark/blob/v2.4.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L651


build.sbt를 보면.. 아래와 같이 되어 있는데. 어딘가 이슈가 있는 건 아닐까??
assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}

코드를 삭제하더라도 힌트는 보이지 않는다. 라이브러리 충돌은 아닌 것 같다..
혹시 META-INF에서 클래스 이름 충돌은 난 것은 아닐까 생각하며.. 아래와 같이 해봤다.

assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.filterDistinctLines
 case x => MergeStrategy.first
}


오호… 드뎌 겹치는 부분이었다.. 헐.. 저부분인가보다.

[error] deduplicate: different file contents found in the following:
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-sql_2.11/2.4.0/spark-sql_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.0/spark-sql-kafka-0-10_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
[error] /Users/samuel.kim/.coursier/cache/v1/https/repo1.maven.org/maven2/org/apache/spark/spark-avro_2.11/2.4.0/spark-avro_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister

kafka 라이브러리를 읽을 려면 아마도 중간에 겹친 녀석이 sbt에 포함되야 한다..
/org/apache/spark/spark-sql-kafka-0-10_2.11/2.4.0/spark-sql-kafka-0-10_2.11-2.4.0.jar:META-INF/services/org.apache.spark.sql.sources.DataSourceRegister


따라서 이 부분을 수정하면 먼가 될 것 같다..   간단하게 머징 전략의 first, last, discard, name으로 는 이 문제를 해결할 수 없다..
assemblyMergeStrategy in assembly := {
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}


TO_BE_CONTINUED(내일)
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala:651
                ```} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {```
apache/sparkAdded by GitHub

case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat 하면 될까??

samuel.c [10:38 AM]
이번에는 Exception in thread “main” java.io.IOException: No FileSystem for scheme: file 에러가 발생해서

Exception in thread “main” java.io.IOException: No FileSystem for scheme: file
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:354)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.spark.sql.execution.streaming.StreamExecution.(StreamExecution.scala:89)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.(MicroBatchExecution.scala:48)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:275)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:316)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)


case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines를 sbt assembly 정책에 추가한다.

samuel.c [11:22 AM]
assemblyMergeStrategy in assembly := {
 case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat
 case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}
이렇게 하니 잘 동작한다..

'scala' 카테고리의 다른 글

scala cats 공부 자료.  (1) 2019.06.18
[spark] kubernetes(k8s) 배포하기  (0) 2019.04.12
sbt 병렬 다운로드  (0) 2019.04.08
sbt assembly 에러  (0) 2019.04.08
[Spark] Streaming 데이터를 DB에 저장하는 코드  (0) 2019.04.04
Posted by '김용환'
,

<뽀모도로>

 

1. 어떤 일을 할지 정한다.

2. 뽀모 도로(타이머)를 25분에 맞춘다.

3. 타이머가 끝날 때까지 그 일을 한다.

4. 짧게 쉰다(보통 5분)

5. 매 4회의 ‘뽀모도로마다 길게 쉰다(15~30분).

 

<레거시 시스템>

 

실행 관례의 도입 자체를 관리자나 팀 구성원에게 설득하지 말고 현재 일하는 방식과 비교해 가져올 이익에 집중해야 한다.

빠른 피드백 루프, 요구사항과 비용에 대한 더 나은 이해, 지식 공유, 줄어드는 버그, 전체적으로 자동화되고 릴리즈가 빨라지는 일들이 기술적 실행 관례를 도입함으로써 얻을 수 있는 가치들이다.

 

테스크 코드를 머저 개발하기!! 

 

 

<채용>

 

소프트웨어 장인이 직장을 찾을 때는 특정한 프로젝트나 펜시한 기술, 괜찮은 급여만을 쫓지는 않는다. 소프트웨어 장인은 생산적인 파트너십과 아침에 일어날 때마다 일하러 가는 것이 행복한 직장을 찾는다.

 

<사기>

개발자들의 낮은 사기는 소프트웨어 프로젝트 실패의 주된 이유 중 하나이다.

 

소프트웨어 장인을 팀에 들이는 것은 기술적인 문제 해결에 도움이 될 뿐만 아니라 열정을 불어 넣고

혁신을 일으키는 데 지지자이자 동맹이 되어 준다는 것이다.

 

배움의 문화 만들기

 

--- 아무도 참여하려 하지 않는다면,

모범을 보여라 (TDD)

관심을 보이는 사람들에게 집중하라

강제하지 마라

모두를 변화시켜 들지 말라

모임에 대한 약속을 제때하라

허락을 구하지 마라

투덜대지 마라

리듬을 만들라

 

 

 

 

 

 

 

Posted by '김용환'
,

 

sudo /usr/local/confluent-5.1.2/bin/kafka-avro-console-consumer  --bootstrap-server  서버목록      --property print.key=true     --property schema.registry.url=스키마-리지스트리 --topic user --consumer.config /home/www/consumer.properties

 consumer에서 맨 마지막 읽은 부분부터 읽고 싶으면 다음 설정을 적용한다.

<consumer.propertie>

auto.offset.reset=latest
group.id=usergroup



정말 잘 설명된 문서

https://free-strings.blogspot.com/2016/04/blog-post_27.html

https://docs.confluent.io/current/clients/consumer.html

https://kafka.apache.org/documentation/

Posted by '김용환'
,

decimal.handling.mode의 기본 모드는 precise로서 Bas64인코딩 정보를 kafka에 전달한다.
string으로 변경하면 문자로 제대로 잘 저장하기에 정밀도 이슈, BigDecimal 이슈를 모두 해결한다.

"decimal.handling.mode": "string",

 

0.9.1 버전에서는 mysq의 decimal 타입일 때 exception이 발생했었는데. 

0.9.2 버전과 "decimal.handling.mode": "string",를 추가하면 괜찮을 듯 싶다.

Posted by '김용환'
,

sbt 병렬 다운로드

scala 2019. 4. 8. 19:33

project 디렉토리 하위에 plugins.sbt 파일에 다음 라이브러리를 추가한다.

addSbtPlugin("io.get-coursier" % "sbt-coursier" % "1.0.3")

 

sbt의 시퀀스 다운로드로 인해 고통 받는 이들을 위해..

Posted by '김용환'
,

sbt assembly 에러

scala 2019. 4. 8. 19:30

패키지가 비슷한 라이브러리 간에 경합이 발생할 때 build.sbt 파일에 다음을 추가한다. first만 묶는다.

assemblyMergeStrategy in assembly := {
case "application.conf" => MergeStrategy.concat
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}

 

 

간단하게 해결하는 방법이다.. 그러나 이 방법이 모든 내용을 해결해 주지 않는다. 

 

https://knight76.tistory.com/entry/1111-10 참조..

Posted by '김용환'
,

 

 

kubernetes api 서버에 요청하는 예시이다. 

 

 

 

$ APISERVER=$(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}')
$ TOKEN=$(kubectl get secret $(kubectl get serviceaccount default -o jsonpath='{.secrets[0].name}') -o jsonpath='{.data.token}' | base64 --decode )
$ curl $APISERVER/api --header "Authorization: Bearer $TOKEN" --insecure
{
  "kind": "APIVersions",
  "versions": [
    "v1"
  ],
  "serverAddressByClientCIDRs": [
    {
      "clientCIDR": "0.0.0.0/0",
      "serverAddress": "10.194.26.99:6443"
    }
  ]
}

 

 

 

 

$ curl $APISERVER/version --header "Authorization: Bearer $TOKEN" --insecure
{
  "major": "1",
  "minor": "11",
  "gitVersion": "v1.11.5",
  "gitCommit": "753b2dbc622f5cc417845f0ff8a77f539a4213ea",
  "gitTreeState": "xx",
  "buildDate": "xxxx",
  "goVersion": "go1xxxx",
  "compiler": "gc",
  "platform": "linux/amd64"
}

 

 

 

이제 swaggerapi를 호출한다. 어떤 api가 나오는지 보여준다. 다양한 api를 호출할 수 있다. 

 

 

$ curl $APISERVER/swaggerapi --header "Authorization: Bearer $TOKEN" --insecure
{
  "swaggerVersion": "1.2",
  "apis": [
   {
    "path": "/version",
    "description": "git code version from which this is built"
   },
   {
    "path": "/apis",
    "description": "get available API versions"
   },
   {
    "path": "/logs",
    "description": "get log files"
   },
   {
    "path": "/api/v1",
    "description": "API at /api/v1"
   },
   {
    "path": "/api",
    "description": "get available API versions"
   },
   {
    "path": "/apis/authentication.k8s.io/v1",
    "description": "API at /apis/authentication.k8s.io/v1"
   },
   {
    "path": "/apis/authentication.k8s.io/v1beta1",
    "description": "API at /apis/authentication.k8s.io/v1beta1"
   },
   {
    "path": "/apis/authentication.k8s.io",
    "description": "get information of a group"
   },
   {
    "path": "/apis/authorization.k8s.io/v1",
    "description": "API at /apis/authorization.k8s.io/v1"
   },
   {
    "path": "/apis/authorization.k8s.io/v1beta1",
    "description": "API at /apis/authorization.k8s.io/v1beta1"
   },
   {
    "path": "/apis/authorization.k8s.io",
    "description": "get information of a group"
   },
   {
    "path": "/apis/autoscaling/v1",
    "description": "API at /apis/autoscaling/v1"
   },
   {
    "path": "/apis/autoscaling/v2beta1",
    "description": "API at /apis/autoscaling/v2beta1"
   },
   {
    "path": "/apis/autoscaling",
    "description": "get information of a group"
   },
   {
    "path": "/apis/batch/v1",
    "description": "API at /apis/batch/v1"
   },
   {
    "path": "/apis/batch/v1beta1",
    "description": "API at /apis/batch/v1beta1"
   },
   {
    "path": "/apis/batch",
    "description": "get information of a group"
   },
   {
    "path": "/apis/certificates.k8s.io/v1beta1",
    "description": "API at /apis/certificates.k8s.io/v1beta1"
   },
   {
    "path": "/apis/certificates.k8s.io",
    "description": "get information of a group"
   },
   {
    "path": "/apis/extensions/v1beta1",
    "description": "API at /apis/extensions/v1beta1"
   },
   {
    "path": "/apis/extensions",
    "description": "get information of a group"
   },
   {
    "path": "/apis/networking.k8s.io/v1",
    "description": "API at /apis/networking.k8s.io/v1"
   },
   {
    "path": "/apis/networking.k8s.io",
    "description": "get information of a group"
   },
   {
    "path": "/apis/policy/v1beta1",
    "description": "API at /apis/policy/v1beta1"
   },
   {
    "path": "/apis/policy",
    "description": "get information of a group"
   },
   {
    "path": "/apis/rbac.authorization.k8s.io/v1",
    "description": "API at /apis/rbac.authorization.k8s.io/v1"
   },
   {
    "path": "/apis/rbac.authorization.k8s.io/v1beta1",
    "description": "API at /apis/rbac.authorization.k8s.io/v1beta1"
   },
   {
    "path": "/apis/rbac.authorization.k8s.io",
    "description": "get information of a group"
   },
   {
    "path": "/apis/scheduling.k8s.io/v1beta1",
    "description": "API at /apis/scheduling.k8s.io/v1beta1"
   },
   {
    "path": "/apis/scheduling.k8s.io",
    "description": "get information of a group"
   },
   {
    "path": "/apis/storage.k8s.io/v1",
    "description": "API at /apis/storage.k8s.io/v1"
   },
   {
    "path": "/apis/storage.k8s.io/v1beta1",
    "description": "API at /apis/storage.k8s.io/v1beta1"
   },
   {
    "path": "/apis/storage.k8s.io",
    "description": "get information of a group"
   },
   {
    "path": "/apis/apps/v1",
    "description": "API at /apis/apps/v1"
   },
   {
    "path": "/apis/apps/v1beta2",
    "description": "API at /apis/apps/v1beta2"
   },
   {
    "path": "/apis/apps/v1beta1",
    "description": "API at /apis/apps/v1beta1"
   },
   {
    "path": "/apis/apps",
    "description": "get information of a group"
   },
   {
    "path": "/apis/admissionregistration.k8s.io/v1beta1",
    "description": "API at /apis/admissionregistration.k8s.io/v1beta1"
   },
   {
    "path": "/apis/admissionregistration.k8s.io/v1alpha1",
    "description": "API at /apis/admissionregistration.k8s.io/v1alpha1"
   },
   {
    "path": "/apis/admissionregistration.k8s.io",
    "description": "get information of a group"
   },
   {
    "path": "/apis/events.k8s.io/v1beta1",
    "description": "API at /apis/events.k8s.io/v1beta1"
   },
   {
    "path": "/apis/events.k8s.io",
    "description": "get information of a group"
   }
  ],
  "apiVersion": "",
  "info": {
   "title": "",
   "description": ""
  }
 }

Posted by '김용환'
,

kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}'

Posted by '김용환'
,

 

구글이 대단한 회사이지만 어쩌면 이런 부분은 우리 나라와 비슷한 건 아닐까 생각이 들었다. 

 

원격 협업이라는게 원래 어렵다는 점이다. 그냥 참고 하는 것 같다는 생각도 든다.

 

특이한 점은 플레이북(playbook)으로 가이드한다는 점이 눈에 띈다.

 

 

https://www.blog.google/inside-google/working-google/working-together-when-were-not-together/

 

원격 협업은 어렵고, 로컬 협업은 좋다. 그래서 멀리 떨어진 사무실 간의 협업을 위해 개인사 질문(주말에 머했는지) 등을 물어보라고 가이드를 한다. 업무 시간을 가정하는 대신 동료와 상담할 때 시간을 내어 동료에게 물어보라고 가이드한다. 

대면하는것이 더 좋을 수 있으니 직접 회의를 위해 여행하는 것도 추천한다.

 

 

'scribbling' 카테고리의 다른 글

리눅스 서버의 한글이 잘 안보일 때, 내 환경을 다시 돌아봐야 겠다.  (0) 2019.06.18
펌) 마이그레이션 전략  (0) 2019.04.24
MCN  (0) 2019.04.01
[펌] 파이썬 3 치트시트  (0) 2019.03.30
최한시 , 최번시  (2) 2019.03.26
Posted by '김용환'
,

(kafka connect)debezium에서 DB를 읽을 때 해당 데이터에 대한 토픽이름이 조금 길다.

 

"서버이름.로지컬DB이름.테이블이름" 인데.

 

이를 다음과 같은 설정을 사용해 '테이블 이름'만 토픽 이름으로 지정할 수 있다.

 

실제로 이렇게 써야 괜찮다.

        "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"

 

 

참고

https://debezium.io/docs/connectors/mysql/

https://debezium.io/docs/configuration/topic-routing/

 

 

 

Posted by '김용환'
,