linux, mac에서 마음대로 git clone https:// 사용 가능하다. 특히 젠킨스에서 많이 활용한다.

먼저 비밀 토큰을 생성한다.

https://help.github.com/en/articles/creating-a-personal-access-token-for-the-command-line

 

Creating a personal access token for the command line - GitHub Help

Creating a personal access token for the command line You can create a personal access token and use it in place of a password when performing Git operations over HTTPS with Git on the command line or the API. A personal access token is required to authent

help.github.com

그 다음 .netrc를 생성한다.

 

$ vi ~/.netrc

machine github.kakaocorp.com
login <비밀키>

 

잘 동작하는 볼 수 있다. 

 

Posted by '김용환'
,

 

 

카프카 커넥트는 자체적인 메타 데이터 topic이 있다.

https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java#L99
https://github.com/axbaretto/kafka/blob/master/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java#L208

버전 문제, 인증 문제, 타임 아웃 문제, 연결 문제등을 다루는데.. 이게 있어야 kafka connect가 동작되고 debezium이 동작된다.

 

 

debezium은 기본적으로 자체 replica 설정이 없다.
kafka broker 의 “default.replication.factor”를 읽고 저 정보가 없으면 1을 설정한다.
따라서 HA 기본 되는 테스트는 replication 설정을 제대로 받고 시작해야 되야 한다.
https://github.com/debezium/debezium/blob/v0.9.3.Final/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java

 

debezium/debezium

Change data capture for a variety of databases. https://debezium.io Please log issues in our JIRA at https://issues.jboss.org/projects/DBZ/issues - debezium/debezium

github.com

 


https://github.com/debezium/debezium/blob/v0.9.3.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SourceInfo.java 에서는 debezium 메타 데이터를 나타난다.

 

 

ebezium의 history db topic은 io.debezium.relational.histor 패키지에서 담당한다.
기본은 kafka topic이지만 파일, 메모리(ReadWriteLock)로도 저장할 수 있다. 데이터베이스, 테이블에 대한 모든 DDL을 저장한다. 따라서 topic이 database, table 로 나눠서 생성된다.  (우리가 앞으로 설정할 때 카프카 기본 설정이 매우 중요할 수 밖에 없다.. kafka broker 의 “default.replication.factor”를 읽고 저 정보가 없으면 1을 설정하기에..)


파티션은 1개짜리, Consumer와 Producer로 구성되어 있다.


데이터베이스, 테이블에 대한 history를 producer가 topic에 저장(storeRecords)하고,
저 topic을 읽으면(recoverRecords) DDL 데이터를 읽을 수 있다.

sink-spark-streaming을 개발하면서 실수한 부분은 history topic을 읽지 않고 data topic만 읽었다는 부분이다. 
제대로 개발하려면 debezium lib만 import해서 함께 사용한다면 완벽한 Database Replication DB를 만들 수 있다. 

 

debezium 기본값

DEFAULT_MAX_QUEUE_SIZE(max.queue.size) = 8192;
DEFAULT_MAX_BATCH_SIZE(max.batch.size)= 2048;
DEFAULT_POLL_INTERVAL_MILLIS(poll.interval.ms)= 500;
snapshot.delay.ms=0

 

 

debezium에서 snapshot 모드로 수억개의 테이블을 읽을 때, 언제마다 flush를 할까?

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java#L545

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java#L251

https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/function/BufferedBlockingConsumer.java

https://github.com/debezium/debezium/blob/master/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java#L436


(코드 참 간결하고 예쁘다…)
=> select * from 테이블' 를 수행하고 row 데이터를 자바 큐에 저장하는데,  큐는 내부적으로 루프를 돌며 2048 (max.batch.size)만큼 읽은 후 kafka에 produce한다.

 

(kafka connect) org.apache.kafka.connect.runtime.WorkerSourceTask.execute   -->
(debezium) 설정의 Connector인 io.debezium.connector.mysql.MySqlConnector를 찾는다.
taskClass()를 호출해 MySqlConnectorTask를 실행한다. 기본 디폴트로 MySqlConnectorTask의 start즉,
      io.debezium.connector.common.BaseSourceTask.start()이 실행된다.

처음에 bin 로그를 읽는 작업은 MySqlConnectorTask에 담겨 있다….

 

 

 

Posted by '김용환'
,

 

snapshot 모드를 실행할 때 나오는 로그치다. 참고용으로 저장한다.


