[TIL] Debezium

Aryumka·2024년 6월 9일

Debezium 개요

카프카 기반으로 만들어져 있으며 카프카 커넥트와 호환 가능한(?) compatible한 특정 DBMS와 연동되는 커넥터를 제공한다.

커넥터는 DBMS 안에서 데이터 변경을 감지하고 각 변경 이벤트를 카프카 토픽으로 스트리밍한다. 컨슈머에서는 이벤트 레코드들을 읽을 수 있다.

어플리케이션이 꺼져도 데이터의 손실이 없다. 어플리케이션이 켜진 순간 멈췄던 곳에서 다시 읽기를 시작한다.

서비스 시작
ZooKeeper, Kafka, Debezium connector service 세가지 별도 서비스가 필요하당.

튜토리얼에서는 별도의 볼륨을 쓰지 않는다.

zookeeper

kafka

mysql

mysql으로부터 변경 이벤트가 발생할 DB 실행

카프카 커넥트 서비스

inventory DB를 모니터링하는 커넥터 등록

카프카 커넥트 서비스의 API를 이용, /connectors에 POST 요청을 제출한다.
등록된 커넥터 리스트 확인

등록된 커넥터의 task인 inventory-connector 확인

커넥터를 등록하면 일어나는 일(process)

...
2021-11-30 01:38:44,223 INFO   ||  [Worker clientId=connect-1, groupId=1] Tasks [inventory-connector-0] configs updated   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,224 INFO   ||  [Worker clientId=connect-1, groupId=1] Handling task config update by restarting tasks []   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,224 INFO   ||  [Worker clientId=connect-1, groupId=1] Rebalance started   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,224 INFO   ||  [Worker clientId=connect-1, groupId=1] (Re-)joining group   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,227 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,230 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=3, memberId='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', protocol='sessioned'}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2021-11-30 01:38:44,231 INFO   ||  [Worker clientId=connect-1, groupId=1] Joined group at generation 3 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-7b087c69-8ac5-4c56-9e6b-ec5adabf27e8', leaderUrl='http://172.17.0.7:8083/', offset=4, connectorIds=[inventory-connector], taskIds=[inventory-connector-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,232 INFO   ||  [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset 4   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2021-11-30 01:38:44,232 INFO   ||  [Worker clientId=connect-1, groupId=1] Starting task inventory-connector-0   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
...

Debezium 로그 아웃풋은 thread specific한 정보를 mapped diagnostic contexts(MDC) 를 사용하여 제공한다.
ex) 커넥터 타입, 커넥터 논리명, 커넥터 활동(task, snapshot, binlog)

스냅샷 작업을 위해 DB에 락을 걸고 스키마 정보를 불러오는 등의 프로세스 진행

