[spark] spark cassandra connector - 스파크에 카산드라 연동하는 라이브러리
RDD를 쉽게 카산드라 테이블에 변환하고 저장하는 커넥터가 대표적으로 2개가 있다.
칼리오페라고 있는데. 사실상 더 개발이 되지 않고 있다.
https://github.com/tuplejump/calliope
val transformer = CqlRowReader.columnListMapper[Employee]("deptid", "empid", "first_name", "last_name")
import transformer._
val cas = CasBuilder.cql3.withColumnFamily("cql3_test", "emp_read_test")
val casrdd = sc.cql3Cassandra[Employee](cas)
val result = casrdd.collect().toList
result must have length (5)
result should contain(Employee(20, 105, "jack", "carpenter"))
result should contain(Employee(20, 106, "john", "grumpy"))
간단한 예제는 다음과 같다.
import org.apache.spark._
import com.datastax.spark.connector._
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "ip")
val sc = new SparkContext("spark://ip:7077", "test", conf)
val testRDD = sc.cassandraTable("testKs", "kv")
println(testRDD.count)
println(testRDD.first)
println(testRDD.map(_.getInt("value")).sum)
val col = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
col.saveToCassandra("testKs", "kv", SomeColumns("key", "value"))