Spark에서
Streaming 데이터를 DB에 저장할 때. 일반적인 데이터 프레임에서 저장하는 방식을 사용할 수 없다.
(만약 사용하면 streaming 데이터 프레임에서 그렇게 저장할 수 없다라는 에러가 나온다)
따라서 Sink 형태(ForeachWriter 상속) 방식을 사용해야 한다.
(단순한 형태의 구현이다 )
예시) Spark Streaming Data Frame쪽 소스
val writer:ForeachWriter[DeserializedFromKafkaRecord] = new JdbcSink(sinkDBUrl, sinkTable); val query = dataframe .writeStream .foreach(writer) .outputMode("append") .start() query.awaitTermination() |
예시) ForeachWriter를 구현한 JdbcSink.scala 파일
package streaming import java.sql.Statement import java.sql.Connection import java.sql.DriverManager import org.apache.commons.lang3.StringUtils import org.apache.spark.sql.ForeachWriter class JdbcSink(url: String, tablename: String) extends ForeachWriter[DeserializedFromKafkaRecord]{ val driver = "com.mysql.cj.jdbc.Driver" var statement:Statement = _ var connection:Connection = _ def open(partitionId: Long, version: Long): Boolean = { Class.forName(driver) connection = DriverManager.getConnection(url) this.statement = connection.createStatement() true } override def process(record: DeserializedFromKafkaRecord): Unit = { if (StringUtils.isEmpty(record.value)) { throw new IllegalArgumentException } val value = record.value.replace("'", "").replace("\"", "") statement.executeUpdate("insert into " + tablename + "(value) values(\"" + value + "\")") } override def close(errorOrNull: Throwable): Unit = { connection.close() } } |
'scala' 카테고리의 다른 글
sbt 병렬 다운로드 (0) | 2019.04.08 |
---|---|
sbt assembly 에러 (0) | 2019.04.08 |
[spark] No output operations registered, so nothing to execute 에러 (0) | 2019.04.03 |
[spark] spark에서 DB 테이블 데이터를 읽는 방법 (0) | 2019.02.28 |
[spark] 스파크 조인 전략 - 셔플 조인, 브로캐스트 조인 (shuffle join, broadcast join) (0) | 2019.02.27 |