
JDBC Source Connector는 RDBMS의 데이터를 Kafka로 전송하기 위해 JDBC Driver를 활용하여 DB에 접속하고 데이터를 추출한 뒤 Kafka로 메시지를 생성하는 Source Connector다.
Kafka Connect 내부에서 Transform → Converter → Kafka 메시지 전송 순서로 처리되며, RDBMS에서 조회한 한 행을 Kafka의 레코드로 변환하여 전송한다.
DBMS마다 JDBC Driver가 별도로 설치되어 있어야 하며, Connector는 JDBC 쿼리를 기반으로 데이터를 주기적으로 폴링하여 Kafka로 전달한다.
SELECT * FROM table WHERE id > last_offset 형태로 동작한다.Source 데이터의 변경 이벤트가 모두 Kafka에 전송되었다고 보장하기 어렵다.
Delete 이벤트는 JDBC Source Connector가 감지하지 못한다.
Polling 방식이므로 대용량 테이블에서 부하가 발생할 수 있다.
{
"name": "mysql_jdbc_om_source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/om",
"connection.user": "connect_dev",
"connection.password": "connect_dev",
"topic.prefix": "mysql_",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 1,
"catalog.pattern": "om",
"table.whitelist": "om.customers",
"poll.interval.ms": 10000,
"mode": "incrementing",
"incrementing.column.name": "customer_id"
}
}
connector.class : JDBC Source Connector 클래스 지정
tasks.max : 생성할 최대 Task 수
connection.url, user, password : JDBC 접속 정보
topic.prefix : 생성되는 Kafka Topic 이름의 Prefix
topic.creation.enable : 토픽 자동 생성 여부
table.whitelist : 조회 대상 테이블 목록
poll.interval.ms : 주기적 조회 주기
mode : 데이터 조회 방식 지정
incrementing.column.name : Incrementing 모드에서 증가값 기준 컬럼
timestamp.column.name : Timestamp 모드 기준 컬럼
토픽명은 자동으로 topic.prefix + table명 형태로 생성된다.
{
"topic": "mysql_om_customers",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1763018927310,
"key": null,
"payload": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"customer_id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email_address\"},{\"type\":\"string\",\"optional\":false,\"field\":\"full_name\"},{\"type\":\"int64\",\"optional\":false,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"system_upd\"}],\"optional\":false,\"name\":\"customers\"},\"payload\":{\"customer_id\":1,\"email_address\":\"testaddress_01@testdomain\",\"full_name\":\"testuser_01\",\"system_upd\":1668444059000}}"
}
Kafka Connect는 JDBC Source Connector의 오프셋을 내부 토픽인 connect-offsets에 저장한다.
Key: ["mysql_jdbc_om_source_00", {"table":"om.customers"}]
Value: {"incrementing": 2}
쿼리 예시:
SELECT * FROM om.customers
WHERE om.customers.customer_id > ?
ORDER BY om.customers.customer_id ASC
Key: ["mysql_jdbc_om_source_01", {"table":"om.customers"}]
Value: {"timestamp_nanos": 0, "timestamp": 1763018927310}
쿼리 예시:
SELECT * FROM om.customers
WHERE system_upd > ? AND system_upd < ?
ORDER BY system_upd ASC
JDBC Source Connector는 Key 값을 자동 생성하지 않는다.
Kafka에서 파티셔닝 처리나 Upsert 처리 등을 위해서는 PK를 메시지 Key로 변환해줘야 한다.
이를 위해 ValueToKey → ExtractField$Key 조합을 사용한다.
{
"name": "mysql_jdbc_om_source_03",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"transforms": "create_key, extract_key",
"transforms.create_key.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.create_key.fields": "customer_id",
"transforms.extract_key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extract_key.field": "customer_id"
}
}
customer_id를 Key로 변환한다.필요할 경우 SMT의 RegexRouter를 통해 토픽 이름을 변경할 수 있다.
예시:
"transforms.topicRename.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRename.regex": "mysql_(.*)",
"transforms.topicRename.replacement": "source_$1"
결과:
mysql_om_customers → source_om_customers