Spark에서

Streaming 데이터를 DB에 저장할 때. 일반적인 데이터 프레임에서 저장하는 방식을 사용할 수 없다.

(만약 사용하면 streaming 데이터 프레임에서 그렇게 저장할 수 없다라는 에러가 나온다)

 

따라서 Sink 형태(ForeachWriter 상속) 방식을 사용해야 한다. 

(단순한 형태의 구현이다 )

 

 

예시) Spark Streaming Data Frame쪽 소스

val writer:ForeachWriter[DeserializedFromKafkaRecord] = new JdbcSink(sinkDBUrlsinkTable);
val query = dataframe
.writeStream
.foreach(writer)
.outputMode("append")
.start()
query.awaitTermination()

 

 

예시) ForeachWriter를 구현한 JdbcSink.scala 파일 

package streaming

import java.sql.Statement
import java.sql.Connection
import java.sql.DriverManager

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.ForeachWriter

class JdbcSink(url: String, tablename: String) extends ForeachWriter[DeserializedFromKafkaRecord]{
val driver = "com.mysql.cj.jdbc.Driver"
var statement:Statement = _
var connection:Connection = _

def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver)
connection = DriverManager.getConnection(url)
this.statement = connection.createStatement()
true
}

override def process(record: DeserializedFromKafkaRecord): Unit = {
if (StringUtils.isEmpty(record.value)) {
throw new IllegalArgumentException
}

val value = record.value.replace("'", "").replace("\"", "")
statement.executeUpdate("insert into " + tablename + "(value) values(\"" + value + "\")")
}

override def close(errorOrNull: Throwable): Unit = {
connection.close()
}
}

 

Posted by '김용환'
,