Kafka Connect (Source, Sink)

์ตœ์ค€ํ˜ธยท2022๋…„ 3์›” 17์ผ
1

Microservice Architecture

๋ชฉ๋ก ๋ณด๊ธฐ
26/32
post-thumbnail

๐Ÿ”จMaria DB ์„ค์ •

create databse mydb;

์„ ํ†ตํ•ด Maria DB์— mydb๋ผ๋Š” ํ…Œ์ด๋ธ”์„ ํ•˜๋‚˜ ๋งŒ๋“ค์–ด๋‘์ž.

order-service์— mariaDB driver๋ฅผ ์ถ”๊ฐ€ํ•ด์ฃผ์ž

<dependency>
    <groupId>org.mariadb.jdbc</groupId>
    <artifactId>mariadb-java-client</artifactId>
    <version>2.7.2</version>
</dependency>

3.x๊ฐ€ ๋‚˜์™”์ง€๋งŒ ์ž˜ ๋ชจ๋ฅด๋‹ˆ๊นŒ 2.7.2๋กœ ์„ค์น˜ํ–ˆ๋‹ค.

create table users(
    id int auto_increment primary key,
    user_id varchar(20),
    pwd varchar(20),
    name varchar(20),
    created_at datetime default NOW()
);
create table orders (
    id int auto_increment primary key,
    product_id varchar(20) not null,
    qty int default 0,
    unit_price int default 0,
    total_price int default 0,
    user_id varchar(50) not null,
    order_id varchar(50) not null,
    created_at datetime default NOW()
);

๊ทธํ›„์— 2๊ฐœ์˜ ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑํ•ด์ค€๋‹ค.

๐Ÿ”จKafka Connect

๐Ÿ‘‰Kafka Connect ์„ค์น˜

http://packages.confluent.io/archive

curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz

url๋กœ ์ ‘๊ทผํ•˜์—ฌ connect ์••์ถ•ํŒŒ์ผ์„ ๋‹ค์šด๋ฐ›๋Š”๋‹ค. ํ˜น์€ curl ๋ช…๋ น์–ด๋กœ ์„ค์น˜ํ•ด๋„ ๋œ๋‹ค. ๋‚˜๋Š” 7.0์œผ๋กœ ์„ค์น˜ํ–ˆ๋‹ค.

./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties

๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•˜์—ฌ kafka connect๋ฅผ ์‹คํ–‰์‹œ์ผœ๋ณด์ž.

์ฐธ๊ณ ๋กœ connect ์‹คํ–‰ ์ „ zookeaper์™€ kafka server๋ฅผ ์‹คํ–‰์‹œ์ผœ ๋‘๊ณ  ํ•ด์•ผํ•œ๋‹ค.

window์˜ ๊ฒฝ์šฐ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.

./bin/windows/kafka-run-class.bat ํŒŒ์ผ์—์„œ

rem Claaspath addition for core
rem classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
	call:concat %BASE_DIR%\share\java\kafka\*
)

๋ถ€๋ถ„์„ ์ฐพ์•„์„œ ์œ„์— ์ฝ”๋“œ๋ฅผ ์‚ฝ์ž…ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค.

๋‹ค์Œ๊ณผ ๊ฐ™์ด ์„ค์ •ํ•ด์ฃผ์–ด์•ผํ•œ๋‹ค.

๊ทธ๋Ÿผ ์ด์ œ ์ด๋Ÿฐ ์˜ค๋ฅ˜๊ฐ€ ๋˜ ๋‚˜์˜จ๋‹ค ใ…Žใ…Ž

./bin/windows/connect-distributed.bat ํŒŒ์ผ์˜ 30๋ฒˆ์งธ๋ฅผ ๋ณด๋ฉด

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
	set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/tools-log4j.properties
)

์ด ๋ถ€๋ถ„์„

rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
	set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/tools-log4j.properties
)

๋กœ ์ˆ˜์ •ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค

/config/๋ฅผ /etc/kafka/๋กœ ์ˆ˜์ •ํ•œ๊ฒƒ์ด๋‹ค.

๋‚˜๋Š” ์˜ค๋ฅ˜๊ฐ€ ๋– ์„œ ์ง„ํ–‰ํ•  ์ˆ˜ ์—†์–ด์„œ 7.0์„ ์„ค์น˜ํ•˜์—ฌ ์œ„ ๋‚ด์šฉ๋Œ€๋กœ ์ง„ํ–‰ํ•˜๋‹ˆ ์šฐ์„  ์‹คํ–‰์ด ๋˜์—ˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ  ๋‚˜์„œ ๋˜

