Kafka를 통해 데이터베이스의 변경 사항을 실시간으로 수신하는 CDC(Change Data Capture) 시스템을 구현할 때, 타겟 테이블에서 발생하는 생성, 수정, 삭제 이벤트가 모두 메시지로 전송됩니다. 그러나 모든 이벤트가 필요한 것은 아니기 때문에 불필요한 메시지 전송이 시스템 리소스를 낭비할 수 있습니다.
이 문제를 해결하기 위해 Debezium의 Message Filtering 기능을 활용하여 원하는 데이터만 메시지로 전송하도록 설정할 수 있습니다.
Debezium에서 Message Filtering을 사용하려면 스크립팅 기능을 활성화해야 합니다. 이를 위해 Docker Compose에서 Kafka Connect를 실행할 때 ENABLE_DEBEZIUM_SCRIPTING: 'true' 속성을 설정해야 합니다.
connect:
image: debezium/connect:latest
container_name: connect
ports:
- "8083:8083"
environment:
GROUP_ID: "1"
CONFIG_STORAGE_TOPIC: "my_connect_configs"
OFFSET_STORAGE_TOPIC: "my_connect_offsets"
STATUS_STORAGE_TOPIC: "my_connect_statuses"
ZOOKEEPER_CONNECT: 'zookeeper:2181'
BOOTSTRAP_SERVERS: 'kafka:9092'
ENABLE_DEBEZIUM_SCRIPTING: 'true' # 스크립팅 활성화
depends_on:
- zookeeper
- kafka
restart: always
networks:
- my_shared_network
스크립팅 기능을 활성화하지 않으면, Debezium 커넥터를 설정할 때 유효성 검사에서 오류가 발생합니다. 이 설정은 커넥터가 사용자 정의 필터링 로직을 적용할 수 있도록 필수적입니다.
Debezium 스크립팅 기능에 대한 자세한 내용은 Debezium 공식 Docker Hub 페이지에서 확인할 수 있습니다.
Debezium은 기본적으로 Filter SMT(Single Message Transform) 기능을 제공하여 원하는 레코드만 처리할 수 있습니다. Filter SMT는 JSR223과 통합된 스크립팅 언어를 지원하는데, 이를 위해 groovy, groovy-jsr223 등의 플러그인이 필요합니다.
그러나 보안상의 이유로 Filter SMT는 기본 Debezium 커넥터 아카이브에 포함되지 않으며, 별도로 플러그인을 추가해야 합니다. 필요한 플러그인을 다운로드하고, Docker 컨테이너에서 사용할 수 있도록 설정해야 합니다.
필요한 플러그인 목록
- debezium-connector-sqlserver-2.7.0.Final.jar
- debezium-scripting-2.7.0.Final.jar
- debezium-storage-file-2.7.0.Final.jar
- debezium-storage-kafka-2.7.0.Final.jar
- debezium-core-2.7.0.Final.jar
- debezium-api-2.7.0.Final.jar
- mssql-jdbc-12.4.2.jre8.jar
- groovy-3.0.6.jar
- groovy-json-3.0.6.jar
- groovy-jsr223-3.0.6.jar
docker를 사용하여 connect를 실행하기에 필요한 jar파일을 미리 download받아 놓고 볼륨설정으로 필요한 플러그인을 추가하였습니다.
filter SMT에 필요한 플러그인 이외에도 기존에 가지고 있던 플러그인도 받아햐 볼륨 설정시 정상적으로 동작하기 때문에 다른 플러그인도 포함되어 있습니다.
connect:
image: debezium/connect:2.7
container_name: connect
ports:
- "8083:8083"
environment:
GROUP_ID: "1"
CONFIG_STORAGE_TOPIC: "my_connect_configs"
OFFSET_STORAGE_TOPIC: "my_connect_offsets"
STATUS_STORAGE_TOPIC: "my_connect_statuses"
ZOOKEEPER_CONNECT: 'zookeeper:2181'
BOOTSTRAP_SERVERS: 'kafka:9092'
ENABLE_DEBEZIUM_SCRIPTING: "true"
volumes:
- ./debezium-connector-sqlserver:/kafka/connect/debezium-connector-sqlserver # 플러그인 볼륨설정
depends_on:
- zookeeper
- kafka
deploy:
restart_policy:
condition: on-failure
delay: 10s
networks:
- my_shared_network
프로젝트에서 mssql을 사용하여 debezium-connector-sqlserver 디렉터리와 볼륨설정을 하여 플러그인을 추가하였습니다. mysql이나 다른 데이터베이스를 사용할 경우 해당 디렉터리와 볼륨설정을 하면됩니다.
플러그인을 추가해야 한다는 것은 알고 있었지만, 처음에는 정확히 어느 경로에 추가해야 하는지 혼란스러웠습니다. 다행히도 Medium에서 Okan YILDIRIM 님이 작성한 블로그를 통해 connect/debezium-connector-sqlserver/
경로에 플러그인 볼륨을 설정해야 한다는 중요한 정보를 얻을 수 있었습니다. 이 경로에 플러그인을 추가하면 Debezium이 정상적으로 Groovy 스크립팅 기능을 사용할 수 있게 됩니다.
출처 : https://medium.com/trendyol-tech/debezium-with-simple-message-transformation-smt-4f5a80c85358
{
"name": "debezium-connector",
"config": {
// 생략...
"transforms":"unwrap,filter", # filter 추가
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":false,
"transforms.unwrap.add.fields.prefix":"cdc_meta_",
"transforms.unwrap.add.fields":"op,table,lsn,source.ts_ms",
"transforms.unwrap.delete.handling.mode":"rewrite",
# filter SMT 를 위한 속성
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.is_closed == true && value.cdc_meta_op == 'u'",
}
}
- transforms.filter.type: 이 속성은 Filter SMT를 지정합니다. 이 필터는 Groovy와 같은 스크립팅 언어를 사용하여 조건을 정의하고, 그 조건에 따라 메시지를 필터링합니다.
- transforms.filter.language: 필터링에 사용할 스크립팅 언어를 지정합니다. 여기서는 Groovy 언어를 사용합니다 (jsr223.groovy).
- transforms.filter.condition: 필터링 조건을 정의하는 스크립트입니다. 이 예제에서는 value.is_closed == true && value.cdc_meta_op == 'u' 조건을 만족하는 메시지들만 처리됩니다. 이 조건에 따라, is_closed 필드가 true이고, cdc_meta_op 필드가 u(업데이트)인 레코드만 메시지로 전달됩니다. 이로 인해 불필요한 메시지 전송을 줄일 수 있습니다.
Connector configuration is invalid and contains the following 1 error(s):\nInvalid value io.debezium.transforms.Filter for configuration transforms.filter.type: Class io.debezium.transforms.Filter could not be found.\nYou can also find the above list of errors at the endpoint
/connector-plugins/{connectorType}/config/validate
filter SMT에 필요한 플러그인을 볼륨설정으로 추가해주었는데도 발생한 에러로 docker-compose.yml에 ENABLE_DEBEZIUM_SCRIPTING: 'true'
속성을 추가하여 해결할 수 있었습니다.
이는 Debeezium이 기본적으로 보안을 강화하기 위해 일부 고급 기능을 비활성화한 상태로 배포되기 때문에 속성을 명시적으로 추가해줘야합니다.
filter SMT를 위한 플러그인을 추가하지 않아서 발생한 에러로 볼륨설정을 통해 필요한 라이브러리를 추가하여 해결할 수 있었습니다.
"trace": "org.apache.kafka.connect.errors.ConnectException: io.debezium.DebeziumException: Failed to parse expression 'value.is_closed == true'\n\tat
// 생략...
org.apache.kafka.connect.runtime.ConnectorConfig.transformationStages(ConnectorConfig.java:288)\n\t... 10 more\nCaused by: io.debezium.DebeziumException: Implementation of language 'groovy' not found on the classpath
이 에러는 Debezium에서 Groovy 스크립트를 사용하여 필터링 로직을 구현하려고 할 때, Groovy 언어의 구현체가 classpath에 존재하지 않아 발생하는 문제입니다.
처음에 debezium-scripting 관련 플러그인만 추가하면 필터링 로직을 구현할 수 있을 것으로 생각했지만, Debezium에서 Groovy 스크립트를 실행하려면 groovy, groovy-jsr223, groovy-json과 같은 Groovy 관련 플러그인들도 함께 classpath에 추가되어야 합니다.
이번 프로젝트를 진행하면서 Debezium과 Kafka를 활용한 CDC 시스템을 구현하는 과정에서 많은 것을 배울 수 있었습니다. 공식 문서를 통해 Debezium의 다양한 기능과 설정 방법을 익힐 수 있었고, 특히 필터링 기능을 통해 불필요한 메시지 전송을 최소화하는 방법에 대해 깊이 있게 이해하게 되었습니다.
물론, 과정 중에 예상치 못한 여러 가지 트러블이 발생했습니다. 특히, 필수 플러그인 설정과 관련된 문제나, Groovy 스크립팅 기능을 사용하기 위한 환경 설정에서 여러 차례 어려움을 겪었지만, 결국에는 문제를 하나씩 해결하면서 더욱 단단한 경험을 쌓을 수 있었습니다.