[2019-04-16 11:33:38,642] INFO Completed snapshot in 00:35:57.362 (io.debezium.connector.mysql.SnapshotReader:659)
[2019-04-16 11:33:38,809] INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader:199)
[2019-04-16 11:33:38,819] INFO GTID set purged on server: 9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-74163567,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745 (io.debezium.connector.mysql.BinlogReader:299)
[2019-04-16 11:33:38,819] INFO Attempting to generate a filtered GTID set (io.debezium.connector.mysql.MySqlTaskContext:306)
[2019-04-16 11:33:38,819] INFO GTID set from previous recorded offset: 9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-84617433,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745 (io.debezium.connector.mysql.MySqlTaskContext:307)
[2019-04-16 11:33:38,819] INFO GTID set available on server: 9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-88280749,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745 (io.debezium.connector.mysql.MySqlTaskContext:314)
[2019-04-16 11:33:38,820] INFO Final merged GTID set to use when connecting to MySQL: 9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-84617433,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745 (io.debezium.connector.mysql.MySqlTaskContext:335)
[2019-04-16 11:33:38,820] INFO Registering binlog reader with GTID set: 9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-84617433,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745 (io.debezium.connector.mysql.BinlogReader:304)
[2019-04-16 11:33:38,820] INFO Creating thread debezium-mysqlconnector-test-binlog-client (io.debezium.util.Threads:263)
[2019-04-16 11:33:38,823] INFO Creating thread debezium-mysqlconnector-test-binlog-client (io.debezium.util.Threads:263)
Apr 16, 2019 11:33:38 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to cdc-test-slv.mydb.google.io:3306 at 9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-84617433,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745 (sid:18405, cid:561)
[2019-04-16 11:33:38,913] INFO Connected to MySQL binlog at cdc-test-slv.mydb.google.io:3306, starting at GTIDs 9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-84617433,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745 and binlog file ‘mysql-bin.000388’, pos=51906298, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader:970)
[2019-04-16 11:33:38,913] INFO Creating thread debezium-mysqlconnector-test-binlog-client (io.debezium.util.Threads:263)

→ bin 로그, GTID 값은 덤프후 새로 생성..


[2019-04-16 10:57:41,295] INFO Step 0: disabling autocommit and enabling repeatable read transactions (io.debezium.connector.mysql.SnapshotReader:198)
[2019-04-16 10:57:41,298] INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader:220)
[2019-04-16 10:57:41,313] INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader:239)
[2019-04-16 10:57:41,313] INFO Step 3: read binlog position of MySQL master (io.debezium.connector.mysql.SnapshotReader:691)
[2019-04-16 10:57:41,314] INFO using binlog ‘mysql-bin.000388’ at position ‘51906298’ and gtid ‘9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-84617433,
b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745’ (io.debezium.connector.mysql.SnapshotReader:703)
[2019-04-16 10:57:41,314] INFO Step 4: read list of available databases (io.debezium.connector.mysql.SnapshotReader:260)
[2019-04-16 10:57:41,315] INFO list of available databases is: [information_schema, mysql, performance_schema, sys, test, voucher] (io.debezium.connector.mysql.SnapshotReader:268)
[2019-04-16 10:57:41,315] INFO Step 5: read list of available tables in each database (io.debezium.connector.mysql.SnapshotReader:277)
[2019-04-16 10:57:41,318] INFO ‘mysql.columns_priv’ is filtered out, discarding (io.debezium.connector.mysql.SnapshotReader:298)

