1) producer 주의 사항


producer를 개발할 때 bootstrap.servers, key.serializer를 반드시 추가해야 한다.


 org.apache.kafka.common.config.ConfigException: Missing required configuration "bootstrap.servers" which has no default value.


org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.




* boostrarp.servers : 9092포트로 떠 있는 kafka 서버이다. 

* key.serializer : 메시지 키를 serialization할 때 사용하는 정보이다.



다음은 producer 예제이다. 


Properties properties = new Properties();

properties.put("client.id", "5");

properties.put("bootstrap.servers", "127.0.0.1:9092");

properties.put("request.timeout.ms", String.valueOf(timeout));

roperties.put("request.required.acks", String.valueOf(requestRequiredAcks));

properties.put("value.serializer", MyDocumentSerializer.class.getName());

properties.put("key.serializer", MyIntegerSerializer.class.getName());

properties.put("serializer.class", MyDocumentSerializer.class.getName());

properties.put("partitioner.class", MyPartitioner.class.getName());


KafkaProducer<String, Document> producer = new KafkaProducer<>(properties);




2) consumer 주의 사항

consumer를 개발할 때 반드시 group.id, zookeeper.connect를 추가해야 한다. 



* group.id : consumr group에서 사용하는 unique 그룹 값이다. 해당 그룹으로 읽어야 offset을 서로 공유하게 되어 중복 데이터를 읽지 않도록 한다. (테스트해보면 쉽게 이해 되는 값이다)


* zookeeper.connect : kafka zookeeper 정보이다. 



만약 두 property를 주지 않으면 다음과 같은 에러가 발생한다. 


java.lang.IllegalArgumentException: requirement failed: Missing required property 'group.id'

at scala.Predef$.require(Predef.scala:224)

at kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177)



cosumer 예제

Properties props = new Properties();

props.put("group.id", "1");

props.put("zookeeper.connect", "localhost:2185");

props.put("zookeeper.session.timeout.ms", "4000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

props.put("autocommit.enable", true);

ConsumerConfig consumerConfig = new ConsumerConfig(props);

ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);




구체적인 필드에 대한 설명은 아래 문서를 봐야 한다. 


https://kafka.apache.org/documentation/


Posted by '김용환'
,