spark 2.x의 spark structured streaming 예시로 다음 코드를 참고하는 것이 좋다.
아래 코드는 카프카와 카산드라 연동 내용이다.
class SparkJob extends Serializable {
@transient lazy val logger = Logger.getLogger(this.getClass)
logger.setLevel(Level.INFO)
val sparkSession =
SparkSession.builder
.master("local[*]")
.appName("kafka2Spark2Cassandra")
.config("spark.cassandra.connection.host", "localhost")
.getOrCreate()
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
// Create keyspace and tables here, NOT in prod
connector.withSessionDo { session =>
Statements.createKeySpaceAndTable(session, true)
}
private def processRow(value: Commons.UserEvent) = {
connector.withSessionDo { session =>
session.execute(Statements.cql(value.user_id, value.time, value.event))
}
}
def runJob() = {
logger.info("Execution started with following configuration")
val cols = List("user_id", "time", "event")
import sparkSession.implicits._
val lines = sparkSession.readStream
.format("kafka")
.option("subscribe", "test.1")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)",
"CAST(topic as STRING)",
"CAST(partition as INTEGER)")
.as[(String, String, Integer)]
val df =
lines.map { line =>
val columns = line._1.split(";") // value being sent out as a comma separated value "userid_1;2015-05-01T00:00:00;some_value"
(columns(0), Commons.getTimeStamp(columns(1)), columns(2))
}.toDF(cols: _*)
df.printSchema()
// Run your business logic here
val ds = df.select($"user_id", $"time", $"event").as[Commons.UserEvent]
// This Foreach sink writer writes the output to cassandra.
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[Commons.UserEvent] {
override def open(partitionId: Long, version: Long) = true
override def process(value: Commons.UserEvent) = {
processRow(value)
}
override def close(errorOrNull: Throwable) = {}
}
val query =
ds.writeStream.queryName("kafka2Spark2Cassandra").foreach(writer).start
query.awaitTermination()
sparkSession.stop()
}