https://knight76.tistory.com/entry/Spark-Streaming-%EB%8D%B0%EC%9D%B4%ED%84%B0%EB%A5%BC-DB%EC%97%90-%EC%A0%80%EC%9E%A5%ED%95%98%EB%8A%94-%EC%BD%94%EB%93%9C

 

[Spark] Streaming 데이터를 DB에 저장하는 코드

Spark에서 Streaming 데이터를 DB에 저장할 때. 일반적인 데이터 프레임에서 저장하는 방식을 사용할 수 없다. (만약 사용하면 streaming 데이터 프레임에서 그렇게 저장할 수 없다라는 에러가 나온다) 따라서 Sin..

knight76.tistory.com

 

 

이전 코드를 더 다듬어 DB 요청쪽 성능 효과를 얻는 코드를 두니, 참고하길 바란다.

 

 

jdbc 연결하는 클래스..

class JdbcSink(url: String, tablename: String) extends ForeachWriter[DeserializedFromKafkaRecord]{
  val driver = "com.mysql.cj.jdbc.Driver"
  var statement:Statement = _
  var connection:Connection  = _

  def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = DriverManager.getConnection(url)
    this.statement = connection.createStatement()
    true
  }

  override def process(record: DeserializedFromKafkaRecord): Unit = {
    if (StringUtils.isEmpty(record.value)) {
      throw new IllegalArgumentException
    }

    val value = record.value.replace("'", "").replace("\"", "")
    //println("insert into " + tablename + "(value) values(\"" + value + "\")")
    statement.executeUpdate("insert into " + tablename + "(value) values(\"" + value + "\")")
  }

  override def close(errorOrNull: Throwable): Unit = {
    connection.close()
  }

 

jdbc 저장하는 JdbcSink 클래스를 object로 감싼 wrapper, 그래야 매번 DB 연객 인스턴스 생성이 없게 한다.

import org.apache.spark.sql.ForeachWriter
import streaming.KafkaAvroDemo._

object DBSink {

  val writer:ForeachWriter[DeserializedFromKafkaRecord] = new JdbcSink(sinkDBUrl, sinkTable)
  writer.open(0, 0)

}

 

실제 예시 코드이다. 

 

    val spark = SparkSession
      .builder()
      .appName("KafkaSparkStreaming")
      .config("spark.master", "local[2]")
      .getOrCreate()

    import spark.implicits._

    val ssc = new StreamingContext(spark.sparkContext, INTERVAL)

    val dataframe = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaBrokers)
        .option("subscribe", kafkaTopic)
        .option("startingOffsets", "latest")
        .option("maxOffsetsPerTrigger", 20)
        .load()
        .map( x=> {
          val props = new Properties()
          props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistry)
          props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
          val vProps = new kafka.utils.VerifiableProperties(props)
          val decoder = new KafkaAvroDecoder(vProps)
          val avroSchema = new RestService(schemaRegistry).getLatestVersion(kafkaTopic + "-value")
          val messageSchema = new Schema.Parser().parse(avroSchema.getSchema)

          DeserializedFromKafkaRecord(decoder.fromBytes(x.getAs[Array[Byte]]("value"),
            messageSchema).asInstanceOf[GenericData.Record].toString)
        }
        )

    val query = dataframe
      .writeStream
        .foreachBatch((batchDF, batchId) => {

          batchDF.foreachPartition(rows => {
            rows.foreach(row => {
              DBSink.writer.process(row)
              println(row)
            })
          })
        })
      .outputMode("append")
      .start()
    query.awaitTermination()

 

 

 

 

Posted by 김용환 '김용환'

 

<마음에 드는 부분..>

고객센터를 자산으로 여기고 내재화했다.

그래서 이를 통해 고객이 무엇을 원하는지를 잘 파악해서 서비스에 반영했다.

 

 

일반 기업이 서비스가 물건을 사는 고객에게 제공하는 덤이나 공짜 선물 같은 부수적인 것으로 여긴다면, 자포스에게 서비스는 돈을 받고 파는 물건과 같고 브랜드를 알리고 고객의 충성도를 쌓기 위한 투자로 여겨진다.다시 말해 자포스는 '신발'을 파는 것이 아니라 '고객의 감동 체험'을 파는 것이다.

 

https://gsrealdesign.tistory.com/entry/자포스-닷컴-아마존은-왜-최고가에-자포스를-인수했나

 

자포스 닷컴 - 아마존은 왜 최고가에 자포스를 인수했나

www.zappos.com 은2009년 12억 달러(한화로 약 1조3천억 - GS SHOP의 2009년 매출 5천6백억) 매출을 올린 제화에서 시작해 의류, 가방, 액세서리 등으로 품목을 넓혀가고 있는 온라인 쇼핑몰 사이트다. 인터넷으..

gsrealdesign.tistory.com

 

 

Posted by 김용환 '김용환'