...
2021-11-30 01:38:44,534 INFO   MySQL|dbserver1|snapshot  Snapshot step 1 - Preparing   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,535 INFO   MySQL|dbserver1|snapshot  Snapshot step 2 - Determining captured tables   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,535 INFO   MySQL|dbserver1|snapshot  Read list of available databases   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,537 INFO   MySQL|dbserver1|snapshot  	 list of available databases is: [information_schema, inventory, mysql, performance_schema, sys]   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,537 INFO   MySQL|dbserver1|snapshot  Read list of available tables in each database   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,548 INFO   MySQL|dbserver1|snapshot  	snapshot continuing with database(s): [inventory]   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,551 INFO   MySQL|dbserver1|snapshot  Snapshot step 3 - Locking captured tables [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand]   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,552 INFO   MySQL|dbserver1|snapshot  Flush and obtain global read lock to prevent writes to database   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,557 INFO   MySQL|dbserver1|snapshot  Snapshot step 4 - Determining snapshot offset   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,560 INFO   MySQL|dbserver1|snapshot  Read binlog position of MySQL primary server   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO   MySQL|dbserver1|snapshot  	 using binlog 'mysql-bin.000003' at position '156' and gtid ''   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO   MySQL|dbserver1|snapshot  Snapshot step 5 - Reading structure of captured tables   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:44,562 INFO   MySQL|dbserver1|snapshot  All eligible tables schema should be captured, capturing: [inventory.addresses, inventory.customers, inventory.geom, inventory.orders, inventory.products, inventory.products_on_hand]   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,058 INFO   MySQL|dbserver1|snapshot  Reading structure of database 'inventory'   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,187 INFO   MySQL|dbserver1|snapshot  Snapshot step 6 - Persisting schema history   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,273 INFO   MySQL|dbserver1|snapshot  Releasing global read lock to enable MySQL writes   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,274 INFO   MySQL|dbserver1|snapshot  Writes to MySQL tables prevented for a total of 00:00:00.717   [io.debezium.connector.mysql.MySqlSnapshotChangeEventSource]
2021-11-30 01:38:45,274 INFO   MySQL|dbserver1|snapshot  Snapshot step 7 - Snapshotting data   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,275 INFO   MySQL|dbserver1|snapshot  Snapshotting contents of 6 tables while still in transaction   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,275 INFO   MySQL|dbserver1|snapshot  Exporting data from table 'inventory.addresses' (1 of 6 tables)   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,276 INFO   MySQL|dbserver1|snapshot  	 For table 'inventory.addresses' using select statement: 'SELECT `id`, `customer_id`, `street`, `city`, `state`, `zip`, `type` FROM `inventory`.`addresses`'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,295 INFO   MySQL|dbserver1|snapshot  	 Finished exporting 7 records for table 'inventory.addresses'; total duration '00:00:00.02'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,296 INFO   MySQL|dbserver1|snapshot  Exporting data from table 'inventory.customers' (2 of 6 tables)   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,296 INFO   MySQL|dbserver1|snapshot  	 For table 'inventory.customers' using select statement: 'SELECT `id`, `first_name`, `last_name`, `email` FROM `inventory`.`customers`'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,304 INFO   MySQL|dbserver1|snapshot  	 Finished exporting 4 records for table 'inventory.customers'; total duration '00:00:00.008'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,304 INFO   MySQL|dbserver1|snapshot  Exporting data from table 'inventory.geom' (3 of 6 tables)   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,305 INFO   MySQL|dbserver1|snapshot  	 For table 'inventory.geom' using select statement: 'SELECT `id`, `g`, `h` FROM `inventory`.`geom`'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO   MySQL|dbserver1|snapshot  	 Finished exporting 3 records for table 'inventory.geom'; total duration '00:00:00.011'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO   MySQL|dbserver1|snapshot  Exporting data from table 'inventory.orders' (4 of 6 tables)   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,316 INFO   MySQL|dbserver1|snapshot  	 For table 'inventory.orders' using select statement: 'SELECT `order_number`, `order_date`, `purchaser`, `quantity`, `product_id` FROM `inventory`.`orders`'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO   MySQL|dbserver1|snapshot  	 Finished exporting 4 records for table 'inventory.orders'; total duration '00:00:00.008'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO   MySQL|dbserver1|snapshot  Exporting data from table 'inventory.products' (5 of 6 tables)   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,325 INFO   MySQL|dbserver1|snapshot  	 For table 'inventory.products' using select statement: 'SELECT `id`, `name`, `description`, `weight` FROM `inventory`.`products`'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,343 INFO   MySQL|dbserver1|snapshot  	 Finished exporting 9 records for table 'inventory.products'; total duration '00:00:00.017'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,344 INFO   MySQL|dbserver1|snapshot  Exporting data from table 'inventory.products_on_hand' (6 of 6 tables)   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,344 INFO   MySQL|dbserver1|snapshot  	 For table 'inventory.products_on_hand' using select statement: 'SELECT `product_id`, `quantity` FROM `inventory`.`products_on_hand`'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,353 INFO   MySQL|dbserver1|snapshot  	 Finished exporting 9 records for table 'inventory.products_on_hand'; total duration '00:00:00.009'   [io.debezium.relational.RelationalSnapshotChangeEventSource]
2021-11-30 01:38:45,355 INFO   MySQL|dbserver1|snapshot  Snapshot - Final stage   [io.debezium.pipeline.source.AbstractSnapshotChangeEventSource]
2021-11-30 01:38:45,356 INFO   MySQL|dbserver1|snapshot  Snapshot ended with SnapshotResult [status=COMPLETED, offset=MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]]   [io.debezium.pipeline.ChangeEventSourceCoordinator]
...

스냅샷 이후 본격적으로 지속적인 binlog 읽기 상태로 전환

...
2021-11-30 01:38:45,362 INFO   MySQL|dbserver1|streaming  Starting streaming   [io.debezium.pipeline.ChangeEventSourceCoordinator]
...
Nov 30, 2021 1:38:45 AM com.github.shyiko.mysql.binlog.BinaryLogClient connect
INFO: Connected to mysql:3306 at mysql-bin.000003/156 (sid:184054, cid:13)
2021-11-30 01:38:45,392 INFO   MySQL|dbserver1|binlog  Connected to MySQL binlog at mysql:3306, starting at MySqlOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.mysql.Source:STRUCT}, sourceInfo=SourceInfo [currentGtid=null, currentBinlogFilename=mysql-bin.000003, currentBinlogPosition=156, currentRowNumber=0, serverId=0, sourceTime=2021-11-30T01:38:45.352Z, threadId=-1, currentQuery=null, tableIds=[inventory.products_on_hand], databaseName=inventory], snapshotCompleted=true, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0], restartGtidSet=null, currentGtidSet=null, restartBinlogFilename=mysql-bin.000003, restartBinlogPosition=156, restartRowsToSkip=0, restartEventsToSkip=0, currentEventLengthInBytes=0, inTransaction=false, transactionId=null, incrementalSnapshotContext =IncrementalSnapshotContext [windowOpened=false, chunkEndPosition=null, dataCollectionsToSnapshot=[], lastEventKeySent=null, maximumKey=null]]   [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
2021-11-30 01:38:45,392 INFO   MySQL|dbserver1|streaming  Waiting for keepalive thread to start   [io.debezium.connector.mysql.MySqlStreamingChangeEventSource]
2021-11-30 01:38:45,393 INFO   MySQL|dbserver1|binlog  Creating thread debezium-mysqlconnector-dbserver1-binlog-client   [io.debezium.util.Threads]
...

DB에 발생하는 다양한 변경 이벤트를 캡쳐한다.

  1. Create 이벤트 조회

wathch-topic 유틸리티 실행

프로덕션 레벨에서는 실제 카프카 컨슈머를 써야 함

create 하지 않았는데,,?

페이로드

  1. Update 이벤트


페이로드

  1. Delete 후 이벤트

페이로드

카프카 커넥트 인스턴스를 종료해도 다시 시작한 순간부터 이벤트 조회 시작함.

profile
아륨까라고 읽습니다.

0개의 댓글