mysql-sink:
image: mysql:8.0
container_name: mysql-sink
ports:
- 3307:3306
environment:
MYSQL_ROOT_PASSWORD: sasd
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
volumes:
- /Users/jiminlee/TempCodes/CDC/mysql-sink/data:/var/lib/mysql
docker-compose -f docker-compose.yml up -d
docker-compose파일을 수정하여 쉽게 mysql sink용 컨테이너를 생성해 주었다.
mysql -u root -p
create database sinkdb;
use sinkdb;
CREATE TABLE accounts (
account_id VARCHAR(255),
role_id VARCHAR(255),
user_name VARCHAR(255),
user_description VARCHAR(255),
update_date DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (account_id)
);
docker exec -it mysql-sink bin/bash
use mysql;
// mysqluser 가 추가 되어 있는지 확인
select host, user from user;
// mysqluser 없으면 생성
CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
FLUSH PRIVILEGES;
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
위 링크에서 다운로드후, Kafka 컨테이너로 업로드
#파일 업로드
docker cp confluentinc-kafka-connect-jdbc-10.7.0.zip kafka:/opt/kafka_2.13-2.8.1/connectors/
cd /opt/kafka_2.13-2.8.1/connectors
unzip confluentinc-kafka-connect-jdbc-10.7.0.zip
source connector를 설치할 때 이미 /opt/kafka/config/connect-distributed.properties
파일의 plugin 경로를 수정해두었다.
connect-distributed.sh /opt/kafka/config/connect-distributed.properties
worker, version, commit 및 Kafka 클러스터 ID에 대한 kafka Connect 클러스터 정보를 확인해보자.
curl http://localhost:8083/
curl --location --request GET 'localhost:8083/connector-plugins'
io.confluent.connect.jdbc.JdbcSinkConnector, io.confluent.connect.jdbc.JdbcSourceConnector 가 있어야 한다.
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "sink-test-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://mysql-sink:3306/sinkdb?user=mysqluser&password=mysqlpw",
"auto.create": "false",
"auto.evolve": "false",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"table.name.format":"${topic}",
"tombstones.on.delete": "true",
"connection.user": "mysqluser",
"connection.password": "mysqlpw",
"topics.regex": "dbserver1.testdb.(.*)",
// "topics": "dbserver1.testdb.accounts", <- 하나만 명시
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
// "transforms.unwrap.drop.tombstones": "false",
// "transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date"
}
}'
# 목록
curl --location --request GET 'http://localhost:8083/connectors'
# 상세정보
curl --location --request GET 'http://localhost:8083/connectors/sink-test-connector/config ' \
--header 'Content-Type: application/json'
# 삭제
curl --location --request DELETE 'http://localhost:8083/connectors/sink-test-connector'
이미 카프카 상에 생성되어있던 토픽으로 sinkDB가 업데이트 되어있는 것을 확인할 수 있다.
INSERT INTO accounts VALUES ("111111", "111", "Jimin", "ADD", "2021-08-16 10:11:12");
INSERT INTO accounts VALUES ("222222", "222", "Lee", "FROM", "2021-08-16 11:12:13");
INSERT INTO accounts VALUES ("333333", "333", "Test", "SOURCE", "2021-08-16 12:13:14");
UPDATE accounts SET user_name = 'UPDATE!!!' WHERE account_id = 111111
delete from accounts where account_id = 0;
mysql -u root -p
create database testdb;
use testdb;
CREATE TABLE CUSTOM_TABLE (
title VARCHAR(255),
name VARCHAR(255),
id VARCHAR(255),
PRIMARY KEY (id)
);
mysql -u root -p
// 권한설정
use mysql;
// mysqluser 가 추가 되어 있는지 확인
select host, user from user;
// mysqluser 없으면 생성
CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
FLUSH PRIVILEGES;
//데이터 베이스 테이블 생성
create database sinkdb2;
use sinkdb2;
CREATE TABLE CUSTOM_TABLE (
title VARCHAR(255),
name VARCHAR(255),
id VARCHAR(255),
PRIMARY KEY (id)
);
중요한 것은 topics에서 CUSTOM_TABLE만 받아오도록 설정하는 것.
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "sink-test2-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://mysql-sink2:3306/sinkdb2?user=mysqluser&password=mysqlpw",
"auto.create": "false",
**"auto.evolve": "true",**
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"table.name.format":"${topic}",
"tombstones.on.delete": "true",
"connection.user": "mysqluser",
"connection.password": "mysqlpw",
**"topics": "dbserver1.testdb.CUSTOM_TABLE",**
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap, route, TimestampConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Timestamp",
"transforms.TimestampConverter.field": "update_date"
}
}'