이전 코드를 더 다듬어 DB 요청쪽 성능 효과를 얻는 코드를 두니, 참고하길 바란다.
jdbc 연결하는 클래스..
class JdbcSink(url: String, tablename: String) extends ForeachWriter[DeserializedFromKafkaRecord]{
val driver = "com.mysql.cj.jdbc.Driver"
var statement:Statement = _
var connection:Connection = _
def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver)
connection = DriverManager.getConnection(url)
this.statement = connection.createStatement()
true
}
override def process(record: DeserializedFromKafkaRecord): Unit = {
if (StringUtils.isEmpty(record.value)) {
throw new IllegalArgumentException
}
val value = record.value.replace("'", "").replace("\"", "")
//println("insert into " + tablename + "(value) values(\"" + value + "\")")
statement.executeUpdate("insert into " + tablename + "(value) values(\"" + value + "\")")
}
override def close(errorOrNull: Throwable): Unit = {
connection.close()
}
jdbc 저장하는 JdbcSink 클래스를 object로 감싼 wrapper, 그래야 매번 DB 연객 인스턴스 생성이 없게 한다.
import org.apache.spark.sql.ForeachWriter
import streaming.KafkaAvroDemo._
object DBSink {
val writer:ForeachWriter[DeserializedFromKafkaRecord] = new JdbcSink(sinkDBUrl, sinkTable)
writer.open(0, 0)
}
실제 예시 코드이다.
val spark = SparkSession
.builder()
.appName("KafkaSparkStreaming")
.config("spark.master", "local[2]")
.getOrCreate()
import spark.implicits._
val ssc = new StreamingContext(spark.sparkContext, INTERVAL)
val dataframe = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers)
.option("subscribe", kafkaTopic)
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 20)
.load()
.map( x=> {
val props = new Properties()
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry)
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
val vProps = new kafka.utils.VerifiableProperties(props)
val decoder = new KafkaAvroDecoder(vProps)
val avroSchema = new RestService(schemaRegistry).getLatestVersion(kafkaTopic + "-value")
val messageSchema = new Schema.Parser().parse(avroSchema.getSchema)
DeserializedFromKafkaRecord(decoder.fromBytes(x.getAs[Array[Byte]]("value"),
messageSchema).asInstanceOf[GenericData.Record].toString)
}
)
val query = dataframe
.writeStream
.foreachBatch((batchDF, batchId) => {
batchDF.foreachPartition(rows => {
rows.foreach(row => {
DBSink.writer.process(row)
println(row)
})
})
})
.outputMode("append")
.start()
query.awaitTermination()