이번 프로젝트에서는 MSA 기반 프로젝트를 개발하면서 Kafka Connect를 사용하여 독립적인 서비스 간의 DB를 동기화했습니다. 이 글에서는 Kafka와 Kafka Connect 설치부터, MySQL JDBC Connector 설정 및 Docker Compose를 이용한 실행 과정까지 다루겠습니다.
먼저 Kafka 관련 작업을 위해 kafka
폴더를 생성하고, Confluent Hub 클라이언트를 설치합니다.
mkdir kafka
cd kafka
# Confluent Hub 클라이언트를 이용해 JDBC Connector 다운로드 후 설치
wget http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
tar -xvf confluent-hub-client-latest.tar.gz
EC2에 Java가 설치되지 않았다면, Java를 설치해야 합니다.
sudo apt-get update
sudo apt-get install openjdk-11-jdk
JDBC Connector 설치를 위해 component
와 config
폴더를 생성합니다.
mkdir component
mkdir config
Kafka Connect의 Worker 설정을 위해 빈 worker.properties
파일을 생성합니다.
cd config
vim worker.properties
confluent-hub
명령어를 사용하기 위해서는 환경 변수에 경로를 등록해야 합니다. /etc/bash.bashrc
파일은 읽기 전용이므로 관리자 권한으로 열어야 합니다. pwd
명령어로 현재 경로를 확인한 후, 경로를 환경 변수로 등록합니다.
cd /etc
sudo vim bash.bashrc
export CONFLUENT_HOME='/home/zvyg1023/kafka'
export PATH=$PATH:$CONFLUENT_HOME/bin
이제 Confluent Hub 클라이언트를 이용해 JDBC Connector를 설치합니다.
confluent-hub install confluentinc/kafka-connect-jdbc:latest --component-dir /home/zvyg1023/kafka/component --worker-configs /home/zvyg1023/kafka/config/worker.properties
이번 프로젝트에서는 MySQL 8.0.30 버전을 사용했으며, 버전에 맞는 MySQL Connector를 설치하였습니다. 설치한 MySQL Connector 파일을 kafka-connect-jdbc의 lib 폴더에 넣어야 합니다.
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.30.tar.gz
tar -xvf mysql-connector-java-8.0.30.tar.gz
# mysql connector 파일을 kafka-connect-jdbc의 lib 폴더로 복사
cp mysql-connector-java-8.0.30/mysql-connector-java-8.0.30.jar /home/zvyg1023/kafka/component/confluentinc-kafka-connect-jdbc/lib
Docker Compose 파일을 작성하여 Kafka, Zookeeper, Kafka Connect를 컨테이너로 실행합니다. 이때 Kafka Connect 컨테이너에서 MySQL Connector 파일을 사용하기 위해 volumes 설정을 해줍니다.
docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
kafka-connect:
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect
depends_on:
- kafka
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:29092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "ficket"
CONNECT_CONFIG_STORAGE_TOPIC: "ficket-config"
CONNECT_OFFSET_STORAGE_TOPIC: "ficket-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "ficket-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
ports:
- "8083:8083"
volumes:
- ./kafka/component/confluentinc-kafka-connect-jdbc/lib:/etc/kafka-connect/jars
mysql:
image: mysql:8.0.30
container_name: mysql
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: rootpassword
MYSQL_DATABASE: testdb
MYSQL_USER: testuser
MYSQL_PASSWORD: testpassword
volumes:
- mysql-data:/var/lib/mysql
volumes:
mysql-data:
작성 후 해당 파일이 있는 경로에서 Docker Compose 명령어로 실행합니다.
docker-compose up -d
Kafka Connect가 정상적으로 실행되었는지 확인하기 위해 Connector Plugin 목록을 확인합니다.
Connector Plugin 목록 확인
GET http://도메인주소:8083/connector-plugins
200 OK 응답과 함께 jdbc가 포함된 것을 확인할 수 있습니다.
Source Connector를 먼저 만들어 줍니다.
POST http://도메인주소:8083/connectors
{
"name" : "커넥터이름설정",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url" : "jdbc:mysql://컨테이너이름:DB포트번호/사용DB이름",
"connection.user" : "DB사용자이름",
"connection.password" : "DB사용자비밀번호",
"mode": "timestamp+incrementing",
"timestamp.column.name" : "update를 감지할 컬럼 이름",
"incrementing.column.name" : "insert를 감지할 컬럼 이름",
"table.whitelist" : "테이블이름",
"topic.prefix" : "토픽접두사",
"tasks.max" : "3"
}
}
topic.prefix의 값과 table.whitelist의 값을 조합하여 토픽 이름이 됩니다.
Sink Connector를 생성하여 Source Connector로부터 생성된 데이터를 다른 DB로 연동합니다.
POST http://도메인주소:8083/connectors
{
"name":"커넥터이름",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://db컨테이너이름:db포트번호/사용DB이름",
"connection.user":"db사용자이름",
"connection.password":"db비밀번호",
"auto.create":"false",
"auto.evolve":"false",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"토픽이름",
"table.name.format":"db이름.테이블이름",
"pk.mode":"record_value",
"pk.fields":"테이블pk",
"insert.mode":"upsert"
}
}
topics에는 Source Connector로 생성된 토픽 이름을 입력합니다.
Refrence