Debezium MySQL CDC Source Connector (2) - connect 생성 sink 연계

이동명·2023년 11월 4일


목록 보기

CDC Source Connector 생성해보기

ExtractNewRecordState SMT 적용 없이 생성

ExtractNewRecordState 없이 connector를 생성하게 되면 readlog, binlog 기반으로 원래 default 인 before/after type으로 생성하게 된다.

하지만 이슈가 이 type을 JDBC sink connector 에서 받을 수 없다.

그런 이슈가 있지만 일단 원본 데이터가 어떻게 생겼나 만들어보자


db 셋팅은 지난번에 해줬고,

spooldir 때 했었던 config 경로에다가 config 를 작성해서 넣자. 나같은 경우 kafka container 에서 실습을 하고있고, mysql container 를 참조하고 있다.

그리고 kafka container 에서 mysql container 를 참조하려면 같은 네트워크에 있어야 되는것도 잊지말자.

docker run --name my-mysql -d -p 3307:3306 --network docker_kafka-net -e MYSQL_ROOT_PASSWORD=1234 -e MYSQL_DATABASE=oc mysql

    "name": "mysql_cdc_oc_source_test01",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "my-mysql",
        "database.port": "3306",
        "database.user": "connect_dev",
        "database.password": "connect_dev",
        "": "10000",
        "": "test01",
        "database.include.list": "oc",
        "database.allowPublicKeyRetrieval": "true",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "schema-changes.mysql.oc",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"

config를 넣어주면 connector 가 올라간다.
그리고 test01 이라는 토픽이 만들어져 있을 것 이다


connector 가 성공적으로 연결이 되었으니, db에 insert를 해보자.

use oc;

insert into customers values (1, 'testaddress_01@testdomain', 'testuser_01');
insert into customers values (2, 'testaddress_02@testdomain', 'testuser_02');
insert into orders values(1, now(), 1, 'delivered', 1);
insert into products values(1, 'testproduct', 'testcategory', 100);
insert into order_items values(1, 1, 1, 100, 1);

topic이 만들어진 모습

한번 만들어진 topic 을 까보자..

