테스트 해보니 debezium의 history topic에 정의된 내용은 다음과 같다.


/usr/local/confluent-5.1.2/bin/kafka-console-consumer  --bootstrap-server kafka-test01.google.io:9092,kafka-test02.google.io:9092,kafka-test03.google.io:9092 --from-beginning --topic schema_changes_user
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “ddl” : “SET character_set_server=utf8mb4, collation_server=utf8mb4_general_ci;”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “ddl” : “DROP TABLE IF EXISTS `test`.`user`”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “databaseName” : “test”,
 “ddl” : “DROP DATABASE IF EXISTS `test`”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “databaseName” : “test”,
 “ddl” : “CREATE DATABASE `test`”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “databaseName” : “test”,
 “ddl” : “USE `test`”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “databaseName” : “test”,
 “ddl” : “CREATE TABLE `user` (\n  `id` bigint(20) NOT NULL AUTO_INCREMENT,\n  `username` varchar(1000) DEFAULT NULL,\n  `vin` varchar(1000) DEFAULT NULL,\n  `address` text,\n  PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=626927 DEFAULT CHARSET=utf8mb4"
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “ts_sec” : 1554884849,
   “file” : “mysql-bin.000022”,
   “pos” : 33223606,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1645437,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745",
   “server_id” : 1944318
 },
 “databaseName” : “”,
 “ddl” : “CREATE USER ‘sj’@‘%’ IDENTIFIED WITH ‘mysql_native_password’ AS ‘*B1F9ACC9F58F4DA857A97AC2BA02FE1A51A82F32’”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “ts_sec” : 1554884857,
   “file” : “mysql-bin.000022",
   “pos” : 33242552,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1645480,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “server_id” : 1944318
 },
 “databaseName” : “”,
 “ddl” : “GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, INDEX, ALTER, CREATE TEMPORARY TABLES, EXECUTE ON *.* TO ‘sj’@‘%’”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “ts_sec” : 1554884990,
   “file” : “mysql-bin.000022”,
   “pos” : 33577581,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1646234,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745",
   “server_id” : 1944318
 },
 “databaseName” : “”,
 “ddl” : “GRANT REPLICATION SLAVE ON *.* TO ‘sj’@‘%’”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “ts_sec” : 1554885199,
   “file” : “mysql-bin.000022",
   “pos” : 34099376,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1647408,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “server_id” : 1944318
 },
“databaseName” : “”,
 “ddl” : “GRANT REPLICATION SLAVE ON *.* TO ‘test_ddl’@‘%’”
}

Posted by '김용환'
,


debezium-kafka 테스트하면서 발견한 장애 처리 내용이다.


1. 특정 토픽에 카프카들이 동기화를 못할 때가 있다.
  -> 카프카 올릴때 컨플루언트에 에러 공유를 위해 기본으로 생성되는 토픽들이 있다.


2. 해당 토픽을 수동으로 지우니깐 주키퍼가 오동작하며, 카프카가 제대로 동작이 되지 않는다.


3. 카프카 주키퍼 다 내리고 모든 토픽 다 지운 후, 다시 데몬을 올리면 정상적으로 올라간다.


그러나 기존 데이터들은 모두 날아감.. 따라서 debezium에서 저장하는 토픽은 모두 삭제되어 다시 listener에 추가한다.

Posted by '김용환'
,

 

다음 커맨드를 실행하면 mysql binlog를 볼 수 있다.

show binlog events in ‘바이너리로그파일명’ from 포지션번호 limit 10

 