.... (모든 테이블 목록 출력)
\
[2019-04-16 10:57:41,333] INFO Step 6: generating DROP and CREATE statements to reflect current database schemas: (io.debezium.connector.mysql.SnapshotReader:368)
[2019-04-16 10:57:41,337] INFO SET character_set_server=utf8mb4, collation_server=utf8mb4_general_ci; (io.debezium.connector.mysql.SnapshotReader:833)
[2019-04-16 10:57:41,344] INFO Cluster ID: qhNIgHPxQ5aLflVfzbwd8g (org.apache.kafka.clients.Metadata:285)
[2019-04-16 10:57:41,350] INFO DROP TABLE IF EXISTS `voucher`.`b2b_order_snapshot_test` (io.debezium.connector.mysql.SnapshotReader:833)
[2019-04-16 10:57:41,351] INFO DROP DATABASE IF EXISTS `voucher` (io.debezium.connector.mysql.SnapshotReader:833)
[2019-04-16 10:57:41,353] INFO CREATE DATABASE `voucher` (io.debezium.connector.mysql.SnapshotReader:833)
[2019-04-16 10:57:41,354] INFO USE `voucher` (io.debezium.connector.mysql.SnapshotReader:833)
[2019-04-16 10:57:41,371] WARN Column is missing a character set: status VARCHAR(50) NOT NULL DEFAULT VALUE (io.debezium.connector.mysql.MySqlValueConverters:308)
[2019-04-16 10:57:41,371] WARN Using UTF-8 charset by default for column without charset: status VARCHAR(50) NOT NULL DEFAULT VALUE (io.debezium.connector.mysql.MySqlValueConverters:284)
[2019-04-16 10:57:41,372] WARN Column is missing a character set: receiver_type VARCHAR(30) NOT NULL DEFAULT VALUE (io.debezium.connector.mysql.MySqlValueConverters:308)
[2019-04-16 10:57:41,372] WARN Using UTF-8 charset by default for column without charset: receiver_type VARCHAR(30) NOT NULL DEFAULT VALUE (io.debezium.connector.mysql.MySqlValueConverters:284)
[2019-04-16 10:57:41,372] WARN Column is missing a character set: receiver_phone_token VARCHAR(200) DEFAULT VALUE (io.debezium.connector.mysql.MySqlValueConverters:308)
[2019-04-16 10:57:41,372] WARN Using UTF-8 charset by default for column without charset: receiver_phone_token VARCHAR(200) DEFAULT VALUE (io.debezium.connector.mysql.MySqlValueConverters:284)
[2019-04-16 10:57:41,374] WARN Column is missing a character set: created_by VARCHAR(45) NOT NULL DEFAULT VALUE (io.debezium.connector.mysql.MySqlValueConverters:308)
[2019-04-16 10:57:41,374] WARN Using UTF-8 charset by default for column without charset: created_by VARCHAR(45) NOT NULL DEFAULT VALUE (io.debezium.connector.mysql.MySqlValueConverters:284)
[2019-04-16 10:57:41,375] INFO CREATE TABLE `b2b_order_snapshot_test` (
`id` bigint(11) unsigned NOT NULL AUTO_INCREMENT,
`api_callback_url` varchar(200) DEFAULT NULL COMMENT ‘API ?? URL’,
`trace_id` varchar(100) DEFAULT NULL COMMENT ‘?? ID’,
[2019-04-16 10:57:41,385] INFO Step 7: releasing global read lock to enable MySQL writes (io.debezium.connector.mysql.SnapshotReader:431)
[2019-04-16 10:57:41,387] INFO Step 7: blocked writes to MySQL for a total of 00:00:00.074 (io.debezium.connector.mysql.SnapshotReader:437)
[2019-04-16 10:57:41,387] INFO Step 8: scanning contents of 1 tables while still in transaction (io.debezium.connector.mysql.SnapshotReader:452)
[2019-04-16 10:57:41,391] INFO Step 8: - scanning table ‘voucher.b2b_order_snapshot_test’ (1 of 1 tables) (io.debezium.connector.mysql.SnapshotReader:498)

[2019-04-16 11:33:38,642] INFO Step 8: scanned 75237376 rows in 1 tables in 00:35:57.255 (io.debezium.connector.mysql.SnapshotReader:567)
[2019-04-16 11:33:38,642] INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader:598)]

 


관련 코드는
https://github.com/debezium/debezium/blob/v0.9.3.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java#L202에 있다.

 

 

Posted by '김용환'
,


https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnector.java

아주 간단한 구조로 되어 있어 pooling이나 failover에 적합한 코드로 개발된 것이 아니다.

 

그러나 mha에 대한 DB gtid를 저장할 수 있다… switching/failover에 대해 db 정보를 Map으로 관리한다.
https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/GtidSet.java

 

debezium/debezium

Change data capture for a variety of databases. https://debezium.io Please log issues in our JIRA at https://issues.jboss.org/projects/DBZ/issues - debezium/debezium

github.com

 

 

 

Posted by '김용환'
,

 

sbt-spark을 개발하면서 중복 라이브러는 다음과 같이 사용했다. 

sbt의 Merge 전략을 다음과 같이 변경했다.

assemblyMergeStrategy in assembly := {
 case “META-INF/services/org.apache.spark.sql.sources.DataSourceRegister” => MergeStrategy.concat
 case PathList(“META-INF”, “services”, “org.apache.hadoop.fs.FileSystem”) => MergeStrategy.filterDistinctLines
 case “application.conf” => MergeStrategy.concat
 case PathList(“META-INF”, xs @ _*) => MergeStrategy.discard
 case x => MergeStrategy.first
}