이 책은 평범한 개발자를 위한 책으로서, 그리고 좋은 멘토와 같은 책이니 추천 5개짜리이다.

 

나는 첫 회사에서 다닐 때부터 느꼈던 "나는 모자란 사람이다" 라는 생각이 항상 있다.

능력이 부족한 사람이기에 쉽게 가능했고,

블로그도 그냥 내가 검색해서 찾기 위한 로깅으로 사용 중이긴 하지만, 부족하다라는 생각부터 시작한 것이다.

 

누군가에게 도움이나 된다면 참 좋겠다 생각하기도 했지만..

여전히 난 잘 모른다. 많이 물어보고 책을 보며 지금껏 살고 있다.

뛰어난 사람이 되기 보다는 그냥 개발자로서 버티면서 살아가는게 쉽지 않으면서 여기까지 온게 신기하고 하다.

 

'프로그래머의 길, 멘토에게 묻다' 책에서 "가장 뒤떨어진 이가 되라" 라는 내용이 참 와닿았다. 

내가 그렇게 일해왔기 때문일런지 모르겠다. 제대로 모른다고 생각해서 지금까지 살아남았으니..

"일하면서 성찰하기", "부숴도 괜찮은 장난감", "실패에서 배우라", "배운 것을 공유하라", "배운 것을 기록하라"도 내가 평상시에 추구하는 개발 태도이니. 비슷한 것 같기도 하다. 그렇다고 해서 미친듯이 개발을 추구하는 사람도 아니라서 지금까지 일하고 있는 것이 아닐까 싶기도 하다.

 

"지속적인 동기 부여"가 가장 어려운 부분인 것 같다. 왜 내가 개발을 해야 하나? 하는 질문은 꽤나 어렵기도 하다.

이 질문은 재미있어서 라고 답하기 어려운 부분이 있다. 내가 먼가 미친듯이 재미있어서 일하는 게 때로는 조직원의 성장을 파괴(조직원의 조직 이동, 질투..)하기도 하는 것을 느꼈다.

그래서 아마도 좋은 먹거리(좋은 성과)는 누군가에게 주고 나는 정리 안된 뭔가를 해야 하는 것에 동기 부여하려니 조금은 쉽지 않다.

그래도 이게 어쩌면 지속적인 동기 부여가 되기도 한다. 때로는 강하게 커뮤니케이션하는 나 자신에 괴리감에 있기도 하고..

 

신기술, 잘 모르는 프로그래밍 언어, 신선한 아키텍처에 대한 신선함, 부족함, 금전적인 부분이 동기부여가 되기도 한다. 

 

 

이 책이 주는 묘미는 좋은 선배가 얘기해 주는 멘토와 같다.

 

직장을 계속 다니며 다니는 평범한 개발자에게는 필요한 내용이 가득 담겨 있으니 보면 좋을 것 같다.

 

 

 

 

 

Posted by 김용환 '김용환'

helm을 통해 kubernetes jenkins을 설치할 때. 주의해야 할 점이 있다.

 

 

 

jenkins 잡이 실행, 에러 정보가 모두 pods로 쌓인다. (Completed|Error)

 

 

처음에는 대수롭게 생각하지 않았는데. 이게 싸이면 결국 kubernetes 클러스터는 자원 부족으로 문제가 발생한다.

 

그래서 아래와 같은 커맨드로 종료하거나..

 

 

kubectl delete pod $(kubectl get pods | grep -E "Completed|Error" | awk '{print $1}') —force —grace-period=0

 

 

kubernetes job 등록해서.. 삭제하도록 해야 한다.

 

spec:
schedule: "*/30 * * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: kubectl-runner
image: wernight/kubectl
command: ["sh", "-c", "kubectl get jobs | awk '$4 ~ /[2-9]d$/ || $3 ~ 1' | awk '{print $1}' | xargs kubectl delete job"]
restartPolicy: Never

 

 

Posted by 김용환 '김용환'

 

 

https://dzone.com/articles/java-garbage-collection-3

 

자바 11 : Epsilon and the Z garbage collector (ZGC).
자바 12 : Shenandoah Garbage Collector (edited) 

큰 메모리를 기반의 자바 애플리케이션에 ZGC를 사용하면 g1 gc 보다 좋다고 한다. 

 

Posted by 김용환 '김용환'

 

https://dzone.com/articles/build-trust-with-scrum

 

스크럼에 신뢰가 중요하다라는 내용의 글이다.

 

신뢰 부족하면 관계가 깨지고 투명성이 사라진다. 사람들은 방어하기 시작한다. 아무도 팀으로 일하거나 목표를 위해 협업하기를 원하지 않는다.



“누군가가 당신을 신뢰하기를 원한다면 먼저 신뢰해야 한다.“라고 말하거 누군가를 신뢰하기 위해 먼저 용감해야 한다고 말할 수 있다.
프로젝트 오너, 개발자들이 서로 신뢰해야 프로젝트는 성공해야 한다는 아주 기본적인 내용이다.

 

