
JDBC Sink Connector는 Kafka의 메시지를 RDBMS에 적재하기 위한 Sink Connector다.
Kafka Consumer 역할을 수행하면서 Kafka 토픽의 데이터를 읽고, JDBC Driver를 이용해 대상 데이터베이스 테이블에 Insert, Update, Delete를 수행한다.
데이터 파이프라인의 종착지 역할을 하며, Kafka에서 처리된 이벤트를 최종적으로 DB에 저장하거나 상태를 동기화할 때 사용된다.
아래는 일반적으로 사용되는 JDBC Sink Connector 설정 예시다.
{
"name": "jdbc-sink-om",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "mysql_om_customers",
"connection.url": "jdbc:mysql://localhost:3306/om_target",
"connection.user": "connect_dev",
"connection.password": "connect_dev",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "customer_id",
"delete.enabled": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": true
}
}
connector.class : JDBC Sink Connector 클래스 지정
tasks.max : 생성할 최대 Task 개수
topics : 적재 대상 Kafka Topic
connection.url, connection.user, connection.password : DB 접속 정보
auto.create : 테이블 자동 생성 여부 (비추천)
auto.evolve : 스키마 변화(column 추가 등) 자동 반영 여부
insert.mode : DB 적용 방식 (insert, update, upsert)
pk.mode : Primary Key 추출 방식 (record_key, record_value, none)
pk.fields : Primary Key로 사용할 필드명
delete.enabled : Kafka 메시지 value가 null일 때 Delete 수행 여부
value.converter.schemas.enable : 메시지에 Schema 포함 여부(DB 컬럼 타입 매핑 시 필수)
Update는 Primary Key가 존재해야만 가능하다.
JDBC Sink Connector는 PK를 기준으로 메시지를 DB 행과 매칭하고, 존재하면 Update를 수행한다.
Insert 모드와 관계없이, Update 동작의 기반은 다음 두 가지이다:
가장 많이 사용하는 방식이다.
Kafka 메시지 (update 와 insert시 메세지는 같아보인다.)
{
"payload": {
"customer_id": 1,
"full_name": "testuser_01_updated",
"email_address": "update_email@test.com"
}
}
Delete 처리 방식은 delete.enabled=true 설정과 Value = null 여부에 따라 결정된다.
delete.enabled=true 이어야 한다. 예: Kafka 메시지(Delete 이벤트)
key: {"customer_id": 1}
value: null
JDBC Sink Connector는 Kafka 메시지에 포함된 날짜·시간 타입을 DB에 적재하기 위해 내부적으로 Connect Schema Type → JDBC 타입 변환을 수행한다.
이때 변환은 Unix Epoch(1970-01-01 00:00:00 UTC)를 기준으로 한 값으로 처리된다.
org.apache.kafka.connect.data.Date org.apache.kafka.connect.data.Time org.apache.kafka.connect.data.Timestamp org.apache.kafka.connect.data.Timestamp {
"schema": {
"type": "struct",
"fields": [
{
"field": "system_upd",
"type": "int64",
"name": "org.apache.kafka.connect.data.Timestamp"
}
]
},
"payload": {
"system_upd": 1763018927310
}
}