Kafka Connect 사용하기

윤미·2023년 6월 3일
0

Kafka

목록 보기
1/1

Kafka와 Kafka Connect 설치하기

이번 프로젝트에서 MSA 기반 프로젝트를 개발하면서 독립적인 서비스 구축을 위해 Kafka Connect를 사용해 다른 서비스들간의 DB를 동기화 했다.

처음에 EC2 서버에 kafka와 zookeeper를 Docker로 설치하고, Kafka 컨테이너 안에서 Kafka Connect를 설치했다.

온갖 설정파일을 만져도 kafka connect가 broker 서버와 연결되지 않아 docker compose로 kafka와 kafka connect를 한번에 올리는 방법으로 해결했다.


1. JDBC Connector 설치

1-1. JDBC Connector 설치를 위해 Confluent client 설치

kafka 폴더를 따로 만들고 그 안에 설치했다.

mkdir kafka
cd kafka

//Confluent Hub 이용해 jdbc-connector 다운 후 설치
wget http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
tar -xvf confluent-hub-client-latest.tar.gz

1-2. EC2에 java가 설치가 안되어있다면 설치해야한다.

sudo apt-get install openjdk-11-jdk


1-3. component와 config 폴더를 생성해준다.

mkdir component
mkdir config

1-4. worker.properties 파일을 지정해줘야한다.

빈 파일로만 만들어주면 된다.

cd config
vi worker.properties

1-5. confluent 명령어를 사용하기 위해서 환경변수에 PATH 경로를 등록해줘야한다.

cd /etc
vi bash.bashrc

export CONFLUENT_HOME='/경로/kafka'
export PATH=$PATH:$CONFLUENT_HOME/bin

이 두 줄을 추가해준다.


1-6. jdbc-connector 설치

confluent-hub 명령어로 jdbc connector를 설치해준다.

confluent-hub install confluentinc/kafka-connect-jdbc:latest --component-dir /경로/kafka/component --worker-configs /경로/kafka/config/worker.properties


2. MySQL Connector 설치

나는 MySQL을 사용해서 MySQL Connector를 설치해줬다.
그리고 kafka-connect-jdbc의 lib 폴더 안에 MySQL Connector를 넣어줬다.

wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.27.tar.gz
tar -xvf mysql-connector-java-8.0.27.tar.gz

//mysql connector 넣어주기
cp mysql-connector-java-8.0.27/mysql-connector-java-8.0.27.jar /경로/kafka/component/confluentinc-kafka-connect-jdbc/lib

3. zookeeper, kafka, kafka connect를 docker로 띄우기

https://github.com/confluentinc/cp-all-in-one/blob/7.3.0-post/cp-all-in-one/docker-compose.yml

나는 confluent 플랫폼에서 제공하는 docker-compose.yml 을 이용했다.

docker-compose.yml 파일을 보면, zookeeper, kafka, kafka connect 뿐만 아니라 ksqldb, rest proxy 서버 등 부가적인 것들도 같이 설치하고 있다.

나는 kafka connect만 이용하는 것이 목적이기 때문에 다음과 같이 수정해 사용했다.


docker-compose.yml 작성 후 실행

connect 컨테이너를 띄울 때 volumes 설정을 해주면 된다.

docker-compose.yml

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - 네트워크이름

  broker:
    image: confluentinc/cp-kafka:7.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://도메인주소:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
    networks:
      - 네트워크이름
  connect:
    image: confluentinc/cp-kafka-connect:7.0.1
    ports:
      - 8083:8083
    container_name: connect
    environment:
      CONNECT_BOOTSTRAP_SERVERS: broker:29092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "quickstart-avro"
      CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_ROOT_LOGLEVEL: WARN
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    volumes:
      - /경로/kafka/component/confluentinc-kafka-connect-jdbc/lib:/etc/kafka-connect/jars
    networks:
      - 네트워크이름
networks:
  네트워크이름:
    external: true

작성 후 docker-compose.yml 파일이 있는 곳에서 실행해준다.

docker-compose up -d


4. Kafka Connect 사용

실행이 잘 되었는지 확인하기 위해 Connector 목록을 확인해보자.

Connector 목록 확인

200 OK가 나오는지 확인한다.

GET http://도메인주소:8083/connectors


Source Connector 생성

Source Connector를 먼저 만들어준다.

POST http://도메인주소:8083/connectors


{
  "name" : "커넥터이름설정",
  "config" : {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url" : "jdbc:mysql://도메인주소:DB포트번호/사용DB이름",
    "connection.user" : "DB사용자이름",
    "connection.password" : "DB사용자비밀번호",
    "mode": "timestamp+incrementing", //insert와 update 감지
    "timestamp.column.name" : "update를 감지할 컬럼 이름",
    "incrementing.column.name" : "insert를 감지할 컬럼 이름",
    "table.whitelist" : "테이블이름",
    "topic.prefix" : "토픽접두사",
    "tasks.max" : "3"
  }
}
  • topic.prefix의 값 + table.whitelist의 값 = 토픽이름이 된다.

등록한 Source Connector 확인

다시 GET http://도메인주소:8083/connectors 로 확인해보면 등록한 Source Connector의 이름이 보인다.


Sink Connector 생성

Source Connector에 등록해준 DB 테이블을 연동받을 DB 정보를 입력해주면 된다.

POST http://도메인주소:8083/connectors


{
    "name":"커넥터이름",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://db호스트:db포트번호/사용DB이름",
        "connection.user":"db사용자이름",
        "connection.password":"db비밀번호",
        "auto.create":"false",
        "auto.evolve":"false",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"토픽이름",
        "table.name.format":"db이름.테이블이름",
        "pk.mode":"record_value",
	    "pk.fields":"테이블pk",
	    "insert.mode":"upsert"
    }
}
  • topics 에는 Source Connector를 생성할 때 생성된 토픽 이름을 입력해주면 된다.
profile
luff

0개의 댓글