Kafka Source Connector - Debezium

Log·2022년 11월 26일
0

Kafka

목록 보기
9/9
post-thumbnail

Debezium

Stream changes from your database.

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.

  • 데이터베이스의 변경사항을 캡처하는 분산 서비스
  • 각 데이터베이스 테이블 내의 모든 행 수준 변경을 변경 이벤트 스트림에 기록하고, 애플리케이션은 이러한 스트림을 읽기만 하면 변경 이벤트가 발생한 동일한 순서로 변경 이벤트를 볼 수 있다.

Debezium with docker

기존에 진행했던 도커 이미지와 연결되는 kafka source connector server를 제작하고자 한다.

MySQL setting

Debezium 설정 전, mysql을 docker로 띄워서 진행할 것이며, cdc가 이루어 질 수 있는 환경을 설정하고자 한다.

Docker image 생성
docker run \
    --name mysql-container \
    -e MYSQL_ROOT_PASSWORD=admin \
    -e MYSQL_USER=debezium \
    -e MYSQL_PASSWORD=debezium \
    -d -p 3306:3306 \
    mysql:latest
Docker image 접속 & database, table 생성
# exec docker image
docker exec -it mysql-container /bin/bash

## in docker bash
mysql -u root -p 
이진 로그 설정 여부
SELECT variable_value
FROM performance_schema.global_variables
WHERE variable_name='log_bin';
+----------------+
| variable_value |
+----------------+
| ON             |
+----------------+
이진 로깅 형식 확인
SELECT variable_value
FROM performance_schema.global_variables
WHERE variable_name='binlog_format';
+----------------+
| variable_value |
+----------------+
| ROW            |
+----------------+
database 및 table 생성
CREATE DATABASE debezium_test;
USE debezium_test;
CREATE TABLE IF NOT EXISTS TestBinLogStream(
	id int(10) NOT NULL AUTO_INCREMENT PRIMARY KEY,
	status varchar(30),
	createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
	updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 
                     ON UPDATE CURRENT_TIMESTAMP
);
기존 kafka 클러스터와 네트워크 연결(쉽게 접속을 위해)

(실제로는 kafka-source-connector과 mysql, source-connector과 kafka클러스터 이렇게 네트워크가 연결되면 될 것 같다.)

docker network connect test-network mysql-container

Debezium Docker image 생성

연결 Test

SQL 실행

INSERT INTO TestBinLogStream (status) VALUES ("submitted");
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=1;
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
INSERT INTO TestBinLogStream (status) VALUES ("submitted");
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=2;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=1;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=2;
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=3;
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=4;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=3;
UPDATE TestBinLogStream SET status = 'pickedUp' WHERE id=5;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=4;
UPDATE TestBinLogStream SET status = 'completed' WHERE id=5;
DELETE FROM TestBinLogStream WHERE id = 3;

결과 확인

$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --list
__consumer_offsets
connect-configs
connect-offsets
connect-status
debezium_test
debezium_test.debezium_test.TestBinLogStream
kafka.client.tutorial
kafka.client.tutorial.consumer
schemahistory.debezium_test
schemahistory.testdb
stream_log
streams-application-__assignor-__leader
test_kafka
topic_kafka_test
# comsumer 
$KAFKA_HOME/kafka_2.12-3.3.1/bin/kafka-console-consumer.sh  --bootstrap-server kafka_tutorial:9092  --topic debezium_test.debezium_test.TestBinLogStream  --from-beginning
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"createdAt"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"updated_at"}],"optional":true,"name":"debezium_test.debezium_test.TestBinLogStream.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"createdAt"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"updated_at"}],"optional":true,"name":"debezium_test.debezium_test.TestBinLogStream.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"debezium_test.debezium_test.TestBinLogStream.Envelope","version":1},"payload":{"before":null,"after":{"id":1,"status":"completed","createdAt":"2022-11-26T10:12:19Z","updated_at":"2022-11-26T10:12:19Z"},"source":{"version":"2.0.0.Final","connector":"mysql","name":"debezium_test","ts_ms":1669458279000,"snapshot":"first","db":"debezium_test","sequence":null,"table":"TestBinLogStream","server_id":0,"gtid":null,"file":"binlog.000002","pos":19791,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669458279803,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"createdAt"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"updated_at"}],"optional":true,"name":"debezium_test.debezium_test.TestBinLogStream.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"createdAt"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"updated_at"}],"optional":true,"name":"debezium_test.debezium_test.TestBinLogStream.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"debezium_test.debezium_test.TestBinLogStream.Envelope","version":1},"payload":{"before":null,"after":{"id":2,"status":"completed","createdAt":"2022-11-26T10:12:19Z","updated_at":"2022-11-26T10:12:19Z"},"source":{"version":"2.0.0.Final","connector":"mysql","name":"debezium_test","ts_ms":1669458279000,"snapshot":"true","db":"debezium_test","sequence":null,"table":"TestBinLogStream","server_id":0,"gtid":null,"file":"binlog.000002","pos":19791,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669458279804,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"createdAt"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"updated_at"}],"optional":true,"name":"debezium_test.debezium_test.TestBinLogStream.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"createdAt"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"updated_at"}],"optional":true,"name":"debezium_test.debezium_test.TestBinLogStream.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"debezium_test.debezium_test.TestBinLogStream.Envelope","version":1},"payload":{"before":null,"after":{"id":4,"status":"completed","createdAt":"2022-11-26T10:12:19Z","updated_at":"2022-11-26T10:12:19Z"},"source":{"version":"2.0.0.Final","connector":"mysql","name":"debezium_test","ts_ms":1669458279000,"snapshot":"true","db":"debezium_test","sequence":null,"table":"TestBinLogStream","server_id":0,"gtid":null,"file":"binlog.000002","pos":19791,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669458279805,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"createdAt"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"updated_at"}],"optional":true,"name":"debezium_test.debezium_test.TestBinLogStream.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"status"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"createdAt"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"updated_at"}],"optional":true,"name":"debezium_test.debezium_test.TestBinLogStream.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"debezium_test.debezium_test.TestBinLogStream.Envelope","version":1},"payload":{"before":null,"after":{"id":5,"status":"completed","createdAt":"2022-11-26T10:12:19Z","updated_at":"2022-11-26T10:12:19Z"},"source":{"version":"2.0.0.Final","connector":"mysql","name":"debezium_test","ts_ms":1669458279000,"snapshot":"last","db":"debezium_test","sequence":null,"table":"TestBinLogStream","server_id":0,"gtid":null,"file":"binlog.000002","pos":19791,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669458279805,"transaction":null}}

참고 문서

profile
열심히 정리하는 습관 기르기..

0개의 댓글