카프카의 MirroMaker



https://cwiki.apache.org/confluence/display/KAFKA/KIP-3+-+Mirror+Maker+Enhancement

카프카 브로커의 특정 topic을 다른 곳으로 다른 브로커로 보낼 수 있는 간단한 기능이다.

복제가 된다는 점에서 훌륭한 툴이다.


(최근에 elasticsearch도 cross cluster replication를 선보였다.)



 replicator에 비하면 약한 부분이 있기는 하지만 무료이다.


https://docs.confluent.io/current/multi-dc-replicator/mirrormaker.html


사용법은 원체 간단하다. 

> bin/kafka-mirror-maker.sh

      --consumer.config consumer.properties

      --producer.config producer.properties --whitelist my-topic


특이할 점은 consumer 쓰레드 개수를  조절할 수 있다.


소스를 살펴보면,

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala


중요한 부분은 consumer는 multi-thread로 실행되고

producer는 while문에서 flush이 이루어지기 때문에. consumer 쪽 보다는 데이터가 많아진다면 producer쪽 이슈가 생길 수 있다. 


Posted by '김용환'
,

schema-registry HA 이슈

kafka 2019. 4. 5. 15:50

 

 

schema-registry는 distributed mode(clustering mode)로 실행 중인데.. 1대가 이유가 죽고 있다.


[2019-04-05 15:06:07,189] WARN [Consumer clientId=consumer-1, groupId=connect-cluster] 9 partitions have leader brokers without a matching listener, including [connect-offsets-0, connect-offsets-15, connect-offsets-9, connect-offsets-3, connect-offsets-24, connect-offsets-18, connect-offsets-12, connect-offsets-6, connect-offsets-21] (org.apache.kafka.clients.NetworkClient:1012)
[2019-04-05 15:06:07,279] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:227)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 30001ms
[2019-04-05 15:06:07,281] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)
[2019-04-05 15:06:07,286] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:226)
[2019-04-05 15:06:07,292] INFO Stopped http_8083@724c5cbe{HTTP/1.1,[http/1.1]}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:341)
[2019-04-05 15:06:07,293] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:167)
[2019-04-05 15:06:07,310] INFO Stopped o.e.j.s.ServletContextHandler@3fd2322d{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1040)
[2019-04-05 15:06:07,312] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:244)
[2019-04-05 15:06:07,312] INFO Herder stopping (org.apache.kafka.connect.runtime.distributed.DistributedHerder:398)
[2019-04-05 15:06:12,313] INFO Herder stopped (org.apache.kafka.connect.runtime.distributed.DistributedHerder:418)
[2019-04-05 15:06:12,313] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:70)

키프카 커넥트에서 사용하는 카프카  관리 topic에 이슈가 있어서 발생한 것이다.  topic 이름을 지우거나 새로 생성하거나 변경하니 문제가 발생하지 않는다.

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

3가지 토픽은 카프카 커넥트에서 관리하는 토픽이다.
카프카 커넥트를 distributed 모드로 실행하면 분산 태스크는 카프카 토픽 정보에 커넥터와 태스크 설정, 커넥터 오프셋, 커넥터 상태 정보를 저장한다.

카프카 커넥트(distributed 모드)를 실행하려 할때, 세 개의 내부 토픽을 확인한다. 만약 토픽이 없으면 주어진 설정을 기반으로 생성한다. 토픽에 주어진 replication, partition 등이 설정으로 있으면 설정한다. 만약 아래 설정이 주어지지 않으면 카프카의 auto 생성 설정 규칙에 따라 생성된다.

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

offset.storage.partitions=25
status.storage.partitions=5

 

 

 

Posted by '김용환'
,

kafka connect에서 kafka broker 설정과 schema registry 설정할 때 조심히 다뤄야 한다.

 

bootstrap.servers=google-test-kafka001.google.io:9092,google-test-kafka002.google.io:9092,google-test-kafka003.google.io:9092

key.converter.schema.registry.url=http://google-test-sr001.google.io:8081,http://google-test-sr002.google.io:8081,http://google-test-sr003.google.io:8081
value.converter.schema.registry.url=http://google-test-sr001.google.io:8081,http://google-test-sr002.google.io:8081,http://google-test-sr003.google.io:8081


 kafka broker는 그냥 host와 port만 지정하고

schema registry는 http를 포함한 host와 port(uri)를 지정한다.

 

알고 나면 매우 당연한데, 막상 설정하다 보면 실수하는 내용이다.

Posted by '김용환'
,

kafka-connect_1    | [2019-03-18 08:23:37,859] ERROR WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)

kafka-connect_1    | org.apache.kafka.connect.errors.ConnectException: Creation of database history topic failed, please create the topic manually