개발 뿐 일까. 모은 일이 깨지는 것은 다 신뢰 문제였단 것 같다. 

 

 

Posted by 김용환 '김용환'

scala에서 커맨드 매개 변수를 처리하려고 보니..

scopt는 너무 복잡한 것 같다.

 

아래와 같이 단순하게 커맨드 라인 매개 변수 처리는 간단한 것이 좋은 것 같다. 

예제는 다음과 같다.

 

object App {
  def main(args: Array[String]): Unit = {

    def nextOption(map : Map[Symbol, Any], list: List[String]) : Map[Symbol, Any] = {
      list match {
        case Nil => map
        case "-broker" :: value :: tail =>
          nextOption(map ++ Map('broker -> value.toString), tail)
        case "-topic" :: value :: tail =>
          nextOption(map ++ Map('topic -> value.toString), tail)
        case "-schema_registry" :: value :: tail =>
          nextOption(map ++ Map('schema_registry -> value.toString), tail)
        case "-sink_jdbc_url" :: value :: tail =>
          nextOption(map ++ Map('sink_jdbc_url -> value.toString), tail)
        case "-sink_table" :: value :: tail =>
          nextOption(map ++ Map('sink_table -> value.toString), tail)
        case "-group_id" :: value :: tail =>
          nextOption(map ++ Map('group_id -> value.toString), tail)
        case option :: tail => println("Unknown option "+option)
          sys.exit(1)
      }
    }
    val options = nextOption(Map(), args.toList)
    println(options)

    val kafkaBroker = options.get('broker)
    val kafkaTopic = options.get('topic)
    val schemaRegistry = options.get('schema_registry)
    val sinkJdbcUrl = options.get('sink_jdbc_url)
    val sinkTable = options.get('sink_table)
    val consumerGroupId = options.get('group_id)

    println(s"$kafkaBroker")
    println(s"$kafkaTopic")
    println(s"$schemaRegistry")
    println(s"$sinkJdbcUrl")
    println(s"$sinkTable")
    println(s"$consumerGroupId")

  }
}

 

결과

 

Map('group_id -> mygroup, 'broker -> localhost:9092, 'sink_table -> user_sink, 'topic -> user, 'schema_registry -> localhost:8081, 'sink_jdbc_url -> jdbc_url)
Some(localhost:9092)
Some(user)
Some(localhost:8081)
Some(jdbc_url)
Some(user_sink)
Some(mygroup)

Posted by 김용환 '김용환'

예시는 다음과 같다. ruby와 python과 비슷하다. 

 

주의할 점은 toString으로 변환할 때이다. 

 

val symbol1 = 'Symbol
val symbol2 = 'Symbol1
println(symbol1 eq symbol2) // false

val symbol3 = Symbol("symbol")
val symbol4 = Symbol("symbol")
println(symbol3 eq symbol4) // true

symbol3 match {
  case Symbol("symbol") => println("ok") // ok
  case _ => println("xx")
}

println('Symbol.toString == symbol1.toString) // true
println('Symbol.toString eq symbol1.toString) // false

자바 때문에 깜박할 수 있는데.

스칼라의 ==는 자바의 equals이다 (값 비교), 따로서 당연히 true

스칼라의 eq는 자바의 주소 값 비교이다. 따라서 당연히 false.

 

 

 

보통 scala에서는 java String을 intern한 값이기에 ==, eq 모두 같은 값이다.

 

val a = "a"
val b = "a"

println(a == b) // true
println(a eq b) // true
Posted by 김용환 '김용환'

 

http://www.kyobobook.co.kr/product/detailViewKor.laf?ejkGb=KOR&mallGb=KOR&barcode=9791160073492&orderClick=LAG&Kc=

 

플랫폼의 생각법

구글, 페이스북, 아마존 등 공급자와 소비자라는 두 개...

www.kyobobook.co.kr

 

이 책은 대학생들과 IT 직장인이라면 보면 좋은 책 같다.

 

내게 감동이 있었던 것은 카카오 모빌리티를 통해서 택시 기사의 소득이 올라갔다는 점이다. 단순히 콜 호출 받기만, 조금 더 편해진 줄 알았는데, 수익까지 올라갔다는 것은 뺏는 시장이 아니라 서로 윈윈하는 시장이 되었다는 점이다.

 

우버나 리프트가 기존 택시 기사의 소득이 들어들게 하고 공급자가 저가 소득자가 된다면 공유 플랫폼이 정상적인지 다시 한번 생각하게 되는 좋은 책이었다. 그 전에는 플랫폼 사업자의 성공만 봤다면, 이 책을 통해 사업적 플랫폼 기업에 대해 다시 생각할 수 있게 되어서 좋았다

Posted by 김용환 '김용환'