참고 자료 :https://github.com/sbt/sbt-assembly

Posted by '김용환'
,

schema registry 에서 사용하는 _schemas 에는 다음 정보가 저장되어 있다.

$ /usr/local/confluent-5.1.2/bin/kafka-console-consumer  --bootstrap-server kafka-test01.google.io:9092,kafka-test02.google.io:9092,kafka-test03.google.io:9092 --from-beginning --topic _schemas1

{“subject”:“test-key”,“version”:1,“id”:1,“schema”:“{\“type\“:\“record\“,\“name\“:\“SchemaChangeKey\“,\“namespace\“:\“io.debezium.connector.mysql\“,\“fields\“:[{\“name\“:\“databaseName\“,\“type\“:\“string\“}],\“connect.name\“:\“io.debezium.connector.mysql.SchemaChangeKey\“}”,“deleted”:false}
{“subject”:“test-value”,“version”:1,“id”:2,“schema”:“{\“type\“:\“record\“,\“name\“:\“SchemaChangeValue\“,\“namespace\“:\“io.debezium.connector.mysql\“,\“fields\“:[{\“name\“:\“source\“,\“type\“:{\“type\“:\“record\“,\“name\“:\“Source\“,\“fields\“:[{\“name\“:\“version\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“connector\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“name\“,\“type\“:\“string\“},{\“name\“:\“server_id\“,\“type\“:\“long\“},{\“name\“:\“ts_sec\“,\“type\“:\“long\“},{\“name\“:\“gtid\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“file\“,\“type\“:\“string\“},{\“name\“:\“pos\“,\“type\“:\“long\“},{\“name\“:\“row\“,\“type\“:\“int\“},{\“name\“:\“snapshot\“,\“type\“:[{\“type\“:\“boolean\“,\“connect.default\“:false},\“null\“],\“default\“:false},{\“name\“:\“thread\“,\“type\“:[\“null\“,\“long\“],\“default\“:null},{\“name\“:\“db\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“table\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“query\“,\“type\“:[\“null\“,\“string\“],\“default\“:null}],\“connect.name\“:\“io.debezium.connector.mysql.Source\“}},{\“name\“:\“databaseName\“,\“type\“:\“string\“},{\“name\“:\“ddl\“,\“type\“:\“string\“}],\“connect.name\“:\“io.debezium.connector.mysql.SchemaChangeValue\“}”,“deleted”:false}
{“subject”:“user-key”,“version”:1,“id”:3,“schema”:“{\“type\“:\“record\“,\“name\“:\“Key\“,\“namespace\“:\“test.test.user\“,\“fields\“:[{\“name\“:\“id\“,\“type\“:\“long\“}],\“connect.name\“:\“test.test.user.Key\“}”,“deleted”:false}
{“subject”:“user-value”,“version”:1,“id”:4,“schema”:“{\“type\“:\“record\“,\“name\“:\“Envelope\“,\“namespace\“:\“test.test.user\“,\“fields\“:[{\“name\“:\“before\“,\“type\“:[\“null\“,{\“type\“:\“record\“,\“name\“:\“Value\“,\“fields\“:[{\“name\“:\“id\“,\“type\“:\“long\“},{\“name\“:\“username\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“vin\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“address\“,\“type\“:[\“null\“,\“string\“],\“default\“:null}],\“connect.name\“:\“test.test.user.Value\“}],\“default\“:null},{\“name\“:\“after\“,\“type\“:[\“null\“,\“Value\“],\“default\“:null},{\“name\“:\“source\“,\“type\“:{\“type\“:\“record\“,\“name\“:\“Source\“,\“namespace\“:\“io.debezium.connector.mysql\“,\“fields\“:[{\“name\“:\“version\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“connector\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“name\“,\“type\“:\“string\“},{\“name\“:\“server_id\“,\“type\“:\“long\“},{\“name\“:\“ts_sec\“,\“type\“:\“long\“},{\“name\“:\“gtid\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“file\“,\“type\“:\“string\“},{\“name\“:\“pos\“,\“type\“:\“long\“},{\“name\“:\“row\“,\“type\“:\“int\“},{\“name\“:\“snapshot\“,\“type\“:[{\“type\“:\“boolean\“,\“connect.default\“:false},\“null\“],\“default\“:false},{\“name\“:\“thread\“,\“type\“:[\“null\“,\“long\“],\“default\“:null},{\“name\“:\“db\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“table\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“query\“,\“type\“:[\“null\“,\“string\“],\“default\“:null}],\“connect.name\“:\“io.debezium.connector.mysql.Source\“}},{\“name\“:\“op\“,\“type\“:\“string\“},{\“name\“:\“ts_ms\“,\“type\“:[\“null\“,\“long\“],\“default\“:null}],\“connect.name\“:\“test.test.user.Envelope\“}”,“deleted”:false}

