Debezium MySQL CDC Source Connector (3) - SMT 메세지 변환

이동명·2023년 11월 4일
0

kafkaConnect

목록 보기
7/9
post-thumbnail

지난 포스팅에서 before/after default type을 JDBC sink connector에서 받을 수 없어서 메시지 변환이 필요했었다..

이번 포스팅에서 알아보자..

Debezium Source Connector 생성 메시지 포맷

아래와 같은 default 형식을 가짐..

ExtractNewRecordState SMT 적용하여 After 메시지만 생성

config

{
    "name": "mysql_cdc_oc_source_01",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "my-mysql",
        "database.port": "3306",
        "database.user": "connect_dev",
        "database.password": "connect_dev",
        // 다른 커넥터랑 겹치지 않게 10001로..
        "database.server.id": "10001",
        "database.server.name": "mysql01",
        "database.include.list": "oc",
        "table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items", 
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "schema-changes.mysql.oc",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false"
    }
}

// 커넥터 등록..
http POST http://localhost:8083/connectors @mysql_cdc_oc_source_01.json

등록이 정상적으로 완료되고..
oc 테이블에 데이터는 다음과 같다.

이 데이터로 토픽이 만들어졌을까 ?

=server name 을 mysql01 이라고 줬으니.. 토픽이 만들어져야 한다.

이런식으로 커넥터를 기동하자마자 토픽이 올라옴

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "customer_id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "email_address"
      },
      {
        "type": "string",
        "optional": false,
        "field": "full_name"
      }
    ],
    "optional": false,
    "name": "mysql01.oc.customers.Value"
  },
  "payload": {
    "customer_id": 1,
    "email_address": "testaddress_01@testdomain",
    "full_name": "testuser_01"
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "customer_id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "email_address"
      },
      {
        "type": "string",
        "optional": false,
        "field": "full_name"
      }
    ],
    "optional": false,
    "name": "mysql01.oc.customers.Value"
  },
  "payload": {
    "customer_id": 3,
    "email_address": "testaddress_01@testdomain",
    "full_name": "testuser_01"
  }
}
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int32",
        "optional": false,
        "field": "customer_id"
      },
      {
        "type": "string",
        "optional": false,
        "field": "email_address"
      },
      {
        "type": "string",
        "optional": false,
        "field": "full_name"
      }
    ],
    "optional": false,
    "name": "mysql01.oc.customers.Value"
  },
  "payload": {
    "customer_id": 4,
    "email_address": "testaddress_02@testdomain",
    "full_name": "testuser_02"
  }
}

payload 에 after만 있는 모습을 볼 수 있다.

이 형식으로 메시지를 전송할거고 메시지를 받을 sink connector도 다시 하나 config 부터 생성해보자.


{
    "name": "mysql_jdbc_oc_sink_products_01",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "mysql01.oc.products",
        "connection.url": "jdbc:mysql://my-mysql:3306/oc_sink",
        "connection.user": "connect_dev",
        "connection.password": "connect_dev",
        "table.name.format": "products_sink",
        "insert.mode": "upsert",
        "pk.fields": "product_id",
        "pk.mode": "record_key",
        "delete.enabled": "true",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

// 커넥터 등록
http POST http://localhost:8083/connectors @mysql_jdbc_oc_sink_customers_01.json

source db.table 의 데이터이다.
row 2번은 delete 되었고..
full_name 은 update 된 필드가 하나 있다..

여기서 insert를 아무거나 해서.. 한번 sink쪽 db를 보니까.. 동기화가 되는모습..

그런데 단점이 테이블당 하나를 만들어야..되는듯..? 이게 너무 많아지면 cpu core 가 가져가야되는 부담이 꽤 크다고 한다..

Sink Connector 에서 ExtractNewRecordState SMT 적용하여 After 메시지만 변환한 뒤 DB 입력

반대로 SMT 를 sink 쪽에서 넣는것도 가능함!!

profile
Web Developer

0개의 댓글