kafkacat -b localhost:9092 -t test01.oc.customers -C -u -q | jq '.'

  "schema": {
    "type": "struct",
    "fields": [
        "type": "struct",
        "fields": [
            "type": "int32",
            "optional": false,
            "field": "customer_id"
            "type": "string",
            "optional": false,
            "field": "email_address"
            "type": "string",
            "optional": false,
            "field": "full_name"
        "optional": true,
        "name": "test01.oc.customers.Value",
        "field": "before"
        "type": "struct",
        "fields": [
            "type": "int32",
            "optional": false,
            "field": "customer_id"
            "type": "string",
            "optional": false,
            "field": "email_address"
            "type": "string",
            "optional": false,
            "field": "full_name"
        "optional": true,
        "name": "test01.oc.customers.Value",
        "field": "after"
        "type": "struct",
        "fields": [
            "type": "string",
            "optional": false,
            "field": "version"
            "type": "string",
            "optional": false,
            "field": "connector"
            "type": "string",
            "optional": false,
            "field": "name"
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
            "type": "string",
            "optional": true,
            "name": "",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            "default": "false",
            "field": "snapshot"
            "type": "string",
            "optional": false,
            "field": "db"
            "type": "string",
            "optional": true,
            "field": "sequence"
            "type": "string",
            "optional": true,
            "field": "table"
            "type": "int64",
            "optional": false,
            "field": "server_id"
            "type": "string",
            "optional": true,
            "field": "gtid"
            "type": "string",
            "optional": false,
            "field": "file"
            "type": "int64",
            "optional": false,
            "field": "pos"
            "type": "int32",
            "optional": false,
            "field": "row"
            "type": "int64",
            "optional": true,
            "field": "thread"
            "type": "string",
            "optional": true,
            "field": "query"
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
        "type": "string",
        "optional": false,
        "field": "op"
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
        "type": "struct",
        "fields": [
            "type": "string",
            "optional": false,
            "field": "id"
            "type": "int64",
            "optional": false,
            "field": "total_order"
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
        "optional": true,
        "field": "transaction"
    "optional": false,
    "name": "test01.oc.customers.Envelope"
  "payload": {
    "before": null,
    "after": {
      "customer_id": 1,
      "email_address": "testaddress_01@testdomain",
      "full_name": "testuser_01"
    "source": {
      "version": "1.9.7.Final",
      "connector": "mysql",
      "name": "test01",
      "ts_ms": 1699071898000,
      "snapshot": "false",
      "db": "oc",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": null,
      "file": "binlog.000002",
      "pos": 3972,
      "row": 0,
      "thread": 9,
      "query": null
    "op": "c",
    "ts_ms": 1699071898184,
    "transaction": null
  "schema": {
    "type": "struct",
    "fields": [
        "type": "struct",
        "fields": [
            "type": "int32",
            "optional": false,
            "field": "customer_id"
            "type": "string",
            "optional": false,
            "field": "email_address"
            "type": "string",
            "optional": false,
            "field": "full_name"
        "optional": true,
        "name": "test01.oc.customers.Value",
        "field": "before"
        "type": "struct",
        "fields": [
            "type": "int32",
            "optional": false,
            "field": "customer_id"
            "type": "string",
            "optional": false,
            "field": "email_address"
            "type": "string",
            "optional": false,
            "field": "full_name"
        "optional": true,
        "name": "test01.oc.customers.Value",
        "field": "after"
        "type": "struct",
        "fields": [
            "type": "string",
            "optional": false,
            "field": "version"
            "type": "string",
            "optional": false,
            "field": "connector"
            "type": "string",
            "optional": false,
            "field": "name"
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
            "type": "string",
            "optional": true,
            "name": "",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            "default": "false",
            "field": "snapshot"
            "type": "string",
            "optional": false,
            "field": "db"
            "type": "string",
            "optional": true,
            "field": "sequence"
            "type": "string",
            "optional": true,
            "field": "table"
            "type": "int64",
            "optional": false,
            "field": "server_id"
            "type": "string",
            "optional": true,
            "field": "gtid"
            "type": "string",
            "optional": false,
            "field": "file"
            "type": "int64",
            "optional": false,
            "field": "pos"
            "type": "int32",
            "optional": false,
            "field": "row"
            "type": "int64",
            "optional": true,
            "field": "thread"
            "type": "string",
            "optional": true,
            "field": "query"
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
        "type": "string",
        "optional": false,
        "field": "op"
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
        "type": "struct",
        "fields": [
            "type": "string",
            "optional": false,
            "field": "id"
            "type": "int64",
            "optional": false,
            "field": "total_order"
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
        "optional": true,
        "field": "transaction"
    "optional": false,
    "name": "test01.oc.customers.Envelope"
  "payload": {
    "before": null,
    "after": {
      "customer_id": 2,
      "email_address": "testaddress_02@testdomain",
      "full_name": "testuser_02"
    "source": {
      "version": "1.9.7.Final",
      "connector": "mysql",
      "name": "test01",
      "ts_ms": 1699071898000,
      "snapshot": "false",
      "db": "oc",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": null,
      "file": "binlog.000002",
      "pos": 4299,
      "row": 0,
      "thread": 9,
      "query": null
    "op": "c",
    "ts_ms": 1699071898188,
    "transaction": null

key 값이랑 value 다 나오고...이것저것 나오긴 하는데 가장 눈여겨 보아야 할 것은 payload 이다.

insert before 와 after가 나오게 된다. 근데 문제는 JDBC sink connector에서 이 형식을 받아줄 수 없다는 것..

"payload": {
    "before": null,
    "after": {
      "customer_id": 2,
      "email_address": "testaddress_02@testdomain",
      "full_name": "testuser_02"


update customers set full_name='updateduser_01' where customer_id = 2;

"payload": {
    "before": {
      "customer_id": 2,
      "email_address": "testaddress_02@testdomain",
      "full_name": "testuser_02"
    "after": {
      "customer_id": 2,
      "email_address": "testaddress_02@testdomain",
      "full_name": "updateduser_01"
    "source": {
      "version": "1.9.7.Final",
      "connector": "mysql",
      "name": "test01",
      "ts_ms": 1699074509000,
      "snapshot": "false",
      "db": "oc",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": null,
      "file": "binlog.000002",
      "pos": 6226,
      "row": 0,
      "thread": 14,
      "query": null
    "op": "u",
    "ts_ms": 1699074509162,
    "transaction": null


delete도 해보자.. after가 그냥 null 임

 "payload": {
    "before": {
      "customer_id": 2,
      "email_address": "testaddress_02@testdomain",
      "full_name": "updateduser_01"
    "after": null,
    "source": {
      "version": "1.9.7.Final",
      "connector": "mysql",
      "name": "test01",
      "ts_ms": 1699074584000,
      "snapshot": "false",
      "db": "oc",
      "sequence": null,
      "table": "customers",
      "server_id": 1,
      "gtid": null,
      "file": "binlog.000002",
      "pos": 6602,
      "row": 0,
      "thread": 14,
      "query": null
    "op": "d",
    "ts_ms": 1699074584724,
    "transaction": null

sink 로 넘겨보자..

// db set
create database oc_sink;
grant all privileges on oc_sink.* to 'connect_dev'@'%' with grant option;

use oc_sink;

drop table if exists customers_sink;
drop table if exists products_sink;
drop table if exists orders_sink;
drop table if exists order_items_sink;

-- 아래 Create Table 스크립트수행.
CREATE TABLE customers_sink (
customer_id int NOT NULL PRIMARY KEY,
email_address varchar(255) NOT NULL,
full_name varchar(255) NOT NULL

CREATE TABLE products_sink (
	product_id int NOT NULL PRIMARY KEY,
	product_name varchar(100) NULL,
	product_category varchar(200) NULL,
	unit_price decimal(10, 0) NULL

CREATE TABLE orders_sink (
	order_id int NOT NULL PRIMARY KEY,
	order_datetime timestamp NOT NULL,
	customer_id int NOT NULL,
	order_status varchar(10) NOT NULL,
	store_id int NOT NULL

CREATE TABLE order_items_sink (
	order_id int NOT NULL,
	line_item_id int NOT NULL,
	product_id int NOT NULL,
	unit_price decimal(10, 2) NOT NULL,
	quantity int NOT NULL,
	primary key (order_id, line_item_id)

select * from customers_sink;
select * from products_sink;
select * from orders_sink;
select * from order_items_sink;

db 셋팅해주고 ..


    "name": "mysql_jdbc_oc_sink_customers_test01",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "test01.oc.customers",
        "connection.url": "jdbc:mysql://my-mysql:3306/oc_sink",
        "connection.user": "connect_dev",
        "connection.password": "connect_dev",
        "": "oc_sink.customers_sink",
        "insert.mode": "upsert",
        "pk.fields": "customer_id",
        "pk.mode": "record_key",
        "delete.enabled": "true",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"

하고 커넥터 등록!
http POST http://localhost:8083/connectors @mysql_cdc_oc_sink_test01.json

하니까 .. 대충 뭔 필드 싱크가 안 맞는다고..에러가 난다..

ERROR [mysql_jdbc_oc_sink_customers_test01|task-0] WorkerSinkTask{id=mysql_jdbc_oc_sink_customers_test01-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "oc_sink"."customers_sink" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.mysql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask:622)...

이게 내 debezium default 형식인 before, after랑 형식이 안 맞는건데 이런경우에 SMT를 이용해서 형식을 좀 변환해줄 필요가 있다..

이건 다음포스팅에서 진행해보겠음..

Web Developer

0개의 댓글