Play2(scala) + Kafka producer/consumer 예제이다. 


kafka는 이제 분산 파일 큐이고 여러 consumer가 동시에 동일한 데이터를 받을 수 있다. 


관련 예제를 소개한다. 





1) Producer


topic은 "aa"로 결정한다.

package worker

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.{Date, Properties}

import scala.util.Random

class KafkaProducerWorker(brokerList: String, topic: String) {
private val rnd = new Random()
private val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[Integer, String](props)

def send(events: Int) {
val runtime = new Date().getTime()
val key = rnd.nextInt(100)
val msg = runtime.toString
val data = new ProducerRecord[Integer, String](topic, key, msg)
producer.send(data)
}
}




2) consumer

package worker

import java.util
import java.util.Properties
import java.util.concurrent.{ExecutorService, Executors}

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}

class KafkaConsumerWorker (val brokers: String,
val groupId: String,
val topic: String) {

val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

val consumer = new KafkaConsumer[String, String](props)
var executor: ExecutorService = null
consumer.subscribe(util.Collections.singleton(this.topic))

def run(): Unit= {
Executors.newSingleThreadExecutor.execute(new Runnable {
override def run(): Unit = {
while(true) {
val consumerRecords = consumer.poll(Integer.MAX_VALUE)

println("size : " + consumerRecords.count())
while(consumerRecords.iterator.hasNext) {
val record = consumerRecords.iterator.next() println(s"topic = ${record.topic()}, partition = ${record.partition()}, " + s"offset = ${record.offset()}, key = ${record.key()}, value = ${record.value()}\n")

}
}
}
})
}

def shutdown() = {
if (consumer != null) consumer.close();
if (executor != null) executor.shutdown();
}
}




3) conumer 추가 - ApplicationTimer

@Singleton
class ApplicationTimer @Inject() (clock: Clock, appLifecycle: ApplicationLifecycle) {

private val start: Instant = clock.instant
Logger.info(s"ApplicationTimer demo: Starting application at $start.")

Logger.info(s"Starting Kafka Consume Worker")
private val kafkaConsumerWorker = new KafkaConsumerWorker("localhost:9092", "1", "aa")
kafkaConsumerWorker.run()

appLifecycle.addStopHook { () =>
kafkaConsumerWorker.shutdown()
Logger.info(s"Stopping kafkaConsumerWorker.")

val stop: Instant = clock.instant
val runningTime: Long = stop.getEpochSecond - start.getEpochSecond
Logger.info(s"Stopping application at ${clock.instant} after ${runningTime}s.")
Future.successful(())
}
}



4) producer web call

@Singleton
class AsyncController @Inject()(counter: Counter, actorSystem: ActorSystem) (implicit exec: ExecutionContext) extends Controller {

def index = Action.async {
getFutureMessage(1.second).map { msg => Ok(counter.nextCount().toString + " " + msg) }
}

val kafkaProducerWorker = new KafkaProducerWorker("localhost:9092", "aa")

private def getFutureMessage(delayTime: FiniteDuration): Future[String] = {
val promise: Promise[String] = Promise[String]()
actorSystem.scheduler.scheduleOnce(delayTime) {
kafkaProducerWorker.send(1)
promise.success("Hi!")
}
promise.future
}
}



결과를 보려면 $ curl http://localhost:9000으로 테스트한다.



size : 1

topic = aa, partition = 0, offset = 1680, key = 3, value = 1489127790823


size : 1

topic = aa, partition = 0, offset = 1681, key = 71, value = 1489127833413


size : 1

topic = aa, partition = 0, offset = 1682, key = 17, value = 1489127836895

Posted by '김용환'
,