ExtractNewRecordState 없이 connector를 생성하게 되면 readlog, binlog 기반으로 원래 default 인 before/after type으로 생성하게 된다.
하지만 이슈가 이 type을 JDBC sink connector 에서 받을 수 없다.
그런 이슈가 있지만 일단 원본 데이터가 어떻게 생겼나 만들어보자
db 셋팅은 지난번에 해줬고,
spooldir 때 했었던 config 경로에다가 config 를 작성해서 넣자. 나같은 경우 kafka container 에서 실습을 하고있고, mysql container 를 참조하고 있다.
그리고 kafka container 에서 mysql container 를 참조하려면 같은 네트워크에 있어야 되는것도 잊지말자.
docker run --name my-mysql -d -p 3307:3306 --network docker_kafka-net -e MYSQL_ROOT_PASSWORD=1234 -e MYSQL_DATABASE=oc mysql
{
"name": "mysql_cdc_oc_source_test01",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "my-mysql",
"database.port": "3306",
"database.user": "connect_dev",
"database.password": "connect_dev",
"database.server.id": "10000",
"database.server.name": "test01",
"database.include.list": "oc",
"database.allowPublicKeyRetrieval": "true",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mysql.oc",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
config를 넣어주면 connector 가 올라간다.
그리고 test01 이라는 토픽이 만들어져 있을 것 이다
connector 가 성공적으로 연결이 되었으니, db에 insert를 해보자.
use oc;
insert into customers values (1, 'testaddress_01@testdomain', 'testuser_01');
insert into customers values (2, 'testaddress_02@testdomain', 'testuser_02');
insert into orders values(1, now(), 1, 'delivered', 1);
insert into products values(1, 'testproduct', 'testcategory', 100);
insert into order_items values(1, 1, 1, 100, 1);
topic이 만들어진 모습
한번 만들어진 topic 을 까보자..
kafkacat -b localhost:9092 -t test01.oc.customers -C -u -q | jq '.'
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "customer_id"
},
{
"type": "string",
"optional": false,
"field": "email_address"
},
{
"type": "string",
"optional": false,
"field": "full_name"
}
],
"optional": true,
"name": "test01.oc.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "customer_id"
},
{
"type": "string",
"optional": false,
"field": "email_address"
},
{
"type": "string",
"optional": false,
"field": "full_name"
}
],
"optional": true,
"name": "test01.oc.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "test01.oc.customers.Envelope"
},
"payload": {
"before": null,
"after": {
"customer_id": 1,
"email_address": "testaddress_01@testdomain",
"full_name": "testuser_01"
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "test01",
"ts_ms": 1699071898000,
"snapshot": "false",
"db": "oc",
"sequence": null,
"table": "customers",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 3972,
"row": 0,
"thread": 9,
"query": null
},
"op": "c",
"ts_ms": 1699071898184,
"transaction": null
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "customer_id"
},
{
"type": "string",
"optional": false,
"field": "email_address"
},
{
"type": "string",
"optional": false,
"field": "full_name"
}
],
"optional": true,
"name": "test01.oc.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "customer_id"
},
{
"type": "string",
"optional": false,
"field": "email_address"
},
{
"type": "string",
"optional": false,
"field": "full_name"
}
],
"optional": true,
"name": "test01.oc.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "test01.oc.customers.Envelope"
},
"payload": {
"before": null,
"after": {
"customer_id": 2,
"email_address": "testaddress_02@testdomain",
"full_name": "testuser_02"
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "test01",
"ts_ms": 1699071898000,
"snapshot": "false",
"db": "oc",
"sequence": null,
"table": "customers",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 4299,
"row": 0,
"thread": 9,
"query": null
},
"op": "c",
"ts_ms": 1699071898188,
"transaction": null
}
}
key 값이랑 value 다 나오고...이것저것 나오긴 하는데 가장 눈여겨 보아야 할 것은 payload 이다.
insert before 와 after가 나오게 된다. 근데 문제는 JDBC sink connector에서 이 형식을 받아줄 수 없다는 것..
"payload": {
"before": null,
"after": {
"customer_id": 2,
"email_address": "testaddress_02@testdomain",
"full_name": "testuser_02"
},
update customers set full_name='updateduser_01' where customer_id = 2;
"payload": {
"before": {
"customer_id": 2,
"email_address": "testaddress_02@testdomain",
"full_name": "testuser_02"
},
"after": {
"customer_id": 2,
"email_address": "testaddress_02@testdomain",
"full_name": "updateduser_01"
},
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "test01",
"ts_ms": 1699074509000,
"snapshot": "false",
"db": "oc",
"sequence": null,
"table": "customers",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 6226,
"row": 0,
"thread": 14,
"query": null
},
"op": "u",
"ts_ms": 1699074509162,
"transaction": null
}
delete도 해보자.. after가 그냥 null 임
"payload": {
"before": {
"customer_id": 2,
"email_address": "testaddress_02@testdomain",
"full_name": "updateduser_01"
},
"after": null,
"source": {
"version": "1.9.7.Final",
"connector": "mysql",
"name": "test01",
"ts_ms": 1699074584000,
"snapshot": "false",
"db": "oc",
"sequence": null,
"table": "customers",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 6602,
"row": 0,
"thread": 14,
"query": null
},
"op": "d",
"ts_ms": 1699074584724,
"transaction": null
}
// db set
create database oc_sink;
grant all privileges on oc_sink.* to 'connect_dev'@'%' with grant option;
use oc_sink;
drop table if exists customers_sink;
drop table if exists products_sink;
drop table if exists orders_sink;
drop table if exists order_items_sink;
-- 아래 Create Table 스크립트수행.
CREATE TABLE customers_sink (
customer_id int NOT NULL PRIMARY KEY,
email_address varchar(255) NOT NULL,
full_name varchar(255) NOT NULL
) ENGINE=InnoDB ;
CREATE TABLE products_sink (
product_id int NOT NULL PRIMARY KEY,
product_name varchar(100) NULL,
product_category varchar(200) NULL,
unit_price decimal(10, 0) NULL
) ENGINE=InnoDB ;
CREATE TABLE orders_sink (
order_id int NOT NULL PRIMARY KEY,
order_datetime timestamp NOT NULL,
customer_id int NOT NULL,
order_status varchar(10) NOT NULL,
store_id int NOT NULL
) ENGINE=InnoDB ;
CREATE TABLE order_items_sink (
order_id int NOT NULL,
line_item_id int NOT NULL,
product_id int NOT NULL,
unit_price decimal(10, 2) NOT NULL,
quantity int NOT NULL,
primary key (order_id, line_item_id)
) ENGINE=InnoDB;
select * from customers_sink;
select * from products_sink;
select * from orders_sink;
select * from order_items_sink;
db 셋팅해주고 ..
{
"name": "mysql_jdbc_oc_sink_customers_test01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "test01.oc.customers",
"connection.url": "jdbc:mysql://my-mysql:3306/oc_sink",
"connection.user": "connect_dev",
"connection.password": "connect_dev",
"table.name.format": "oc_sink.customers_sink",
"insert.mode": "upsert",
"pk.fields": "customer_id",
"pk.mode": "record_key",
"delete.enabled": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
하고 커넥터 등록!
http POST http://localhost:8083/connectors @mysql_cdc_oc_sink_test01.json
하니까 .. 대충 뭔 필드 싱크가 안 맞는다고..에러가 난다..
ERROR [mysql_jdbc_oc_sink_customers_test01|task-0] WorkerSinkTask{id=mysql_jdbc_oc_sink_customers_test01-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "oc_sink"."customers_sink" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.mysql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask:622)...
이게 내 debezium default 형식인 before, after랑 형식이 안 맞는건데 이런경우에 SMT를 이용해서 형식을 좀 변환해줄 필요가 있다..
이건 다음포스팅에서 진행해보겠음..