create databse mydb;
์ ํตํด Maria DB์ mydb๋ผ๋ ํ ์ด๋ธ์ ํ๋ ๋ง๋ค์ด๋์.
order-service
์ mariaDB driver๋ฅผ ์ถ๊ฐํด์ฃผ์
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<version>2.7.2</version>
</dependency>
3.x๊ฐ ๋์์ง๋ง ์ ๋ชจ๋ฅด๋๊น 2.7.2๋ก ์ค์นํ๋ค.
create table users(
id int auto_increment primary key,
user_id varchar(20),
pwd varchar(20),
name varchar(20),
created_at datetime default NOW()
);
create table orders (
id int auto_increment primary key,
product_id varchar(20) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
user_id varchar(50) not null,
order_id varchar(50) not null,
created_at datetime default NOW()
);
๊ทธํ์ 2๊ฐ์ ํ ์ด๋ธ์ ์์ฑํด์ค๋ค.
http://packages.confluent.io/archive
curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
url๋ก ์ ๊ทผํ์ฌ connect ์์ถํ์ผ์ ๋ค์ด๋ฐ๋๋ค. ํน์ curl ๋ช ๋ น์ด๋ก ์ค์นํด๋ ๋๋ค. ๋๋ 7.0์ผ๋ก ์ค์นํ๋ค.
./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties
๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํ์ฌ kafka connect๋ฅผ ์คํ์์ผ๋ณด์.
์ฐธ๊ณ ๋ก connect ์คํ ์ zookeaper์ kafka server๋ฅผ ์คํ์์ผ ๋๊ณ ํด์ผํ๋ค.
window์ ๊ฒฝ์ฐ ๋ค์๊ณผ ๊ฐ์ ์ค๋ฅ๊ฐ ๋ฐ์ํ๋ค.
./bin/windows/kafka-run-class.bat
ํ์ผ์์
rem Claaspath addition for core
rem classpath addition for LSB style path
if exist %BASE_DIR%\share\java\kafka\* (
call:concat %BASE_DIR%\share\java\kafka\*
)
๋ถ๋ถ์ ์ฐพ์์ ์์ ์ฝ๋๋ฅผ ์ฝ์ ํด์ฃผ์ด์ผ ํ๋ค.
๋ค์๊ณผ ๊ฐ์ด ์ค์ ํด์ฃผ์ด์ผํ๋ค.
๊ทธ๋ผ ์ด์ ์ด๋ฐ ์ค๋ฅ๊ฐ ๋ ๋์จ๋ค ใ ใ
./bin/windows/connect-distributed.bat
ํ์ผ์ 30๋ฒ์งธ๋ฅผ ๋ณด๋ฉด
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/config/tools-log4j.properties
)
์ด ๋ถ๋ถ์
rem Log4j settings
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%/etc/kafka/tools-log4j.properties
)
๋ก ์์ ํด์ฃผ์ด์ผ ํ๋ค
/config/
๋ฅผ /etc/kafka/
๋ก ์์ ํ๊ฒ์ด๋ค.
๋๋ ์ค๋ฅ๊ฐ ๋ ์ ์งํํ ์ ์์ด์ 7.0์ ์ค์นํ์ฌ ์ ๋ด์ฉ๋๋ก ์งํํ๋ ์ฐ์ ์คํ์ด ๋์๋ค.
๊ทธ๋ฆฌ๊ณ ๋์ ๋
could not be established. Broker may not be available.
์ค๋ฅ๋ฅผ ๋ฟ์ด๋ด๊ณ ์๋ค.
/{kafka ํด๋}/config/server.properties
ํ์ผ์
ํฌํธ ๋ถ๋ถ์ ์ฃผ์์ ํ์ด์ฃผ๊ณ kafka์ zookeaper๋ฅผ ๋ค์ ์คํํด๋ณด์
๋๋์ด ์ ์ ์คํ๋์๋ค.
./bin/windows/kafka-topics.bat --bootstrap-server localhost:9092 --list
kfaka์์ ํด๋น ๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํด์ฃผ๋ฉด
coonect topic๋ค์ด ์ถ๊ฐ๋๊ฑธ ํ์ธํ ์ ์๋ค.
https://docs.confluent.io/5.5.1/connect/kafka-connect-jdbc/index.html
๊ทธ ํ์ ์ url๋ก ์ ๊ทผํ์ฌ JDBC Connector๋ฅผ ์ค์นํด์ผํ๋๋ฐ
๋๋ฅด๊ณ
๋๋ฌ์ ์์ถํ์ผ์ ๋ค์ด ๋ฐ๋๋ค.
์์ถ์ ํ์ด์ lib ํด๋ ๋ด๋ถ์ ๋ค์ jar ํ์ผ์ ํ์ธํ ์ ์๋๋ฐ ํด๋น ํ์ผ์ ๊ฒฝ๋ก๋ฅผ ์ค์ ํ์ผ์ ์ถ๊ฐํด์ฃผ์ด์ผํ๋ค.
C:\2022\kafka-connect\etc\kafka\connect-distributed.properties
ํ์ผ์ ์ด์ด์
window์ ๊ฒฝ์ฐ ๋ค์๊ณผ ๊ฐ์ด plugin์ ์ถ๊ฐํด์ค๋ค.
C:\{HomeDirectory}\.m2\repository\org\mariadb\jdbc\mariadb-java-client\2.7.2
์
์ด ํ์ผ์ ๋ณต์ฌํด์
{kafka-connectํด๋}\share\java\kafka
์ ๋ถ์ฌ ๋ฃ๋๋ค.
ํฌ์คํธ๋งจ์ ์คํ์์ผ์
http://127.0.0.1:8083/connectors url์ post๋ก ๋ค์๊ณผ ๊ฐ์ด ์์ฒญํ๋ฉด
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"db ๋น๋ฐ๋ฒํธ",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
๊ฒฐ๊ณผ๋ฅผ ์ป์ ์ ์๋ค. ์ด์ Maria DB์ Kafka JDBC๋ก ์ฐ๊ฒฐ๋ ์ํ๊ฐ ๋์๋ค.
http://127.0.0.1:8083/connectors ์์ฒญ์
๋ด๊ฐ ๋ฑ๋กํ connect์ ๋ฆฌ์คํธ๋ฅผ ๋ณผ์ ์๊ณ
http://127.0.0.1:8083/connectors/my-source-connect/status ์์ฒญ์
connect์ ์ํ๋ฅผ ํ์ธํ ์ ์๋ค.
์์ง ์๋ฌด๋ฐ ๋ฐ์ดํฐ์ ๋ณ๊ฒฝ์ฌํญ์ด ์์ ๋ topic list์ด๋ค.
insert into users(user_id, pwd, name) values ('user1', 'test1234', 'user1');
๋ช
๋ น์ด๋ฅผ ํตํด
db์ ๋ฐ์ดํฐ๊ฐ ์ถ๊ฐ๋์๊ณ
kafka์๋ connect๋ก ๋ฑ๋กํด๋จ๋ topic์ด ์ถ๊ฐ๋๊ฒ์ ํ์ธํ ์ ์๋ค.
๊ทธ๋ฆฌ๊ณ consumer๋ฅผ ์ฌ์ฉํด๋ณด์
./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
๋ช ๋ น์ด๋ฅผ ์คํํ์ฌ consumer์ ๋ด์ฉ์ ํ์ธํด๋ณด๋ฉด
์ฐ๋ฆฌ๊ฐ ๋ฐฉ๊ธ ์ถ๊ฐํ user1์ ์ถ๊ฐ๋ ์ ๋ณด๋ฅผ ํ์ธํ ์ ์๋ค.
์ ๋ง๋ก ์ด๊ฒ ์ถ๊ฐ๋๊ฒ ๋ง๋์ง ๋ ํ์ธํ๊ณ ์ถ๋ค๋ฉด ํ๋ฒ ๋ db์ ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐํด๋ณด์!
์ ๋ ฅํ๊ณ
consumer์๋ ์ ์์ ์ผ๋ก ์ถ๊ฐ๋์๋ค.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "pwd"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created_at"
}
],
"optional": false,
"name": "users"
},
"payload": {
"id": 1,
"user_id": "user1",
"pwd": "test1234",
"name": "user1",
"created_at": 1647466531000
}
}
/////
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "pwd"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created_at"
}
],
"optional": false,
"name": "users"
},
"payload": {
"id": 2,
"user_id": "user2",
"pwd": "test1234",
"name": "user2",
"created_at": 1647466806000
}
}
์ด ๋ฐ์ดํฐ๋ฅผ json ํํ๋ก ์ ์ ๋ฆฌํ๋ฉด ์ ๋ด์ฉ๊ณผ ๊ฐ๋ค.
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"root ๋น๋ฐ๋ฒํธ",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
Source Connect๋ฅผ ๋ฑ๋กํ ๋์ ๋ง์ฐฌ๊ฐ์ง๋ก topic์ ๋จผ์ ๋ฑ๋กํ๋ค.
์ ์์ ์ผ๋ก ๋ฑ๋ก๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
db์๋ my_topic_users๋ผ๋ ์ด๋ฆ์ผ๋ก ํ ์ด๋ธ์ด ์์ฑ๋์๊ณ
์์ ๋ฐ์ดํฐ๋ ๋์ผํ๊ฒ ๋ค์ด๊ฐ์๋ ๊ฒ๋ ํ์ธํ ์ ์๋ค.
insert into users(user_id, pwd, name) values ('user3', 'test123412', 'user3');
insert into users(user_id, pwd, name) values ('admin', 'admin1234', 'admin');
์ด์ 2๊ฐ์ insert query๋ฅผ ๋ ๋ ค์ ์ ์ ๋ฅผ users ํ
์ด๋ธ์ ๋ฐ์ดํฐ๋ฅผ ๋ฑ๋กํด๋ดค๋ค.
๋ ํ ์ด๋ธ ๋ชจ๋ ๋์ธํ๊ฒ ๋ฐ์ดํฐ๊ฐ ๋ค์ด๊ฐ์๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.