kafka-connect_1    | at io.debezium.relational.history.KafkaDatabaseHistory.initializeStorage(KafkaDatabaseHistory.java:348)

kafka-connect_1    | at io.debezium.connector.mysql.MySqlSchema.intializeHistoryStorage(MySqlSchema.java:266)

kafka-connect_1    | at io.debezium.connector.mysql.MySqlTaskContext.initializeHistoryStorage(MySqlTaskContext.java:196)

kafka-connect_1    | at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:137)

kafka-connect_1    | at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47)

kafka-connect_1    | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:198)

kafka-connect_1    | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)

kafka-connect_1    | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)

kafka-connect_1    | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

kafka-connect_1    | at java.util.concurrent.FutureTask.run(FutureTask.java:266)

kafka-connect_1    | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

kafka-connect_1    | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

kafka-connect_1    | at java.lang.Thread.run(Thread.java:748)

kafka-connect_1    | Caused by: java.util.concurrent.TimeoutException

kafka-connect_1    | at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)

kafka-connect_1    | at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)

kafka-connect_1    | at io.debezium.relational.history.KafkaDatabaseHistory.getKafkaBrokerConfig(KafkaDatabaseHistory.java:353)

kafka-connect_1    | at io.debezium.relational.history.KafkaDatabaseHistory.initializeStorage(KafkaDatabaseHistory.java:337)

kafka-connect_1    | ... 12 more



원인은 kafka connect 포트 이슈이다.



docker compose를 사용할 때. 

kafka를 9092로 띄우면 docker 이슈가 발생한다. 따라서 저 에러가 나면 해당 포트(9092)에 데몬 이슈가 있으니 아래 링크를 참고해서 kafka:29092로 변경하고 관련해서 kafka_connect 컴포넌트에서 kafka:29092에 연결하도록 변경한다. 




https://rmoff.net/2018/08/02/kafka-listeners-explained/



'kafka' 카테고리의 다른 글

schema-registry HA 이슈  (0) 2019.04.05
kafka connect 설정 주의 사항  (0) 2019.04.04
stream stream-join 정보와 mjoin  (0) 2019.03.02
[펌] kafka burrow api  (0) 2018.11.20
[kafka] enable.auto.commit , auto.commit.interval.ms  (0) 2018.10.22
Posted by '김용환'
,


여러 stream 데이터를 하나의 데이터로 join해 준다면 얼마나 좋을까?

이슈는 상태(state)를 관리해야 하기에 메모리 이슈가 있다.




yelp는 mjoin 알고리즘을 열심히 작업 중이다..

https://engineeringblog.yelp.com/2018/12/joinery-a-tale-of-unwindowed-joins.html





큐의 스트림처리 방식으로는 stream stream-join이라는 개념이 있다.



apache spark에서는 watermark를 활용한다.


https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html



https://dzone.com/articles/spark-stream-stream-join


https://blog.codecentric.de/en/2017/02/crossing-streams-joins-apache-kafka/






메모리 이슈가 있고 역시 타임아웃 이슈가 있어서 완벽히 진행하려면..

데이터를 스토리에 쌓고. 계속 데이터가 도착할 때마다 스토리지를 호출해 데이터가 다 들어올 때까지 쿼리를 날리는 수 밖에 없는 것 같다..



Posted by '김용환'
,

[펌] kafka burrow api

kafka 2018. 11. 20. 14:46


카프카 api stat, metric 지표를 보기 위한 burrow(https://github.com/linkedin/Burrow/wiki/HTTP-Endpoint)라는 오픈 소스 툴이 있다.



kafka에서 사용하고 싶은 http endpoint를 보고 호출한다.


https://github.com/linkedin/Burrow/wiki/HTTP-Endpoint#request-endpoints


예)

http://burrow.google.io:8000/v3/kafka/mycluster/topic/logs

{"error":false,"message":"topic offsets returned","offsets":[152223734586,152224559773,152224774276,152224723888,152224644847,152224641838,152224508250,152224383547,152215033727,152215053018,152214434045,152214530227,152215253175,152214990582,152215431432,152213601661,152215273301,152215092394,152214795862,152215123946,152215194674,152215391037,152214877453,152215137734,152215680569,152215360097,152214928462,152215484025,152214933673,152214661665,152214049830,152215021533,152215748420,152214945335,152215126831,152215051384,152214863230,152214966710,152215634739,152214820473,152215165668,152215071434,152214866458,152214865355,152214934334,152214662023,152214830751,152214573022,152215197587,152214836785],"request":{"url":"/v3/kafka/mycluster/topic/logs","host":"burrow-google"}}







Posted by '김용환'
,





