Spark에서 Sqlite DB 테이블을 읽어오는 예시이다.
spark jdbc로 데이터 프레임을 읽을 때 항상 모든 테이블 로우를 읽을 수 있을 뿐 아니라.
특정 쿼리의 데이터만 읽을 수 있다.
두가지 방법이 있는데.
첫 번째는 option("dbtable)에 쿼리를 추가하는 방법,
두 번째는 jdbc를 읽을 때 predicate(where절 같은 형태)를 추가하는 방법이 있다.
object JDBCMain extends SparkHelper {
def main(args: Array[String]): Unit = {
val driver = "org.sqlite.JDBC"
val path = "origin-source/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:${path}"
val tablename = "flight_info"
// driver loading
import java.sql.DriverManager
Class.forName("org.sqlite.JDBC")
val connection = DriverManager.getConnection(url)
println(connection.isClosed)
println(connection.close())
val pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info"""
val newDbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.load()
newDbDataFrame.explain()
println("predicates--")
val props = new java.util.Properties
props.setProperty("driver", "org.sqlite.JDBC")
val predicates = Array(
"DEST_COUNTRY_NAME = 'Sweden' OR ORIGIN_COUNTRY_NAME = 'Sweden'",
"DEST_COUNTRY_NAME = 'Anguilla' OR ORIGIN_COUNTRY_NAME = 'Anguilla'")
println(spark.read.jdbc(url, tablename, predicates, props).count())
println(spark.read.jdbc(url, tablename, predicates, props).rdd.getNumPartitions)
val predicates2 = Array(
"DEST_COUNTRY_NAME != 'Sweden' OR ORIGIN_COUNTRY_NAME != 'Sweden'",
"DEST_COUNTRY_NAME != 'Anguilla' OR ORIGIN_COUNTRY_NAME != 'Anguilla'")
println(spark.read.jdbc(url, tablename, predicates2, props).count())
println(spark.read.jdbc(url, tablename, predicates2, props).rdd.getNumPartitions)
이 예시는 Spark Definitive Guide에 있고 깃허브(https://github.com/knight76/spark-definitive-guide-sbt)에 저장되어 있다.
'scala' 카테고리의 다른 글
[Spark] Streaming 데이터를 DB에 저장하는 코드 (0) | 2019.04.04 |
---|---|
[spark] No output operations registered, so nothing to execute 에러 (0) | 2019.04.03 |
[spark] 스파크 조인 전략 - 셔플 조인, 브로캐스트 조인 (shuffle join, broadcast join) (0) | 2019.02.27 |
[spark] Malformed class name 에러 해결하기 (0) | 2019.02.27 |
[spark] monotonically_increasing_id 예시 (0) | 2019.02.25 |