카프카 서버 성능에 크게 영향을 미치는 요인이 acks, retries, batch.size 이다..





https://www.slideshare.net/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600


https://www.youtube.com/watch?v=oQe7PpDDdzA


https://ko.hortonworks.com/blog/microbenchmarking-storm-1-0-performance/







Posted by '김용환'
,





kafka(kafka_2.11-0.10.1.0)에서 delete.topic.enable을 설정하지 않으면(delete.topic.enable=false) 토픽을 삭제하지 못하는 데모를 보여준다. 



1. 로컬에서 간단히 실행하기


- 주키퍼를 실행한다


[/usr/local/kafka_2.11-0.10.1.0] ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties


반대로 kafka 주키퍼 종료하려면 다음을 실행한다. 


[/usr/local/kafka_2.11-0.10.1.0] ./bin/zookeeper-server-stop.sh



2. kafka를 실행한다.


kafka를 실행하기 전에 먼저 설정이 delete.topic.enable 기본값(false)로 둔다.


[/usr/local/kafka_2.11-0.10.1.0] cat config/server.properties | grep delete.topic

#delete.topic.enable=true


[/usr/local/kafka_2.11-0.10.1.0] ./bin/kafka-server-start.sh  -daemon config/server.properties



x라는 토픽을 생성한다.


[/usr/local/kafka_2.11-0.10.1.0] bin/kafka-topics.sh --create --zookeeper localhost --replication-factor 1 -partition 1 --topic x

Created topic "x".


[/usr/local/kafka_2.11-0.10.1.0] bin/kafka-topics.sh --list --zookeeper localhost

x




토픽 잘 동작하는지 producer/consumer를 실행한다.


[/usr/local/kafka_2.11-0.10.1.0] bin/kafka-console-producer.sh --broker-list localhost:9092 --topic x

aaa


[/usr/local/kafka_2.11-0.10.1.0] bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic x --from-beginning

aaa



이제 x 토픽을 삭제하고 토픽 리스트를 보면 삭제라고 마크가 되어 있다.



[/usr/local/kafka_2.11-0.10.1.0] bin/kafka-topics.sh --delete --zookeeper localhost --topic x

Topic x is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.



[/usr/local/kafka_2.11-0.10.1.0] bin/kafka-topics.sh --list --zookeeper localhost

x - marked for deletion





없는 x 토픽에 추가하면, 다음과 같이 warning 에러가 많이 나온다. 


[/usr/local/kafka_2.11-0.10.1.0] bin/kafka-console-producer.sh --broker-list localhost:9092 --topic x

aaa

[2017-12-06 14:59:35,645] WARN Error while fetching metadata with correlation id 0 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:35,846] WARN Error while fetching metadata with correlation id 1 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:35,951] WARN Error while fetching metadata with correlation id 2 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:36,059] WARN Error while fetching metadata with correlation id 3 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:36,168] WARN Error while fetching metadata with correlation id 4 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:36,277] WARN Error while fetching metadata with correlation id 5 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:36,383] WARN Error while fetching metadata with correlation id 6 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:36,492] WARN Error while fetching metadata with correlation id 7 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:36,599] WARN Error while fetching metadata with correlation id 8 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:36,706] WARN Error while fetching metadata with correlation id 9 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:36,815] WARN Error while fetching metadata with correlation id 10 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:36,924] WARN Error while fetching metadata with correlation id 11 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:37,031] WARN Error while fetching metadata with correlation id 12 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2017-12-06 14:59:37,140] WARN Error while fetching metadata with correlation id 13 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)







[/usr/local/kafka_2.11-0.10.1.0] bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic x --from-beginning

[2017-12-06 14:59:22,928] WARN Error while fetching metadata with correlation id 1 : {x=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)





2. delete.topic.enable=true로 설정한 후 kafka 실행하기 




[/usr/local/kafka_2.11-0.10.1.0] cat config/server.properties | grep delete.topic

delete.topic.enable=true