could not be established. Broker may not be available. ์˜ค๋ฅ˜๋ฅผ ๋ฟœ์–ด๋‚ด๊ณ  ์žˆ๋‹ค.

/{kafka ํด๋”}/config/server.properties ํŒŒ์ผ์˜

ํฌํŠธ ๋ถ€๋ถ„์˜ ์ฃผ์„์„ ํ’€์–ด์ฃผ๊ณ  kafka์™€ zookeaper๋ฅผ ๋‹ค์‹œ ์‹คํ–‰ํ•ด๋ณด์ž

๋“œ๋””์–ด ์ •์ƒ ์‹คํ–‰๋˜์—ˆ๋‹ค.

./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --list

kfaka์—์„œ ํ•ด๋‹น ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•ด์ฃผ๋ฉด

coonect topic๋“ค์ด ์ถ”๊ฐ€๋œ๊ฑธ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

๐Ÿ‘‰Kafka Connect JDBC ์„ค์น˜

https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html

๊ทธ ํ›„์— ์œ„ url๋กœ ์ ‘๊ทผํ•˜์—ฌ JDBC Connector๋ฅผ ์„ค์น˜ํ•ด์•ผํ•˜๋Š”๋ฐ

๋ˆ„๋ฅด๊ณ 

๋ˆŒ๋Ÿฌ์„œ ์••์ถ•ํŒŒ์ผ์„ ๋‹ค์šด ๋ฐ›๋Š”๋‹ค.


์••์ถ•์„ ํ’€์–ด์„œ lib ํด๋” ๋‚ด๋ถ€์— ๋‹ค์Œ jar ํŒŒ์ผ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ ํ•ด๋‹น ํŒŒ์ผ์˜ ๊ฒฝ๋กœ๋ฅผ ์„ค์ • ํŒŒ์ผ์— ์ถ”๊ฐ€ํ•ด์ฃผ์–ด์•ผํ•œ๋‹ค.

C:\2022\kafka-connect\etc\kafka\connect-distributed.properties ํŒŒ์ผ์„ ์—ด์–ด์„œ

window์˜ ๊ฒฝ์šฐ ๋‹ค์Œ๊ณผ ๊ฐ™์ด plugin์„ ์ถ”๊ฐ€ํ•ด์ค€๋‹ค.

C:\{HomeDirectory}\.m2\repository\org\mariadb\jdbc\mariadb-java-client\2.7.2์˜

์ด ํŒŒ์ผ์„ ๋ณต์‚ฌํ•ด์„œ

{kafka-connectํด๋”}\share\java\kafka์— ๋ถ™์—ฌ ๋„ฃ๋Š”๋‹ค.

๐Ÿ”จKafka Source Connect ๋“ฑ๋ก

ํฌ์ŠคํŠธ๋งจ์„ ์‹คํ–‰์‹œ์ผœ์„œ

http://127.0.0.1:8083/connectors url์— post๋กœ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์š”์ฒญํ•˜๋ฉด

{
    "name" : "my-source-connect",
    "config" : {
        "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"db ๋น„๋ฐ€๋ฒˆํ˜ธ",
        "mode": "incrementing",
        "incrementing.column.name" : "id",
        "table.whitelist":"users",
        "topic.prefix" : "my_topic_",
        "tasks.max" : "1"
    }
}

๊ฒฐ๊ณผ๋ฅผ ์–ป์„ ์ˆ˜ ์žˆ๋‹ค. ์ด์ œ Maria DB์— Kafka JDBC๋กœ ์—ฐ๊ฒฐ๋œ ์ƒํƒœ๊ฐ€ ๋˜์—ˆ๋‹ค.

http://127.0.0.1:8083/connectors ์š”์ฒญ์‹œ

๋‚ด๊ฐ€ ๋“ฑ๋กํ•œ connect์˜ ๋ฆฌ์ŠคํŠธ๋ฅผ ๋ณผ์ˆ˜ ์žˆ๊ณ 

http://127.0.0.1:8083/connectors/my-source-connect/status ์š”์ฒญ์‹œ

connect์˜ ์ƒํƒœ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

์•„์ง ์•„๋ฌด๋Ÿฐ ๋ฐ์ดํ„ฐ์˜ ๋ณ€๊ฒฝ์‚ฌํ•ญ์ด ์—†์„ ๋•Œ topic list์ด๋‹ค.

insert into users(user_id, pwd, name) values ('user1', 'test1234', 'user1');

๋ช…๋ น์–ด๋ฅผ ํ†ตํ•ด

db์— ๋ฐ์ดํ„ฐ๊ฐ€ ์ถ”๊ฐ€๋˜์—ˆ๊ณ 