-----
스키마 변경이 바뀌는 시점은 alter 문이 실행될 때가 아닌 alter 이후에 데이터가 들어오는 정보가 있을 때 변경된다.

{“subject”:“user-value”,“version”:2,“id”:5,“schema”:“{\“type\“:\“record\“,\“name\“:\“Envelope\“,\“namespace\“:\“test.test.user\“,\“fields\“:[{\“name\“:\“before\“,\“type\“:[\“null\“,{\“type\“:\“record\“,\“name\“:\“Value\“,\“fields\“:[{\“name\“:\“id\“,\“type\“:\“long\“},{\“name\“:\“username\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“vin\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“address\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“aaaaa\“,\“type\“:[\“null\“,\“string\“],\“default\“:null}],\“connect.name\“:\“test.test.user.Value\“}],\“default\“:null},{\“name\“:\“after\“,\“type\“:[\“null\“,\“Value\“],\“default\“:null},{\“name\“:\“source\“,\“type\“:{\“type\“:\“record\“,\“name\“:\“Source\“,\“namespace\“:\“io.debezium.connector.mysql\“,\“fields\“:[{\“name\“:\“version\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“connector\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“name\“,\“type\“:\“string\“},{\“name\“:\“server_id\“,\“type\“:\“long\“},{\“name\“:\“ts_sec\“,\“type\“:\“long\“},{\“name\“:\“gtid\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“file\“,\“type\“:\“string\“},{\“name\“:\“pos\“,\“type\“:\“long\“},{\“name\“:\“row\“,\“type\“:\“int\“},{\“name\“:\“snapshot\“,\“type\“:[{\“type\“:\“boolean\“,\“connect.default\“:false},\“null\“],\“default\“:false},{\“name\“:\“thread\“,\“type\“:[\“null\“,\“long\“],\“default\“:null},{\“name\“:\“db\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“table\“,\“type\“:[\“null\“,\“string\“],\“default\“:null},{\“name\“:\“query\“,\“type\“:[\“null\“,\“string\“],\“default\“:null}],\“connect.name\“:\“io.debezium.connector.mysql.Source\“}},{\“name\“:\“op\“,\“type\“:\“string\“},{\“name\“:\“ts_ms\“,\“type\“:[\“null\“,\“long\“],\“default\“:null}],\“connect.name\“:\“test.test.user.Envelope\“}”,“deleted”:false}


alter add column 후 데이터 입력 => schema1에 저장 
alter drop column 후 데이터 입력 -> schema1에 반영되지 않는다.

Posted by '김용환'
,

ALTER TABLE `test`.`user`
ADD COLUMN `xxx` VARCHAR(45) NULL AFTER `address`;
추가하면.

history topic에는 다음이 추가된다.

{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “ts_sec” : 1554888803,
   “file” : “mysql-bin.000022”,
   “pos” : 38264335,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1656775,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745",
   “server_id” : 1944318
 },
 “databaseName” : “”,
 “ddl” : “ALTER TABLE `test`.`user` \nADD COLUMN `xxx` VARCHAR(45) NULL AFTER `address`”
}

 

— 다음과 같이 수행하면..

ALTER TABLE `test`.`user`
DROP COLUMN `xxx`;

history topic에 다음이 전달된다.
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “ts_sec” : 1554888844,
   “file” : “mysql-bin.000022”,
   “pos” : 38264514,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1656776,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745",
   “server_id” : 1944318
 },
 “databaseName” : “”,
 “ddl” : “ALTER TABLE `test`.`user` \nDROP COLUMN `xxx`”
}

Posted by '김용환'
,

테스트 해보니 debezium의 history topic에 정의된 내용은 다음과 같다.