그리고, kafka를 재시작(Stop/start)한다.



토픽 만들고 다시 삭제해본다. 정상적이다.


[/usr/local/kafka_2.11-0.10.1.0] bin/kafka-topics.sh --create --zookeeper localhost --replication-factor 1 -partition 1 --topic x

Created topic "x".


[/usr/local/kafka_2.11-0.10.1.0]  bin/kafka-topics.sh --list --zookeeper localhost

x


[/usr/local/kafka_2.11-0.10.1.0] bin/kafka-topics.sh --delete --zookeeper localhost --topic x

Topic x is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.


[/usr/local/kafka_2.11-0.10.1.0]  bin/kafka-topics.sh --list --zookeeper localhost

test




3. 이슈


delete.topic.enable을 true로 설정했다 하더라도 토픽 삭제가 안되는 경우가 발생할 수 있다. 


예)


https://stackoverflow.com/questions/23976670/when-how-does-a-topic-marked-for-deletion-get-finally-removed


https://stackoverflow.com/questions/44564606/how-can-i-remove-kafka-topics-marked-for-deletion



이럴 때는 재시작을 하거나..  (실제로 재시작을 통해서 삭제된 경우가 있었음..)


설정의 log.dir (보통 /tmp 디렉토리)의 파일을 삭제하고 재시작하면 된다고 한다..ㅡㅡ;






4. 


참고로 한글 토픽은 생성할 수 없다. ASCII만 된다. 일부 특수 문자만 허용한다.



bin/kafka-topics.sh --create --zookeeper localhost --replication-factor 1 -partition 1 --topic 우와

Error while executing topic command : topic name 우와 is illegal,  contains a character other than ASCII alphanumerics, '.', '_' and '-'

[2017-12-06 15:02:33,492] ERROR org.apache.kafka.common.errors.InvalidTopicException: topic name 우와 is illegal,  contains a character other than ASCII alphanumerics, '.', '_' and '-'

 (kafka.admin.TopicCommand$)

Posted by '김용환'
,



kafka 업글 또는 운영할 때 server.properties에 아래 설정을 잘 저장해야 한다. 


버전업할 때는 특히 아래 설정을 잘 조정하면서 진행할 수 있다. 

inter.broker.protocol.version=0.10.1.0

log.message.format.version=0.10.1.0



참조

https://kafka.apache.org/documentation/

http://christianposta.com/kafka-docs/_book/getting-started/upgrading.html

Posted by '김용환'
,





2017년 초부터 yahoo의 kafka monitor가 0.10.0.*을 지원한다..

(다행히)



https://github.com/yahoo/kafka-manager/pull/282


https://github.com/yahoo/kafka-manager/commit/dd80fd88a45d0c91e7b9e0cda732ae46e5a5c122



