Apache Kafka & Debezium CDC 구현

박태현·2025년 5월 17일

Apache Kafka

https://velog.io/@ayeah77/Apache-Kafka-Debezium-CDC


Zookeeper 실행 ( Kafka 파일 경로 )

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

Kafka 실행 ( Kafka 파일 경로 )

./bin/kafka-server-start.sh ./config/server.properties

Kafka Connect Worker 실행 ( Kafka 파일 경로 )

./bin/connect-distributed.sh config/connect-distributed.properties

Debezium Connector 실행

curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
  "name": "event-push-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "tiffndla0423",
    "database.server.id": "184054",
    "database.server.name": "queue_system_db",
    "database.allowPublicKeyRetrieval": "true",
    "database.include.list": "queue_system_db",
    "table.include.list": "queue_system_db.outbox",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "dbhistory.queueing_system_db",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "transforms": "unwrap,addTopicPrefix",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.addTopicPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.addTopicPrefix.regex": "(.*)",
    "transforms.addTopicPrefix.replacement": "$1"
  }
}'

[ 커넥터 목록 확인 ]

curl http://localhost:8083/connectors

[ 등록된 커넥터 삭제 ]

curl -X DELETE http://localhost:8083/connectors/event-push-connector

Debezium CDC 동작 테스트


MySQL로 Outbox 테이블에 임의로 변경 사항 발생

INSERT INTO outbox 
	( aggregate_type, aggregate_id, event_type, queue_type, user_id) 
VALUES 
	('test01', 01, 'QUEUE_REGISTERED', 'test01','test_user_01');

카프카 명령어로 확인 ( 성공적으로 발행 성공 )

./bin/kafka-console-consumer.sh \
   --bootstrap-server localhost:9092 \
   --topic queue_system_db.queue_system_db.outbox \
   --from-beginning

{
  "schema": {
    ...
  },
  "payload": {
    "id": 6,
    "aggregate_id": 1,
    "aggregate_type": "test01",
    "event_type": "QUEUE_REGISTERED",
    "queue_type": "test01",
    "user_id": "test_user_01"
  }
}

[ Debezium이 테이블 변경 사항을 Kafka로 produce 했을 때 consume 되는 메세지 형식 ]

{
  "schema": {...},
  "payload": {
    "id": 4,
    "aggregate_id": ###,
    "aggregate_type": ###,
    "event_type": ###,
    "queue_type": ###,
    "user_id": ###
  }
}
@KafkaListener(topics = "queue_system_db.queue_system_db.outbox")
public void kafkaConsume(String message) {
	try {

		JsonNode node = objectMapper.readTree(message);
		JsonNode payload = node.path("payload");

		String queueType = payload.get("queue_type").asText();
		String userId = payload.get("user_id").asText();

		log.info("consume - queueType : {} , userId : {}", queueType, userId);

		SseEventService.sink.tryEmitNext(queueType);

	} catch (Exception e) {
		log.error("Kafka 메시지 소비 실패", e);
	}
}
// 정상적으로 consume
...KafkaConsumerService   : consume - queueType : test01 , userId : test_user_01
profile
꾸준하게

0개의 댓글