Spark와 Kafka 연동하는 방식은 다음과 같다.
- 수신기 기반 접근 방식(Receiver-based approach)
- 다이렉트 스트림 접근 방식(Direct stream approach)
- 구조화된 스트리밍(Structured streaming)
1. 수신기 기반 접근 방식
수신기 기반 방식은 스파크와 카프카와의 첫 번째 통합 방식이었다. 수신기 접근 방식에서 드라이버는 익스큐터에서 카프카 브로커의 고급 API를 사용해 데이터를 가져올 수 있는 수신자를 실행한다. 수신자가 카프카 브로커에서 이벤트를 가져 오고 있기 때문에 수신자는 주키퍼(zookeeper)에 오프셋을 저장한다. 주키퍼는 카프카 클러스터에서도 사용된다. 주요 측면은 WAL(Write Ahead Log)의 사용이다. 수신자는 카프카에서 데이터를 소비하면서 WAL에 계속 저장한다. 따라서 문제가 발생해 익스큐터 또는 수신자가 손실되거나 재시작될 때 WAL을 사용해 이벤트를 복구하고 처리할 수 있다. 따라서이 로그 기반 설계는 내구성과 일관성을 모두 제공한다.
각 수신기는 카프카 토픽(topic)에서 이벤트의 입력 DStream을 생성하고 주키퍼에 카프카 토픽, 브로커, 오프셋 등을 쿼리한다.
사용하는 API는 KafkaUtils.createStream이다.
def createStream(
ssc: StreamingContext, // StreamingContext 오브젝트
zkQuorum: String, //주키퍼 쿼럼(quorum) (호스트이름:포트,호스트이름:포트,..)
groupId: String, //컨슈머의 그룹 id
topics: Map[String, Int], // 소비할 (토픽 이름, 파티션 개수) 맵입니다. 각 파티션은 자체 스레드에서 사용된다.
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
Storage level to use for storing the received objects
(default: StorageLevel.MEMORY_AND_DISK_SER_2)
): ReceiverInputDStream[(String, String)] //(카프카 메시지 키, 카프카 메시지 값) DStream
예제는 다음과 같다.
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
2. 다이렉트 스트림 접근 방식
다이렉트 스트림 접근 방식(direct stream approach)은 카프카 통합과 관련한 새로운 접근 방식이며 드라이버를 사용하여 브로커에 직접 연결하고 이벤트를 가져 오는 방식으로 동작한다. 주요 내용은 다이렉트 스트림 API를 사용하는 것이므로 스파크 태스크는 카프카 토픽/파티션 대비 스파크 파티션 비율을 볼 때 1:1 비율로 동작한다는 것이다. 다이렉트 스트림 기반 접근 방식은 HDFS 또는 WAL에 대한 의존성 때문에 유연하지 않다. 또한 이제 오프셋으로 바로 접근할 수 있기 때문에 멱등성 또는 트랜잭션 방식을 사용해 정확히 한 번만 처리할 수 있다.
수신자를 사용하지 않고 카프카 브로커에서 직접 메시지를 가져오는 입력 스트림을 생성한다. 입력 스트림은 카프카에서 가져온 각 메시지가 정확히 한 번 처리하는 트랜스포메이션에 포함되도록 보장할 수 있다.
다음과 같이 KafkaUtils.createDirectStream() API를 사용하여 다이렉트 스트림을 생성할 수 있다.
def createDirectStream[
K: ClassTag, // 카프카 메시지 키의 K 타입
V: ClassTag, // 카프카 메시지 값의 V 타입
KD <: Decoder[K]: ClassTag, // 카프카 메시지 키 디코더의 KD 타입
VD <: Decoder[V]: ClassTag, // 카프카 메시지 값 디코더의 VD 타입
R: ClassTag // 메시지 핸들러에서 리턴하는 R 타입
](
ssc: StreamingContext, //StreamingContext 오브젝트
KafkaParams: Map[String, String],
/*
카프카의 설정 매개변수(http://kafka.apache.org/documentation.html#configuration)를 참조한다.
host1:port1,host2:port2 형식으로 지정된 카프카 브로커(주키퍼 서버는 아님)과 함께 "metadata.broker.list"또는 "bootstrap.servers" 매개 변수를 설정해야 한다.
*/
fromOffsets: Map[TopicAndPartition, Long], // 스트림의 시작점(포함)을 정의하는 토픽/파티션 별 카프카 오프셋
messageHandler: MessageAndMetadata[K, V] => R // 각 메시지와 메타 데이터를 원한 타입으로 변환하는 함수
): InputDStream[R] // R 타입의 DStream
다이렉트 스트림 API에 대한 예는 다음과 같다.
val topicsSet = topics.split(",").toSet
val KafkaParams : Map[String, String] =
Map("metadata.broker.list" -> brokers,
"group.id" -> groupid )
val rawDstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, KafkaParams, topicsSet)
다이렉트 스트림 API는 카프카에서만 사용할 수 있어서 일반적으로 사용할 수 있는 방식이 아니다.
3. 구조화된 스트리밍(Structured streaming)
구조화된 스트리밍(structured streaming)은 아파치 스파크 2.0 이상에서 새로 도입되었다.
구조화 스트리밍(structured streaming)은 스파크 SQL 엔진 위에 구축된 확장 가능하고 내결함성 스트림 처리 엔진이다. 이는 DStream 패러다임 및 스파크 스트리밍 API와 관련된 이슈가 아니라 스트림 처리와 계산이 배치 처리에 가깝다. 구조화된 스트리밍 엔진은 정확히 한 번 스트림 처리, 처리 결과에 대한 증분 업데이트, 집계 등과 같은 내용을 처리한다.
다음은 카프카 소스 스트림 또는 카프카 소스에서 읽는 예이다.
val ds1 = spark
.read
.format("Kafka")
.option("Kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
val ds1 = spark
.readStream
.format("Kafka")
.option("Kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
또한 구조화된 스트리밍 API는 스파크 스트리밍의 큰 이슈를 해결할 수 있는 방법을 제공한다. 즉 스파크 스트리밍은 들어오는 데이터를 마이크로 배치로 처리하고 수신 시간을 데이터를 분할하는 수단으로 사용하므로 실제 이벤트 시간을 고려하지 않는다. 구조화된 스트리밍을 사용하면 수신되는 데이터에서 이런 이벤트 시간을 지정하여 최신 데이터가 자동으로 처리되도록 할 수 있다.
구조화된 스트리밍의 핵심 아이디어는 실시간 데이터 스트림을 이벤트가 스트림에서 처리될 때 연속적으로 추가되는 무제한 테이블(unbounded table)로 처리하는 것이다. 그리고 일반적으로 배치 데이터를 갖고 처리하는 것처럼 무제한 테이블에서 계산과 SQL 쿼리를 실행할 수 있다.
DStream은 시간이 지나면서 많은 데이터는 처리되어 결과를 생성한다. 따라서 무제한 입력 테이블은 결과 테이블을 생성하는 데 사용된다. 출력 또는 결과 테이블은 출력(output)이라고하는 외부 싱크(sink)에 저장될 수 있다.
스트림을 받는 예제는 다음과 같다.
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val inputLines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val words = inputLines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
query.start()
지연을 처리하기 위해 watermark를 사용할 수 있다. 다음은 그 예이다.
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val inputLines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.option("includeTimestamp", true)
.load()
val words = inputLines.as[(String, Timestamp)].flatMap(line =>
line._1.split(" ").map(word => (word, line._2))).toDF("word", "timestamp")
val windowedCounts = words.withWatermark("timestamp", "10 seconds")
.groupBy(window($"timestamp", "10 seconds", "10 seconds"), $"word").count().orderBy("window")
val query = windowedCounts.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "false")
query.start()