RDD를 쉽게 카산드라 테이블에 변환하고 저장하는 커넥터가 대표적으로 2개가 있다.
칼리오페라고 있는데. 사실상 더 개발이 되지 않고 있다.
https://github.com/tuplejump/calliope
val transformer = CqlRowReader.columnListMapper[Employee]("deptid", "empid", "first_name", "last_name")
import transformer._
val cas = CasBuilder.cql3.withColumnFamily("cql3_test", "emp_read_test")
val casrdd = sc.cql3Cassandra[Employee](cas)
val result = casrdd.collect().toList
result must have length (5)
result should contain(Employee(20, 105, "jack", "carpenter"))
result should contain(Employee(20, 106, "john", "grumpy"))
간단한 예제는 다음과 같다.
import org.apache.spark._
import com.datastax.spark.connector._
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "ip")
val sc = new SparkContext("spark://ip:7077", "test", conf)
val testRDD = sc.cassandraTable("testKs", "kv")
println(testRDD.count)
println(testRDD.first)
println(testRDD.map(_.getInt("value")).sum)
val col = sc.parallelize(Seq(("key3", 3), ("key4", 4)))
col.saveToCassandra("testKs", "kv", SomeColumns("key", "value"))
'scala' 카테고리의 다른 글
[brew] 맥 OS에서 scala 업그레이드 (0) | 2017.04.07 |
---|---|
[scala] case object와 object의 차이, (0) | 2017.04.03 |
[scala] 스칼라 의존성, 패키지 검색하는 웹 - http://spark-packages.org (0) | 2017.03.30 |
[spark] RDD join 예제 (0) | 2017.03.29 |
[spark2.0] dataframe의 filter,where,isin,select,contains,col,between,withColumn, 예제 (0) | 2017.03.28 |