debezium에서 사용하는 bin 로그 라이브러리는 shyiko이고 NIFI에서도 사용되는 검증된 라이브러리이다.

https://github.com/shyiko/mysql-binlog-connector-java

 

 

실제 Debezium에서 shyiko를 통해 bin 로그를 읽어오는 코드이다. 

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java

Posted by '김용환'
,

https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector

 

Kafka Connect Deep Dive – JDBC Source Connector | Confluent

The JDBC source connector for Kafka Connect enables you to pull data (source) from a database into Apache Kafka®, and to push data (sink) from a Kafka topic to a database. Almost all relational databases provide a JDBC driver, including Oracle, Microsoft S

www.confluent.io

 

카프카 커넥트에 대한 설명이 있다. 

 

재미있는 부분은 아래와 같다.

증분 데이터를 DB에서 직접 읽을 수 있는 방법을 설명한다. 실제로 잘 동작하니 참고하면 좋다.

    • MySQL

      CREATE TABLE foo ( … UPDATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );
    • Postgres

      CREATE TABLE foo ( … UPDATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Courtesy of https://techblog.covermymeds.com/databases/on-update-timestamps-mysql-vs-postgres/ CREATE FUNCTION update_updated_at_column() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN NEW.update_ts = NOW(); RETURN NEW; END; $$; CREATE TRIGGER t1_updated_at_modtime BEFORE UPDATE ON foo FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();
    • Oracle

      CREATE TABLE foo ( … CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP , ); CREATE OR REPLACE TRIGGER TRG_foo_UPD BEFORE INSERT OR UPDATE ON foo REFERENCING NEW AS NEW_ROW FOR EACH ROW BEGIN SELECT SYSDATE INTO :NEW_ROW.UPDATE_TS FROM DUAL; END; /
  •  

 

 

 

 

Posted by '김용환'
,

postgress--> debezium -> confluent platform 연동 사례
https://www.linkedin.com/pulse/change-data-capture-postgresql-via-debezium-part-1-paolo-scarpino/

불러오는 중입니다...

 

상용에 적용한 경우는 아니었다고 한다.

그러나 상용에 적용할 만 하다!!!

 

 

Posted by '김용환'
,

schema registry는 clustering 지원/mulit-idc 지원합니다.
https://docs.confluent.io/current/schema-registry/docs/multidc.html

 

https://docs.confluent.io/current/schema-registry/docs/multidc.html

 

docs.confluent.io

 

그리고 zk, kafka 만 있으면 된다. schema registry 간의 통신은 없다 .

Posted by '김용환'
,

kafka -avro 연동과 관련된 좋은 자료이다. 


https://blog.cloudera.com/blog/2018/07/robust-message-serialization-in-apache-kafka-using-apache-avro-part-1/
https://blog.cloudera.com/blog/2018/07/robust-message-serialization-in-apache-kafka-using-apache-avro-part-2/
https://blog.cloudera.com/blog/2018/08/robust-message-serialization-in-apache-kafka-using-apache-avro-part-3/ 

Posted by '김용환'
,

http://blog.christianposta.com/microservices/the-hardest-part-about-microservices-data/


https://www.slideshare.net/ceposta/the-hardest-part-of-microservices-your-data

 

https://youtu.be/MrV0DqTqpFU

 

 

-> consistenty에 대한 얘기를 나누며

debezium을 설명한다. 

Posted by '김용환'
,

https://wecode.wepay.com/posts/streaming-databases-in-realtime-with-mysql-debezium-kafka

 

Streaming databases in realtime with MySQL, Debezium, and Kafka

Principal Software Engineer Change data capture has been around for a while, but some recent developments in technology have given it new life. Notably, using Kafka as a backbone to stream your database data in realtime has become increasingly common. If y

wecode.wepay.com

 

발표 자료와 슬라이드

https://www.confluent.io/kafka-summit-sf17/database-streaming-at-wepay-with-kafka-debezium

불러오는 중입니다...

 

Posted by '김용환'
,

https://debezium.io/blog/2019/02/05/debezium-0-9-0-final-released/

(구글 번역기를 돌리고 중요한 부분만 발췌 및 정리)

직접 debezium을 사용해보고 테스트 한 후, 이 부분을 안 읽고 넘어가는게 좋은 것 같다. 

 

*Debezium이란 무엇인가?*
Debezium은 데이터베이스에서 로우 레벨의 변경 사항을 캡처하여 애플리케이션이 변경 내용을보고 응답 할 수 있게 하는 분산 서비스 집합입니다. Debezium은 각 데이터베이스 테이블에 커밋 된 모든 로우 레벨 변경 내용을 트랜잭션 로그에 기록합니다. 각 애플리케이션은 관심있는 트랜잭션 로그를 읽기 만하면 모든 이벤트가 발생한 순서와 동일한 순서로 표시된다.

 

*"Debezium"이라는 이름은 어디에서 왔는가?*
이름은 여러 데이터베이스의 약어와 같은 " DB "와 주기율표의 많은 요소 이름에 사용되는 " -ium "접미사 의 조합이다.

 


*변경 데이터 캡처(CDC)란 무엇입니까?*
변경 데이터 캡처는 다른 소프트웨어가 데이터 변경 사항에 응답할 수 있도록 데이터의 변경 사항을 모니터링하고 캡처하는 용어이다.  Debezium은 기본적으로 다양한 데이터베이스 시스템 모니터링을 지원 하는 현대적이고 분산 오픈 소스 변경 데이터 캡처 플랫폼이다.

 


*Debezium이 모니터링 할 수있는 데이터베이스는 무엇입니까?*
MySQL 데이터베이스 서버 , MongoDB 복제 세트 또는 샤드 클러스터 , PostgreSQL 서버, Oracle Server (XStream 기반) 용 Debezium 커넥터 와 Debezium 0.8 / 0.9부터의 미리보기 버전으로 출시된 SQL Server 데이터베이스가 있다.

 


*Debezium의 용도는?*
Debezium의 주요 용도는 데이터베이스의 데이터가 변경 될 때마다 애플리케이션이 거의 즉시 응답 할 수 있게 하는 것이다. 애플리케이션은 추가, 업데이트, 삭제 이벤트로 무엇이든 할 수 있다. 이벤트를 사용해 캐시에서 데이터를 제거 시기를 알 수 있다. 또한 데이터로 검색 색인을 업데이트할 수 있다. 하나 이상의 모바일 장치에 푸시 알림을 보낼 수 있다. 

 


*Debezium이 분산 시스템인 이유는 무엇인가?*
Debezium은 결함과 실패를 허용하도록 설계되었고 수행하는 유일한 방법은 분산 시스템을 사용하는 것이다. Debezium은 모니터링 프로세스 또는 커넥터를 여러 시스템에 분산시킬 수 있고 문제가 발생하면 커넥터를 다시 시작할 수 있다.

 

*애플리케이션이 단일 데이터베이스를 직접 모니터링 할 수 있는가?*
예. Debezium 커넥터를 내장할 수 있다.

 


*Debezium 플랫폼 구성은?
Debezium 시스템은 여러 부분으로 구성되어 있다. Apache Kafka 브로커 클러스터는 Debezium이 모든 이벤트를 기록하고 모든 이벤트는 모든 이벤트를 사용하는 영구, 복제, 파티션된 트랜잭션 로그를 제공한다. 

각 Debezium 커넥터는 하나의 데이터베이스 클러스터 및 서버를 모니터링한다. 

모든 커넥터는 이벤트와 다른 정보를 주키퍼가 아닌 Apache Kafka에 저장한다. Apache Kafka는 각 테이블의 이벤트를 별도 토픽(topic) 유지, 복제, 분할한다. 

 

*Debezium은 원본 데이터베이스에 어떤 영향을 주는가?*
Debezium이 모니터링하기 전에 데이터베이스를 설정할 작업이 있다. 예를 들어, MySQL 서버는 로우 레벨의 binlog를 사용하고 binlog를 읽는 권한을 가진 사용자를 하나 생성해 설정해야 한다. Debezium 커넥터는 권한 있는 사용자를 포함하여 올바른 정보로 구성되야 한다.

Debezium 커넥터는 원본 데이터베이스 내부에 정보를 저장하지 않는다. 그러나 커넥터를 실행하면 원본 데이터베이스에 추가 부하가 발생할 수 있다.

 


*데이터베이스의 이벤트는 어떻게 구성해야 하는가?*
대부분의 커넥터는 단일 데이터베이스 테이블에 대한 모든 이벤트를 단일 토픽에 저장한다. 또한 토픽 내의 모든 이벤트는 완전히 순서화되어 모든 이벤트의 순서를 유지한다. (에러가 발생해 이벤트가 복제되더라도 모든 이벤트를 적용한 후 최종 결과는 그대로 유지된다)


*순서


Kafka Connect 서비스는 커넥터의 이벤트를 직렬화하여 Kafka에 저장한다. JSON 변환기는 매우 일반적이며 매우 간단하지만 전체 이벤트 정보를 직렬화 할 수밖에 없다. 따라서 JSON에 표시된 이벤트는 실제로 길고 크다.

Confluent의 Avro Converter 는 두 가지 면에서 뛰어나다. 첫째, 커넥터의 스키마를 Apache Avro 스키마로 변환하므로 페이로드를 매우 컴팩트한 바이너리 포맷으로 직렬화할 수 있다. 둘째, 연속적으로 많은 이벤트가 동일한 스키마(Avro 스키마)를 사용하고 별도의 스키마 레지스트리에 이러한 Avro 스키마를 등록함으로써 각 직렬화된 이벤트에 스키마 버전의 작은 식별자를 배치 할 수 있다. Avro Converter와 Schema Registry는 함께 작동하여 각 스키마의 히스토리를 추적할 수 있다.

한편, 동일한 Avro Converter는 컴팩트 이진 포맷 이벤트를 디코딩하고, 해당 메시지에서 사용하는 스키마 버전의 식별자를 읽습니다. 스키마 버전이 Schema Registry에서 Avro 스키마를 다운로드하는 것을 아직 보지 못한 경우, 마지막으로 Avro 스키마를 사용하여 이벤트의 바이너리 페이로드를 디코딩한다. 반복적으로 많은 이벤트가 동일한 스키마(Avro 스키마 버전)를 공유하므로 변환기는 원시 압축 이벤트를 소비자가 예상하는 동일한 스키마와 페이로드로 간단하게 디코딩할 수 있다.

 


*Confluent의 Avro Converter는 어떻게 사용합니까?*
Confluent의 Avro Converter 와 Debezium을 함께 사용할 수 있다. Avro Converter는 훨씬 똑똑하고 기본적으로 사용되는 JSON 변환보다 훨씬 더 콤팩트 이벤트 메시지를 직렬화한다.

Debezium 커넥터를 Kafka Connect 작업자 서비스에 배포하는 경우 Avro 변환기 JAR을 사용할 수 있는지 확인하고 Avro 변환기를 사용하도록 작업자 서비스를 구성한다. 예를 들어 변환기를 Confluent Schema Registry로 지정해야 한다. 그다음 Debezium 커넥터 (또는 실제로 다른 Kafka Connect 커넥터)를 작업자 서비스에 배치하기만 하면 된다. 

 



*Debezium이 중지되거나 충돌하면 어떻게 되는가?*


데이터베이스의 변경 이벤트를 사용하기 위해 애플리케이션은 Kafka 브로커에 연결하고 데이터베이스와 관련된 항목에 대한 모든 이벤트를 사용하는 Kafka 소비자를 생성한다. 소비자는 주기적으로 각 토픽의 위치(일명 오프셋)를 저장하도록 설정된다. 애플리케이션이 정상적으로 종료되고 소비자를 종료하면 소비자는 각 항목의 마지막 이벤트에 대한 오프셋을 저장한다. 나중에 애플리케이션이 재시작되면 소비자는 해당 오프셋을 검색하여 각 항목의 바로 다음 이벤트를 읽는다. 따라서 정상적인 운영 시나리오에서 애플리케이션은 모든 이벤트를 정확히 한 번(at once) 본다.

애플리케이션이 예기치 않게 중단되면 재시작할 때 애플리케이션의 사용자는 각 항목에 대해 마지막에 저장된 오프셋을 조회하고 각 항목의 마지막 오프셋에서 이벤트를 시작한다. 대부분의 경우 애플리케이션은 종료 이전에 본 것과 동일한 이벤트를 보고(그러나 오프셋을 저장한 후) 아직 보지 못한 이벤트가 뒤따른다. 따라서 애플리케이션은 모든 이벤트를 최소한 한 번 봅니다. 애플리케이션은 오프셋를 더 자주 저장한다면 클라이언트의 성능과 처리량에 부정적인 영향을 미친다.

Kafka 소비자는 각 토픽에서 가장 최근의 오프셋으로 연결하고 읽기를 시작하도록 구성할 수 있다. 이로 인해 이벤트가 누락될 수 있다. 

 


*Debezium이 멈추거나 종료될 때 어떻게 해야 하는가?


Debezium의 동작은 중단된 컴포넌트에 따라 다르다. Kafka 브로커 중 상당수가 정지하거나 종료된다면 각 항목 파티션이 최소 동기화 개수가 복제본 개수보다 적을 경우 커넥터와 애플리케이션이 차단된다. 카프카 브로커를 재시작하거나 새로운 브로커를 새로 얻어야 동작한다. 따라서 동기화된 복제본의 최소 수는 가용성에 매우 큰 영향을 미치며 일관성을 유지하기 위해서는 항상 1이상(3이 아닌 경우)이어야 한다.

Kafka Connect 서비스는 각 커넥터의 위치와 오프셋을 주기적으로 저장하도록 설정된다. 클러스터에 있는 Kafka Connect 서비스 인스턴스 중 하나가 정상적으로 중지되면 해당 프로세스에서 실행중인 모든 커넥터가 정상적으로 중지되며(즉, 모든 위치와 오프셋이 저장된다) 해당 커넥터가 다른 Kafka Connect 서비스 인스턴스에서 다시 시작된다. 커넥터가 재시작되면 중복 이벤트가 기록되지 않고 중단된 부분부터 정확하게 이벤트를 계속 저장한다.

Kafka Connect 서비스 클러스터에서 실행 중인 커넥터 중 하나가 정상적으로 중지되면 현재 작업을 완료하고 Kafka에서 최신 위치 및 오프셋을 저장한다. 다운 스트림 애플리케이션은 새로운 이벤트가 추가될 때까지 기다릴 것이다.

클러스터의 Kafka Connect 서비스 인스턴스 중 예기치 않게 충돌이 발생 하면 충돌한 프로세스에서 실행중인 모든 커넥터가 동일한 클러스터의 다른 Kafka Connect 서비스 인스턴스에서 다시 시작된다. 그러나 이러한 커넥터가 재시작되면 충돌이 발생하기 전에 커넥터에서 마지막으로 기록한 오프셋에서 시작하여 데이터베이스의 이벤트를 기록하기 시작한다 . 이는 새로 재시작 한 커넥터가 충돌 이전에 이전에 기록한 것과 동일한 이벤트를 기록 할 수 있음을 의미하며 이러한 중복은 항상 다운 스트림을 스트리밍하는 애플리케이션에서 볼 수 있다.

 


*모니터링되는 데이터베이스가 중지되거나 종료되면 어떻게 될까?*
Debezium이 모니터하는 데이터베이스가 중지되거나 종료하면 Debezium 커넥터가 통신을 다시 설정하려 한다. Debezium은 정기적으로 커넥터의 위치와 오프셋을 Kafka에 저장하기에 커넥터가 통신을 설정하면 커넥터는 마지막으로 저장된 위치와 오프셋에서 계속해서 읽어야 한다.

 


*애플리케이션에서 소비하려 할 때 중복 이벤트가 발생하는 이유는 무엇인가?*
모든 시스템이 명목상으로 실행 중이거나 일부 또는 모든 시스템이 정상적으로 종료되면 애플리케이션에서 모든 이벤트를 정확히 한 번 볼 수 있다 . 그러나  잘못되면 한 번 이상 이벤트를 볼 수 있다 .

Debezium의 시스템이 충돌하면 항상 마지막 위치/오프셋을 저장할 수있는 것은 아니다. 다시 시작되면 마지막으로 있었던 것으로 시작하여 복구되고 소비하는 애플리케이션은 항상 모든 이벤트를 볼 수 있지만 복구 중 중복 된 메시지가 나타날 수 있다.

또한 네트워크 오류로 Debezium 커넥터가 쓰기 확인을받지 못하게되어 동일한 이벤트가 한 번 이상 저장된다.


*카프카 란 무엇인가?*
Apache Kafka 는 모든 메시지를 복제, 파티션 분할, 완전히 정렬 된 트랜잭션 로그에 저장하는 빠르고 확장 가능하고 내구성이 뛰어난 분산 메시징 시스템이다. 소비자는 로그에서 소비자의 오프셋를 ​​추적하고 다른 모든 소비자들과 마찬가지로 이 오프셋을 제어 할 수 있다. 즉, 특정 소비자는 로그의 처음부터 시작할 수 있고 다른 사용자는 가장 최근에 저장된 메시지를 따라 잡을 수 있는다. 카프카는 동적 브로커 집단으로 운영된다. 각 로그 파티션은 여러 브로커에 복제되므로 모든 브로커가 실패하더라도 클러스터에 여전히 여러 복사본이 존재한다.

Debezium 커넥터는 모든 이벤트를 Kafka 클러스터에 기록하고 애플리케이션은 Kafka를 통해 이벤트를 사용한다.

*카프카 커넥트 란 무엇입니까?*
Kafka Connect는 Apache Kafka와 다른 시스템간에 데이터를 확장 가능하고 안정적으로 스트리밍하기 위한 프레임워크이다. Kafka 커뮤니티에 최근에 추가된 기능으로, 대량의 데이터를 Kafka 안팎으로 이동시키는 커넥터를 간단하게 정의할 수 있고 프레임 워크는 커넥터의 오프셋을 올바르게 저장하는 데 많은 노력을 기울인다. Kafka Connect 서비스에는 커넥터 관리 및 배포를위한 RESTful API가 있습니다. 서비스는 클러스터 될 수 있으며 커넥터가 항상 실행 중인지 확인하여 클러스터에 커넥터를 자동으로 분배한다.

Debezium은 Kafka Connect 프레임 워크를 사용한다. Debezium의 모든 커넥터는 Kafka 커넥터 소스 커넥터 이며, Kafka Connect 서비스를 사용하여 배치 및 관리 할 수 ​​있다.



출처 : https://debezium.io/docs/faq/

Posted by '김용환'
,

트위터는 2013년부터

루비온 레일즈의 모노리식 구조에서 SOA(service oriented architecture)로 점진적인 변화를 주고 있다.

현재는 99.9%로 레일즈를 걷어냈고 스칼라(finagle, finatra)를 사용해 개발하고 있다.

 

 

https://blog.twitter.com/engineering/en_us/a/2013/new-tweets-per-second-record-and-how.html

 

New Tweets per second record, and how!

New Tweets per second record, and how!

blog.twitter.com

https://blog.twitter.com/engineering/en_us/a/2011/finagle-a-protocol-agnostic-rpc-system.html

 

Finagle: A Protocol-Agnostic RPC System

Finagle: A Protocol-Agnostic RPC System

blog.twitter.com

 

 

thrift idl은 다음과 같다. scrooge를 사용해서 약간 특이하게 되어 있다. 

namespace java idl.thrift
#@namespace scala idl.thrift


service HelloService {
  string hi();
}

finangle server는 간단하게 아래와 같이 구현할 수 있다.

 

package example

import com.twitter.finagle.Thrift
import com.twitter.util.{Await, Future}
import idl.thrift.HelloService

object Server {

  class HelloImpl extends HelloService.MethodPerEndpoint {
    def hi(): Future[String] = Future.value("Hello World")
  }

  val server = Thrift.server.serveIface("localhost:8080", new HelloImpl)

  def main(args: Array[String]): Unit = {
    println("Starting thrift server(8080)...")
    Await.result(server)
  }
}

 

 

클라이언트는 다음과 같이 사용할 수 있다. 

package client

import com.twitter.finagle.Thrift
import com.twitter.util.Await
import idl.thrift.HelloService

object ScalaEchoClient {
  def main(args: Array[String]): Unit = {
    println("Starting Scala client...")

    val methodPerEndpoint: HelloService.MethodPerEndpoint =
      Thrift.client.build[HelloService.MethodPerEndpoint]("localhost:8080")

    val response = methodPerEndpoint.hi()
    response onSuccess {
      result: String => println(result)
    }

    Await.result(response)
  }

 

Posted by '김용환'
,


nginx에 json 로그가 없을 것이라 생각했는데.. 찾아보니 json을 제공한다. 
http://nginx.org/en/docs/http/ngx_http_log_module.html#log_format

그냥 지원하지 않을 것 같은 느낌인데.. 사실은 이미 지원하고 있다. 


Syntax:    log_format name [escape=default|json|none] string ...;
Default:    
log_format combined “...“;
Context:    http

 


log_format json_combined escape=json
 ‘{’
   ‘“time_local”:“$time_local”,’
   ‘“remote_addr”:“$remote_addr”,’
   ‘“remote_user”:“$remote_user”,’
   ‘“request”:“$request”,’
   ‘“status”: “$status”,’
   ‘“body_bytes_sent”:“$body_bytes_sent”,’
   ‘“request_time”:“$request_time”,’
   ‘“http_referrer”:“$http_referer”,’
   ‘“http_user_agent”:“$http_user_agent”’
 ‘}’;

 

 

access_log /var/log/nginx/access.log json_combined;

 


특히 기본 모듈에서 제공하는 기능이라 따로 모듈 추가를 하지 않아도 된다. 

 

 




Posted by '김용환'
,