최근 kafka를 사용할 때 버전에 따라 계속 프로퍼티가 바뀐다. (더 상세히 말하면 WARN 에러가 발생할 수 있다)


따라서 최대한 org.apache.kafka.clients.producer.ProducerConfig 또는 org.apache.kafka.clients.producer.ConsumerConfig를 참조하는 것이 제일 좋다.


예시) producer config

private val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000")
val producer = new KafkaProducer[String, String](props)



예시2) consumer config

val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "1000")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")


0.10.1 이전 버전 만 해도 consumer api에 zookeeper설정과 zookeeper timeout 설정이 있었지만, 


broker 목록으로 다 통일되었다. 


zookeeper 목록은 커맨드에서나 쓴다. 





Posted by '김용환'
,