[kafka] debezium - 1

rejs·2024년 7월 11일

debezium튜토리얼을 보고 작성한 글입니다.

docker-compose

docker-compose.yml 파일 출처
해당 리포지터리에서는 mysql이 아닌 다른 데이터베이스 예시도 있습니다.

위의 내용에 더해서 message를 더 편하게 보기 위해 kafka-ui를 추가했다.

(튜토리얼에서는 watch-topic을 사용했지만 그러면 json을 보기 힘들다...)

version: '3'

services:
  zookeeper:
    image: quay.io/debezium/zookeeper:latest
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:latest
    ports:
      - 9092:9092
    links:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: quay.io/debezium/example-mysql:latest
    ports:
      - 3306:3306
    environment:
      - MYSQL_ROOT_PASSWORD=debezium
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
  connect:
    image: quay.io/debezium/connect:latest
    ports:
      - 8083:8083
    links:
      - kafka
      - mysql
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  sink_db:
    image: 'mysql:latest'
    environment:
      - 'MYSQL_DATABASE=mydb'
      - 'MYSQL_USER=admin'
      - 'MYSQL_PASSWORD=admin'
      - 'MYSQL_ROOT_PASSWORD=root'
    ports:
      - 13306:3306
    command:
      - --character-set-server=utf8mb4
      - --collation-server=utf8mb4_unicode_ci

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: kafka-ui
    depends_on:
      - kafka
    ports:
      - 8084:8080
    environment:
      - DYNAMIC_CONFIG_ENABLED=true
      - KAFKA_CLUSTERS_0_NAME=peters_kafka
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092

카프카 커넥터 추가하기

{
  /* 커넥터의 이름 */
  "name": "inventory-connector",  
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    /* 데이터베이스의 도메인과 포트 */
    "database.hostname": "mysql",  
    "database.port": "3306",
    
    /* db 유저와 비밀번호 */
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",  
      
      /* topic의 prefix */
    "topic.prefix": "dbserver1",  
      
      /* 어떤 데이터베이스를 감지할 것인지 */
    "database.include.list": "inventory",  
    "schema.history.internal.kafka.topic": "schema-changes.inventory"  
      /*kafka host*/
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",  
  }
}

커넥터 호스트에 /connectors에 POST요청으로 저 json을 날려주면 커넥터가 생성된다.

위의 docker-compose를 사용했다면 localhost:8083/connectors로 POST 요청을 보내야한다.

실제로 생성된 토픽은 아래와 같다.
(schema-changes.inventory도 생성된다)

커넥터에게 필요한 db 권한

debezium 튜토리얼에선 example-mysql 컨테이너를 사용한다.

해당 컨테이너는 예시 데이터와 debezium 유저가 포함되어 있다.

사용해보기


예제 데이터 만들기

create database test_db;
use test_db;

CREATE TABLE Users (
    UserID INT PRIMARY KEY,
    UserName VARCHAR(50) NOT NULL,
    Email VARCHAR(100) UNIQUE,
    Age INT,
    Gender VARCHAR(10)
);

CREATE TABLE Orders (
    OrderID INT PRIMARY KEY,
    UserID INT,
    OrderDate DATE,
    TotalAmount DECIMAL(10, 2),
    FOREIGN KEY (UserID) REFERENCES Users(UserID)
);

INSERT INTO Users (UserID, UserName, Email, Age, Gender) VALUES
(1, 'Alice', 'alice@example.com', 30, 'Female'),
(2, 'Bob', 'bob@example.com', 28, 'Male'),
(3, 'Charlie', 'charlie@example.com', 35, 'Male'),
(4, 'Diana', 'diana@example.com', 25, 'Female');

INSERT INTO Orders (OrderID, UserID, OrderDate, TotalAmount) VALUES
(101, 1, '2024-07-10', 150.00),
(102, 2, '2024-07-10', 220.50),
(103, 1, '2024-07-11', 75.25),
(104, 3, '2024-07-11', 320.00),
(105, 4, '2024-07-11', 100.75);