[Kafka 0.8.1.1 or 0.8.2.* or 0.9.0.* or 0.10.0.*](http://kafka.apache.org/downloads.html)



Posted by '김용환'
,


먼저 zookeeper 서버를 실행한다. 


$./bin/zookeeper-server-start.sh config/zookeeper.properties


kafka 서버를 실행한다. 최소 3대


$ ./bin/kafka-server-start.sh config/server-1.properties



$ ./bin/kafka-server-start.sh config/server-2.properties



$ ./bin/kafka-server-start.sh config/server-3.properties



각 설정 파일에 순서대로 설정을 변경한다.


listeners=PLAINTEXT://:포트

broker.id=아이디

log.dirs=/tmp/kafka-logs_파일번호



토픽을 생성한다.


$./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test



만약 토픽 생성시 에러가 발생했다면 카프카 서버가 없어서 발생할 수 있다. 


Error while executing topic command : replication factor: 1 larger than available brokers: 0

[2017-09-14 17:10:04,315] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: replication factor: 1 larger than available brokers: 0

 (kafka.admin.TopicCommand$)

 

   

 


실제 데이터 입력한다.


$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test




 bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test


Posted by '김용환'
,

[kafka] 복제(replication)

kafka 2017. 3. 17. 13:42


kafka에 복제(replication) 개념이 있고, 아래 위키에서 잘 설명하고 있다. nosql을 공부하는 사람이라면 충분히 이해할 수 있을 것이다.


동기 복제와 비동기 복제 때문에 ack관련 설정이 있기 때문에 적당히 kafka를 이해할 필요가 있을 것 같다. 



https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication





복제 관련 운영 툴은 다음과 같다.


https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools



kafka-preferred-replica-election.sh 


큰 클러스터에서 카프카는 브로커간에 리드 복제본이 균등하게 분산되게 한다. 브로커 종료가 실패하면 분산 기능이 균형을 이루지 못한다. 균등하게 분산하기 위해 카프카에 클러스터의 브로커간에 리드 복제본을 배포한다. 



kafka-preferred-replica-election.sh 툴은 주키퍼 목록을 리드 복제본이 이동되야 하는 토픽 파티션 목록으로 변경한다. 컨트롤러는 기본 복제본이 리더가 아님을 알게 되면 브로커에 기본 복제본을 파티션 리더로 지정하라는 요청을 보낸다. 기본 복제본이 ISR(in-sync replicas) 목록에 없으면 컨트롤러는 데이터를 손실하지 않도록 작업을 실패 처리한다.




$ bin/kafka-preferred-replica-election.sh --zookeeper localhost:12913/kafka --path-to-json-file topicPartitionList.json



topicPartionList.json 파일은 다음과 같다.


{"partitions": [  

    {"topic": "topic", "partition": "0"}, 

    {"topic": "topic", "partition": "1"}, 

    {"topic": "topic", "partition": "2"}, 

    {"topic": "topic2", "partition": "0"}, 

    {"topic": "topic2", "partition": "1"}, 

    {"topic": "topic2", "partition": "2"},  

  ] 




kafka-reassign-partitions.sh을 사용하면 복제본의 파티션 정책을 변경할 수 있다.


$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute 



increase-replication-factor.json 파일은 다음과 같다.

 {"partitions":[{"topic":"smackTopic","partition":0,"replicas":[2,3]}], "version":1 } 






Posted by '김용환'
,


최근 kafka를 사용할 때 버전에 따라 계속 프로퍼티가 바뀐다. (더 상세히 말하면 WARN 에러가 발생할 수 있다)


따라서 최대한 org.apache.kafka.clients.producer.ProducerConfig 또는 org.apache.kafka.clients.producer.ConsumerConfig를 참조하는 것이 제일 좋다.


예시) producer config

private val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000")
val producer = new KafkaProducer[String, String](props)



예시2) consumer config

val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "1000")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")


0.10.1 이전 버전 만 해도 consumer api에 zookeeper설정과 zookeeper timeout 설정이 있었지만, 


broker 목록으로 다 통일되었다. 


zookeeper 목록은 커맨드에서나 쓴다. 





Posted by '김용환'
,




# linkedin

linked 기술 블로그에 따르면 kafka를 중앙 pub/sub 구조의 큐로 잘 사용해 피드 시스템을 처리하고 있다. 재미있는 것은 avro도 사용하고 있다는 점이다. 


https://engineering.linkedin.com/blog/2016/04/kafka-ecosystem-at-linkedin


이미지 : https://content.linkedin.com/content/dam/engineering/site-assets/images/blog/posts/2016/04/KafkaEcosystem1.jpg




#uber


카프카 데이터 피드를 사용해 분당 수백 번의 승차 정보를 로그 데이터로 저장한 후, 해당 데이터를 Amazon S3에 대량 로드한다. 로컬 데이터 센터의 변경 데이터 로그를 스트리밍한다. json 데이터를 수집한 후 spark-hadoop(paquet)를  사용한다.


https://www.datanami.com/2015/10/05/how-uber-uses-spark-and-hadoop-to-optimize-customer-experience/









#  twitter