/usr/local/confluent-5.1.2/bin/kafka-console-consumer  --bootstrap-server kafka-test01.google.io:9092,kafka-test02.google.io:9092,kafka-test03.google.io:9092 --from-beginning --topic schema_changes_user
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “ddl” : “SET character_set_server=utf8mb4, collation_server=utf8mb4_general_ci;”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “ddl” : “DROP TABLE IF EXISTS `test`.`user`”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “databaseName” : “test”,
 “ddl” : “DROP DATABASE IF EXISTS `test`”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “databaseName” : “test”,
 “ddl” : “CREATE DATABASE `test`”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “databaseName” : “test”,
 “ddl” : “USE `test`”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “file” : “mysql-bin.000022",
   “pos” : 30567369,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1639464,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “snapshot” : true
 },
 “databaseName” : “test”,
 “ddl” : “CREATE TABLE `user` (\n  `id` bigint(20) NOT NULL AUTO_INCREMENT,\n  `username` varchar(1000) DEFAULT NULL,\n  `vin` varchar(1000) DEFAULT NULL,\n  `address` text,\n  PRIMARY KEY (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=626927 DEFAULT CHARSET=utf8mb4"
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “ts_sec” : 1554884849,
   “file” : “mysql-bin.000022”,
   “pos” : 33223606,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1645437,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745",
   “server_id” : 1944318
 },
 “databaseName” : “”,
 “ddl” : “CREATE USER ‘sj’@‘%’ IDENTIFIED WITH ‘mysql_native_password’ AS ‘*B1F9ACC9F58F4DA857A97AC2BA02FE1A51A82F32’”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “ts_sec” : 1554884857,
   “file” : “mysql-bin.000022",
   “pos” : 33242552,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1645480,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “server_id” : 1944318
 },
 “databaseName” : “”,
 “ddl” : “GRANT SELECT, INSERT, UPDATE, DELETE, CREATE, DROP, INDEX, ALTER, CREATE TEMPORARY TABLES, EXECUTE ON *.* TO ‘sj’@‘%’”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “ts_sec” : 1554884990,
   “file” : “mysql-bin.000022”,
   “pos” : 33577581,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1646234,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745",
   “server_id” : 1944318
 },
 “databaseName” : “”,
 “ddl” : “GRANT REPLICATION SLAVE ON *.* TO ‘sj’@‘%’”
}
{
 “source” : {
   “server” : “test”
 },
 “position” : {
   “ts_sec” : 1554885199,
   “file” : “mysql-bin.000022",
   “pos” : 34099376,
   “gtids” : “9a227629-491c-11e9-ae9d-fa163ea8d9a7:1-1647408,b0be07a9-491c-11e9-ae12-fa163ec49735:1-13745”,
   “server_id” : 1944318
 },
“databaseName” : “”,
 “ddl” : “GRANT REPLICATION SLAVE ON *.* TO ‘test_ddl’@‘%’”
}

Posted by '김용환'
,


debezium-kafka 테스트하면서 발견한 장애 처리 내용이다.


1. 특정 토픽에 카프카들이 동기화를 못할 때가 있다.
  -> 카프카 올릴때 컨플루언트에 에러 공유를 위해 기본으로 생성되는 토픽들이 있다.


2. 해당 토픽을 수동으로 지우니깐 주키퍼가 오동작하며, 카프카가 제대로 동작이 되지 않는다.


3. 카프카 주키퍼 다 내리고 모든 토픽 다 지운 후, 다시 데몬을 올리면 정상적으로 올라간다.


그러나 기존 데이터들은 모두 날아감.. 따라서 debezium에서 저장하는 토픽은 모두 삭제되어 다시 listener에 추가한다.

Posted by '김용환'
,

 

다음 커맨드를 실행하면 mysql binlog를 볼 수 있다.

show binlog events in ‘바이너리로그파일명’ from 포지션번호 limit 10

 


{“id”:631028}    {“before”:{“test.test.user.Value”:{“id”:631028,“username”:{“string”:“Random-8d04a798-89df-4260-87f7-8f91d6da458e”},“vin”:{“string”:“c485b8d4-f5d4-459f-b686-52b6a5f8f343"},“address”:null}},“after”:null,“source”:{“version”:{“string”:“0.9.2.Final”},“connector”:{“string”:“mysql”},“name”:“test”,“server_id”:1944318,“ts_sec”:1554885981,“gtid”:{“string”:“9a227629-491c-11e9-ae9d-fa163ea8d9a7:1651774"},“file”:“mysql-bin.000001",“pos”:36040188,“row”:0,“snapshot”:{“boolean”:false},“thread”:{“long”:1616},“db”:{“string”:“test”},“table”:{“string”:“user”},“query”:null},“op”:“d”,“ts_ms”:{“long”:1554885981898}}


file : 바이너리 로그 파일명,  pos에는 포지션 번호이다.

 

Posted by '김용환'
,