카프카(Kafka) 컨슈머는 토픽(topic)에서 메시지를 읽는다. 갑작스럽게 종료되면 종료되기 전에 어딘가까지 읽었다는 위치(오프셋(offset))을 저장한다. 오프셋(offset)은 파티션에서 수신되는 각 메시지에 대해 계속 증가하는 정수 값인 메타 데이터 조각(piece)입니다. 각 메시지는 파티션에 고유한 오프셋 값을 갖는다.


카프카의 각 메시지는 고유한 오프셋을 갖고 오프셋은 특정 파티션에서 해당 메시지의 위치를 나타낸다.


컨슈머가 파티션에서 메시지를 읽으면 카프카는 마지막으로 사용한 메시지의 오프셋을 알 수 있다. 카프카 오프셋은 _consumer_offsets라는 토픽에 저장되며 컨슈머는 컨슘 메시지를 잊지 않고 중지한 부분부터 재시작할 수 있다.


어떻게 디폴트로 저장되는지 보려면 다음 값을 확인할 수 있다. 


enable.auto.commit  (기본값은 true)

auto.commit.interval.ms (기본값은 5000)

 

즉 컨슈머는 기본적으로 매 5초마다 카프카(Kafka)에 오프셋을 자동 커밋(commit)하거나 지정 토픽에서 데이터를 가져올 때마다 최신 오프셋을 커밋한다



만약 중복 처리를 최대한 하고 싶지 않다면 메시지의 오프셋을 수동으로 커밋(commit)한다. 


그리고 enable.auto.commit 속성의 값을 false로 변경해야 한다.


(자연스럽게 auto.commit.interval.ms 값은 무시된다.)


 


Posted by '김용환'
,




카프라 토픽에 저장된 데이터가 너무 많아지면 lag의 값은 점점 커진다.


lag이 커지면 처리되야 할 양은 많아지고, 결국 카프카가 저장하는 토픽 저장소의 한계도 넘어서게 된다.


따라서, 이럴 때 간단히 해결할 수 있는 방법은 파티션을 늘리는 것이다. 


topic에 할당한 파티션(partion)의 개수가 5이면 10개로 늘리면, 조금씩 lag이 줄어든다..

10개이면 20개로 늘리면 조금씩 lag이 줄어든다.



consumer group의 병렬 처리 정도는 consume하는 파티션의 수에 의해 제한된다. 따라서 일반적으로 파티션이 많아지면 당연히 처리량, throghput이 높아진다.




그렇다면 파티션을 늘리는 것이 답일까??

먼가 kafka에 영향을 주지 않을까 고민하던데 차에. 아래 문서를 보게 되었다.





http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/




요점 - 발 번역



1. 파티션 개수가 많아지면 오픈 파일 핸들(open file handle)을 더 필요로 한다.


각 파티션은 브로커의 파일 시스템의 디렉토리에 매핑된다. 해당 디렉토리에는 로그 세그먼트 당 두 개의 파일 (인덱스 용, 실제 데이터 용)이 존재한다. 각 브로커는 모든 로그 세그먼트의 인덱스와 데이터 파일 모두에 대한 파일 핸들을 연다. 따라서 파티션이 많을수록 기본 운영 체제에서 열린 파일 핸들 제한을 구성한다. 따라서 카프카 서버 운영자 입장에서는 open file handle(fd) 개수 더 커지지 않도록 운영할 필요가 있다.



2. 파티션 개수가 많아지면 비-가용성이 증가된다.


장애 상황에 이슈가 발생할 수 있다. 브로커가 불명확하게 종료될 비-가용성은 파티션 수에 비례한다. 브로커에 총 2000개의 파티션이 있고 각 파티션에 2 개의 복제본이 있다고 가정하면 해당 브로커는 약 1000개의 파티션을 위한 리더가 될 것이다.  해당 브로커가 불완전히 실패하면 1000개의 파티션을 모두 동시에 사용할 수 없게 된다. 단일 파티션에 대한 새로운 리더를 선출하는 데 5ms가 걸린다고 가정하면 1000개의 모든 파티션에 대한 새로운 리더를 뽑는는 데 최대 5 초가 소요된다. 일부 파티션의 경우 비-가용성은 5초+실패 감지 시간이 될 것이다. 


불행한 경우, 실패한 브로커가 컨트롤러라면 심각하다. 페일 오버는 자동으로 발생하지만 새 컨트롤러는 초기화하는 동안 ZooKeeper에서 모든 파티션의 일부 메타 데이터를 읽어야 한다. 예를 들어 카프카 클러스터에 10,000 개의 파티이 있고 ZooKeeper에서 메타 데이터를 초기화하는 데 파티션 당 2ms가 걸리면 사용 불가능한 창에 20초가 더 걸릴 수 있다.


