kafka db connector 연동 (feat. mysql)

최준호·2022년 5월 25일
2

kafka

목록 보기
4/4
post-thumbnail

참고
Docker의 MySQL 8 Kafka Connect 자습서
kafka connect tutorial
kafka docker quickstart

이전의 내용을 토대로 이번엔 mysql connect를 사용하여 db와 함께 사용해보자.

🔨Connect 컨테이너 설정

connect 폴더를 하나 더 만들고 그 안에 jars 폴더와 docker-compose 파일을 작성해야한다.


폴더를 생성하고

폴더 내에 jars와 docker-compose 파일을 생성한다. jars에는 우리가 연결할 db에 대한 connector와 confluent-connector jar 파일들을 모두 몰아 넣어줄 디렉터리다.

Confluent connector 설치 방법은 해당 글의 Kafka Connect JDBC 설치 부분을 참고하면 금방 설치할 수 있다.

confluent-connector 설정

다음과 같이 설치된 압축파일의 압축을 풀고

해당 파일 내 lib 폴더 내 jar 파일들을 모두 docker volume으로 잡은 jars 폴더 내로 옮겨준다.

mysql-connector

mysql이나 다른 db connector들은 maven repository에 검색하면 다 나온다. 나의 경우에는 mysql이여서

maven repository mysql connector로 설치했다.

설치된 jar 파일 또한 docker volume으로 잡아둔 jars에 넣어주자.

✍docker-compose.yml 작성

---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.1
    container_name: broker
    ports:
    # To learn about configuring Kafka for access across networks see
    # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

networks:
  default:
    external:
      name: kafka

우선 이전에 작성한 kafka server와 zookeeper server에 대한 docker-compose 파일이고

---
version: '3'
services:
  connector:
    image: confluentinc/cp-kafka-connect:7.0.1
    ports:
      - 8083:8083

    environment:
      CONNECT_BOOTSTRAP_SERVERS: broker:29092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "quickstart-avro"
      CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
      CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
      CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
    volumes:
      - ./jars:/etc/kafka-connect/jars

networks:
  default:
    external:
      name: kafka

connector로 생성될 container에 대한 docker-compose.yml 파일 내용이다. 해당 부분은 각자 환경에 맞게 설정을 변경해서 사용하면 된다.

아마 quickstart-avro 이 설정들은 avro 설정에 대한 값인거 같은데 나중에는 빼보자.

그 후에 나는 혹시 몰라서 kafka와 connector를 분리하여 실행해서

다음과 같이 두개의 컨테이너가 실행됨을 확인했고 connect의 경우 시간이 좀 걸리니 조금 기다려주자.

🔨source-connect 등록

connector에는 sink와 source 두가지가 있는데, 우선 sink는 한 db에서 데이터의 변동이 생겼을 경우 다른 어플리케이션(ex db)에서도 데이터를 일정하게 맞추기 위해 사용되는 connector==consumer라고 생각하면 좋고. source의 경우 한 db에서 데이터의 변동이 생겼을 경우 producer로써 역할을 한다고 생각하면 쉬울거 같다. 일단 내 업무에서 sink보다는 source가 급한 일이라 source로만 해보자.

curl로 확인해도 좋으나 포스트맨으로 실습하는게 더 쉽고 내용도 기록해둘 수 있어 포스트맨으로 진행했다.

api로 확인하기

connector가 실행되었다면 api를 요청하여 상태 확인, 등록을 실행할 수 있는데

상태 확인

GET http://127.0.0.1:8083/connectors	//connector 상태확인

connector가 등록이 되었다면 다음과 같이 확인이 가능하고 안되어 있다면 []로 표시되어진다.

등록

POST http://127.0.0.1:8083/connectors


{
  "name" : "my-source-connect",
  "config" : {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:mysql://ip:3306/스키마",
    "connection.user":"root",
    "connection.password":"db 비밀번호",
    "mode": "incrementing",
    "incrementing.column.name" : "id",
    "table.whitelist":"table 명",
    "topic.prefix" : "my_topic_",
    "poll.interval.ms" : 2000,
    "tasks.max" : "1"
  }
}

다음과 같이 등록시

정상 등록되었을 경우 다음과 같이 반환받으며 내용은 요청한 config 내용이 반환된다.

connector 상태확인

GET http://127.0.0.1:8083/connectors/my-source-connect/status

요청시 connector 별로 상태를 확인할 수 있다.

다음과 같이 state가 running 상태여야 정상 작동중인 상태이다.

connector delete

DELETE http://127.0.0.1:8083/connectors/my-source-connect

혹시라도 테스트 중에 잘못 생성했거나 지우고자 하는 connector가 있다면 delete로 요청하면 삭제된다.

👏테스트

모두 정상 실행되었다면 연결된 db에서 insert가 일어났을 때 데이터를 잘 가져오는지 확인해 봐야한다.

우리가 지금까지 설정해둔 connector는 mode가 isnert로 인해 증가되어야 날라오므로 update나 select 등은 기록되지 않는다.

테스트 하느라 중구난방이지만 insert를 통해 2~6까지 ROE_USER라는 데이터를 insert했다.

docker exec -it broker kafka-console-consumer --bootstrap-server broker:9092 --topic my_topic_game_role --from-beginning

그 후 cmd에서 다음과 같이 consumer를 docker로 실행하면

지금까지 db에 insert 되었던 결과들이 모두 나오며 이후 insert를 실행해도 잘 기록이 된다.

참고로 window cmd의 경우 cmd창에서 엔터를 입력해줘야 insert된 데이터가 정상적으로 노출되더라... window만 그런건지는 모르겠지만 이 방법을 몰라서 한참 헤맸다.

막상 정리해놓으니 어려운게 없어 보이는데... 엄청 헤매면서 했다. 인터넷 상에도 내용이 모두 로컬에서 직접 kafka를 만들고 mysql 서버까지 만들어서 하는 경우나 아니면 아예 한번에 모두 docker-compose로 한번에 띄우는 방법만 설명되어 있어 나처럼 원래 있던 db에 연결하는 kafka server를 docker를 통해 생성하고자 하는 분들에게 도움이 되는 글이 되었으면 좋겠다!

이 후 컨슈머는 자신이 사용하고자 하는 어플리케이션에서 작성해주면 된다!

profile
코딩을 깔끔하게 하고 싶어하는 초보 개발자 (편하게 글을 쓰기위해 반말체를 사용하고 있습니다! 양해 부탁드려요!) 현재 KakaoVX 근무중입니다!

0개의 댓글