
CDC는 데이터베이스에서 발생하는 Insert Update Delete와 같은 변경 사항을 실시간에 가깝게 감지해서 외부 시스템으로 전달하는 기술이다.
일정 주기마다 전체 테이블을 조회하는 방식이 아니라 데이터베이스 내부의 트랜잭션 로그를 읽어서 변경된 데이터만 추출한다.
이 덕분에 원본 DB에 주는 부하는 적고 변경 이력은 빠르게 외부로 흘려 보낼 수 있다.
복제는 데이터베이스 간 데이터를 동일하게 맞추는 것이 목적이다.
주로 장애 대비나 읽기 부하 분산을 위해 사용하고 동일한 데이터를 다른 DB 인스턴스에 그대로 복사한다.
CDC는 데이터 그 자체보다 변경 행위에 초점을 둔다.
어떤 row가 생성되었는지 어떤 값으로 수정되었는지 어떤 row가 삭제되었는지를 이벤트 형태로 외부로 내보낸다.
정리하면
로그 기반 CDC는 데이터베이스가 내부적으로 유지하는 트랜잭션 로그를 읽어 변경 사항을 추출하는 방식이다.
로그에는 Insert Update Delete가 어떻게 수행되었는지가 기록되며 CDC 시스템은 이 로그를 분석해 실제로 어떤 레코드가 어떻게 바뀌었는지 확인한다.
로그 기반 CDC에서 자주 등장하는 개념은 다음과 같다.
Statement 기반
실행된 SQL 문장을 로그에 기록한다. 예를 들어 UPDATE orders SET status = 'DONE' WHERE id = 10 같은 형태가 로그에 남는다.
어떤 컬럼의 값이 어떻게 바뀌었는지 직접적으로 드러나지 않고 조건에 따라 결과가 달라질 수 있어 재현성이 떨어진다.
Row 기반
변경된 레코드 값을 통째로 로그에 기록한다.
Insert는 after 값만 Update는 before와 after 값을 모두 Delete는 before 값만 기록한다.
Debezium 같은 CDC 도구는 이 Row 기반 로그를 읽어서 변경 메시지를 만든다.
Row 기반 로그는 어떤 컬럼까지 포함할지에 따라 보통 다음과 같이 나뉜다.
full
레코드의 모든 컬럼을 그대로 기록한다.
minimal
PK와 변경에 필요한 컬럼만 기록한다.
noblob
full과 비슷하지만 용량이 큰 blob text 컬럼은 제외한다.
Debezium은 로그 기반 CDC를 Kafka Connect Source Connector 형태로 구현한 오픈소스 프레임워크다.
MySQL PostgreSQL Oracle SQL Server 등 다양한 DB의 트랜잭션 로그를 읽어서 Kafka 토픽으로 변경 이벤트를 보낸다.
JDBC Source Connector와 비교하면 다음과 같은 차이가 있다.
그래서 Debezium은
{
"name": "mysql_cdc_oc_source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "192.168.56.101",
"database.port": "3306",
"database.user": "connect_dev",
"database.password": "connect_dev",
"database.allowPublicKeyRetrieval": "true",
"database.server.id": "184054",
"database.server.name": "mysql_oc",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.mysql_oc",
"table.include.list": "om.orders,om.order_items",
"include.schema.changes": "true",
"decimal.handling.mode": "precise",
"time.precision.mode": "connect",
"database.connectionTimeZone": "Asia/Seoul",
"snapshot.mode": "initial"
}
}
name : Connector 이름
connector.class : 사용할 Debezium Connector 클래스
tasks.max : 생성할 Task 최대 개수
database.hostname : CDC 대상 MySQL 서버 주소
database.port : MySQL 포트
database.user : Binlog 읽기 권한이 있는 사용자
database.password : 위 사용자 비밀번호
database.allowPublicKeyRetrieval : MySQL 8 환경에서 public key를 획득하도록 허용
database.server.id : Replication용 server id
database.server.name : Kafka 토픽 prefix가 되는 논리적 서버명
database.history.kafka.bootstrap.servers : 스키마 변경 이력을 저장할 Kafka 브로커
database.history.kafka.topic : 스키마 변경 이력 저장 토픽
table.include.list : CDC 대상 테이블 목록
include.schema.changes : 테이블의 DDL 변경 사항을 Kafka로 전송
decimal.handling.mode : decimal의 precision scale 유지 방식
time.precision.mode : 시간 타입의 정밀도 표현 방식
database.connectionTimeZone : 날짜 시간 해석 기준 타임존
snapshot.mode : Snapshot 초기 수행 방식
하나의 Source Connector로 여러 테이블을 추출할 수 있다.
하나의 테이블은 하나의 Topic으로 생성된다.
Source 테이블의 PK는 자동으로 Kafka 메시지의 Key로 생성된다.
토픽 이름은 기본적으로 database.server.name.database.table 형식이다.
SMT를 통해 토픽 이름을 변경할 수 있다.
Source 테이블의 DDL 변경 사항을 Kafka에 저장한다.
메시지는 JDBC Source와 다르게 before after 구조다.
JDBC Sink Connector에서 사용하려면 SMT로 after-only 형태로 바꾸어야 한다.
tombstone 메시지 생성을 위해 SMT를 설정해야 한다.
date datetime numeric decimal 등은 Sink가 이해할 수 있는 타입으로 변환해야 한다 .
{
"before": {
"order_id": 1001,
"status": "READY"
},
"after": {
"order_id": 1001,
"status": "SHIPPED"
},
"op": "u",
"ts_ms": 1710000000100
}
메시지 구조는 다음과 같다.
null 메세지는 after만 존재한다 JDBC Sink Connector와 연동하기 위해서는 Debezium 메시지를 after-only 형태로 바꾸는 것이 일반적이다.
가장 많이 사용하는 SMT는 ExtractNewRecordState이다.
{
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
SMT 적용 후 메시지는 다음과 같이 변환된다.
{
"order_id": 1001,
"status": "SHIPPED"
}