kafka 0.10.1.1을 설치했다.
kafka는 격변으로 변하고 있다.. 내부는 크게 변하는 것 같지는 않지만 API쪽은 계속 변하는 것 같다.
kafka 0.10.1.1를 3대의 클러스터로 설치하는 내용이다.
http://kafka.apache.org/downloads.html에서 확인한다.
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz 에서 0.10.1.1을 다운받는다.
다운로드하고 압축을 푼다.
$ wget http://apache.mirror.cdnetworks.com/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
$ tar zxvf kafka_2.11-0.10.1.1.tgz
$ sudo cp -r kafka_2.11-0.10.1.1 /usr/local/
$ sudo ln -s /usr/local/kafka_2.11-0.10.1.1 /usr/local/kafka
kafka에는 zookeeper를 내장되어 있다. 설정을 변경한다.
$ cd /usr/local/kafka/config
$ sudo vi zookeeper.properties
initLimit=5
syncLimit=2
server.1=alpha-googleplus-test1.google.cc:15000:16000
server.2=alpha-googleplus-test2.google.cc:15000:16000
server.3=alpha-googleplus-test3.google.cc:15000:16000
dataDir=/tmp/zookeeper이 기본값이기 때문에
zookeeper에 잘못된 값을 지정한 것이 있으면 아래 내용으로 zookeeper id를 변경할 수 있다.
$ sudo mkdir -p /tmp/zookeeper
$ sudo bash -c 'echo 1 > /tmp/zookeeper/myid'
$ cat /tmp/zookeeper/myid
1
2번째 장비에는 /tmp/zookeeper/myid에 2를 저장한다.
3번째 장비에는 /tmp/zookeeper/myid에 3을 저장한다.
이제 kafka의 주키퍼를 실행하기 위해 스크립트를 추가한다.
$ cd /usr/local/kafka/
$ sudo vi zk-start.sh
#!/bin/bash
nohup bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
$ sudo chmod 755 zk-start.sh
$ sudo ./zk-start.sh
차례로 3대의 서버에서 kafka의 zookeeper를 실행하면 정상적으로 동작한다.
zookeeper를 종료하는 스크립트는 다음과 같다.
$ cat zk-stop.sh
#!/bin/bash
KAFKA_HOME=/usr/local/kafka
$KAFKA_HOME/bin/zookeeper-server-stop.sh
$ sudo zk-stop.sh
이번에 kakfa 클러스터 설정을 지정한다.
$ cd /usr/local/kafka/
$ sudo vi server.properties
zookeeper id에 맞게 broker.id=0를 수정한다.
id가 3번째이면 broker.id=3으로 수정한다.
그리고 zookeeper.connect를 변경한다.
zookeeper.connect=localhost:2181
—>
zookeeper.connect=alpha-googleplus-test1.google.cc:2181,alpha-googleplus-test2.google.cc:2181,alpha-googleplus-test3.google.cc:2181
kafka 스크립트를 추가한다.
$ cat kafka-start.sh
#!/bin/bash
KAFKA_HOME=/usr/local/kafka
$KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
kafkaServer.out에에 정상적인지 실행되었는지 확인한다.
[2017-02-28 21:20:03,701] INFO Kafka version : 0.10.1.1 (org.apache.kafka.common.utils.AppInfoParser)
[2017-02-28 21:20:03,701] INFO Kafka commitId : f10ef2720b03b247 (org.apache.kafka.common.utils.AppInfoParser)
[2017-02-28 21:20:03,702] INFO [Kafka Server 3], started (kafka.server.KafkaServer)
kafka 데몬을 종료하는 스크립트를 추가한다.
$ cat kafka-stop.sh
#!/bin/bash
KAFKA_HOME=/usr/local/kafka
$KAFKA_HOME/bin/kafka-server-stop.sh
kafkaServer.out에 다음 로그가있다.
[2017-02-28 21:21:34,672] INFO [Kafka Server 3], shut down completed (kafka.server.KafkaServer)
[2017-02-28 21:21:34,672] INFO EventThread shut down for session: 0x25a83bf8af20000 (org.apache.zookeeper.ClientCnxn)
kafka 토픽을 생성한다.
$ /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper alpha-googleplus-test1.google.cc --replication-factor 3 --partitions 1 --topic test
Created topic "test".
kafka 토픽이 제대로 생성되었는지 출력한다.
$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper alpha-googleplus-test1.google.cc
test
토픽을 변경한다.
/usr/local/kafka/bin/kafka-topics.sh --alter --zookeeper alpha-googleplus-test1.google.cc --topic test --partitions 2
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
토픽의 대한 내용을 자세히 확인하기
$ /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper alpha-googleplus-test1.google.cc --topic test
Topic:test PartitionCount:2 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: test Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
메시지 사용하는 방법이다.
producer와 consumer용 터미널가 있어야 한다.
producer용 터미널에서 실행한다.
$ /usr/local/kafka/bin/kafka-console-producer.sh --broker-list alpha-googleplus-test1.google.cc:9092 --topic test
consumer용 터미널에서 실행한다.
$ /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper alpha-googleplus-test1.google.cc --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
...
producer 터미널에서 111을 입력하면 consumer 터미널에서 실행된다.
consumer의 커맨드가 zookeeper 대신 bootstrap-server를 쓰라고 되어 있다. 아래와 같이 바꾸면 더 이상 warning 메시지는 발생하지 않는다.
$ /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server alpha-googleplus-test1.google.cc:9092 --topic test --from-beginning
아까 입력했던 111이 출력된다.
producer에서 222를 입력하면 consumer에서 222가 출력된다.
참고로 디폴트 설정으로 토픽을 삭제할 수 없다. delete.topic.enable=false로 되어 있다.
$ /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper alpha-googleplus-test1.google.cc --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper alpha-googleplus-test1.google.cc
test - marked for deletion
설정 파일을 수정하고 다시 실행해야 한다.
# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true
따라서 kafka 설정에 delete.topic.enable=true으로 변경하고 재시작한다.
$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper alpha-googleplus-test1.google.cc
test는 삭제되어 있다.
다시 한번 요약하면 다음과 같다.
$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper alpha-googleplus-test1.google.cc
test
$ /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper alpha-googleplus-test1.google.cc --topic test
$ /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper alpha-googleplus-test1.google.cc
만약 topic 생성 없이 kafka topic에 바로 publish하면 기본 상태로 topic이 생성되고 데이터가 추가된다. 파티션 개수 1, replication factor는 1이다.
$ /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper alpha-googleplus-test1.google.cc --topic profile_image_to_talk
Topic:profile_image_to_talk PartitionCount:1 ReplicationFactor:1 Configs:
Topic: profile_image_to_talk Partition: 0 Leader: 1 Replicas: 1 Isr: 1
카프카 토픽에 설정을 추가하기 위해 다음 커맨드를 사용한다.
/usr/local/kafka/bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --topic profile_image_to_talk --config <key>=<value>
토픽에서 구성을 제거하기 위해 다음 커맨드를 사용한다.
$ /usr/local/kafka/bin/kafka-topics.sh --alter --zookeeper localhost:2181/chroot --topic profile_image_to_talk --deleteconfig <key>=<value>