Debezium 개요
카프카 기반으로 만들어져 있으며 카프카 커넥트와 호환 가능한(?) compatible한 특정 DBMS와 연동되는 커넥터를 제공한다.
커넥터는 DBMS 안에서 데이터 변경을 감지하고 각 변경 이벤트를 카프카 토픽으로 스트리밍한다. 컨슈머에서는 이벤트 레코드들을 읽을 수 있다.
어플리케이션이 꺼져도 데이터의 손실이 없다. 어플리케이션이 켜진 순간 멈췄던 곳에서 다시 읽기를 시작한다.
서비스 시작
ZooKeeper, Kafka, Debezium connector service 세가지 별도 서비스가 필요하당.
튜토리얼에서는 별도의 볼륨을 쓰지 않는다.
zookeeper

kafka

mysql

mysql으로부터 변경 이벤트가 발생할 DB 실행

카프카 커넥트 서비스

inventory DB를 모니터링하는 커넥터 등록

카프카 커넥트 서비스의 API를 이용, /connectors에 POST 요청을 제출한다.
등록된 커넥터 리스트 확인

등록된 커넥터의 task인 inventory-connector 확인

커넥터를 등록하면 일어나는 일(process)
...
2021-11-30 01:38:44,223 INFO || [Worker clientId=connect-1, groupId=1] Tasks [inventory-connector-0] configs updated [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] Handling task config update by restarting tasks [] [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] Rebalance started [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,224 INFO || [Worker clientId=connect-1, groupId=1] (Re-)joining group [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,227 INFO || [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,230 INFO || [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'} [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,231 INFO || [Worker clientId=connect-1, groupId=1] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', leaderUrl='http://172.17.0.7:8083/', offset=4, connectorIds=[inventory-connector], taskIds=[inventory-connector-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,232 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset 4 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,232 INFO || [Worker clientId=connect-1, groupId=1] Starting task inventory-connector-0 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
...
Debezium 로그 아웃풋은 thread specific한 정보를 mapped diagnostic contexts(MDC) 를 사용하여 제공한다.
ex) 커넥터 타입, 커넥터 논리명, 커넥터 활동(task, snapshot, binlog)
스냅샷 작업을 위해 DB에 락을 걸고 스키마 정보를 불러오는 등의 프로세스 진행
...
2021-11-30 01:38:44,534 INFO MySQL|dbserver1|snapshot Snapshot step 1 - Preparing [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,535 INFO MySQL|dbserver1|snapshot Snapshot step 2 - Determining captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,535 INFO MySQL|dbserver1|snapshot Read list of available databases [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,537 INFO MySQL|dbserver1|snapshot list of available databases is: [information_schema, inventory, mysql, performance_schema, sys] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,537 INFO MySQL|dbserver1|snapshot Read list of available tables in each database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,548 INFO MySQL|dbserver1|snapshot snapshot continuing with database(s): [inventory] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,551 INFO MySQL|dbserver1|snapshot Snapshot step 3 - Locking captured tables [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand] [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,552 INFO MySQL|dbserver1|snapshot Flush and obtain global read lock to prevent writes to database [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,557 INFO MySQL|dbserver1|snapshot Snapshot step 4 - Determining snapshot offset [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,560 INFO MySQL|dbserver1|snapshot Read binlog position of MySQL primary server [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot using binlog 'mysql-bin.000003' at position '156' and gtid '' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot Snapshot step 5 - Reading structure of captured tables [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO MySQL|dbserver1|snapshot All eligible tables schema should be captured, capturing: [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand] [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,058 INFO MySQL|dbserver1|snapshot Reading structure of database 'inventory' [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,187 INFO MySQL|dbserver1|snapshot Snapshot step 6 - Persisting schema history [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,273 INFO MySQL|dbserver1|snapshot Releasing global read lock to enable MySQL writes [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,274 INFO MySQL|dbserver1|snapshot Writes to MySQL tables prevented for a total of 00:00:00.717 [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,274 INFO MySQL|dbserver1|snapshot Snapshot step 7 - Snapshotting data [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,275 INFO MySQL|dbserver1|snapshot Snapshotting contents of 6 tables while still in transaction [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,275 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.addresses' (1 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,276 INFO MySQL|dbserver1|snapshot For table 'inventory.addresses' using select statement: 'SELECT `id`, `customer_id`, `street`, `city`, `state`, `zip`, `type` FROM `inventory`.`addresses`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,295 INFO MySQL|dbserver1|snapshot Finished exporting 7 records for table 'inventory.addresses'; total duration '00:00:00.02' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,296 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.customers' (2 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,296 INFO MySQL|dbserver1|snapshot For table 'inventory.customers' using select statement: 'SELECT `id`, `first_name`, `last_name`, `email` FROM `inventory`.`customers`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,304 INFO MySQL|dbserver1|snapshot Finished exporting 4 records for table 'inventory.customers'; total duration '00:00:00.008' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,304 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.geom' (3 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,305 INFO MySQL|dbserver1|snapshot For table 'inventory.geom' using select statement: 'SELECT `id`, `g`, `h` FROM `inventory`.`geom`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot Finished exporting 3 records for table 'inventory.geom'; total duration '00:00:00.011' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.orders' (4 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO MySQL|dbserver1|snapshot For table 'inventory.orders' using select statement: 'SELECT `order_number`, `order_date`, `purchaser`, `quantity`, `product_id` FROM `inventory`.`orders`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot Finished exporting 4 records for table 'inventory.orders'; total duration '00:00:00.008' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.products' (5 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO MySQL|dbserver1|snapshot For table 'inventory.products' using select statement: 'SELECT `id`, `name`, `description`, `weight` FROM `inventory`.`products`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,343 INFO MySQL|dbserver1|snapshot Finished exporting 9 records for table 'inventory.products'; total duration '00:00:00.017' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,344 INFO MySQL|dbserver1|snapshot Exporting data from table 'inventory.products_on_hand' (6 of 6 tables) [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,344 INFO MySQL|dbserver1|snapshot For table 'inventory.products_on_hand' using select statement: 'SELECT `product_id`, `quantity` FROM `inventory`.`products_on_hand`' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,353 INFO MySQL|dbserver1|snapshot Finished exporting 9 records for table 'inventory.products_on_hand'; total duration '00:00:00.009' [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,355 INFO MySQL|dbserver1|snapshot Snapshot - Final stage [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
2021-11-30 01:38:45,356 INFO MySQL|dbserver1|snapshot Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]] [io.debezium.pipeline.ChangeEventSourceCoordinator]
...
스냅샷 이후 본격적으로 지속적인 binlog 읽기 상태로 전환
...
2021-11-30 01:38:45,362 INFO MySQL|dbserver1|streaming Starting streaming [io.debezium.pipeline.ChangeEventSourceCoordinator]
...
Nov 30, 2021 1:38:45 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/156 (sid:184054, cid:13)
2021-11-30 01:38:45,392 INFO MySQL|dbserver1|binlog Connected to MySQL binlog at mysql:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]] [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
2021-11-30 01:38:45,392 INFO MySQL|dbserver1|streaming Waiting for keepalive thread to start [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
2021-11-30 01:38:45,393 INFO MySQL|dbserver1|binlog Creating thread debezium-mysqlconnector-dbserver1-binlog-client [io.debezium.util.Threads]
...
DB에 발생하는 다양한 변경 이벤트를 캡쳐한다.
wathch-topic 유틸리티 실행

프로덕션 레벨에서는 실제 카프카 컨슈머를 써야 함
create 하지 않았는데,,?
페이로드



페이로드


페이로드

카프카 커넥트 인스턴스를 종료해도 다시 시작한 순간부터 이벤트 조회 시작함.