1) producer 주의 사항
producer를 개발할 때 bootstrap.servers, key.serializer를 반드시 추가해야 한다.
org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.
org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
* boostrarp.servers : 9092포트로 떠 있는 kafka 서버이다.
* key.serializer : 메시지 키를 serialization할 때 사용하는 정보이다.
다음은 producer 예제이다.
Properties properties = new Properties();
properties.put("client.id", "5");
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("request.timeout.ms", String.valueOf(timeout));
roperties.put("request.required.acks", String.valueOf(requestRequiredAcks));
properties.put("value.serializer", MyDocumentSerializer.class.getName());
properties.put("key.serializer", MyIntegerSerializer.class.getName());
properties.put("serializer.class", MyDocumentSerializer.class.getName());
properties.put("partitioner.class", MyPartitioner.class.getName());
KafkaProducer<String, Document> producer = new KafkaProducer<>(properties);
2) consumer 주의 사항
consumer를 개발할 때 반드시 group.id, zookeeper.connect를 추가해야 한다.
* group.id : consumr group에서 사용하는 unique 그룹 값이다. 해당 그룹으로 읽어야 offset을 서로 공유하게 되어 중복 데이터를 읽지 않도록 한다. (테스트해보면 쉽게 이해 되는 값이다)
* zookeeper.connect : kafka zookeeper 정보이다.
만약 두 property를 주지 않으면 다음과 같은 에러가 발생한다.
java.lang.IllegalArgumentException: requirement failed: Missing required property 'group.id'
at scala.Predef$.require(Predef.scala:224)
at kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177)
cosumer 예제
Properties props = new Properties();
props.put("group.id", "1");
props.put("zookeeper.connect", "localhost:2185");
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("autocommit.enable", true);
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
구체적인 필드에 대한 설명은 아래 문서를 봐야 한다.
https://kafka.apache.org/documentation/