spark readStream()으로 읽은 DataSet을 카산드라에 저장하는 예시 코드이다.
import com.datastax.driver.core.Session
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.ForeachWriter
val spark = ...
val ds = spark.readStream()
...
val connector = CassandraConnector.apply(spark.sparkContext.getConf)
val session = connector.openSession
def processRow(value: (String, String, String, String)) = {
connector.withSessionDo { session =>
session.execute(s"insert into test.log(ktag, ts, uuid, log) values(' ${value._1}', '${value._2}', '${value._3}', '${value._4}' )")
}
}
val writer = new ForeachWriter[(String, String, String, String)] {
override def open(partitionId: Long, version: Long) = true
override def process(value: (String, String, String, String)) = {
processRow(value)
}
override def close(errorOrNull: Throwable) = {
println(errorOrNull)
}
}
val query = ds.writeStream.queryName("test").foreach(writer).start
query.awaitTermination()
build.sbt에는 spark-cassandra-connector를 추가한다.
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2"