https://velog.io/@ayeah77/Apache-Kafka-Debezium-CDC
Zookeeper 실행 ( Kafka 파일 경로 )
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
Kafka 실행 ( Kafka 파일 경로 )
./bin/kafka-server-start.sh ./config/server.properties
Kafka Connect Worker 실행 ( Kafka 파일 경로 )
./bin/connect-distributed.sh config/connect-distributed.properties
Debezium Connector 실행
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "event-push-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "tiffndla0423",
"database.server.id": "184054",
"database.server.name": "queue_system_db",
"database.allowPublicKeyRetrieval": "true",
"database.include.list": "queue_system_db",
"table.include.list": "queue_system_db.outbox",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "dbhistory.queueing_system_db",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex": "(.*)",
"transforms.addTopicPrefix.replacement": "$1"
}
}'
[ 커넥터 목록 확인 ]
curl http://localhost:8083/connectors
[ 등록된 커넥터 삭제 ]
curl -X DELETE http://localhost:8083/connectors/event-push-connector
MySQL로 Outbox 테이블에 임의로 변경 사항 발생
INSERT INTO outbox
( aggregate_type, aggregate_id, event_type, queue_type, user_id)
VALUES
('test01', 01, 'QUEUE_REGISTERED', 'test01','test_user_01');
카프카 명령어로 확인 ( 성공적으로 발행 성공 )
./bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic queue_system_db.queue_system_db.outbox \
--from-beginning

{
"schema": {
...
},
"payload": {
"id": 6,
"aggregate_id": 1,
"aggregate_type": "test01",
"event_type": "QUEUE_REGISTERED",
"queue_type": "test01",
"user_id": "test_user_01"
}
}
[ Debezium이 테이블 변경 사항을 Kafka로 produce 했을 때 consume 되는 메세지 형식 ]
{
"schema": {...},
"payload": {
"id": 4,
"aggregate_id": ###,
"aggregate_type": ###,
"event_type": ###,
"queue_type": ###,
"user_id": ###
}
}
@KafkaListener(topics = "queue_system_db.queue_system_db.outbox")
public void kafkaConsume(String message) {
try {
JsonNode node = objectMapper.readTree(message);
JsonNode payload = node.path("payload");
String queueType = payload.get("queue_type").asText();
String userId = payload.get("user_id").asText();
log.info("consume - queueType : {} , userId : {}", queueType, userId);
SseEventService.sink.tryEmitNext(queueType);
} catch (Exception e) {
log.error("Kafka 메시지 소비 실패", e);
}
}
// 정상적으로 consume
...KafkaConsumerService : consume - queueType : test01 , userId : test_user_01