참고
Docker의 MySQL 8 Kafka Connect 자습서
kafka connect tutorial
kafka docker quickstart
이전의 내용을 토대로 이번엔 mysql connect를 사용하여 db와 함께 사용해보자.
connect 폴더를 하나 더 만들고 그 안에 jars 폴더와 docker-compose 파일을 작성해야한다.
폴더를 생성하고
폴더 내에 jars와 docker-compose 파일을 생성한다. jars에는 우리가 연결할 db에 대한 connector와 confluent-connector jar 파일들을 모두 몰아 넣어줄 디렉터리다.
Confluent connector 설치 방법은 해당 글의 Kafka Connect JDBC 설치 부분을 참고하면 금방 설치할 수 있다.
다음과 같이 설치된 압축파일의 압축을 풀고
해당 파일 내 lib 폴더 내 jar 파일들을 모두 docker volume으로 잡은 jars 폴더 내로 옮겨준다.
mysql이나 다른 db connector들은 maven repository에 검색하면 다 나온다. 나의 경우에는 mysql이여서
maven repository mysql connector로 설치했다.
설치된 jar 파일 또한 docker volume으로 잡아둔 jars에 넣어주자.
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.1
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
networks:
default:
external:
name: kafka
우선 이전에 작성한 kafka server와 zookeeper server에 대한 docker-compose 파일이고
---
version: '3'
services:
connector:
image: confluentinc/cp-kafka-connect:7.0.1
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: broker:29092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "quickstart-avro"
CONNECT_CONFIG_STORAGE_TOPIC: "quickstart-avro-config"
CONNECT_OFFSET_STORAGE_TOPIC: "quickstart-avro-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "quickstart-avro-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
volumes:
- ./jars:/etc/kafka-connect/jars
networks:
default:
external:
name: kafka
connector로 생성될 container에 대한 docker-compose.yml 파일 내용이다. 해당 부분은 각자 환경에 맞게 설정을 변경해서 사용하면 된다.
아마 quickstart-avro 이 설정들은 avro 설정에 대한 값인거 같은데 나중에는 빼보자.
그 후에 나는 혹시 몰라서 kafka와 connector를 분리하여 실행해서
다음과 같이 두개의 컨테이너가 실행됨을 확인했고 connect의 경우 시간이 좀 걸리니 조금 기다려주자.
connector에는 sink와 source 두가지가 있는데, 우선 sink는 한 db에서 데이터의 변동이 생겼을 경우 다른 어플리케이션(ex db)에서도 데이터를 일정하게 맞추기 위해 사용되는 connector==consumer
라고 생각하면 좋고. source의 경우 한 db에서 데이터의 변동이 생겼을 경우 producer
로써 역할을 한다고 생각하면 쉬울거 같다. 일단 내 업무에서 sink보다는 source가 급한 일이라 source로만 해보자.
curl로 확인해도 좋으나 포스트맨으로 실습하는게 더 쉽고 내용도 기록해둘 수 있어 포스트맨으로 진행했다.
connector가 실행되었다면 api를 요청하여 상태 확인, 등록을 실행할 수 있는데
GET http://127.0.0.1:8083/connectors //connector 상태확인
connector가 등록이 되었다면 다음과 같이 확인이 가능하고 안되어 있다면 []
로 표시되어진다.
POST http://127.0.0.1:8083/connectors
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://ip:3306/스키마",
"connection.user":"root",
"connection.password":"db 비밀번호",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"table 명",
"topic.prefix" : "my_topic_",
"poll.interval.ms" : 2000,
"tasks.max" : "1"
}
}
다음과 같이 등록시
정상 등록되었을 경우 다음과 같이 반환받으며 내용은 요청한 config 내용이 반환된다.
GET http://127.0.0.1:8083/connectors/my-source-connect/status
요청시 connector 별로 상태를 확인할 수 있다.
다음과 같이 state가 running 상태여야 정상 작동중인 상태이다.
DELETE http://127.0.0.1:8083/connectors/my-source-connect
혹시라도 테스트 중에 잘못 생성했거나 지우고자 하는 connector가 있다면 delete로 요청하면 삭제된다.
모두 정상 실행되었다면 연결된 db에서 insert가 일어났을 때 데이터를 잘 가져오는지 확인해 봐야한다.
우리가 지금까지 설정해둔 connector는 mode가 isnert로 인해 증가되어야 날라오므로 update나 select 등은 기록되지 않는다.
테스트 하느라 중구난방이지만 insert를 통해 2~6까지 ROE_USER라는 데이터를 insert했다.
docker exec -it broker kafka-console-consumer --bootstrap-server broker:9092 --topic my_topic_game_role --from-beginning
그 후 cmd에서 다음과 같이 consumer를 docker로 실행하면
지금까지 db에 insert 되었던 결과들이 모두 나오며 이후 insert를 실행해도 잘 기록이 된다.
참고로 window cmd의 경우 cmd창에서 엔터를 입력해줘야 insert된 데이터가 정상적으로 노출되더라... window만 그런건지는 모르겠지만 이 방법을 몰라서 한참 헤맸다.
막상 정리해놓으니 어려운게 없어 보이는데... 엄청 헤매면서 했다. 인터넷 상에도 내용이 모두 로컬에서 직접 kafka를 만들고 mysql 서버까지 만들어서 하는 경우나 아니면 아예 한번에 모두 docker-compose로 한번에 띄우는 방법만 설명되어 있어 나처럼 원래 있던 db에 연결하는 kafka server를 docker를 통해 생성하고자 하는 분들에게 도움이 되는 글이 되었으면 좋겠다!
이 후 컨슈머는 자신이 사용하고자 하는 어플리케이션에서 작성해주면 된다!