Debezium을 이용한 PostgreSQL CDC(Change Data Capture)

이현우·2022년 3월 31일
0

Change Data Capture?

  • 데이터베이스에서 변경 데이터 캡처(change data capture, CDC)는 변경된 데이터를 사용하여 동작을 취할 수 있도록 데이터를 결정하고 추적하기 위해 사용되는 여러 소프트웨어 디자인 패턴들의 모임이다.

Debezium

  • Debezium은 기존 데이터베이스 이벤트를 스트림으로 바꾸는 변환하는 플랫폼으로 어플리케이션은 데이터베이스내의 행 레벨의 변경을 확인 할 수 있다.

Debezium 아키텍쳐

  • debezium connectors는 MySQL, PostgreSQL 데이터베이스에 대한 CDC를 지원합니다.
  • kafka Connect는 kafka broker와 별도의 서비스로 운영됩니다.
  • sink connector를 이용하여 다른 데이터베이스로의 변환이 가능합니다.

Start Example

    1. Start Zookeeper
    1. Start Kafka
    1. Start PostgreSQL
    1. Start PostgrSQL command line client
    1. Start Kafka Connect
    1. Start consumer
    1. Use JDBCSinkConnector

1. Start Zookeeper

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8

-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name zookeeper
컨테이너의 이름입니다.
-p 2181:2181 -p 2888:2888 -p 3888:3888
컨테이너의 포트 3개를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 다른 컨테이너가 Zookeeper와 통신할 수 있습니다.

  • 실행확인
