schema-registry HA 이슈

kafka 2019. 4. 5. 15:50

 

 

schema-registry는 distributed mode(clustering mode)로 실행 중인데.. 1대가 이유가 죽고 있다.


[2019-04-05 15:06:07,189] WARN [Consumer clientId=consumer-1, groupId=connect-cluster] 9 partitions have leader brokers without a matching listener, including [connect-offsets-0, connect-offsets-15, connect-offsets-9, connect-offsets-3, connect-offsets-24, connect-offsets-18, connect-offsets-12, connect-offsets-6, connect-offsets-21] (org.apache.kafka.clients.NetworkClient:1012)
[2019-04-05 15:06:07,279] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:227)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 30001ms
[2019-04-05 15:06:07,281] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
[2019-04-05 15:06:07,286] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:226)
[2019-04-05 15:06:07,292] INFO Stopped http_8083@724c5cbe{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:341)
[2019-04-05 15:06:07,293] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:167)
[2019-04-05 15:06:07,310] INFO Stopped o.e.j.s.ServletContextHandler@3fd2322d{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1040)
[2019-04-05 15:06:07,312] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:244)
[2019-04-05 15:06:07,312] INFO Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder:398)
[2019-04-05 15:06:12,313] INFO Herder stopped (org.apache.kafka.connect.runtime.distributed.DistributedHerder:418)
[2019-04-05 15:06:12,313] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:70)

키프카 커넥트에서 사용하는 카프카  관리 topic에 이슈가 있어서 발생한 것이다.  topic 이름을 지우거나 새로 생성하거나 변경하니 문제가 발생하지 않는다.

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

3가지 토픽은 카프카 커넥트에서 관리하는 토픽이다.
카프카 커넥트를 distributed 모드로 실행하면 분산 태스크는 카프카 토픽 정보에 커넥터와 태스크 설정, 커넥터 오프셋, 커넥터 상태 정보를 저장한다.

카프카 커넥트(distributed 모드)를 실행하려 할때, 세 개의 내부 토픽을 확인한다. 만약 토픽이 없으면 주어진 설정을 기반으로 생성한다. 토픽에 주어진 replication, partition 등이 설정으로 있으면 설정한다. 만약 아래 설정이 주어지지 않으면 카프카의 auto 생성 설정 규칙에 따라 생성된다.

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

offset.storage.partitions=25
status.storage.partitions=5

 

 

 

Posted by '김용환'
,

java.lang.RuntimeException: Missing scala-library.jar    

불러오는 중입니다...

 

build.sbt 파일의 스칼라 버전이 실제로 로컬에 저장되어 있지 않으면. 에러가 난다.

 

즉 build.sbt 파일에는 2.11.11을 적어두고 로컬 머신에 2.11.8이 설치되어 있을 때 .sbt 에러가 발생할 수 있다.

scalaVersion := "2.11.11"

 

 

scalaVersion := "2.11.11"을 scalaVersion := "2.11.8"로 변경하든지 scala 2.11.11로 설치하면 된다.

Posted by '김용환'
,

kafka connect에서 kafka broker 설정과 schema registry 설정할 때 조심히 다뤄야 한다.

 

bootstrap.servers=google-test-kafka001.google.io:9092,google-test-kafka002.google.io:9092,google-test-kafka003.google.io:9092

key.converter.schema.registry.url=http://google-test-sr001.google.io:8081,http://google-test-sr002.google.io:8081,http://google-test-sr003.google.io:8081
value.converter.schema.registry.url=http://google-test-sr001.google.io:8081,http://google-test-sr002.google.io:8081,http://google-test-sr003.google.io:8081


 kafka broker는 그냥 host와 port만 지정하고

schema registry는 http를 포함한 host와 port(uri)를 지정한다.

 

알고 나면 매우 당연한데, 막상 설정하다 보면 실수하는 내용이다.

Posted by '김용환'
,

Spark에서

Streaming 데이터를 DB에 저장할 때. 일반적인 데이터 프레임에서 저장하는 방식을 사용할 수 없다.

(만약 사용하면 streaming 데이터 프레임에서 그렇게 저장할 수 없다라는 에러가 나온다)

 

따라서 Sink 형태(ForeachWriter 상속) 방식을 사용해야 한다. 

(단순한 형태의 구현이다 )

 

 

예시) Spark Streaming Data Frame쪽 소스

val writer:ForeachWriter[DeserializedFromKafkaRecord] = new JdbcSink(sinkDBUrlsinkTable);
val query = dataframe
.writeStream
.foreach(writer)
.outputMode("append")
.start()
query.awaitTermination()

 

 

예시) ForeachWriter를 구현한 JdbcSink.scala 파일 

package streaming

import java.sql.Statement
import java.sql.Connection
import java.sql.DriverManager

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.ForeachWriter

