Spark에서 Sqlite DB 테이블을 읽어오는 예시이다. 


spark jdbc로 데이터 프레임을 읽을 때 항상 모든 테이블 로우를 읽을 수 있을 뿐 아니라.


특정 쿼리의 데이터만 읽을 수 있다.




두가지 방법이 있는데. 


첫 번째는 option("dbtable)에  쿼리를 추가하는 방법,


두 번째는 jdbc를 읽을 때 predicate(where절 같은 형태)를 추가하는 방법이 있다.




object JDBCMain extends SparkHelper {
def main(args: Array[String]): Unit = {
val driver = "org.sqlite.JDBC"
val path = "origin-source/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:${path}"
val tablename = "flight_info"

// driver loading
import java.sql.DriverManager
Class.forName("org.sqlite.JDBC")
val connection = DriverManager.getConnection(url)
println(connection.isClosed)
println(connection.close())

val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info"""
val newDbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.load()
newDbDataFrame.explain()

println("predicates--")
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
val predicates = Array(
"DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
"DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'")
println(spark.read.jdbc(url, tablename, predicates, props).count())
println(spark.read.jdbc(url, tablename, predicates, props).rdd.getNumPartitions)

val predicates2 = Array(
"DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
"DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'")
println(spark.read.jdbc(url, tablename, predicates2, props).count())
println(spark.read.jdbc(url, tablename, predicates2, props).rdd.getNumPartitions)



이 예시는 Spark Definitive Guide에 있고 깃허브(https://github.com/knight76/spark-definitive-guide-sbt)에 저장되어 있다. 



Posted by '김용환'
,