debezium튜토리얼을 보고 작성한 글입니다.
docker-compose.yml 파일 출처
해당 리포지터리에서는 mysql이 아닌 다른 데이터베이스 예시도 있습니다.
위의 내용에 더해서 message를 더 편하게 보기 위해 kafka-ui를 추가했다.
(튜토리얼에서는 watch-topic을 사용했지만 그러면 json을 보기 힘들다...)
version: '3'
services:
zookeeper:
image: quay.io/debezium/zookeeper:latest
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:latest
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
mysql:
image: quay.io/debezium/example-mysql:latest
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
connect:
image: quay.io/debezium/connect:latest
ports:
- 8083:8083
links:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
sink_db:
image: 'mysql:latest'
environment:
- 'MYSQL_DATABASE=mydb'
- 'MYSQL_USER=admin'
- 'MYSQL_PASSWORD=admin'
- 'MYSQL_ROOT_PASSWORD=root'
ports:
- 13306:3306
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- 8084:8080
environment:
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_NAME=peters_kafka
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
{
/* 커넥터의 이름 */
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
/* 데이터베이스의 도메인과 포트 */
"database.hostname": "mysql",
"database.port": "3306",
/* db 유저와 비밀번호 */
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
/* topic의 prefix */
"topic.prefix": "dbserver1",
/* 어떤 데이터베이스를 감지할 것인지 */
"database.include.list": "inventory",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
/*kafka host*/
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
}
}
커넥터 호스트에 /connectors에 POST요청으로 저 json을 날려주면 커넥터가 생성된다.
위의 docker-compose를 사용했다면 localhost:8083/connectors로 POST 요청을 보내야한다.
실제로 생성된 토픽은 아래와 같다.
(schema-changes.inventory도 생성된다)
debezium 튜토리얼에선 example-mysql 컨테이너를 사용한다.
해당 컨테이너는 예시 데이터와 debezium 유저가 포함되어 있다.
create database test_db;
use test_db;
CREATE TABLE Users (
UserID INT PRIMARY KEY,
UserName VARCHAR(50) NOT NULL,
Email VARCHAR(100) UNIQUE,
Age INT,
Gender VARCHAR(10)
);
CREATE TABLE Orders (
OrderID INT PRIMARY KEY,
UserID INT,
OrderDate DATE,
TotalAmount DECIMAL(10, 2),
FOREIGN KEY (UserID) REFERENCES Users(UserID)
);
INSERT INTO Users (UserID, UserName, Email, Age, Gender) VALUES
(1, 'Alice', 'alice@example.com', 30, 'Female'),
(2, 'Bob', 'bob@example.com', 28, 'Male'),
(3, 'Charlie', 'charlie@example.com', 35, 'Male'),
(4, 'Diana', 'diana@example.com', 25, 'Female');
INSERT INTO Orders (OrderID, UserID, OrderDate, TotalAmount) VALUES
(101, 1, '2024-07-10', 150.00),
(102, 2, '2024-07-10', 220.50),
(103, 1, '2024-07-11', 75.25),
(104, 3, '2024-07-11', 320.00),
(105, 4, '2024-07-11', 100.75);
chat gpt를 사용하여 예제 데이터를 생성해보았다.
CREATE USER 'test_user'@'%' IDENTIFIED BY 'testpw';
GRANT SELECT, RELOAD,LOCK TABLES, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'test_user'@'%';
show grants for 'test_user'@'%';
튜토리얼의 debezium유저를 참조하여 새 유저를 만들고 권한을 부여해보자.
mysql 커넥터의 경우
LOCK TABLES권한이 추가로 필요하다.
참고 : debezium-document-connectors-mysql
{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "test_user",
"database.password": "testpw",
"database.server.id": "184054",
"topic.prefix": "test_db",
"database.include.list": "test_db",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.test_db"
}
}
localhost:8083/connectors에 GET 요청을 보내면 생성된 커넥터 목록을 받을 수 있다.
"payload": {
/* 생성인 경우 before이 null이다
삭제인 경우 after가 null이 된다. */
"before": null,
"after": {
"OrderID": 101,
"UserID": 1,
"OrderDate": 19914,
"TotalAmount": "Opg="
},
"source": {
"version": "2.6.2.Final",
"connector": "mysql",
"name": "test_db",
"ts_ms": 1720704150000,
"snapshot": "first",
"db": "test_db",
"sequence": null,
"ts_us": 1720704150000000,
"ts_ns": 1720704150000000000,
"table": "Orders",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 3554,
"row": 0,
"thread": null,
"query": null
},
/*
어떤 연산인가
생성은 "r", update는 "u"
삭제인 경우 "d"로 표시된다.
*/
"op": "r",
/* 언제 일어난 연산인가 */
"ts_ms": 1720704150195,
"ts_us": 1720704150195803,
"ts_ns": 1720704150195803000,
"transaction": null
}
after를 잘 보면 TotalAmount부분이 이상한 Opg=로 보이는 데 이는 커넥터의 decimal-handling-mode에 의해 생기는 문제이다.
커넥터 설정할 때 "decimal.handling.mode":"double"를 추가해주면 실수로 표기된다. (공식문서에서는 정밀도 손실이 있을 수 있다고 주의해준다.)
update Users set UserName = 'Bob Update Name' where UserID = 2;
"payload": {
"before": {
"UserID": 2,
"UserName": "Bob",
"Email": "bob@example.com",
"Age": 28,
"Gender": "Male"
},
"after": {
"UserID": 2,
"UserName": "Bob Update Name",
"Email": "bob@example.com",
"Age": 28,
"Gender": "Male"
},
"source": {
"version": "2.6.2.Final",
"connector": "mysql",
"name": "test_db",
"ts_ms": 1720704844000,
"snapshot": "false",
"db": "test_db",
"sequence": null,
"ts_us": 1720704844000000,
"ts_ns": 1720704844000000000,
"table": "Users",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 3791,
"row": 0,
"thread": 8,
"query": null
},
"op": "u",
"ts_ms": 1720704844229,
"ts_us": 1720704844229240,
"ts_ns": 1720704844229240000,
"transaction": null
}
update의 경우 "op"는 "u"이다.