kafka์—๋„ connect๋กœ ๋“ฑ๋กํ•ด๋†จ๋˜ topic์ด ์ถ”๊ฐ€๋œ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ  consumer๋ฅผ ์‚ฌ์šฉํ•ด๋ณด์ž

./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning

๋ช…๋ น์–ด๋ฅผ ์‹คํ–‰ํ•˜์—ฌ consumer์˜ ๋‚ด์šฉ์„ ํ™•์ธํ•ด๋ณด๋ฉด

์šฐ๋ฆฌ๊ฐ€ ๋ฐฉ๊ธˆ ์ถ”๊ฐ€ํ•œ user1์˜ ์ถ”๊ฐ€๋œ ์ •๋ณด๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

์ •๋ง๋กœ ์ด๊ฒŒ ์ถ”๊ฐ€๋œ๊ฒŒ ๋งž๋Š”์ง€ ๋˜ ํ™•์ธํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด ํ•œ๋ฒˆ ๋” db์— ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”๊ฐ€ํ•ด๋ณด์ž!

์ž…๋ ฅํ–ˆ๊ณ 

consumer์—๋„ ์ •์ƒ์ ์œผ๋กœ ์ถ”๊ฐ€๋˜์—ˆ๋‹ค.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "user_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "pwd"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "created_at"
      }
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    "id": 1,
    "user_id": "user1",
    "pwd": "test1234",
    "name": "user1",
    "created_at": 1647466531000
  }
}
/////
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "user_id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "pwd"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      },
      {
        "type": "int64",
        "optional": true,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "created_at"
      }
    ],
    "optional": false,
    "name": "users"
  },
  "payload": {
    "id": 2,
    "user_id": "user2",
    "pwd": "test1234",
    "name": "user2",
    "created_at": 1647466806000
  }
}

์ด ๋ฐ์ดํ„ฐ๋ฅผ json ํ˜•ํƒœ๋กœ ์ž˜ ์ •๋ฆฌํ•˜๋ฉด ์œ„ ๋‚ด์šฉ๊ณผ ๊ฐ™๋‹ค.

๐Ÿ”จKafka Sink Connect ๋“ฑ๋ก

{
    "name":"my-sink-connect",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://localhost:3306/mydb",
        "connection.user":"root",
        "connection.password":"root ๋น„๋ฐ€๋ฒˆํ˜ธ",
        "auto.create":"true",
        "auto.evolve":"true",
        "delete.enabled":"false",
        "tasks.max":"1",
        "topics":"my_topic_users"
    }
}

Source Connect๋ฅผ ๋“ฑ๋กํ• ๋•Œ์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ topic์„ ๋จผ์ € ๋“ฑ๋กํ•œ๋‹ค.

์ •์ƒ์ ์œผ๋กœ ๋“ฑ๋ก๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

db์—๋„ my_topic_users๋ผ๋Š” ์ด๋ฆ„์œผ๋กœ ํ…Œ์ด๋ธ”์ด ์ƒ์„ฑ๋˜์—ˆ๊ณ 

์•ˆ์— ๋ฐ์ดํ„ฐ๋„ ๋™์ผํ•˜๊ฒŒ ๋“ค์–ด๊ฐ€์žˆ๋Š” ๊ฒƒ๋„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

insert into users(user_id, pwd, name) values ('user3', 'test123412', 'user3');
insert into users(user_id, pwd, name) values ('admin', 'admin1234', 'admin');

์ด์ œ 2๊ฐœ์˜ insert query๋ฅผ ๋‚ ๋ ค์„œ ์œ ์ €๋ฅผ users ํ…Œ์ด๋ธ”์— ๋ฐ์ดํ„ฐ๋ฅผ ๋“ฑ๋กํ•ด๋ดค๋‹ค.

๋‘ ํ…Œ์ด๋ธ” ๋ชจ๋‘ ๋™์ธํ•˜๊ฒŒ ๋ฐ์ดํ„ฐ๊ฐ€ ๋“ค์–ด๊ฐ€์žˆ๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

profile
์ฝ”๋”ฉ์„ ๊น”๋”ํ•˜๊ฒŒ ํ•˜๊ณ  ์‹ถ์–ดํ•˜๋Š” ์ดˆ๋ณด ๊ฐœ๋ฐœ์ž (ํŽธํ•˜๊ฒŒ ๊ธ€์„ ์“ฐ๊ธฐ์œ„ํ•ด ๋ฐ˜๋ง์ฒด๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค! ์–‘ํ•ด ๋ถ€ํƒ๋“œ๋ ค์š”!) ํ˜„์žฌ KakaoVX ๊ทผ๋ฌด์ค‘์ž…๋‹ˆ๋‹ค!

0๊ฐœ์˜ ๋Œ“๊ธ€