Play2(scala) + Kafka producer/consumer 예제이다.
kafka는 이제 분산 파일 큐이고 여러 consumer가 동시에 동일한 데이터를 받을 수 있다.
관련 예제를 소개한다.
1) Producer
topic은 "aa"로 결정한다.
package worker
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.{Date, Properties}
import scala.util.Random
class KafkaProducerWorker(brokerList: String, topic: String) {
private val rnd = new Random()
private val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[Integer, String](props)
def send(events: Int) {
val runtime = new Date().getTime()
val key = rnd.nextInt(100)
val msg = runtime.toString
val data = new ProducerRecord[Integer, String](topic, key, msg)
producer.send(data)
}
}
2) consumer
package worker
}
import java.util
import java.util.Properties
import java.util.concurrent.{ExecutorService, Executors}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
class KafkaConsumerWorker (val brokers: String,
val groupId: String,
val topic: String) {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](props)
var executor: ExecutorService = null
consumer.subscribe(util.Collections.singleton(this.topic))
def run(): Unit= {
Executors.newSingleThreadExecutor.execute(new Runnable {
override def run(): Unit = {
while(true) {
val consumerRecords = consumer.poll(Integer.MAX_VALUE)
println("size : " + consumerRecords.count())
while(consumerRecords.iterator.hasNext) {
val record = consumerRecords.iterator.next() println(s"topic = ${record.topic()}, partition = ${record.partition()}, " + s"offset = ${record.offset()}, key = ${record.key()}, value = ${record.value()}\n")
}
}
})
}
def shutdown() = {
if (consumer != null) consumer.close();
if (executor != null) executor.shutdown();
}
}
3) conumer 추가 - ApplicationTimer
@Singleton
class ApplicationTimer @Inject() (clock: Clock, appLifecycle: ApplicationLifecycle) {
private val start: Instant = clock.instant
Logger.info(s"ApplicationTimer demo: Starting application at $start.")
Logger.info(s"Starting Kafka Consume Worker")
private val kafkaConsumerWorker = new KafkaConsumerWorker("localhost:9092", "1", "aa")
kafkaConsumerWorker.run()
appLifecycle.addStopHook { () =>
kafkaConsumerWorker.shutdown()
Logger.info(s"Stopping kafkaConsumerWorker.")
val stop: Instant = clock.instant
val runningTime: Long = stop.getEpochSecond - start.getEpochSecond
Logger.info(s"Stopping application at ${clock.instant} after ${runningTime}s.")
Future.successful(())
}
}
4) producer web call
@Singleton
class AsyncController @Inject()(counter: Counter, actorSystem: ActorSystem) (implicit exec: ExecutionContext) extends Controller {
def index = Action.async {
getFutureMessage(1.second).map { msg => Ok(counter.nextCount().toString + " " + msg) }
}
val kafkaProducerWorker = new KafkaProducerWorker("localhost:9092", "aa")
private def getFutureMessage(delayTime: FiniteDuration): Future[String] = {
val promise: Promise[String] = Promise[String]()
actorSystem.scheduler.scheduleOnce(delayTime) {
kafkaProducerWorker.send(1)
promise.success("Hi!")
}
promise.future
}
}
결과를 보려면 $ curl http://localhost:9000으로 테스트한다.
size : 1
topic = aa, partition = 0, offset = 1680, key = 3, value = 1489127790823
size : 1
topic = aa, partition = 0, offset = 1681, key = 71, value = 1489127833413
size : 1
topic = aa, partition = 0, offset = 1682, key = 17, value = 1489127836895
'scala' 카테고리의 다른 글
컬렉션의 forall과 exists 메소드 (0) | 2017.03.13 |
---|---|
[Akka] invalid ActorSystem name Exception 해결하기 (0) | 2017.03.13 |
[scala] 컬렉션의 head,last,init,lastOption,tail 예제 (0) | 2017.03.09 |
[scala] 컬렉션의 take,drop,dropRight,takeWhile,slice 예제 (0) | 2017.03.09 |
Intellij에서 scala-play2 환경 구축하기 (0) | 2017.03.08 |