version: '3'
services:
mysql:
image: mysql:8.0
container_name: mysql
ports:
- 3306:3306
environment:
**MYSQL_ROOT_PASSWORD: sasd**
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
**volumes:
- /Users/jiminlee/TempCodes/CDC/mysql/data:/var/lib/mysql**
zookeeper:
**platform: linux/amd64/v8 //platform: linux/amd64/**
container_name: zookeeper
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka
image: wurstmeister/kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
docker-compose 파일 실행, 컨테이너 실행
docker-compose -f docker-compose.yml up -d
진행하다 위와 같이 에러가 나서 compose 파일을 수정해줬다 - #1 확인
설치된 컨테이너 확인 : docker ps -a
mysql -u root -p
create database testdb;
use testdb;
CREATE TABLE accounts (
account_id VARCHAR(255),
role_id VARCHAR(255),
user_name VARCHAR(255),
user_description VARCHAR(255),
update_date DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (account_id)
);
MySQL Container 접속 후 - docker exec -it mysql /bin/bash
-it
는 표준입출력을 열고 tty를 통해 접속하겠다는 의미bash
가 표준이기에 bash 를 사용SQL문으로 TABLE을 생성해준다.
mysqluser에게 모든 권한을 부여하는 과정이다.
use mysql;
// mysqluser 가 추가 되어 있는지 확인
select host, user from user;
// mysqluser 없으면 생성
CREATE USER 'mysqluser'@'%' IDENTIFIED BY 'mysqlpw';
// mysqluser 에게 권한 부여
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
FLUSH PRIVILEGES;
https://debezium.io/releases/1.5/ → debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
을 다운로드 받는다.
/opt/kafka_2.13-2.8.1/
경로에 미리 connectors
폴더를 만들자.
로컬 컴퓨터에서 kafka 컨테이너로 debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
를 업로드한다.
docker cp debezium-connector-mysql-1.5.4.Final-plugin.tar.gz kafka:/opt/kafka_2.13-2.8.1/connectors/debezium-connector-mysql-2.1.4.Final-plugin.tar.gz
파일 압축을 푼다
cd /opt/kafka_2.13-2.8.1/connectors
tar -zxvf debezium-connector-mysql-1.5.4.Final-plugin.tar.gz
카프카 컨테이너에 접속하여 /opt/kafka/config/connect-distributed.properties
파일을 수정한다. 파일을 수정한 뒤에는 카프카 컨테이너를 재시작해야 플러그인 경로가 정상 반영된다.
// 원래 경로
#plugin.path=
// 수정 경로
plugin.path=/opt/kafka_2.13-2.8.1/connectors
분산모드(distributed) 카프카 커넥트를 실행한다. 분산모드는 2개 이상의 커넥트를 한 개의 클러스터를 묶어서 운영한다.
connect-distributed.sh /opt/kafka/config/connect-distributed.properties
// 정상 실행 시 INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:57) 확인 가능
다른 탭을 새로 열어 카프카에 접속 후 플러그인 목록을 조회한다. io.debezium.connector.mysql.MySqlConnector
가 있어야 한다.
curl --location --request GET 'localhost:8083/connector-plugins'
rest api 를 호출하여 connector 를 생성하자.
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "source-test-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysqlpw",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "testdb",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testdb",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1"
}
}'
1.5.1
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
"name": "source-test-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysqlpw",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "testdb",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "dbhistory.testdb",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1",
"topic.prefix" : "testdb",
"schema.history.internal.kafka.topic": "schemahistory.fullfillment",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"include.schema.changes": "true"
}
}'
https://debezium.io/documentation/reference/2.1/connectors/mysql.html#mysql-delete-events 2.1
이러한 설정 중 일부는 일반적인 설정으로서, 모든 커넥터에 대해 지정해야 합니다. 예를 들면 다음과 같습니다.
connector.class
는 커넥터의 Java 클래스입니다.tasks.max
는 이 커넥터에 대해 생성되어야 할 태스크의 최대 수입니다.다른 설정은 Debezium MySQL 커넥터에만 해당됩니다.
database.hostname
은 Aurora 데이터베이스의 작성자 인스턴스 엔드포인트를 포함합니다.database.server.name
은 데이터베이스 서비의 논리적 이름입니다. 이 설정은 이름은 Debezium에서 생성한 Kafka 주제의 이름에 사용됩니다.database.include.list
는 지정한 서버에서 호스팅하는 데이터베이스의 목록을 포함합니다.database.history.kafka.topic
은 데이터베이스 스키마 변경을 추적하기 위해 Debezium에서 내부적으로 사용하는 Kafka 주제입니다.database.history.kafka.bootstrap.servers
는 MSK 클러스터의 부트스트랩 서버를 포함합니다.database.history.consumer.*
및 database.history.producer.*
)은 데이터베이스 기록 주제를 액세스하기 위한 IAM 인증을 활성화합니다. Amazon MSK Connect – Apache Kafka 클러스터로 데이터 전달 서비스 출시 | Amazon Web Services # 목록
curl --location --request GET 'http://localhost:8083/connectors'
# 상세정보
curl --location --request GET 'http://localhost:8083/connectors/{connector-name}/config ' \
--header 'Content-Type: application/json'
curl --location --request GET 'http://localhost:8083/connectors/source-test-connector/config ' \
--header 'Content-Type: application/json'
#삭제
curl --location --request DELETE 'http://localhost:8083/connectors/source-test-connector'
https://docs.confluent.io/platform/current/connect/references/restapi.html
https://developer.confluent.io/learn-kafka/kafka-connect/rest-api/
kafka-topics.sh --list --bootstrap-server localhost:9092
INSERT INTO accounts VALUES ("123456", "111", "Susan Cooper", "God", "2021-08-16 10:11:12");
INSERT INTO accounts VALUES ("123457", "111", "Rick Ford", "mistakes", "2021-08-16 11:12:13");
INSERT INTO accounts VALUES ("123458", "999", "Bradley Fine", "face", "2021-08-16 12:13:14");
kafka-console-consumer.sh --topic dbserver1.testdb.accounts --bootstrap-server localhost:9092 --from-beginning
새로운 탭을 열어 카프카에서 데이터를 어떻게 받아들이고 있는지 확인해보자.
위 처럼 MySQL에서 변화가 생기는 것을 Kafka가 topic의 형태로 잘 감지하고 저장하고 있음을 확인 할 수 있다.
(2)에서 계속..