하루 50억 세션을 실시간으로 처리하려면 카프카를 스트림 처리 인프라로 사용해야 한다.


https://blog.twitter.com/2015/handling-five-billion-sessions-a-day-in-real-time






# netflix


카프카는 실시간 모니터링 및 이벤트 처리를 위한 넷플릭스의 데이터 파이프 라인의 백본이다.


http://techblog.netflix.com/2013/12/announcing-suro-backbone-of-netflixs.html



# spotify


https://www.meetup.com/ko-KR/stockholm-hug/events/121628932/?eventId=121628932



# yahoo


https://yahooeng.tumblr.com/post/109994930921/kafka-yahoo

Posted by '김용환'
,

1) producer 주의 사항


producer를 개발할 때 bootstrap.servers, key.serializer를 반드시 추가해야 한다.


 org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.


org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.




* boostrarp.servers : 9092포트로 떠 있는 kafka 서버이다. 

* key.serializer : 메시지 키를 serialization할 때 사용하는 정보이다.



다음은 producer 예제이다. 


Properties properties = new Properties();

properties.put("client.id", "5");

properties.put("bootstrap.servers", "127.0.0.1:9092");

properties.put("request.timeout.ms", String.valueOf(timeout));

roperties.put("request.required.acks", String.valueOf(requestRequiredAcks));

properties.put("value.serializer", MyDocumentSerializer.class.getName());

properties.put("key.serializer", MyIntegerSerializer.class.getName());

properties.put("serializer.class", MyDocumentSerializer.class.getName());

properties.put("partitioner.class", MyPartitioner.class.getName());


KafkaProducer<String, Document> producer = new KafkaProducer<>(properties);




2) consumer 주의 사항

consumer를 개발할 때 반드시 group.id, zookeeper.connect를 추가해야 한다. 



* group.id : consumr group에서 사용하는 unique 그룹 값이다. 해당 그룹으로 읽어야 offset을 서로 공유하게 되어 중복 데이터를 읽지 않도록 한다. (테스트해보면 쉽게 이해 되는 값이다)


* zookeeper.connect : kafka zookeeper 정보이다. 



만약 두 property를 주지 않으면 다음과 같은 에러가 발생한다. 


java.lang.IllegalArgumentException: requirement failed: Missing required property 'group.id'

at scala.Predef$.require(Predef.scala:224)

at kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177)



cosumer 예제

Properties props = new Properties();

props.put("group.id", "1");

props.put("zookeeper.connect", "localhost:2185");

props.put("zookeeper.session.timeout.ms", "4000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

props.put("autocommit.enable", true);

ConsumerConfig consumerConfig = new ConsumerConfig(props);

ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);




구체적인 필드에 대한 설명은 아래 문서를 봐야 한다. 


https://kafka.apache.org/documentation/


Posted by '김용환'
,





org.apache.kafka.clients.producer.Partitioner이 0.10.1.1버전의 다음처럼 변경되었다. 


kafka 0.8


public int partition(Object key, int partitions)



kafka 0.10.1.1


public interface Partitioner extends Configurable {


    /**

     * Compute the partition for the given record.

     *

     * @param topic The topic name

     * @param key The key to partition on (or null if no key)

     * @param keyBytes The serialized key to partition on( or null if no key)

     * @param value The value to partition on or null

     * @param valueBytes The serialized value to partition on or null

     * @param cluster The current cluster metadata

     */

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);


    /**

     * This is called when partitioner is closed.

     */

    public void close();


}




0.8에서는 partitoning을 다음처럼 진행했었다. 


public int partition(Object key, int partitions) {

...

    return Integer.valueOf(key) % partitions;

}






partition 개수를 얻어오려면 Cluster를 통해서 partition을 얻도록 한다. 



@Override

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

int partitionsCount = partitions.size();


int id = (Integer) key;

if (id < 0) {

return random.nextInt(partitionsCount);

}

int partitionId = id % partitionsCount;

return partitionId;

}


Posted by '김용환'
,