MySQL → Debezium → Kafka Connect 단계별 실습 가이드 (Docker Compose 포함)

GarionNachal·2026년 2월 4일

kafka

목록 보기
23/23

이 글은 로컬 환경에서 MySQL binlog 기반 CDC(Change Data Capture) 를 구성해, Debezium 커넥터가 변경 이벤트를 Kafka 토픽으로 흘려보내는 것까지를 “끝까지” 실습하는 가이드입니다. Debezium은 보통 Apache Kafka Connect 위에 배포하는 형태가 가장 흔합니다. Source

아키텍처 그림(개념 잡기):
Debezium Architecture
Source


목표

  • Docker Compose로 MySQL + Kafka + Kafka Connect(Debezium 포함) 실행
  • MySQL에 샘플 테이블 생성/변경
  • Kafka Connect REST API로 Debezium MySQL Source Connector 등록
  • Kafka 토픽에서 변경 이벤트를 확인

사전 준비물

  • Docker / Docker Compose 설치
  • 로컬 포트 사용:
    • MySQL: 3306
    • Kafka Connect: 8083

0) 실습 폴더 준비

mkdir debezium-mysql-lab && cd debezium-mysql-lab

1) Docker Compose 작성 (MySQL + Kafka + Debezium Connect)

Debezium 공식 예제 저장소에 MySQL 기반 compose 예시가 있습니다. (필요 시 비교 참고용)
debezium-examples tutorial compose Source

아래는 “최소 실습형” compose 예시입니다(핵심은 MySQL binlog 설정Kafka Connect 컨테이너(debezium/connect) 입니다.

파일명: docker-compose.yml

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.5.3
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  mysql:
    image: mysql:8.0
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: rootpw
      MYSQL_USER: debezium
      MYSQL_PASSWORD: dbz
      MYSQL_DATABASE: inventory
    command: >
      --server-id=223344
      --log-bin=mysql-bin
      --binlog_format=ROW
      --binlog_row_image=FULL
      --gtid_mode=ON
      --enforce_gtid_consistency=ON
    # 참고: Debezium MySQL 커넥터는 binlog를 읽으며 ROW 포맷을 사용합니다.
    # 관련 개념/설정은 커넥터 문서에서 확인 가능합니다.
    # [Source](https://debezium.io/documentation/reference/stable/connectors/mysql.html)
    healthcheck:
      test: ["CMD", "mysqladmin", "ping", "-prootpw"]
      interval: 5s
      timeout: 3s
      retries: 30

  connect:
    image: debezium/connect:latest
    depends_on: [kafka, mysql]
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: "kafka:9092"
      GROUP_ID: "1"
      CONFIG_STORAGE_TOPIC: "connect-configs"
      OFFSET_STORAGE_TOPIC: "connect-offsets"
      STATUS_STORAGE_TOPIC: "connect-status"
      # JSON으로 빠르게 확인하려고 converter를 JSON으로 둡니다
      KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      # Debezium는 Kafka Connect 기반으로 배포하는 게 일반적입니다.
      # [Source](https://debezium.io/documentation/reference/stable/architecture.html)

왜 MySQL에 binlog 설정이 필요할까?
Debezium MySQL 커넥터는 MySQL binlog(트랜잭션 로그)를 읽어 행 단위 변경(INSERT/UPDATE/DELETE)을 이벤트로 만듭니다. Source


2) 컨테이너 기동

docker compose up -d
docker compose ps

Kafka Connect REST가 살아있는지 확인:

curl -s http://localhost:8083/ | jq .
# jq가 없다면 jq 없이 그냥 출력해도 됩니다.

3) MySQL 접속 후 샘플 테이블 만들기

docker compose exec mysql mysql -uroot -prootpw

MySQL 콘솔에서 아래 실행:

USE inventory;

CREATE TABLE customers (
  id INT PRIMARY KEY AUTO_INCREMENT,
  name VARCHAR(255) NOT NULL,
  email VARCHAR(255),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO customers(name, email) VALUES
('alice', 'alice@example.com'),
('bob', 'bob@example.com');

UPDATE customers SET email='alice+1@example.com' WHERE name='alice';
DELETE FROM customers WHERE name='bob';

4) Debezium MySQL 커넥터 등록 (Kafka Connect REST API)

Debezium 튜토리얼에서도 “MySQL 커넥터를 배포하고 변경 이벤트를 확인”하는 흐름을 보여줍니다. Source

커넥터 이름 예: mysql-inventory-connector
토픽 프리픽스 예: dbz

curl -i -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-inventory-connector",
    "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "tasks.max": "1",

      "database.hostname": "mysql",
      "database.port": "3306",
      "database.user": "debezium",
      "database.password": "dbz",

      "database.server.id": "184054",
      "topic.prefix": "dbz",

      "database.include.list": "inventory",
      "table.include.list": "inventory.customers",

      "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
      "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
  }'

여기서 중요한 포인트:

  • topic.prefix: Debezium이 발행할 Kafka 토픽 이름의 앞부분
  • database.include.list, table.include.list: 캡처 대상 범위를 좁혀서 실습을 단순하게
  • schema.history.internal.kafka.topic: 커넥터가 스키마 히스토리를 저장하는 내부 토픽 (Debezium 튜토리얼/커넥터 문서에 언급) Source

5) 커넥터 상태 확인

curl -s http://localhost:8083/connectors | jq .
curl -s http://localhost:8083/connectors/mysql-inventory-connector/status | jq .

Kafka Connect/커넥터 운영 기본은 Debezium 아키텍처 설명에서도 연결됩니다. Source


6) Kafka 토픽에서 변경 이벤트 확인

커넥터가 정상 등록되면 보통 다음과 같은 토픽이 생깁니다(환경/설정에 따라 다를 수 있음):

  • dbz.inventory.customers (핵심 변경 이벤트 토픽)

토픽 컨슘(간단 확인용):

docker compose exec kafka bash -lc "kafka-console-consumer --bootstrap-server kafka:9092 --topic dbz.inventory.customers --from-beginning"

이 상태에서 MySQL에 추가 변경을 넣어보세요:

docker compose exec mysql mysql -uroot -prootpw -e \
"USE inventory; INSERT INTO customers(name,email) VALUES ('charlie','charlie@example.com');"

콘솔 컨슈머에 새 이벤트가 찍히면 성공입니다.


(선택) 7) 자주 막히는 포인트 체크리스트

A. MySQL binlog_format이 ROW가 아니면?

Debezium MySQL 커넥터는 binlog를 읽고, 일반적으로 ROW 기반으로 변경을 캡처합니다. 설정/요건은 커넥터 문서를 확인하세요. Source

B. 스키마 히스토리 토픽이 문제를 일으키면?

Debezium은 커넥터 동작을 위해 스키마 히스토리 토픽을 사용합니다. 토픽 보존/삭제/권한 이슈가 나면 커넥터가 재기동 시 복구에 실패할 수 있어 운영에서 특히 주의합니다. 개념은 튜토리얼에도 나옵니다. Source


참고 자료

  • Debezium 아키텍처( Kafka Connect 기반 배포 ) Source
  • Debezium MySQL 커넥터( binlog 기반 ) Source
  • Debezium 공식 튜토리얼(MySQL로 이벤트 스트림 보기) Source
  • Debezium 예제 모음(Compose 포함) Source

profile
AI를 꿈꾸는 BackEnd개발자

0개의 댓글