Kafka, MySQL Sink Connector 설정법

60jong·2025년 9월 18일
0

Apache Kafka

목록 보기
2/2
curl --location --request POST 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data-raw '{
  "name": "sink-test-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://mysql-sink:3308/sinkdb?user=60jong&password=991911",
    "auto.create": "false",
    "auto.evolve": "false",
    "delete.enabled": "true",
    "insert.mode": "upsert",
    "pk.mode": "record_key",
    "table.name.format":"${topic}",
    "tombstones.on.delete": "true",
    "connection.user": "mysqluser",
    "connection.password": "mysqlpw",
    "topics.regex": "dbserver1.testdb.(.*)",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "transforms": "unwrap, route, TimestampConverter",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
    "transforms.TimestampConverter.target.type": "Timestamp",
    "transforms.TimestampConverter.field": "update_date"
  }
}'
 
{
  "error_code":400,
  "message":"Connector configuration is invalid and contains the following 2 error(s):
  Invalid value io.debezium.transforms.ExtractNewRecordState for configuration transforms.unwrap.type: Class io.debezium.transforms.ExtractNewRecordState could not be found.
  Invalid value null for configuration transforms.unwrap.type: Not a Transformation
  You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}

참고 : https://wecandev.tistory.com/110

profile
울릉도에 별장 짓고 싶다

0개의 댓글