spark readStream()으로 읽은 DataSet을 카산드라에 저장하는 예시 코드이다. 






import com.datastax.driver.core.Session

import com.datastax.spark.connector.cql.CassandraConnector

import org.apache.spark.sql.ForeachWriter




val spark = ...


val ds = spark.readStream()

...




val connector = CassandraConnector.apply(spark.sparkContext.getConf)

val session = connector.openSession


def processRow(value: (String, String, String, String)) = {

  connector.withSessionDo { session =>

    session.execute(s"insert into test.log(ktag, ts, uuid, log) values('  ${value._1}', '${value._2}', '${value._3}', '${value._4}'   )")

  }

}


    

val writer = new ForeachWriter[(String, String, String, String)] {

  override def open(partitionId: Long, version: Long) = true


  override def process(value: (String, String, String, String)) = {

    processRow(value)

  }

  override def close(errorOrNull: Throwable) = {

    println(errorOrNull)

  }


}


val query = ds.writeStream.queryName("test").foreach(writer).start


query.awaitTermination()





build.sbt에는 spark-cassandra-connector를 추가한다.

libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector" % "2.0.2"



Posted by '김용환'
,