chat gpt를 사용하여 예제 데이터를 생성해보았다.

user 추가하기

CREATE USER 'test_user'@'%' IDENTIFIED BY 'testpw';
GRANT SELECT, RELOAD,LOCK TABLES, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'test_user'@'%';
show grants for 'test_user'@'%';

튜토리얼의 debezium유저를 참조하여 새 유저를 만들고 권한을 부여해보자.

mysql 커넥터의 경우 LOCK TABLES 권한이 추가로 필요하다.
참고 : debezium-document-connectors-mysql

커넥터 만들기

{
  "name": "test-connector", 
  "config": {  
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",   
    "database.hostname": "mysql",  
    "database.port": "3306",
    "database.user": "test_user",
    "database.password": "testpw",
    "database.server.id": "184054",  
    "topic.prefix": "test_db",  
    "database.include.list": "test_db",  
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",  
    "schema.history.internal.kafka.topic": "schema-changes.test_db"  
  }
}

localhost:8083/connectors에 GET 요청을 보내면 생성된 커넥터 목록을 받을 수 있다.

message 살펴보기

create

	"payload": {
      	/* 생성인 경우 before이 null이다
      	 삭제인 경우 after가 null이 된다. */
		"before": null,
		"after": {
			"OrderID": 101,
			"UserID": 1,
			"OrderDate": 19914,
			"TotalAmount": "Opg="
		},
		"source": {
			"version": "2.6.2.Final",
			"connector": "mysql",
			"name": "test_db",
			"ts_ms": 1720704150000,
			"snapshot": "first",
			"db": "test_db",
			"sequence": null,
			"ts_us": 1720704150000000,
			"ts_ns": 1720704150000000000,
			"table": "Orders",
			"server_id": 0,
			"gtid": null,
			"file": "mysql-bin.000003",
			"pos": 3554,
			"row": 0,
			"thread": null,
			"query": null
		},
		/* 
        	어떤 연산인가 
            생성은 "r", update는 "u"
            삭제인 경우 "d"로 표시된다. 
        */
        "op": "r",
        
        /* 언제 일어난 연산인가 */
		"ts_ms": 1720704150195,
		"ts_us": 1720704150195803,
		"ts_ns": 1720704150195803000,
		"transaction": null
	}

after를 잘 보면 TotalAmount부분이 이상한 Opg=로 보이는 데 이는 커넥터의 decimal-handling-mode에 의해 생기는 문제이다.

debezium documentation - connectors - mysql

커넥터 설정할 때 "decimal.handling.mode":"double"를 추가해주면 실수로 표기된다. (공식문서에서는 정밀도 손실이 있을 수 있다고 주의해준다.)

Update

update Users set UserName = 'Bob Update Name' where UserID = 2;
	"payload": {
		"before": {
			"UserID": 2,
			"UserName": "Bob",
			"Email": "bob@example.com",
			"Age": 28,
			"Gender": "Male"
		},
		"after": {
			"UserID": 2,
			"UserName": "Bob Update Name",
			"Email": "bob@example.com",
			"Age": 28,
			"Gender": "Male"
		},
		"source": {
			"version": "2.6.2.Final",
			"connector": "mysql",
			"name": "test_db",
			"ts_ms": 1720704844000,
			"snapshot": "false",
			"db": "test_db",
			"sequence": null,
			"ts_us": 1720704844000000,
			"ts_ns": 1720704844000000000,
			"table": "Users",
			"server_id": 223344,
			"gtid": null,
			"file": "mysql-bin.000003",
			"pos": 3791,
			"row": 0,
			"thread": 8,
			"query": null
		},
		"op": "u",
		"ts_ms": 1720704844229,
		"ts_us": 1720704844229240,
		"ts_ns": 1720704844229240000,
		"transaction": null
	}

update의 경우 "op"는 "u"이다.

0개의 댓글