Kafka - JDBC Source Connector

develkkm·2025년 11월 13일

0부터 시작하는 Kafka

목록 보기
10/18

JDBC Source Connector는 RDBMS의 데이터를 Kafka로 전송하기 위해 JDBC Driver를 활용하여 DB에 접속하고 데이터를 추출한 뒤 Kafka로 메시지를 생성하는 Source Connector다.

Kafka Connect 내부에서 Transform → Converter → Kafka 메시지 전송 순서로 처리되며, RDBMS에서 조회한 한 행을 Kafka의 레코드로 변환하여 전송한다.

DBMS마다 JDBC Driver가 별도로 설치되어 있어야 하며, Connector는 JDBC 쿼리를 기반으로 데이터를 주기적으로 폴링하여 Kafka로 전달한다.


Mode

Bulk Mode

  • 전체 데이터를 매번 모두 읽는다.
  • 데이터량이 많으면 비효율적이며 중복 가능성이 있다.

Incrementing Mode

  • 증가하는 정수형 컬럼(PK 또는 증가값 컬럼)을 기준으로 이전 이후의 데이터를 조회한다.
  • SELECT * FROM table WHERE id > last_offset 형태로 동작한다.
  • Update 이벤트는 감지하지 못한다.

Timestamp Mode

  • 타임스탬프 컬럼을 기준으로 최근 이후의 데이터를 조회한다.
  • Update까지 감지할 수 있다.

Timestamp + Incrementing

  • Timestamp와 Incrementing 컬럼을 함께 사용한다.
  • 신규 데이터와 변경 데이터를 모두 감지하는 데 가장 적합하다.

문제점

  • Source 데이터의 변경 이벤트가 모두 Kafka에 전송되었다고 보장하기 어렵다.

  • Delete 이벤트는 JDBC Source Connector가 감지하지 못한다.

  • Polling 방식이므로 대용량 테이블에서 부하가 발생할 수 있다.


Config

{
  "name": "mysql_jdbc_om_source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://localhost:3306/om",
    "connection.user": "connect_dev",
    "connection.password": "connect_dev",
    "topic.prefix": "mysql_",
    "topic.creation.enable": "true",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1,
    "catalog.pattern": "om",
    "table.whitelist": "om.customers",
    "poll.interval.ms": 10000,
    "mode": "incrementing",
    "incrementing.column.name": "customer_id"
  }
}

주요 파라미터 설명

  • connector.class : JDBC Source Connector 클래스 지정

  • tasks.max : 생성할 최대 Task 수

  • connection.url, user, password : JDBC 접속 정보

  • topic.prefix : 생성되는 Kafka Topic 이름의 Prefix

  • topic.creation.enable : 토픽 자동 생성 여부

  • table.whitelist : 조회 대상 테이블 목록

  • poll.interval.ms : 주기적 조회 주기

  • mode : 데이터 조회 방식 지정

  • incrementing.column.name : Incrementing 모드에서 증가값 기준 컬럼

  • timestamp.column.name : Timestamp 모드 기준 컬럼

토픽명은 자동으로 topic.prefix + table명 형태로 생성된다.


Message

{
  "topic": "mysql_om_customers",
  "partition": 0,
  "offset": 0,
  "tstype": "create",
  "ts": 1763018927310,
  "key": null,
  "payload": "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"customer_id\"},{\"type\":\"string\",\"optional\":false,\"field\":\"email_address\"},{\"type\":\"string\",\"optional\":false,\"field\":\"full_name\"},{\"type\":\"int64\",\"optional\":false,\"name\":\"org.apache.kafka.connect.data.Timestamp\",\"version\":1,\"field\":\"system_upd\"}],\"optional\":false,\"name\":\"customers\"},\"payload\":{\"customer_id\":1,\"email_address\":\"testaddress_01@testdomain\",\"full_name\":\"testuser_01\",\"system_upd\":1668444059000}}"
}

메시지 설명

  • schema : 메시지의 구조 정의
  • payload : 실제 레코드 데이터
  • JDBC Source Connector는 기본적으로 key를 생성하지 않는다.
  • 메시지 형식은 schema + payload 구조로 구성된다.

오프셋 관리 방식

Kafka Connect는 JDBC Source Connector의 오프셋을 내부 토픽인 connect-offsets에 저장한다.

Incrementing

Key: ["mysql_jdbc_om_source_00", {"table":"om.customers"}]
Value: {"incrementing": 2}

쿼리 예시:

SELECT * FROM om.customers 
WHERE om.customers.customer_id > ?
ORDER BY om.customers.customer_id ASC

Timestamp

Key: ["mysql_jdbc_om_source_01", {"table":"om.customers"}]
Value: {"timestamp_nanos": 0, "timestamp": 1763018927310}

쿼리 예시:

SELECT * FROM om.customers 
WHERE system_upd > ? AND system_upd < ?
ORDER BY system_upd ASC

메시지 특징

  • 토픽명은 기본적으로 테이블명을 따른다.
  • Key는 자동 생성되지 않기 때문에 SMT를 통해 Key를 수동 생성해야 한다.
  • 메시지는 항상 schema + payload 구조로 생성된다.
  • schema 정보에는 필드 타입, 이름, optional 여부가 포함된다.

SMT

JDBC Source Connector는 Key 값을 자동 생성하지 않는다.
Kafka에서 파티셔닝 처리나 Upsert 처리 등을 위해서는 PK를 메시지 Key로 변환해줘야 한다.
이를 위해 ValueToKey → ExtractField$Key 조합을 사용한다.

{
  "name": "mysql_jdbc_om_source_03",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "transforms": "create_key, extract_key",
    "transforms.create_key.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.create_key.fields": "customer_id",
    "transforms.extract_key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extract_key.field": "customer_id"
  }
}

ValueToKey

  • Value의 특정 필드(PK 포함)를 Key 영역으로 복사한다.
  • 예: Value 안의 customer_id를 Key로 변환한다.

ExtractField$Key

  • Key에 복사된 데이터 중 특정 필드만 추출한다.
  • 복잡한 구조의 Key를 단일 필드로 단순화하는 데 사용된다.

RegexRouter

필요할 경우 SMT의 RegexRouter를 통해 토픽 이름을 변경할 수 있다.

예시:

"transforms.topicRename.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRename.regex": "mysql_(.*)",
"transforms.topicRename.replacement": "source_$1"

결과:

  • mysql_om_customerssource_om_customers
profile
알던것을 더 확실하게

0개의 댓글