class JdbcSink(url: String, tablename: String) extends ForeachWriter[DeserializedFromKafkaRecord]{
val driver = "com.mysql.cj.jdbc.Driver"
var statement:Statement = _
var connection:Connection = _

def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver)
connection = DriverManager.getConnection(url)
this.statement = connection.createStatement()
true
}

override def process(record: DeserializedFromKafkaRecord): Unit = {
if (StringUtils.isEmpty(record.value)) {
throw new IllegalArgumentException
}

val value = record.value.replace("'", "").replace("\"", "")
statement.executeUpdate("insert into " + tablename + "(value) values(\"" + value + "\")")
}

override def close(errorOrNull: Throwable): Unit = {
connection.close()
}
}

 

Posted by '김용환'
,

spark streaming 코딩하다 다음과 같은 에러가 발생했다. 

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:168)

 

Dstream을 처리할 때는 spark의 모든 API가 전부 다 허용되지 않는다.

 

Output operations allow DStream’s data to be pushed out to external systems like a database or a file systems. Since the output operations actually allow the transformed data to be consumed by external systems, they trigger the actual execution of all the DStream transformations (similar to actions for RDDs). Currently, the following output operations are defined:

 

Dstream 처리에 허영되는 api는 다음과 같다.

print()
foreachRDD()
saveAsObjectFiles()
saveAsTextFiles()
saveAsHadoopFiles()

http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

Posted by '김용환'
,

 

kibana 5.x에서는 index-pattern id가 index 이름이었는데.

kibana 6.x에서는 index-pattern id가 uuid로 바뀌었다. 

 

 키바나 내부 api를 사용해 어떤 save object가 있는지 확인할 수 있다. 

 

curl -s http://kibana.internal.google.io:5601/api/saved_objects/_find?type=index-pattern

{
      "type": "index-pattern",
      "id": "b1d1bed0-464d-11e9-9577-0b28abc59fe5",
      "attributes": {
        "title": "google_search_admin*",
        "timeFieldName": "customer",
        "fields": "[{\"name\":\"_id\",\"type\":\

...

}

 

 

kibana index 변경을 하고 싶으면 index_pattern을 수정해야 한다.

 


https://qiita.com/NAO_MK2/items/2d03d9db1cd7b0ceae04

Posted by '김용환'
,

gradle에서 proxy 설정을 다음과 같이 진행하니 제대로 proxy

 

$ ./gradlew -Dhttp.proxyHost=proxy.igoogle.com -Dhttp.proxyPort=8082 - Dhttps.proxyHost=proxy.igoogle.com -Dhttps.proxyPort=8082 clean build

 

실행 안된다.

 

애플리케이션의 gradle.properties로 변경해야 동작한다.

 

 

systemProp.http.proxyHost=...
systemProp.http.proxyPort=..
systemProp.http.nonProxyHosts=...
systemProp.https.proxyHost=...
systemProp.https.proxyPort=..
systemProp.https.nonProxyHosts=....

Posted by '김용환'
,

MCN

scribbling 2019. 4. 1. 18:18

공부차원에서 작성한 내용..

 

 MCN (Multi Channel Network)

: 유튜브, 아프리카TV 등 동영상 사이트에서 사용되는 방식으로 인기가 높은 중소 창작자(1인 포함)의 콘텐츠 유통/판매/저작권 관리/광고 유치/자금 지원등에 도움을 주고 콘텐츠로부터 나온 수익을 창작자와 나눠 갖는 미디어 사업을 의미한다고 한다..

 

 

https://terms.naver.com/entry.nhn?docId=3543412&cid=42171&categoryId=58478

Posted by '김용환'
,

 

spring boot 2 - JPA를 사용 중에 javax.persistence.TransactionRequiredException: Executing an update/delete query 에러가 발생했다. 

 

 

이전 코드는 아래와 같았는데..

 

@Modifying
@Query(value="update user set vin = ?2 where username = ?1", nativeQuery = true)
void update(String username, String vin);

 

 

Transactional을 추가하니 잘 동작된다. 

 

주의 할 점은 javax.transaction.Transactional을 사용하면 안된다. spring 앱 개발할 때는 spring 만 사용하면 된다.

import org.springframework.transaction.annotation.Transactional;

@Transactional
@Modifying
@Query(value="update user set vin = ?2 where username = ?1", nativeQuery = true)
void update(String username, String vin);

 

 

Posted by '김용환'
,

spring boot2에 jpa에서 native query 를 사용하다가.

 

Validation failed for query for method... 에러를 만났다.

 

@Modifying

@Query("update user u set u.vin = ?2 where u.username = ?1")

void update(String username, String vin);

 

다음과 같이 nativeQuery인지를 알려줘야 더 이상 에러가 발생하지 않는다.

 

@Modifying
@Query(value="update user u set u.vin = ?2 where u.username = ?1", nativeQuery = true)
void update(String username, String vin);

Posted by '김용환'
,