카프카 커넥트는 자체적인 메타 데이터 topic이 있다.

https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L99
https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L208

버전 문제, 인증 문제, 타임 아웃 문제, 연결 문제등을 다루는데.. 이게 있어야 kafka connect가 동작되고 debezium이 동작된다.

 

 

debezium은 기본적으로 자체 replica 설정이 없다.
kafka broker 의 “default.replication.factor”를 읽고 저 정보가 없으면 1을 설정한다.
따라서 HA 기본 되는 테스트는 replication 설정을 제대로 받고 시작해야 되야 한다.
https://github.com/debezium/debezium/blob/v0.9.3.Final/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java

 

debezium/debezium

Change data capture for a variety of databases. https://debezium.io Please log issues in our JIRA at https://issues.jboss.org/projects/DBZ/issues - debezium/debezium

github.com

 


https://github.com/debezium/debezium/blob/v0.9.3.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java 에서는 debezium 메타 데이터를 나타난다.

 

 

ebezium의 history db topic은 io.debezium.relational.histor 패키지에서 담당한다.
기본은 kafka topic이지만 파일, 메모리(ReadWriteLock)로도 저장할 수 있다. 데이터베이스, 테이블에 대한 모든 DDL을 저장한다. 따라서 topic이 database, table 로 나눠서 생성된다.  (우리가 앞으로 설정할 때 카프카 기본 설정이 매우 중요할 수 밖에 없다.. kafka broker 의 “default.replication.factor”를 읽고 저 정보가 없으면 1을 설정하기에..)


파티션은 1개짜리, Consumer와 Producer로 구성되어 있다.


데이터베이스, 테이블에 대한 history를 producer가 topic에 저장(storeRecords)하고,
저 topic을 읽으면(recoverRecords) DDL 데이터를 읽을 수 있다.

sink-spark-streaming을 개발하면서 실수한 부분은 history topic을 읽지 않고 data topic만 읽었다는 부분이다. 
제대로 개발하려면 debezium lib만 import해서 함께 사용한다면 완벽한 Database Replication DB를 만들 수 있다. 

 

debezium 기본값

DEFAULT_MAX_QUEUE_SIZE(max.queue.size) = 8192;
DEFAULT_MAX_BATCH_SIZE(max.batch.size)= 2048;
DEFAULT_POLL_INTERVAL_MILLIS(poll.interval.ms)= 500;
snapshot.delay.ms=0

 

 

debezium에서 snapshot 모드로 수억개의 테이블을 읽을 때, 언제마다 flush를 할까?

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java#L545

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java#L251

https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/function/BufferedBlockingConsumer.java

https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java#L436


(코드 참 간결하고 예쁘다…)
=> select * from 테이블' 를 수행하고 row 데이터를 자바 큐에 저장하는데,  큐는 내부적으로 루프를 돌며 2048 (max.batch.size)만큼 읽은 후 kafka에 produce한다.

 

(kafka connect) org.apache.kafka.connect.runtime.WorkerSourceTask.execute   -->
(debezium) 설정의 Connector인 io.debezium.connector.mysql.MySqlConnector를 찾는다.
taskClass()를 호출해 MySqlConnectorTask를 실행한다. 기본 디폴트로 MySqlConnectorTask의 start즉,
      io.debezium.connector.common.BaseSourceTask.start()이 실행된다.

처음에 bin 로그를 읽는 작업은 MySqlConnectorTask에 담겨 있다….

 

 

 

Posted by '김용환'
,