이번 프로젝트에서 MSA 기반 프로젝트를 개발하면서 독립적인 서비스 구축을 위해 Kafka Connect를 사용해 다른 서비스들간의 DB를 동기화 했다.
처음에 EC2 서버에 kafka와 zookeeper를 Docker로 설치하고, Kafka 컨테이너 안에서 Kafka Connect를 설치했다.
온갖 설정파일을 만져도 kafka connect가 broker 서버와 연결되지 않아 docker compose로 kafka와 kafka connect를 한번에 올리는 방법으로 해결했다.
1-1. JDBC Connector 설치를 위해 Confluent client 설치
kafka
폴더를 따로 만들고 그 안에 설치했다.
mkdir kafka
cd kafka
//Confluent Hub 이용해 jdbc-connector 다운 후 설치
wget http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
tar -xvf confluent-hub-client-latest.tar.gz
1-2. EC2에 java가 설치가 안되어있다면 설치해야한다.
sudo apt-get install openjdk-11-jdk
1-3. component와 config 폴더를 생성해준다.
mkdir component
mkdir config
1-4. worker.properties
파일을 지정해줘야한다.
빈 파일로만 만들어주면 된다.
cd config
vi worker.properties
1-5. confluent 명령어를 사용하기 위해서 환경변수에 PATH 경로를 등록해줘야한다.
cd /etc
vi bash.bashrc
export CONFLUENT_HOME='/경로/kafka'
export PATH=$PATH:$CONFLUENT_HOME/bin
이 두 줄을 추가해준다.
1-6. jdbc-connector 설치
confluent-hub 명령어로 jdbc connector를 설치해준다.
confluent-hub install confluentinc/kafka-connect-jdbc:latest --component-dir /경로/kafka/component --worker-configs /경로/kafka/config/worker.properties
나는 MySQL을 사용해서 MySQL Connector를 설치해줬다.
그리고 kafka-connect-jdbc의 lib 폴더 안에 MySQL Connector를 넣어줬다.
wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz
tar -xvf mysql-connector-java-8.0.27.tar.gz
//mysql connector 넣어주기
cp mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar /경로/kafka/component/confluentinc-kafka-connect-jdbc/lib
https://github.com/confluentinc/cp-all-in-one/blob/7.3.0-post/cp-all-in-one/docker-compose.yml
나는 confluent 플랫폼에서 제공하는 docker-compose.yml 을 이용했다.
docker-compose.yml 파일을 보면, zookeeper, kafka, kafka connect 뿐만 아니라 ksqldb, rest proxy 서버 등 부가적인 것들도 같이 설치하고 있다.
나는 kafka connect만 이용하는 것이 목적이기 때문에 다음과 같이 수정해 사용했다.
connect 컨테이너를 띄울 때 volumes 설정을 해주면 된다.
docker-compose.yml
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- 네트워크이름
broker:
image: confluentinc/cp-kafka:7.3.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://도메인주소:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
networks:
- 네트워크이름
connect:
image: confluentinc/cp-kafka-connect:7.0.1
ports:
- 8083:8083
container_name: connect
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:29092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "quickstart-avro"
CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_LOG4J_ROOT_LOGLEVEL: WARN
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
volumes:
- /경로/kafka/component/confluentinc-kafka-connect-jdbc/lib:/etc/kafka-connect/jars
networks:
- 네트워크이름
networks:
네트워크이름:
external: true
작성 후 docker-compose.yml
파일이 있는 곳에서 실행해준다.
docker-compose up -d
실행이 잘 되었는지 확인하기 위해 Connector 목록을 확인해보자.
Connector 목록 확인
200 OK가 나오는지 확인한다.
GET http://도메인주소:8083/connectors
Source Connector 생성
Source Connector를 먼저 만들어준다.
POST http://도메인주소:8083/connectors
{ "name" : "커넥터이름설정", "config" : { "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url" : "jdbc:mysql://도메인주소:DB포트번호/사용DB이름", "connection.user" : "DB사용자이름", "connection.password" : "DB사용자비밀번호", "mode": "timestamp+incrementing", //insert와 update 감지 "timestamp.column.name" : "update를 감지할 컬럼 이름", "incrementing.column.name" : "insert를 감지할 컬럼 이름", "table.whitelist" : "테이블이름", "topic.prefix" : "토픽접두사", "tasks.max" : "3" } }
- topic.prefix의 값 + table.whitelist의 값 = 토픽이름이 된다.
등록한 Source Connector 확인
다시 GET http://도메인주소:8083/connectors
로 확인해보면 등록한 Source Connector의 이름이 보인다.
Sink Connector 생성
Source Connector에 등록해준 DB 테이블을 연동받을 DB 정보를 입력해주면 된다.
POST http://도메인주소:8083/connectors
{ "name":"커넥터이름", "config":{ "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url":"jdbc:mysql://db호스트:db포트번호/사용DB이름", "connection.user":"db사용자이름", "connection.password":"db비밀번호", "auto.create":"false", "auto.evolve":"false", "delete.enabled":"false", "tasks.max":"1", "topics":"토픽이름", "table.name.format":"db이름.테이블이름", "pk.mode":"record_value", "pk.fields":"테이블pk", "insert.mode":"upsert" } }
- topics 에는 Source Connector를 생성할 때 생성된 토픽 이름을 입력해주면 된다.