[Kafka] kafka 설치부터 세팅까지

정명진·2023년 10월 5일
0

MSA 구성을 하면서 kafka 세팅이 필요하였다.

kafka 설치는 https://kafka.apache.org/downloads 여기서 가능하다.

설치후 압축을 풀면 다음과 같은 구조를 갖는 폴더가 나올것이다.

.
├── bin
│   └── windows
├── config
│   └── kraft
├── libs
├── licenses
├── logs
└── site-docs

shell 파일이 있는 bin을 사용하게 될 것이고 설정 파일은 config에 존재한다.

kafka 를 실행하려면 다음과 같은 순서로 실행해야한다.

  1. zookeeper
  2. kafka
sh zookeeper-server-start.sh ../config/zookeeper.properties
sh kafka-server-start.sh ../config/server.properties
# topic create test
sh kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1
# topic이 만들어졌나 확인
sh kafka-topics.sh --bootstrap-server localhost:9092 --list

이제 kafka와 RDB를 연동하려면 kafka connect와 db 종류에 맞는 connector가 필요하다.

해당 파일들을 다운로드 해주자

wget https://packages.confluent.io/archive/6.1/confluent-6.1.0.tar.gz
tar xvf confluent-6.1.0.tar.gz

# run kafka-connect 
./bin/connect-distributed -daemon ./etc/kafka/connect-distributed.properties
# check topic
sh kafka-topics.sh --bootstrap-server localhost:9092 --list

# result
__consumer_offsets
connect-configs
connect-offsets
connect-status
my_topic_users
quickstart-events

kafka-connect 실행후 정상적으로 동작한다면 다음과 같이 connect가 생긴걸 확인할 수 있다.

이제 jdbc-connector를 다운로드 받아주자. https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html

그리고 나서 kafka-connect에서 plugin path를 수정해줘야한다.

vi etc/kafka/connect-distributed.properties

# 다음 부분 수정
plugin.path=${jdbc-connector lib 가 있는 경로}
# example
plugin.path=/Users/myid/confluent-6.1.0/confluentinc-kafka-connect-jdbc-10.7.4/lib

이제 kafka-connect의 /share/java/kafka 경로에 spring boot에서 사용중인 jdbc-connector.jar 파일을 넣어줘야 한다. 각자 사용중인 db종류-connector.jar 파일을 gradle 또는 maven 경로에서 복사하여 넣어주면 된다.

그리고 나서 connector 등록을 해주자

POST http://localhost:8083/connectors

RequestBody
{
    "name": "my-source-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql: //localhost:3306/mydb",
        "connection.user": "root",
        "connection.password": "",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "table.whitelist": "users",
        "topic.prefix": "my_topic_",
        "tasks.max": "1"
    }
}

Response
{
    "name": "my-source-connect",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:mysql: //localhost:3306/mydb",
        "connection.user": "root",
        "connection.password": "",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "table.whitelist": "users",
        "topic.prefix": "my_topic_",
        "tasks.max": "1",
        "name": "my-source-connect"
    },
    "tasks": [],
    "type": "source"
}

그리고 실제 상태를 확인해보면

GET http://localhost:8083/connectors/my-source-connect/status

Respone
{
    "name": "my-source-connect",
    "connector": {
        "state": "RUNNING",
        "worker_id": "127.0.0.1:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "127.0.0.1:8083"
        }
    ],
    "type": "source"
}

다음과 같이 정상 작동함을 알 수 있다.

이제 실제 mysql에서 데이터가 생겼을때 kafka가 작동하는지 확인해보자.

본인이 지정했던 토픽 정보로 kafka-console을 실행해 확인해보자.

sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning

db에 데이터를 넣어주자.

insert into users(user_id, pwd, name) values('user1', 'test123', 'tester');

그러면 kafka-console 에서 다음과 같이 출력될 것이다.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "user_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "pwd"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "created_at"
      }
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    "id": 1,
    "user_id": "user1",
    "pwd": "test123",
    "name": "tester",
    "created_at": 1696499658000
  }
}

여기까지 하면 기초 연동 과정은 끝났다.

profile
개발자로 입사했지만 정체성을 잃어가는중... 다시 준비 시작이다..

0개의 댓글