docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8
-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name zookeeper
컨테이너의 이름입니다.
-p 2181:2181 -p 2888:2888 -p 3888:3888
컨테이너의 포트 3개를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 다른 컨테이너가 Zookeeper와 통신할 수 있습니다.
Starting up in standalone mode
ZooKeeper JMX enabled by default
Using config: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,417 - INFO [main:QuorumPeerConfig@134] - Reading configuration from: /zookeeper/conf/zoo.cfg
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@78] - autopurge.snapRetainCount set to 3
2017-09-21 07:15:55,419 - INFO [main:DatadirCleanupManager@79] - autopurge.purgeInterval set to 1
...
port 0.0.0.0/0.0.0.0:2181
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8
-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name kafka
컨테이너의 이름입니다.
-p 9092:9092
컨테이너의 9092 포트를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 다른 컨테이 너가 kafka와 통신할 수 있습니다.
--link zookeeper:zookeeper
동일한 Docker 호스트에서 실행 중인 컨테이너에서 zookeeper를 찾고 있다는 것을 컨테이너 에게 알립니다.
...
2017-09-21 07:16:59,085 - INFO [main-EventThread:ZkClient@713] - zookeeper state changed (SyncConnected)
2017-09-21 07:16:59,218 - INFO [main:Logging$class@70] - Cluster ID = LPtcBFxzRvOzDSXhc6AamA
...
2017-09-21 07:16:59,649 - INFO [main:Logging$class@70] - [Kafka Server 1], started
docker run -it --rm --name postgres -p 5432:5432 -e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=password debezium/postgres:1.8
-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name postgres
컨테이너의 이름입니다.
-p 5432:5432
컨테이너의 5432 포트를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 다른 컨테이 너가 postgres와 통신할 수 있습니다.
-e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=password
debezium postgreSQL 유저명과 비밀번호입니다.
...
server started
CREATE DATABASE
...
docker exec -ti postgres /bin/bash
cat /var/lib/postgresql/data/postgresql.conf
# LOGGING
# log_min_error_statement = fatal
# log_min_messages = DEBUG1
# CONNECTION
listen_addresses = '*'
# MODULES
shared_preload_libraries = 'decoderbufs,wal2json'
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 4 # max number of walsender processes (change requires restart)
#wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
psql -U postgresuser
CREATE DATABASE dbbase;
\c dbbase
CREATE TABLE sample (
id int,
name varchar(8),
age int,
datetime_created timestamp,
datetime_updated timestamp,
primary key(id)
);
ALTER TABLE sample replica identity FULL;
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:1.8
-it
터미널의 표준 입력 및 출력이 컨테이너에 연결됩니다.
--rm
컨테이너가 중지되면 제거됩니다.
--name connect
컨테이너의 이름입니다.
-p 8083:8083
컨테이너의 8083 포트를 Docker 호스트의 동일한 포트에 매핑합니다. 이를 통해 컨테이너 외 부의 어플리케이션은 Kafka Connect의 REST API를 사용하여 새 컨테이너 인스턴스를 설정하 고 관리 할 수 있습니다.
--link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres
zookeeper, kafka, postgres 컨테이너에 연결합니다.
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses
debezium 이미지에 필요한 환경 변수를 설정합니다.
...
2020-02-06 15:48:33,939 INFO || Kafka version: 3.0.0 [org.apache.kafka.common.utils.AppInfoParser]
...
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset -1 [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2020-02-06 15:48:34,485 INFO || [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
curl -H "Accept:application/json" localhost:8083/
{"version":"3.0.0","commit":"cb8625948210849f"}
curl -H "Accept:application/json" localhost:8083/connectors/
[]
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "fulfillment-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgresuser", "database.password": "password", "database.dbname" : "dbbase", "database.server.name": "fulfillment", "table.include.list": "public.sample"}}'
curl -H "Accept:application/json" localhost:8083/connectors/
[fulfillment-connector]
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/fulfillment-connector
HTTP/1.1 200 OK
Date: Thu, 06 Feb 2020 22:12:03 GMT
Content-Type: application/json
Content-Length: 531
Server: Jetty(9.4.20.v20190813)
{
"name": "fulfillment-connector",
...
"tasks": [
{
"connector": "fulfillment-connector",
"task": 0
}
]
}
docker run -it --rm --name consumer --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:1.8 watch-topic -a fulfillment.public.sample | grep --line-buffered '^{' | sudo python3 -u <filepath>/samplestream.py > <filepath>/samplestream.txt
-watch-topic
fulfillment.public.sample topic을 봅니다.
-a
topic이 생성된 이후의 모든 이벤트를 봅니다.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic fulfillment.public.sample:
insert into sample values (1001, 'H1', 10, now(), now());
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32",
...
"payload":{"before":null,"after":{"id":1001,"name":"H1","age":10,"datetime_created":1648719890915312,"datetime_updated":1648719890915312},"source":{"version":"1.8.1.Final","connector":"postgresql","name":"fulfillment","ts_ms":1648719890941,"snapshot":"false","db":"dbbase","sequence":"[null,\"23719176\"]","schema":"public","table":"sample","txId":556,"lsn":23719176,"xmin":null},"op":"c","ts_ms":1648719891348,"transaction":null}}
tail -f samplestream.txt
1001,H1,10,1648719890915312,1648719890915312,2022-03-31-20-00-49,c
1001,H1,11,1648719890915312,1648719890915312,2022-03-31-20-00-49,u
1001,None,None,None,None,2022-03-31-20-00-49,d
1002,H2,20,1648724488499908,1648724488499908,2022-03-31-20-01-28,c
import json
import os
import sys
from datetime import datetime
def parse_crud(payload, op_type):
current_ts = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
out_list = []
out_list.append(payload.get('id'))
out_list.append(payload.get('name'))
out_list.append(payload.get('age'))
out_list.append(payload.get('datetime_created'))
out_list.append(payload.get('datetime_updated'))
out_list.append(current_ts)
out_list.append(op_type)
return out_list
def parse_payload(input_raw_json):
input_json = json.loads(input_raw_json)
op_type = input_json.get('payload', {}).get('op')
if op_type == 'c':
return parse_crud(
input_json.get('payload', {}).get('after', {}),
op_type
)
elif op_type == 'd':
return parse_crud(
input_json.get('payload', {}).get('before', {}),
op_type
)
elif op_type == 'u':
return parse_crud(
input_json.get('payload', {}).get('after', {}),
op_type
)
return []
for line in sys.stdin:
data = parse_payload(line)
log_str = ','.join([str(elt) for elt in data])
print(log_str, flush=True)
docker stop $(docker ps -aq)
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka jdbc-sink
curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/fulfillment-connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors/ -d '{"name": "source-connector", "config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgresuser", "database.password": "password", "database.dbname" : "dbbase", "database.server.name": "fulfillment", "table.include.list": "public.sample",
"transforms": "route","transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "targetsample"}}'
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "jdbc-sink","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1","topics": "targetsample","connection.url": "jdbc:postgresql://postgres:5432/dbbase?user=postgresuser&password=password", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "insert.mode": "upsert", "pk.fields": "id", "pk.mode": "record_key", "delete.enabled": "true" }}'
select * from sample;
select * from targetsample;
위키피디아 Change Data Capture
debezium 1.8 공식문서
postgreSQL 14 공식문서
confluent jdbcSinkConnector 공식문서