'2019/04/04'에 해당되는 글 2건

  1. 2019.04.04 kafka connect 설정 주의 사항
  2. 2019.04.04 [Spark] Streaming 데이터를 DB에 저장하는 코드

kafka connect에서 kafka broker 설정과 schema registry 설정할 때 조심히 다뤄야 한다.

 

bootstrap.servers=google-test-kafka001.google.io:9092,google-test-kafka002.google.io:9092,google-test-kafka003.google.io:9092

key.converter.schema.registry.url=http://google-test-sr001.google.io:8081,http://google-test-sr002.google.io:8081,http://google-test-sr003.google.io:8081
value.converter.schema.registry.url=http://google-test-sr001.google.io:8081,http://google-test-sr002.google.io:8081,http://google-test-sr003.google.io:8081


 kafka broker는 그냥 host와 port만 지정하고

schema registry는 http를 포함한 host와 port(uri)를 지정한다.

 

알고 나면 매우 당연한데, 막상 설정하다 보면 실수하는 내용이다.

Posted by '김용환'
,

Spark에서

Streaming 데이터를 DB에 저장할 때. 일반적인 데이터 프레임에서 저장하는 방식을 사용할 수 없다.

(만약 사용하면 streaming 데이터 프레임에서 그렇게 저장할 수 없다라는 에러가 나온다)

 

따라서 Sink 형태(ForeachWriter 상속) 방식을 사용해야 한다. 

(단순한 형태의 구현이다 )

 

 

예시) Spark Streaming Data Frame쪽 소스

val writer:ForeachWriter[DeserializedFromKafkaRecord] = new JdbcSink(sinkDBUrlsinkTable);
val query = dataframe
.writeStream
.foreach(writer)
.outputMode("append")
.start()
query.awaitTermination()

 

 

예시) ForeachWriter를 구현한 JdbcSink.scala 파일 

package streaming

import java.sql.Statement
import java.sql.Connection
import java.sql.DriverManager

import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.ForeachWriter

class JdbcSink(url: String, tablename: String) extends ForeachWriter[DeserializedFromKafkaRecord]{
val driver = "com.mysql.cj.jdbc.Driver"
var statement:Statement = _
var connection:Connection = _

def open(partitionId: Long, version: Long): Boolean = {
Class.forName(driver)
connection = DriverManager.getConnection(url)
this.statement = connection.createStatement()
true
}

override def process(record: DeserializedFromKafkaRecord): Unit = {
if (StringUtils.isEmpty(record.value)) {
throw new IllegalArgumentException
}

val value = record.value.replace("'", "").replace("\"", "")
statement.executeUpdate("insert into " + tablename + "(value) values(\"" + value + "\")")
}

override def close(errorOrNull: Throwable): Unit = {
connection.close()
}
}

 

Posted by '김용환'
,