Starting up in standalone mode
ZooKeeper JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,417 - INFO  [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,419 - INFO  [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2017-09-21 07:15:55,419 - INFO  [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
...
port 0.0.0.0/0.0.0.0:2181  

2. Start Kafka

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8

-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name kafka
컨테이너의 이름입니다.
-p 9092:9092
컨테이너의 9092 포트를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 다른 컨테이 너가 kafka와 통신할 수 있습니다.
--link zookeeper:zookeeper
동일한 Docker 호스트에서 실행 중인 컨테이너에서 zookeeper를 찾고 있다는 것을 컨테이너 에게 알립니다.

  • 실행확인
...
2017-09-21 07:16:59,085 - INFO  [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
2017-09-21 07:16:59,218 - INFO  [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA
...
2017-09-21 07:16:59,649 - INFO  [main:Logging$class@70] - [Kafka Server 1], started

3. Start PostgreSQL

docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=password debezium/postgres:1.8

-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name postgres
컨테이너의 이름입니다.
-p 5432:5432
컨테이너의 5432 포트를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 다른 컨테이 너가 postgres와 통신할 수 있습니다.
-e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=password
debezium postgreSQL 유저명과 비밀번호입니다.

  • 실행확인
...
server started
CREATE DATABASE
...

4. Start PostgreSQL command line client

  • docker 터미널 실행
docker exec -ti postgres /bin/bash
  • postgres config 확인
cat /var/lib/postgresql/data/postgresql.conf
# LOGGING
# log_min_error_statement = fatal
# log_min_messages = DEBUG1

# CONNECTION
listen_addresses = '*'

# MODULES
shared_preload_libraries = 'decoderbufs,wal2json'

# REPLICATION
wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 4             # max number of walsender processes (change requires restart)
#wal_keep_segments = 4          # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s       # in milliseconds; 0 disables
max_replication_slots = 4       # max number of replication slots (change requires restart)
  • wal_level
    - minimal : 갑작스런 종료로부터 다시 시작하는데 필요한 최소 정보.
    - archive : 데이터베이스 엔진이 WAL 보관을 할 수 있도록 함.
    - hot_standby : 데이터베이스 엔진이 서버의 읽기 전용 복제본을 생성할 수 있도록 함.
    - logical : WAL 데이터를 다른 시스템이 사용할 수 있도록 하는데 필요한 모든 정보.
  • max_wal_senders
    - WAL 발신자는 WAL을 수신자로 보내기 위해 데이터베이스에서 실행되는 프로세스입니다.
  • psql 실행
psql -U postgresuser
  • Sample Data 생성
CREATE DATABASE dbbase;
\c dbbase
CREATE TABLE sample (
    id int,
    name varchar(8),
    age int,
    datetime_created timestamp,
    datetime_updated timestamp,
    primary key(id)
);
ALTER TABLE sample replica identity FULL;
  • replica identity (DEFAULT / NOTHING / FULL / INDEX)
    - WAL에 기록되는 세부 정보의 양을 결정하는 옵션.
    - DEFAULT : 기본 키 열의 이전 값을 기록합니다.
    - NOTHING : 이전 행의 대한 정보를 기록하지 않습니다.
    - FULL : 모든 열에 대한 이전 정보를 기록합니다.
    - INDEX : 명명된 인덱스에 포함된 열의 이전 값을 기록합니다.

5. Start Kafka Connect

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.8

-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name connect
컨테이너의 이름입니다.
-p 8083:8083
컨테이너의 8083 포트를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 컨테이너 외 부의 어플리케이션은 Kafka Connect의 REST API를 사용하여 새 컨테이너 인스턴스를 설정하 고 관리 할 수 있습니다.
--link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres
zookeeper, kafka, postgres 컨테이너에 연결합니다.
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses
debezium 이미지에 필요한 환경 변수를 설정합니다.

  • 실행확인
...
2020-02-06 15:48:33,939 INFO   ||  Kafka version: 3.0.0   [org.apache.kafka.common.utils.AppInfoParser]
...
2020-02-06 15:48:34,485 INFO   ||  [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-02-06 15:48:34,485 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
  • kafka connect 서비스 상태 확인
curl -H "Accept:application/json" localhost:8083/
{"version":"3.0.0","commit":"cb8625948210849f"}
  • kafka connect에 등록된 커넥터 목록 확인
curl -H "Accept:application/json" localhost:8083/connectors/
[]
  • kafka connect 등록
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "fulfillment-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgresuser", "database.password": "password", "database.dbname" : "dbbase", "database.server.name": "fulfillment", "table.include.list": "public.sample"}}'
  • kafka connect에 등록된 커넥터 목록 확인
curl -H "Accept:application/json" localhost:8083/connectors/
[fulfillment-connector]
  • connect의 task 확인
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/fulfillment-connector
HTTP/1.1 200 OK
Date: Thu, 06 Feb 2020 22:12:03 GMT
Content-Type: application/json
Content-Length: 531
Server: Jetty(9.4.20.v20190813)

{
  "name": "fulfillment-connector",
  ...
  "tasks": [
    {
      "connector": "fulfillment-connector",  
      "task": 0
    }
  ]
}

6. Start consumer

  • Sample consumer 확인
docker run -it --rm --name consumer --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.8 watch-topic -a fulfillment.public.sample | grep --line-buffered '^{' | sudo python3 -u <filepath>/samplestream.py > <filepath>/samplestream.txt

-watch-topic
fulfillment.public.sample topic을 봅니다.
-a
topic이 생성된 이후의 모든 이벤트를 봅니다.

  • 실행확인
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic fulfillment.public.sample:
  • insert data
insert into sample values (1001, 'H1', 10, now(), now());
  • consumer 확인
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32",
...
"payload":{"before":null,"after":{"id":1001,"name":"H1","age":10,"datetime_created":1648719890915312,"datetime_updated":1648719890915312},"source":{"version":"1.8.1.Final","connector":"postgresql","name":"fulfillment","ts_ms":1648719890941,"snapshot":"false","db":"dbbase","sequence":"[null,\"23719176\"]","schema":"public","table":"sample","txId":556,"lsn":23719176,"xmin":null},"op":"c","ts_ms":1648719891348,"transaction":null}}
  • samplestream.txt 확인
    - Insert, Update, Delete Evnet시 한줄 씩 생성되는 것을 볼 수 있다.
tail -f samplestream.txt
1001,H1,10,1648719890915312,1648719890915312,2022-03-31-20-00-49,c
1001,H1,11,1648719890915312,1648719890915312,2022-03-31-20-00-49,u
1001,None,None,None,None,2022-03-31-20-00-49,d
1002,H2,20,1648724488499908,1648724488499908,2022-03-31-20-01-28,c
  • samplestream.py
import json
import os
import sys
from datetime import datetime

def parse_crud(payload, op_type):
    current_ts = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')  
    
    out_list = []
    out_list.append(payload.get('id'))
    out_list.append(payload.get('name'))
    out_list.append(payload.get('age'))
    out_list.append(payload.get('datetime_created'))
    out_list.append(payload.get('datetime_updated'))
    out_list.append(current_ts)
    out_list.append(op_type)

    return out_list

def parse_payload(input_raw_json):
    input_json = json.loads(input_raw_json)
    op_type = input_json.get('payload', {}).get('op')

    if op_type == 'c':
        return parse_crud(
            input_json.get('payload', {}).get('after', {}),
            op_type
        )
    elif op_type == 'd':
        return parse_crud(
            input_json.get('payload', {}).get('before', {}),
            op_type
        )
    elif op_type == 'u':
        return parse_crud(
            input_json.get('payload', {}).get('after', {}),
            op_type
        )

    return []

for line in sys.stdin:
    data = parse_payload(line)
    log_str = ','.join([str(elt) for elt in data])
    print(log_str, flush=True)
  • stop docker
docker stop $(docker ps -aq)

7. Use JdbcSinkConnector

  • jdbcSinkConnector는 confluent에서 개발되었기 때문에 kafka connector에 파일을 추가해 주어야 한다.
    - /kafka/libs 폴더에 PostgreSQL JDBC driver를 넣어야 한다.
    - /kafka/connect/kafka-connect-jdbc 폴더에 Kafka Connect JDBC을 넣어야 한다.
  • debezium-examples/end-to-end-demo/debezium-jdbc/의 Dockerfile을 빌드해서 사용한다.
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka jdbc-sink
  • connector 삭제 방법
curl -i -X DELETE -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/fulfillment-connector
  • connector 생성
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '{"name": "source-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgresuser", "database.password": "password", "database.dbname" : "dbbase", "database.server.name": "fulfillment", "table.include.list": "public.sample",
"transforms": "route","transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "targetsample"}}'
  • sinkConnector 생성
curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "jdbc-sink","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1","topics": "targetsample","connection.url": "jdbc:postgresql://postgres:5432/dbbase?user=postgresuser&password=password", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true",  "insert.mode": "upsert",  "pk.fields": "id",  "pk.mode": "record_key", "delete.enabled": "true" }}'
  • sample table에 Insert, Update, Delete시 target table에 데이터의 변경을 확인 할 수 있다.
  • Data 확인
select * from sample;
select * from targetsample;

Reference

위키피디아 Change Data Capture
debezium 1.8 공식문서
postgreSQL 14 공식문서
confluent jdbcSinkConnector 공식문서

0개의 댓글