MSA 구축 (8) - Kafka와 Kafka Connect 설치하기

오형상·2024년 11월 16일
0

Ficket

목록 보기
8/27
post-thumbnail

이번 프로젝트에서는 MSA 기반 프로젝트를 개발하면서 Kafka Connect를 사용하여 독립적인 서비스 간의 DB를 동기화했습니다. 이 글에서는 Kafka와 Kafka Connect 설치부터, MySQL JDBC Connector 설정 및 Docker Compose를 이용한 실행 과정까지 다루겠습니다.

1. JDBC Connector 설치

1-1. JDBC Connector 설치를 위해 Confluent Hub 클라이언트 설치

먼저 Kafka 관련 작업을 위해 kafka 폴더를 생성하고, Confluent Hub 클라이언트를 설치합니다.

mkdir kafka
cd kafka

# Confluent Hub 클라이언트를 이용해 JDBC Connector 다운로드 후 설치
wget http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
tar -xvf confluent-hub-client-latest.tar.gz

1-2. Java 설치

EC2에 Java가 설치되지 않았다면, Java를 설치해야 합니다.

sudo apt-get update
sudo apt-get install openjdk-11-jdk

1-3. 필요한 폴더 생성

JDBC Connector 설치를 위해 componentconfig 폴더를 생성합니다.

mkdir component
mkdir config

1-4. Worker 설정 파일 생성

Kafka Connect의 Worker 설정을 위해 빈 worker.properties 파일을 생성합니다.

cd config
vim worker.properties

1-5. Confluent 명령어 사용을 위한 환경 변수 설정

confluent-hub 명령어를 사용하기 위해서는 환경 변수에 경로를 등록해야 합니다. /etc/bash.bashrc 파일은 읽기 전용이므로 관리자 권한으로 열어야 합니다. pwd 명령어로 현재 경로를 확인한 후, 경로를 환경 변수로 등록합니다.

cd /etc
sudo vim bash.bashrc
export CONFLUENT_HOME='/home/zvyg1023/kafka'
export PATH=$PATH:$CONFLUENT_HOME/bin

1-6. JDBC Connector 설치

이제 Confluent Hub 클라이언트를 이용해 JDBC Connector를 설치합니다.

confluent-hub install confluentinc/kafka-connect-jdbc:latest --component-dir /home/zvyg1023/kafka/component --worker-configs /home/zvyg1023/kafka/config/worker.properties

2. MySQL Connector 설치

이번 프로젝트에서는 MySQL 8.0.30 버전을 사용했으며, 버전에 맞는 MySQL Connector를 설치하였습니다. 설치한 MySQL Connector 파일을 kafka-connect-jdbc의 lib 폴더에 넣어야 합니다.

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.30.tar.gz
tar -xvf mysql-connector-java-8.0.30.tar.gz

# mysql connector 파일을 kafka-connect-jdbc의 lib 폴더로 복사
cp mysql-connector-java-8.0.30/mysql-connector-java-8.0.30.jar /home/zvyg1023/kafka/component/confluentinc-kafka-connect-jdbc/lib

3. Kafka, Zookeeper, Kafka Connect Docker로 실행하기

Docker Compose 파일을 작성하여 Kafka, Zookeeper, Kafka Connect를 컨테이너로 실행합니다. 이때 Kafka Connect 컨테이너에서 MySQL Connector 파일을 사용하기 위해 volumes 설정을 해줍니다.

docker-compose.yml

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
  kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    container_name: kafka-connect
    depends_on: 
      - kafka
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "ficket"
      CONNECT_CONFIG_STORAGE_TOPIC: "ficket-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "ficket-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "ficket-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    ports:
      - "8083:8083"
    volumes:
      - ./kafka/component/confluentinc-kafka-connect-jdbc/lib:/etc/kafka-connect/jars
  mysql:
    image: mysql:8.0.30
    container_name: mysql
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: rootpassword
      MYSQL_DATABASE: testdb
      MYSQL_USER: testuser
      MYSQL_PASSWORD: testpassword
    volumes:
      - mysql-data:/var/lib/mysql
volumes:
  mysql-data:

작성 후 해당 파일이 있는 경로에서 Docker Compose 명령어로 실행합니다.

docker-compose up -d

4. Kafka Connect 사용

Kafka Connect가 정상적으로 실행되었는지 확인하기 위해 Connector Plugin 목록을 확인합니다.

Connector Plugin 목록 확인

GET http://도메인주소:8083/connector-plugins

200 OK 응답과 함께 jdbc가 포함된 것을 확인할 수 있습니다.

Connector Plugin 확인

Source Connector 생성

Source Connector를 먼저 만들어 줍니다.

POST http://도메인주소:8083/connectors
{
  "name" : "커넥터이름설정",
  "config" : {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url" : "jdbc:mysql://컨테이너이름:DB포트번호/사용DB이름",
    "connection.user" : "DB사용자이름",
    "connection.password" : "DB사용자비밀번호",
    "mode": "timestamp+incrementing",
    "timestamp.column.name" : "update를 감지할 컬럼 이름",
    "incrementing.column.name" : "insert를 감지할 컬럼 이름",
    "table.whitelist" : "테이블이름",
    "topic.prefix" : "토픽접두사",
    "tasks.max" : "3"
  }
}

topic.prefix의 값과 table.whitelist의 값을 조합하여 토픽 이름이 됩니다.

Sink Connector 생성

Sink Connector를 생성하여 Source Connector로부터 생성된 데이터를 다른 DB로 연동합니다.

POST http://도메인주소:8083/connectors
{
  "name":"커넥터이름",
  "config":{
    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url":"jdbc:mysql://db컨테이너이름:db포트번호/사용DB이름",
    "connection.user":"db사용자이름",
    "connection.password":"db비밀번호",
    "auto.create":"false",
    "auto.evolve":"false",
    "delete.enabled":"false",
    "tasks.max":"1",
    "topics":"토픽이름",
    "table.name.format":"db이름.테이블이름",
    "pk.mode":"record_value",
    "pk.fields":"테이블pk",
    "insert.mode":"upsert"
  }
}

topics에는 Source Connector로 생성된 토픽 이름을 입력합니다.


Refrence

0개의 댓글