MSA 구성을 하면서 kafka 세팅이 필요하였다.
kafka 설치는 https://kafka.apache.org/downloads 여기서 가능하다.
설치후 압축을 풀면 다음과 같은 구조를 갖는 폴더가 나올것이다.
.
├── bin
│ └── windows
├── config
│ └── kraft
├── libs
├── licenses
├── logs
└── site-docs
shell 파일이 있는 bin을 사용하게 될 것이고 설정 파일은 config에 존재한다.
kafka 를 실행하려면 다음과 같은 순서로 실행해야한다.
sh zookeeper-server-start.sh ../config/zookeeper.properties
sh kafka-server-start.sh ../config/server.properties
# topic create test
sh kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1
# topic이 만들어졌나 확인
sh kafka-topics.sh --bootstrap-server localhost:9092 --list
이제 kafka와 RDB를 연동하려면 kafka connect와 db 종류에 맞는 connector가 필요하다.
해당 파일들을 다운로드 해주자
wget https://packages.confluent.io/archive/6.1/confluent-6.1.0.tar.gz
tar xvf confluent-6.1.0.tar.gz
# run kafka-connect
./bin/connect-distributed -daemon ./etc/kafka/connect-distributed.properties
# check topic
sh kafka-topics.sh --bootstrap-server localhost:9092 --list
# result
__consumer_offsets
connect-configs
connect-offsets
connect-status
my_topic_users
quickstart-events
kafka-connect 실행후 정상적으로 동작한다면 다음과 같이 connect가 생긴걸 확인할 수 있다.
이제 jdbc-connector를 다운로드 받아주자. https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
그리고 나서 kafka-connect에서 plugin path를 수정해줘야한다.
vi etc/kafka/connect-distributed.properties
# 다음 부분 수정
plugin.path=${jdbc-connector lib 가 있는 경로}
# example
plugin.path=/Users/myid/confluent-6.1.0/confluentinc-kafka-connect-jdbc-10.7.4/lib
이제 kafka-connect의 /share/java/kafka 경로에 spring boot에서 사용중인 jdbc-connector.jar 파일을 넣어줘야 한다. 각자 사용중인 db종류-connector.jar 파일을 gradle 또는 maven 경로에서 복사하여 넣어주면 된다.
그리고 나서 connector 등록을 해주자
POST http://localhost:8083/connectors
RequestBody
{
"name": "my-source-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql: //localhost:3306/mydb",
"connection.user": "root",
"connection.password": "",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "users",
"topic.prefix": "my_topic_",
"tasks.max": "1"
}
}
Response
{
"name": "my-source-connect",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql: //localhost:3306/mydb",
"connection.user": "root",
"connection.password": "",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "users",
"topic.prefix": "my_topic_",
"tasks.max": "1",
"name": "my-source-connect"
},
"tasks": [],
"type": "source"
}
그리고 실제 상태를 확인해보면
GET http://localhost:8083/connectors/my-source-connect/status
Respone
{
"name": "my-source-connect",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
}
],
"type": "source"
}
다음과 같이 정상 작동함을 알 수 있다.
이제 실제 mysql에서 데이터가 생겼을때 kafka가 작동하는지 확인해보자.
본인이 지정했던 토픽 정보로 kafka-console을 실행해 확인해보자.
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
db에 데이터를 넣어주자.
insert into users(user_id, pwd, name) values('user1', 'test123', 'tester');
그러면 kafka-console 에서 다음과 같이 출력될 것이다.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "pwd"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created_at"
}
],
"optional": false,
"name": "users"
},
"payload": {
"id": 1,
"user_id": "user1",
"pwd": "test123",
"name": "tester",
"created_at": 1696499658000
}
}
여기까지 하면 기초 연동 과정은 끝났다.