이 글은 로컬 환경에서 MySQL binlog 기반 CDC(Change Data Capture) 를 구성해, Debezium 커넥터가 변경 이벤트를 Kafka 토픽으로 흘려보내는 것까지를 “끝까지” 실습하는 가이드입니다. Debezium은 보통 Apache Kafka Connect 위에 배포하는 형태가 가장 흔합니다. Source
아키텍처 그림(개념 잡기):
Source
33068083mkdir debezium-mysql-lab && cd debezium-mysql-lab
Debezium 공식 예제 저장소에 MySQL 기반 compose 예시가 있습니다. (필요 시 비교 참고용)
debezium-examples tutorial compose Source
아래는 “최소 실습형” compose 예시입니다(핵심은 MySQL binlog 설정과 Kafka Connect 컨테이너(debezium/connect) 입니다.
파일명:
docker-compose.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.5.3
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
mysql:
image: mysql:8.0
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: rootpw
MYSQL_USER: debezium
MYSQL_PASSWORD: dbz
MYSQL_DATABASE: inventory
command: >
--server-id=223344
--log-bin=mysql-bin
--binlog_format=ROW
--binlog_row_image=FULL
--gtid_mode=ON
--enforce_gtid_consistency=ON
# 참고: Debezium MySQL 커넥터는 binlog를 읽으며 ROW 포맷을 사용합니다.
# 관련 개념/설정은 커넥터 문서에서 확인 가능합니다.
# [Source](https://debezium.io/documentation/reference/stable/connectors/mysql.html)
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-prootpw"]
interval: 5s
timeout: 3s
retries: 30
connect:
image: debezium/connect:latest
depends_on: [kafka, mysql]
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: "kafka:9092"
GROUP_ID: "1"
CONFIG_STORAGE_TOPIC: "connect-configs"
OFFSET_STORAGE_TOPIC: "connect-offsets"
STATUS_STORAGE_TOPIC: "connect-status"
# JSON으로 빠르게 확인하려고 converter를 JSON으로 둡니다
KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
KEY_CONVERTER_SCHEMAS_ENABLE: "false"
VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
# Debezium는 Kafka Connect 기반으로 배포하는 게 일반적입니다.
# [Source](https://debezium.io/documentation/reference/stable/architecture.html)
왜 MySQL에 binlog 설정이 필요할까?
Debezium MySQL 커넥터는 MySQL binlog(트랜잭션 로그)를 읽어 행 단위 변경(INSERT/UPDATE/DELETE)을 이벤트로 만듭니다. Source
docker compose up -d
docker compose ps
Kafka Connect REST가 살아있는지 확인:
curl -s http://localhost:8083/ | jq .
# jq가 없다면 jq 없이 그냥 출력해도 됩니다.
docker compose exec mysql mysql -uroot -prootpw
MySQL 콘솔에서 아래 실행:
USE inventory;
CREATE TABLE customers (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
email VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO customers(name, email) VALUES
('alice', 'alice@example.com'),
('bob', 'bob@example.com');
UPDATE customers SET email='alice+1@example.com' WHERE name='alice';
DELETE FROM customers WHERE name='bob';
Debezium 튜토리얼에서도 “MySQL 커넥터를 배포하고 변경 이벤트를 확인”하는 흐름을 보여줍니다. Source
커넥터 이름 예:
mysql-inventory-connector
토픽 프리픽스 예:dbz
curl -i -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "mysql-inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "dbz",
"database.include.list": "inventory",
"table.include.list": "inventory.customers",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}'
여기서 중요한 포인트:
topic.prefix: Debezium이 발행할 Kafka 토픽 이름의 앞부분database.include.list, table.include.list: 캡처 대상 범위를 좁혀서 실습을 단순하게schema.history.internal.kafka.topic: 커넥터가 스키마 히스토리를 저장하는 내부 토픽 (Debezium 튜토리얼/커넥터 문서에 언급) Sourcecurl -s http://localhost:8083/connectors | jq .
curl -s http://localhost:8083/connectors/mysql-inventory-connector/status | jq .
Kafka Connect/커넥터 운영 기본은 Debezium 아키텍처 설명에서도 연결됩니다. Source
커넥터가 정상 등록되면 보통 다음과 같은 토픽이 생깁니다(환경/설정에 따라 다를 수 있음):
dbz.inventory.customers (핵심 변경 이벤트 토픽)토픽 컨슘(간단 확인용):
docker compose exec kafka bash -lc "kafka-console-consumer --bootstrap-server kafka:9092 --topic dbz.inventory.customers --from-beginning"
이 상태에서 MySQL에 추가 변경을 넣어보세요:
docker compose exec mysql mysql -uroot -prootpw -e \
"USE inventory; INSERT INTO customers(name,email) VALUES ('charlie','charlie@example.com');"
콘솔 컨슈머에 새 이벤트가 찍히면 성공입니다.
Debezium MySQL 커넥터는 binlog를 읽고, 일반적으로 ROW 기반으로 변경을 캡처합니다. 설정/요건은 커넥터 문서를 확인하세요. Source
Debezium은 커넥터 동작을 위해 스키마 히스토리 토픽을 사용합니다. 토픽 보존/삭제/권한 이슈가 나면 커넥터가 재기동 시 복구에 실패할 수 있어 운영에서 특히 주의합니다. 개념은 튜토리얼에도 나옵니다. Source