{“id”:631028}    {“before”:{“test.test.user.Value”:{“id”:631028,“username”:{“string”:“Random-8d04a798-89df-4260-87f7-8f91d6da458e”},“vin”:{“string”:“c485b8d4-f5d4-459f-b686-52b6a5f8f343"},“address”:null}},“after”:null,“source”:{“version”:{“string”:“0.9.2.Final”},“connector”:{“string”:“mysql”},“name”:“test”,“server_id”:1944318,“ts_sec”:1554885981,“gtid”:{“string”:“9a227629-491c-11e9-ae9d-fa163ea8d9a7:1651774"},“file”:“mysql-bin.000001",“pos”:36040188,“row”:0,“snapshot”:{“boolean”:false},“thread”:{“long”:1616},“db”:{“string”:“test”},“table”:{“string”:“user”},“query”:null},“op”:“d”,“ts_ms”:{“long”:1554885981898}}


file : 바이너리 로그 파일명,  pos에는 포지션 번호이다.

 

Posted by '김용환'
,

debezium에서 mysql long 타입 때문에 precison 이슈가 생길 수 있다. 

debezium은 기본적으로 precise로서 base64 인코딩을 진행하는데, 

경험상 string 처리가 잘 되는 것 같다.

 

세부 내용

decimal.handling.mode의 기본 모드는 precise로서 Bas64인코딩 정보를 kafka에 전달한다.
string으로 변경하면 문자로 제대로 잘 저장하기에 정밀도 이슈, BigDecimal 이슈를 모두 해결한다.


decimal.handling.mode”: “string”,

 

Posted by '김용환'
,

debezium + schema registry 테스트를 하는 중에

The retention policy of the schema topic is incorrect 라는 에러가 갑자기 발생했다. 

이유는 정확치 않았지만. kafka replication이 제대로 동작안되면 문제가 발생할 수 있다. 


The retention policy of the schema topic " + topic + " is incorrect. "
                + "You must configure the topic to 'compact' cleanup policy to avoid Kafka "
                + "deleting your schemas after a week. "
                + "Refer to Kafka documentation for more details on cleanup policies"

https://github.com/confluentinc/schema-registry/blob/master/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java#L262 코드를 보면. 아래 조건에 해당되면 에러를 발생한다


if (retentionPolicy == null || !TopicConfig.CLEANUP_POLICY_COMPACT.equals(retentionPolicy)) {
..

}



topic마다 config를 확인해봐야 한다. 
https://docs.confluent.io/current/installation/configuration/topic-configs.html

 

 

Posted by '김용환'
,

schema registry 설정에 zk에 저장하는 kafkasotre 필드의 정보를 변경하니 두 schema registry 서버가 잘 동작한다.


# The name of the topic to store schemas in
kafkastore.topic=_schemas1 (edited) 

 

schema registry끼리는 서로 통신하지 않고 kafka만 바라본다.

Posted by '김용환'
,

debezium은 before/after 데이터 변경에 대해  row의 변화에 대한 정보를 json으로 출력한다. 

 

gtid가 두가지 정보가 온다. : 이후에 sequence number가 오는데 debezium이 주는걸까? mysql일까?
“gtid”:{“string”:“9a227629-491c-11e9-ae9d-fa163ea8d9a7:128519”}
“gtid”:{“string”:“9a227629-491c-11e9-ae9d-fa163ea8d9a7:128520"}



(내생각) 이 정보를 배경으로 (아마도 kafka) at once를 구현하는 것처럼 보인다..

로그를 보면. 그런식으로 먼가 계산하는 것처럼 보인다.
INFO Connected to MySQL binlog at cdc-test.mydb.daumkakao.io:3306, starting at GTIDs 9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-129873 and binlog file ‘mysql-bin.000015’, pos=60032311, skipping 0 events plus 1 rows (io.debezium.connector.mysql.BinlogReader:970) (edited) 

 


Debezium 로그를 보면, shyiko 바이너리 클라이언트는 어디까지 읽었는지.. 알린다.

 


 com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to cdc-test.mydb.daumkakao.io:3306 at 9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-129869 (sid:18405, cid:628)
INFO Connected to MySQL binlog at cdc-test.mydb.daumkakao.io:3306, starting at GTIDs 9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-129869 and binlog file ‘mysql-

bin.000015’, pos=60030849, skipping 0 events plus 1 rows (io.debezium.connector.mysql.BinlogReader:970)

 

그리고  jpa와 같은 delete 쿼리에 transaction이 같이 날아다는 데. 이 때문에 null이 출력한다.(사실은 debezium이 처리하고 싶지 않은 데이터는 모두 null로 출력된다, debezium은 insert, update, delete 정보를 출력한다.

debezimum 소스를 보면, delete event일 때 두 메시지를 보낸다. 한다. null이 바로 tombstone message이다.

{“id”:49615}  {“before…“op”:“d”,“ts_ms”:{“long”:1554456897149}}
{“id”:49615}    null

 




https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/transforms/UnwrapFromEnvelope.java#L34
when delete event is emitted by database then Debezium emits two messages: a delete message and a tombstone message that serves as a signal to Kafka compaction process.

 

 

debezium 설정에 “tombstones.on.delete”: “false”, 로 설정하니 delete 이벤트가 날아갈 때 null이 더 이상 나오지 않는다.
{“id”:49615}  {“before…“op”:“d”,“ts_ms”:{“long”:1554456897149}}

 

https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java#L32

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RecordMakers.java

 

debezium/debezium

Change data capture for a variety of databases. https://debezium.io Please log issues in our JIRA at https://issues.jboss.org/projects/DBZ/issues - debezium/debezium

github.com

 

 

 

 

 

 

Posted by '김용환'
,

 

debezium에서 사용하는 schema_changed 관련된 topic 연동하면서 아래와 같이 에러가 나고 debezium이 동작되지 않는 버그가 있다. 


WARN [Consumer clientId=kc_debezium_connector_test-dbhistory, groupId=kc_debezium_connector_test-dbhistory] 1 partitions have leader brokers without a matching listener, including [schema_changes_test-0] (org.apache.kafka.clients.NetworkClient:1012)

ERROR WorkerSourceTask{id=kc_debezium_connector_test-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 60001ms
       at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:273)


문제는 debezium에서 사용하는 schema_changed 관련된 topic 연동하면서 이슈가 있다.

그래서, debezium 카프카 커넥터를의 history db(schema_changed_test-> schema_changed_user)를 변경하니 이슈가 해결되었다.  즉, 복구할 수 없는 잘못된 메타 데이터로 인해 발생된 것으로 보인다.



org.apache.kafka.connect.errors.ConnectException: The db history topic is missing. You may attempt to recover it by reconfiguring the connector to SCHEMA_ONLY_RECOVERY

 


그래서 기존 debezium connector 설정을 삭제하고..
snapshot.mode”:“schema_only_recovery”을 추가해서 kafka connector를 추가했더니,, consume이 성공했다.

 

$  sudo ./kafka-avro-console-consumer     --bootstrap-server http://loki-test-kafka001.dakao.io:9092,http://loki-test-kafka002.dakao.io:9092,http://loki-test-kafka003.dakao.io:9092        --property print.key=true     --property schema.registry.url=http://loki-test-sr001.dakao.io:8081,http://loki-test-sr002.dakao.io:8081     --topic user

 

 

https://debezium.io/docs/connectors/mysql/에 따르면..
snapshot 모드의 schema_only_recovery는 데이터베이스 히스토리 토픽의 엉킨(corrupted) 데이터를 복구할 수 있다. 반면, snapshot 모드의 schema_only는 데이터를 읽지 않고 현재 테이블 스키마만 읽는 다.

schema_only_recovery 모드는 주기적으로 데이터베이스 히스토리 토픽을 삭제하는데 사용된다. 사실 schema_only_recovery 모드를 사용하지 않으면 토픽을 삭제하기 않기 때문에 주의해야 한다.
따라서 schema_only_recovery 모드를 사용하려면 먼저 데이터베이스 히스토리 토픽을 삭제하고 적용하라고 적혀 있다.

 

카프카 커넥트 또는 debezium은 메타 정보를 카프카 topic에 저장하기 때문에 제대로 replication이 되지 않으면 문제가 발생할 수 있다. 따라서 테스트를 진행할 때 replication 개수를 3정도는 하도록 설정해야 테스트하는데.. 이슈가 생기지 않는다.

작은 replication 설정 때문에 테스크에 문제가 많이 생길 수 있다. 

 

 

 

Posted by '김용환'
,

키프카 커넥트에서 사용하는 카프카  관리 topic에 이슈가 있어서 카프카가 동작 안될 수 있다.  

아마도 topic 에 저장된 메타 데이터가 문제가 있어서 발생한 것 같다. 카프카 커넥트 설정의 topic 이름을 바꾸니 문제가 발생하지 않는다.

config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-statuses

3가지 토픽은 카프카 커넥트에서 관리하는 토픽이다.


카프카 커넥트를 distributed 모드로 실행하면 분산 태스크는 카프카 토픽 정보에 커넥터와 태스크 설정, 커넥터 오프셋, 커넥터 상태 정보를 저장한다.

카프카 커넥트(distributed 모드)를 실행하려 할때, 세 개의 내부 토픽을 확인한다. 만약 토픽이 없으면 주어진 설정을 기반으로 생성한다. 토픽에 주어진 replication, partition 등이 설정으로 있으면 설정한다. 만약 아래 설정이 주어지지 않으면 카프카의 auto 생성 설정 규칙에 따라 생성된다.

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

offset.storage.partitions=25
status.storage.partitions=5

Posted by '김용환'
,

kafka connect 0.9x의 가장 큰 이슈는 

mysql 드라이버를 com.mysql.cj.jdbc.driver로 하드 코딩되어 있다는 점이다. 

(잘 사용할려면 시간이 좀 필요하다)

 

따라서 하위 버전(5.x) mysql 드라이버를 사용하려하면 java.lang.NoClassDefFoundError: com/mysql/cj/jdbc/Driver 예외가 발생한다.

Sink Connector에 mysql-connector 8 대신 mysql-connector 5로 바꿔서 사용하고 싶지만 실제로 사용할 수 없다.

Sink Connector 테스트 종료. (현재 버전까지는 사용할 수 없는 형태임)
https://github.com/confluentinc/kafka-connect-jdbc/issues/573

Posted by '김용환'
,