RDD를 쉽게 카산드라 테이블에 변환하고 저장하는 커넥터가 대표적으로 2개가 있다. 


칼리오페라고 있는데. 사실상 더 개발이 되지 않고 있다. 


https://github.com/tuplejump/calliope


      val transformer = CqlRowReader.columnListMapper[Employee]("deptid", "empid", "first_name", "last_name")


      import transformer._


      val cas = CasBuilder.cql3.withColumnFamily("cql3_test", "emp_read_test")


      val casrdd = sc.cql3Cassandra[Employee](cas)


      val result = casrdd.collect().toList


      result must have length (5)

      result should contain(Employee(20, 105, "jack", "carpenter"))

      result should contain(Employee(20, 106, "john", "grumpy"))




많이 사용되는 cassandra connector는 datastax에서 개발되었다. 

현재 버전까지 잘 지원되고 있다. 

https://github.com/datastax/spark-cassandra-connector


간단한 예제는 다음과 같다. 


import org.apache.spark._ 

import com.datastax.spark.connector._


val conf = new SparkConf(true).set("spark.cassandra.connection.host", "ip") 

val sc = new SparkContext("spark://ip:7077", "test", conf)


val testRDD = sc.cassandraTable("testKs", "kv") 

println(testRDD.count) 

println(testRDD.first)

println(testRDD.map(_.getInt("value")).sum) 


val col = sc.parallelize(Seq(("key3", 3), ("key4", 4))) 

col.saveToCassandra("testKs", "kv", SomeColumns("key", "value"))      




Posted by '김용환'
,