https://knight76.tistory.com/entry/Spark-Streaming-%EB%8D%B0%EC%9D%B4%ED%84%B0%EB%A5%BC-DB%EC%97%90-%EC%A0%80%EC%9E%A5%ED%95%98%EB%8A%94-%EC%BD%94%EB%93%9C

 

[Spark] Streaming 데이터를 DB에 저장하는 코드

Spark에서 Streaming 데이터를 DB에 저장할 때. 일반적인 데이터 프레임에서 저장하는 방식을 사용할 수 없다. (만약 사용하면 streaming 데이터 프레임에서 그렇게 저장할 수 없다라는 에러가 나온다) 따라서 Sin..

knight76.tistory.com

 

 

이전 코드를 더 다듬어 DB 요청쪽 성능 효과를 얻는 코드를 두니, 참고하길 바란다.

 

 

jdbc 연결하는 클래스..

class JdbcSink(url: String, tablename: String) extends ForeachWriter[DeserializedFromKafkaRecord]{
  val driver = "com.mysql.cj.jdbc.Driver"
  var statement:Statement = _
  var connection:Connection  = _

  def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = DriverManager.getConnection(url)
    this.statement = connection.createStatement()
    true
  }

  override def process(record: DeserializedFromKafkaRecord): Unit = {
    if (StringUtils.isEmpty(record.value)) {
      throw new IllegalArgumentException
    }

    val value = record.value.replace("'", "").replace("\"", "")
    //println("insert into " + tablename + "(value) values(\"" + value + "\")")
    statement.executeUpdate("insert into " + tablename + "(value) values(\"" + value + "\")")
  }

  override def close(errorOrNull: Throwable): Unit = {
    connection.close()
  }

 

jdbc 저장하는 JdbcSink 클래스를 object로 감싼 wrapper, 그래야 매번 DB 연객 인스턴스 생성이 없게 한다.

import org.apache.spark.sql.ForeachWriter
import streaming.KafkaAvroDemo._

object DBSink {

  val writer:ForeachWriter[DeserializedFromKafkaRecord] = new JdbcSink(sinkDBUrl, sinkTable)
  writer.open(0, 0)

}

 

실제 예시 코드이다. 

 

    val spark = SparkSession
      .builder()
      .appName("KafkaSparkStreaming")
      .config("spark.master", "local[2]")
      .getOrCreate()

    import spark.implicits._

    val ssc = new StreamingContext(spark.sparkContext, INTERVAL)

    val dataframe = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaBrokers)
        .option("subscribe", kafkaTopic)
        .option("startingOffsets", "latest")
        .option("maxOffsetsPerTrigger", 20)
        .load()
        .map( x=> {
          val props = new Properties()
          props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry)
          props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
          val vProps = new kafka.utils.VerifiableProperties(props)
          val decoder = new KafkaAvroDecoder(vProps)
          val avroSchema = new RestService(schemaRegistry).getLatestVersion(kafkaTopic + "-value")
          val messageSchema = new Schema.Parser().parse(avroSchema.getSchema)

          DeserializedFromKafkaRecord(decoder.fromBytes(x.getAs[Array[Byte]]("value"),
            messageSchema).asInstanceOf[GenericData.Record].toString)
        }
        )

    val query = dataframe
      .writeStream
        .foreachBatch((batchDF, batchId) => {

          batchDF.foreachPartition(rows => {
            rows.foreach(row => {
              DBSink.writer.process(row)
              println(row)
            })
          })
        })
      .outputMode("append")
      .start()
    query.awaitTermination()

 

 

 

 

Posted by '김용환'
,