따라서 최악의 경우를 피하려면 브로커 당 파티션 수를 2개에서 4000개 밑으로 제한하고 클러스터의 총 파티션 수를 수만 개로 제한하는 것이 좋다.



3. 파티션 개수가 많아지면 end-to-end 대기 시간이 증가된다.


카프카의 end-to-end 대기 시간은 producer가 메시지를 퍼블리싱하고 consumer가 메시지를 읽을 때까지의 시간으로 정의된다. 메시지 커밋 시간은 end-to-end 대기 시간의 상당 부분을 차지한다. default로 카프카 브로커는 두 개의 브로커 간에 복제본을 공유하는 모든 파티션에 대해 단일 스레드를 사용하여 다른 브로커의 데이터를 복제한다. 한 브로커에서 다른 브로커로 1000개의 파티션을 복제하면 약 20ms의 대기 시간이 추가될 수 있다. 즉 end-to-end 대기 시간이 최소 20ms임을 의미한다. 


end-to-end 대기 시간을 줄이려면 브로커 당 파티션 수를 100 x 카프카 클러스터의 브로커 수 x replica factor로 제한하는 것이 좋다. 



4. 파티션 개수가 많아지면 producer/consumer 클라이언트에서 많은 메모리를 필요로 한다.

0.8.2 릴리스에서 producoer는 파티션 당 메시지를 버퍼한다. 충분한 데이터가 축적되거나 충분한 시간이 경과하면 축적 된 메시지가 버퍼에서 제거되고 브로커로 전송된다.


파티션 수를 늘리면 producer의 더 많은 파티션에 메시지가 저장하게 된다. 사용된 메모리 총량이 설정된 메모리 제한을 초과할 수 있다. 이 경우 producer는 새로운 메시지를 차단하거나 삭제해야 하며 어느 쪽도 이상적이지 않다. 


따라서 producer가 생성하는 파티션 당 최소 수십 KB를 할당하도록 하고 파티션 수가 크게 늘어날 경우 총 메모리 양을 조정해야 한다.


비슷한 문제가 consumer에도 존재한다. consumer는 파티션 당 메시지 덩어리를 가져온다. 파티션이 클수록 더 많은 메모리가 필요하다.

Posted by '김용환'
,




kafka에서 retention.ms 수정하는 방법은 다음 예처럼 간단한다. 


$ ./bin/kafka-topics.sh  --create  --zookeeper zkserver -replication-factor 1 --partitions 1 --topic samuel.test


$ ./bin/kafka-topics.sh  --describe --zookeeper zkserver --topic samuel.test

Topic:samuel.test PartitionCount:1 ReplicationFactor:1 Configs:

Topic: samuel.test Partition: 0 Leader: 25 Replicas: 25 Isr: 25

$ ./bin/kafka-topics.sh --zookeeper zkserver --alter --topic samuel.test --config retention.ms=86400000



$ ./bin/kafka-topics.sh  --describe --zookeeper zkserver --topic samuel.test

Topic:samuel.test PartitionCount:1 ReplicationFactor:1 Configs:retention.ms=86400000

Topic: samuel.test Partition: 0 Leader: 25 Replicas: 25 Isr: 25

Posted by '김용환'
,



0.10.1.0에서 특정 노드에서 다음 카프카 에러 발생


복제가 실패하는 에러가 나온다.




 Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5db63c3b (kafka.server.ReplicaFetcherThread)

java.io.IOException: Connection to 1 was disconnected before the response was read

        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)

        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)

        at scala.Option.foreach(Option.scala:257)

        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)

        at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)

        at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)

        at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)

        at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)

        at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)

        at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)

        at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)

        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)

        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)


로그 내용을 보면 다음과 같다.

        

33676:[2018-01-18 12:54:53,992] INFO Partition [__consumer_offsets,34] on broker 1: Shrinking ISR for partition [__consumer_offsets,34] from 3,2,1 to 1 (kafka.cluster.Partition)

33687:[2018-01-18 12:55:06,164] INFO Partition [__consumer_offsets,34] on broker 1: Expanding ISR for partition [__consumer_offsets,34] from 1 to 1,2 (kafka.cluster.Partition)

33689:[2018-01-18 12:55:06,262] INFO Partition [__consumer_offsets,34] on broker 1: Expanding ISR for partition [__consumer_offsets,34] from 1,2 to 1,2,3 (kafka.cluster.Partition)

33691:[2018-01-18 12:55:23,963] INFO Partition [__consumer_offsets,34] on broker 1: Shrinking ISR for partition [__consumer_offsets,34] from 1,2,3 to 1 (kafka.cluster.Partition)



        

https://issues.apache.org/jira/browse/KAFKA-4477  


0.10.1.1에서 픽스 되었다고 한다.      

Posted by '김용환'
,