kafka 0.10.1.1을 설치했다. 

kafka는 격변으로 변하고 있다.. 내부는 크게 변하는 것 같지는 않지만 API쪽은 계속 변하는 것 같다. 







kafka 0.10.1.1를 3대의 클러스터로 설치하는 내용이다. 



http://kafka.apache.org/downloads.html에서 확인한다. 


https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz 에서 0.10.1.1을 다운받는다. 



다운로드하고 압축을 푼다.


$ wget http://apache.mirror.cdnetworks.com/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz


$ tar zxvf kafka_2.11-0.10.1.1.tgz


$ sudo cp -r kafka_2.11-0.10.1.1 /usr/local/


$ sudo ln -s /usr/local/kafka_2.11-0.10.1.1 /usr/local/kafka




kafka에는 zookeeper를 내장되어 있다. 설정을 변경한다.


$ cd /usr/local/kafka/config


$ sudo vi zookeeper.properties

initLimit=5

syncLimit=2


server.1=alpha-googleplus-test1.google.cc:15000:16000

server.2=alpha-googleplus-test2.google.cc:15000:16000

server.3=alpha-googleplus-test3.google.cc:15000:16000





dataDir=/tmp/zookeeper이 기본값이기 때문에 

zookeeper에 잘못된 값을 지정한 것이 있으면 아래 내용으로 zookeeper id를 변경할 수 있다. 


$ sudo mkdir -p /tmp/zookeeper

$ sudo bash -c 'echo 1 > /tmp/zookeeper/myid'

$ cat /tmp/zookeeper/myid

1


2번째 장비에는  /tmp/zookeeper/myid에 2를 저장한다. 


3번째 장비에는  /tmp/zookeeper/myid에 3을 저장한다. 



이제 kafka의 주키퍼를 실행하기 위해 스크립트를 추가한다.



$ cd /usr/local/kafka/

$ sudo vi zk-start.sh


#!/bin/bash

nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 


$ sudo chmod 755 zk-start.sh

$ sudo ./zk-start.sh



차례로 3대의 서버에서 kafka의 zookeeper를 실행하면 정상적으로 동작한다. 



zookeeper를 종료하는 스크립트는 다음과 같다. 


$ cat zk-stop.sh

#!/bin/bash


KAFKA_HOME=/usr/local/kafka

$KAFKA_HOME/bin/zookeeper-server-stop.sh


$ sudo zk-stop.sh




이번에 kakfa 클러스터 설정을 지정한다. 


$ cd /usr/local/kafka/

$ sudo vi server.properties



zookeeper id에 맞게 broker.id=0를 수정한다.

id가 3번째이면 broker.id=3으로 수정한다. 


그리고 zookeeper.connect를 변경한다.


zookeeper.connect=localhost:2181

—>

zookeeper.connect=alpha-googleplus-test1.google.cc:2181,alpha-googleplus-test2.google.cc:2181,alpha-googleplus-test3.google.cc:2181




kafka 스크립트를 추가한다.


$ cat kafka-start.sh

#!/bin/bash


KAFKA_HOME=/usr/local/kafka


$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties




kafkaServer.out에에 정상적인지 실행되었는지 확인한다.


[2017-02-28 21:20:03,701] INFO Kafka version : 0.10.1.1 (org.apache.kafka.common.utils.AppInfoParser)

[2017-02-28 21:20:03,701] INFO Kafka commitId : f10ef2720b03b247 (org.apache.kafka.common.utils.AppInfoParser)

[2017-02-28 21:20:03,702] INFO [Kafka Server 3], started (kafka.server.KafkaServer)




kafka 데몬을 종료하는 스크립트를 추가한다.


$ cat kafka-stop.sh

#!/bin/bash


KAFKA_HOME=/usr/local/kafka


$KAFKA_HOME/bin/kafka-server-stop.sh



kafkaServer.out에 다음 로그가있다. 


[2017-02-28 21:21:34,672] INFO [Kafka Server 3], shut down completed (kafka.server.KafkaServer)

[2017-02-28 21:21:34,672] INFO EventThread shut down for session: 0x25a83bf8af20000 (org.apache.zookeeper.ClientCnxn)





kafka 토픽을 생성한다.


$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper alpha-googleplus-test1.google.cc --replication-factor 3 --partitions 1 --topic test

Created topic "test".


kafka 토픽이 제대로 생성되었는지 출력한다.


$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper alpha-googleplus-test1.google.cc

test



토픽을 변경한다. 


/usr/local/kafka/bin/kafka-topics.sh --alter --zookeeper alpha-googleplus-test1.google.cc --topic test --partitions 2

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected

Adding partitions succeeded!



토픽의 대한 내용을 자세히 확인하기


$ /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper alpha-googleplus-test1.google.cc --topic test

Topic:test PartitionCount:2 ReplicationFactor:3 Configs:

Topic: test Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2

Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1




메시지 사용하는 방법이다.


producer와 consumer용 터미널가 있어야 한다. 


producer용 터미널에서 실행한다. 


$ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list alpha-googleplus-test1.google.cc:9092 --topic test




consumer용 터미널에서 실행한다. 


$ /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper alpha-googleplus-test1.google.cc --topic test --from-beginning

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

...




producer 터미널에서 111을 입력하면 consumer 터미널에서 실행된다. 


consumer의 커맨드가 zookeeper 대신 bootstrap-server를 쓰라고 되어 있다. 아래와 같이 바꾸면 더 이상 warning 메시지는 발생하지 않는다. 


$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server alpha-googleplus-test1.google.cc:9092 --topic test --from-beginning


아까 입력했던 111이 출력된다. 


producer에서 222를 입력하면 consumer에서 222가 출력된다. 



참고로 디폴트 설정으로 토픽을 삭제할 수 없다. delete.topic.enable=false로 되어 있다. 



$ /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper alpha-googleplus-test1.google.cc --topic test

Topic test is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.


$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper alpha-googleplus-test1.google.cc

test - marked for deletion





설정 파일을 수정하고 다시 실행해야 한다.


# Switch to enable topic deletion or not, default value is false

delete.topic.enable=true




따라서 kafka 설정에 delete.topic.enable=true으로 변경하고 재시작한다. 


$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper alpha-googleplus-test1.google.cc



test는 삭제되어 있다. 




다시 한번 요약하면 다음과 같다.


$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper alpha-googleplus-test1.google.cc

test


$ /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper alpha-googleplus-test1.google.cc --topic test


$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper alpha-googleplus-test1.google.cc






만약 topic 생성 없이 kafka topic에 바로 publish하면 기본 상태로 topic이 생성되고 데이터가 추가된다. 파티션 개수 1, replication factor는 1이다. 


$ /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper alpha-googleplus-test1.google.cc --topic profile_image_to_talk

Topic:profile_image_to_talk PartitionCount:1 ReplicationFactor:1 Configs:

Topic: profile_image_to_talk Partition: 0 Leader: 1 Replicas: 1 Isr: 1




카프카 토픽에 설정을 추가하기 위해 다음 커맨드를 사용한다.


/usr/local/kafka/bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --topic profile_image_to_talk --config <key>=<value>



토픽에서 구성을 제거하기 위해 다음 커맨드를 사용한다.


$ /usr/local/kafka/bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --topic profile_image_to_talk --deleteconfig <key>=<value>



Posted by '김용환'
,