지난 포스팅에서 before/after default type을 JDBC sink connector에서 받을 수 없어서 메시지 변환이 필요했었다..
이번 포스팅에서 알아보자..
아래와 같은 default 형식을 가짐..
{
"name": "mysql_cdc_oc_source_01",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "my-mysql",
"database.port": "3306",
"database.user": "connect_dev",
"database.password": "connect_dev",
// 다른 커넥터랑 겹치지 않게 10001로..
"database.server.id": "10001",
"database.server.name": "mysql01",
"database.include.list": "oc",
"table.include.list": "oc.customers, oc.products, oc.orders, oc.order_items",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.mysql.oc",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}
// 커넥터 등록..
http POST http://localhost:8083/connectors @mysql_cdc_oc_source_01.json
등록이 정상적으로 완료되고..
oc 테이블에 데이터는 다음과 같다.
이 데이터로 토픽이 만들어졌을까 ?
=server name 을 mysql01 이라고 줬으니.. 토픽이 만들어져야 한다.
이런식으로 커넥터를 기동하자마자 토픽이 올라옴
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "customer_id"
},
{
"type": "string",
"optional": false,
"field": "email_address"
},
{
"type": "string",
"optional": false,
"field": "full_name"
}
],
"optional": false,
"name": "mysql01.oc.customers.Value"
},
"payload": {
"customer_id": 1,
"email_address": "testaddress_01@testdomain",
"full_name": "testuser_01"
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "customer_id"
},
{
"type": "string",
"optional": false,
"field": "email_address"
},
{
"type": "string",
"optional": false,
"field": "full_name"
}
],
"optional": false,
"name": "mysql01.oc.customers.Value"
},
"payload": {
"customer_id": 3,
"email_address": "testaddress_01@testdomain",
"full_name": "testuser_01"
}
}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "customer_id"
},
{
"type": "string",
"optional": false,
"field": "email_address"
},
{
"type": "string",
"optional": false,
"field": "full_name"
}
],
"optional": false,
"name": "mysql01.oc.customers.Value"
},
"payload": {
"customer_id": 4,
"email_address": "testaddress_02@testdomain",
"full_name": "testuser_02"
}
}
payload 에 after만 있는 모습을 볼 수 있다.
이 형식으로 메시지를 전송할거고 메시지를 받을 sink connector도 다시 하나 config 부터 생성해보자.
{
"name": "mysql_jdbc_oc_sink_products_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mysql01.oc.products",
"connection.url": "jdbc:mysql://my-mysql:3306/oc_sink",
"connection.user": "connect_dev",
"connection.password": "connect_dev",
"table.name.format": "products_sink",
"insert.mode": "upsert",
"pk.fields": "product_id",
"pk.mode": "record_key",
"delete.enabled": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
// 커넥터 등록
http POST http://localhost:8083/connectors @mysql_jdbc_oc_sink_customers_01.json
source db.table 의 데이터이다.
row 2번은 delete 되었고..
full_name 은 update 된 필드가 하나 있다..
여기서 insert를 아무거나 해서.. 한번 sink쪽 db를 보니까.. 동기화가 되는모습..
그런데 단점이 테이블당 하나를 만들어야..되는듯..? 이게 너무 많아지면 cpu core 가 가져가야되는 부담이 꽤 크다고 한다..
반대로 SMT 를 sink 쪽에서 넣는것도 가능함!!