Polling-Publisher
모델은 지속적으로 데이터베이스에 조회 쿼리를 실행해서, 처리되지 않은 메시지를 가져옵니다.
그로 인해 데이터베이스
에 많은 부하가 발생하며, 부가적으로 메시지 처리 후 Update 쿼리
를 실행시키는 등의 작업으로 추가적인 부하도 일어납니다.
DB
의 부하를 줄이고, 데이터의 생성 뿐만 아니라 업데이트, 삭제 등의 이벤트도 감지할 수 있는 CDC
를 활용하여 마지막 개선을 해보겠습니다.
CDC(Change Data Capture)
는 데이터가 변경되는 시점과 변경 항목을 추적하고 이러한 변경이 전파되어야 하는 다른 시스템이나 서비스에 알림을 전송하는 패턴입니다.
CDC 솔루션
도 여러가지 제품이 있지만 그 중에서 Debezium MySQL conenctor를 사용하도록 하겠습니다.
이 그림을 보면 주문 서비스
에서 주문이 발생하고 데이터의 변경(저장)이 일어나서 Debezium Event Router
를 통해 Kafka
로 메시지가 발행되고 토픽
을 구독
하는 배송, 회원 서비스가 메시지를 수신합니다.
이제 직접 Debezium MySQL Connector
를 설정해보겠습니다.
저는 MySQL
, Kafka
, Zookeeper
, Spring Service
, Debezium Connector
를 docker-compose
로 구성했습니다.
version: '3.7'
services:
mysql1:
image: mysql:8.0
platform: linux/x86_64
container_name: mysql1
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: post_service_db
volumes:
- mysql1_data:/var/lib/mysql
- ./mysql/my1.cnf:/etc/mysql/my.cnf
ports:
- "3306:3306"
networks:
my-network:
ipv4_address: 172.20.0.2
mysql2:
image: mysql:8.0
platform: linux/x86_64
container_name: mysql2
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: subscribe_service_db
volumes:
- mysql2_data:/var/lib/mysql
- ./mysql/my2.cnf:/etc/mysql/my.cnf
ports:
- "3307:3306"
networks:
my-network:
ipv4_address: 172.20.0.3
zookeeper:
image: debezium/zookeeper:2.7
container_name: zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafka:
image: debezium/kafka:2.7
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
connect:
image: debezium/connect:2.7
container_name: connect
depends_on:
- kafka
- mysql1
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
subscribe-service:
build: ../v3/subscribe-service
container_name: subscribe-service
ports:
- "8082:8082"
depends_on:
- mysql2
- kafka
environment:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql2:3306/subscribe_service_db?useSSL=false&serverTimezone=UTC
SPRING_DATASOURCE_USERNAME: root
SPRING_DATASOURCE_PASSWORD: root
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
restart: always
post-service:
build: ../v3/post-service
container_name: post-service
ports:
- "8081:8081"
depends_on:
- mysql1
environment:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql1:3306/post_service_db?useSSL=false&serverTimezone=UTC
SPRING_DATASOURCE_USERNAME: root
SPRING_DATASOURCE_PASSWORD: root
restart: always
volumes:
mysql1_data:
mysql2_data:
MySQL
에서 사용되는 my.cnf
파일은 아래와 같습니다.
[client]
default-character-set = utf8mb4
[mysql]
default-character-set = utf8mb4
# MySQL 서버(daemon)의 설정을 지정한다.
[mysqld]
character-set-client-handshake = FALSE
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
default-time-zone='+9:00'
# 이진 로그를 사용해서 MySQL 서버에서 발행하는 변경 사항을 기록한다. mysql-bin은 이진로그의 파일 이름을 지정한다.
log-bin = mysql-bin
# MySQL 사용자의 인증 플러그인을 mysql_native_password로 지정한다 -> 5.7 이전 버전과의 호환성을 유지하기 위해 사용되는 매커니즘이다.
default_authentication_plugin=mysql_native_password
여기서 가장 중요한 부분은 이진 로그
를 사용하는 설정을 하는 구간입니다.
Debezium MySQL Connector
는 이진 로그를 활용하여 데이터의 변경을 추적하기 때문입니다.
이제 순서대로 Debezium Connector
를 설정해보겠습니다.
Debezium Connector
에 새로운 Connector
설정하기{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql1",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"database.server.id": "184054",
"topic.prefix": "post-create-event",
"table.include.list": "post_service_db.post_event_outbox",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "post-create-event"
}
}
위의 JSON
을 Body
에 담아서 POST http://localhost:8083/connectors 요청을 보냅니다.
각 필드에 대해서 간단히 설명을 드리면 아래와 같습니다.
database.hostname
: DB 접속 IP (같은 네트워크를 사용하므로 컨테이너 이름을 지정)database.server.id
: 데이터베이스 클라이언트의 숫자 ID로, MySQL 클러스터에서 현재 실행 중인 모든 데이터베이스 프로세스에서 고유해야 합니다.topic.prefix
: 모든 Kafka 토픽의 접두사로 사용됩니다.table.include.list
: database.tableName 형식으로 변경 사항을 캡쳐하려는 테이블을 지정합니다.요청을 보내면 아래와 같은 응답이 오고, Connector
가 생성됩니다.
더 자세한 내용은 Debezium Tutorial 공식 문서를 확인해주세요:)
이러면 설정이 잘 됬는지 궁금해지실겁니다.
제가 확인한 방법은 Kafka 토픽
을 조회해서 Debezium Connector
가 토픽을 생성했는지 확인하는 방식입니다.
$ kafka-topics.sh --bootstrap-server=kafka:9092 --list
이 명령어로 Kafka
에 생성된 토픽을 조회하면 커넥터
가 연결되었다는 것을 알 수 있습니다.
그렇다면 post-create-event
에는 어떤 메시지가 있는지 보겠습니다.
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic post-create-event --from-beginning
이 명령어
로 토픽에 생성된 메시지를 볼 수 있습니다.
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false,incremental"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"sequence"
},
{
"type":"int64",
"optional":true,
"field":"ts_us"
},
{
"type":"int64",
"optional":true,
"field":"ts_ns"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"int64",
"optional":false,
"field":"server_id"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"int64",
"optional":true,
"field":"thread"
},
{
"type":"string",
"optional":true,
"field":"query"
}
],
"optional":false,
"name":"io.debezium.connector.mysql.Source",
"field":"source"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"field":"databaseName"
},
{
"type":"string",
"optional":true,
"field":"schemaName"
},
{
"type":"string",
"optional":true,
"field":"ddl"
},
{
"type":"array",
"items":{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"type"
},
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":true,
"field":"defaultCharsetName"
},
{
"type":"array",
"items":{
"type":"string",
"optional":false
},
"optional":true,
"field":"primaryKeyColumnNames"
},
{
"type":"array",
"items":{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int32",
"optional":false,
"field":"jdbcType"
},
{
"type":"int32",
"optional":true,
"field":"nativeType"
},
{
"type":"string",
"optional":false,
"field":"typeName"
},
{
"type":"string",
"optional":true,
"field":"typeExpression"
},
{
"type":"string",
"optional":true,
"field":"charsetName"
},
{
"type":"int32",
"optional":true,
"field":"length"
},
{
"type":"int32",
"optional":true,
"field":"scale"
},
{
"type":"int32",
"optional":false,
"field":"position"
},
{
"type":"boolean",
"optional":true,
"field":"optional"
},
{
"type":"boolean",
"optional":true,
"field":"autoIncremented"
},
{
"type":"boolean",
"optional":true,
"field":"generated"
},
{
"type":"string",
"optional":true,
"field":"comment"
},
{
"type":"string",
"optional":true,
"field":"defaultValueExpression"
},
{
"type":"array",
"items":{
"type":"string",
"optional":false
},
"optional":true,
"field":"enumValues"
}
],
"optional":false,
"name":"io.debezium.connector.schema.Column",
"version":1
},
"optional":false,
"field":"columns"
},
{
"type":"string",
"optional":true,
"field":"comment"
}
],
"optional":true,
"name":"io.debezium.connector.schema.Table",
"version":1,
"field":"table"
}
],
"optional":false,
"name":"io.debezium.connector.schema.Change",
"version":1
},
"optional":false,
"field":"tableChanges"
}
],
"optional":false,
"name":"io.debezium.connector.mysql.SchemaChangeValue",
"version":1
},
"payload":{
"source":{
"version":"2.7.0.Final",
"connector":"mysql",
"name":"post-create-event",
"ts_ms":1719795356101,
"snapshot":"true",
"db":"post_service_db",
"sequence":null,
"ts_us":1719795356101473,
"ts_ns":1719795356101473000,
"table":"post_event_outbox",
"server_id":0,
"gtid":null,
"file":"mysql-bin.000003",
"pos":3205,
"row":0,
"thread":null,
"query":null
},
"ts_ms":1719795356103,
"databaseName":"post_service_db",
"schemaName":null,
"ddl":"CREATE TABLE `post_event_outbox` (\n `author_id` bigint NOT NULL,\n `created_at` timestamp NOT NULL,\n `post_id` bigint NOT NULL,\n `updated_at` timestamp NOT NULL,\n `event_id` binary(16) NOT NULL,\n `title` text COLLATE utf8mb4_unicode_ci NOT NULL,\n PRIMARY KEY (`event_id`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci",
"tableChanges":[
{
"type":"CREATE",
"id":"\"post_service_db\".\"post_event_outbox\"",
"table":{
"defaultCharsetName":"utf8mb4",
"primaryKeyColumnNames":[
"event_id"
],
"columns":[
{
"name":"author_id",
"jdbcType":-5,
"nativeType":null,
"typeName":"BIGINT",
"typeExpression":"BIGINT",
"charsetName":null,
"length":null,
"scale":null,
"position":1,
"optional":false,
"autoIncremented":false,
"generated":false,
"comment":null,
"defaultValueExpression":null,
"enumValues":null
},
{
"name":"created_at",
"jdbcType":2014,
"nativeType":null,
"typeName":"TIMESTAMP",
"typeExpression":"TIMESTAMP",
"charsetName":null,
"length":null,
"scale":null,
"position":2,
"optional":false,
"autoIncremented":false,
"generated":false,
"comment":null,
"defaultValueExpression":null,
"enumValues":null
},
{
"name":"post_id",
"jdbcType":-5,
"nativeType":null,
"typeName":"BIGINT",
"typeExpression":"BIGINT",
"charsetName":null,
"length":null,
"scale":null,
"position":3,
"optional":false,
"autoIncremented":false,
"generated":false,
"comment":null,
"defaultValueExpression":null,
"enumValues":null
},
{
"name":"updated_at",
"jdbcType":2014,
"nativeType":null,
"typeName":"TIMESTAMP",
"typeExpression":"TIMESTAMP",
"charsetName":null,
"length":null,
"scale":null,
"position":4,
"optional":false,
"autoIncremented":false,
"generated":false,
"comment":null,
"defaultValueExpression":null,
"enumValues":null
},
{
"name":"event_id",
"jdbcType":-2,
"nativeType":null,
"typeName":"BINARY",
"typeExpression":"BINARY",
"charsetName":null,
"length":16,
"scale":null,
"position":5,
"optional":false,
"autoIncremented":false,
"generated":false,
"comment":null,
"defaultValueExpression":null,
"enumValues":null
},
{
"name":"title",
"jdbcType":12,
"nativeType":null,
"typeName":"TEXT",
"typeExpression":"TEXT",
"charsetName":"utf8mb4",
"length":null,
"scale":null,
"position":6,
"optional":false,
"autoIncremented":false,
"generated":false,
"comment":null,
"defaultValueExpression":null,
"enumValues":null
}
],
"comment":null
}
}
]
}
}
이 메시지는 여러가지 메타데이터
설정 및 post_event_outbox
테이블의 DDL이 실행된 히스토리 정보임을 알 수 있습니다.
그렇다면 실제 데이터
가 변경 되었을때 기록은 어떤 토픽
에 있을까요?
게시글 작성 요청
을 한 번 보내보겠습니다.
요청을 보낸 뒤, 토픽 리스트
를 확인해보니 새로운 토픽이 생겼습니다.
이 토픽
의 메시지
를 확인해보면 아래와 같습니다.
{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"author_id"
},
{
"type":"string",
"optional":false,
"name":"io.debezium.time.ZonedTimestamp",
"version":1,
"field":"created_at"
},
{
"type":"int64",
"optional":false,
"field":"post_id"
},
{
"type":"string",
"optional":false,
"name":"io.debezium.time.ZonedTimestamp",
"version":1,
"field":"updated_at"
},
{
"type":"bytes",
"optional":false,
"field":"event_id"
},
{
"type":"string",
"optional":false,
"field":"title"
}
],
"optional":true,
"name":"post-create-event.post_service_db.post_event_outbox.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"author_id"
},
{
"type":"string",
"optional":false,
"name":"io.debezium.time.ZonedTimestamp",
"version":1,
"field":"created_at"
},
{
"type":"int64",
"optional":false,
"field":"post_id"
},
{
"type":"string",
"optional":false,
"name":"io.debezium.time.ZonedTimestamp",
"version":1,
"field":"updated_at"
},
{
"type":"bytes",
"optional":false,
"field":"event_id"
},
{
"type":"string",
"optional":false,
"field":"title"
}
],
"optional":true,
"name":"post-create-event.post_service_db.post_event_outbox.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false,incremental"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"sequence"
},
{
"type":"int64",
"optional":true,
"field":"ts_us"
},
{
"type":"int64",
"optional":true,
"field":"ts_ns"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"int64",
"optional":false,
"field":"server_id"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"int64",
"optional":true,
"field":"thread"
},
{
"type":"string",
"optional":true,
"field":"query"
}
],
"optional":false,
"name":"io.debezium.connector.mysql.Source",
"field":"source"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"name":"event.block",
"version":1,
"field":"transaction"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"int64",
"optional":true,
"field":"ts_us"
},
{
"type":"int64",
"optional":true,
"field":"ts_ns"
}
],
"optional":false,
"name":"post-create-event.post_service_db.post_event_outbox.Envelope",
"version":2
},
"payload":{
"before":null,
"after":{
"author_id":1,
"created_at":"2024-06-30T19:40:00Z",
"post_id":8,
"updated_at":"2024-06-30T19:40:00Z",
"event_id":"C2bcliDMQGWvIjM7UmDfnQ==",
"title":"Rex Test - 1"
},
"source":{
"version":"2.7.0.Final",
"connector":"mysql",
"name":"post-create-event",
"ts_ms":1719808800000,
"snapshot":"false",
"db":"post_service_db",
"sequence":null,
"ts_us":1719808800000000,
"ts_ns":1719808800000000000,
"table":"post_event_outbox",
"server_id":1,
"gtid":null,
"file":"mysql-bin.000003",
"pos":5958,
"row":0,
"thread":103,
"query":null
},
"transaction":null,
"op":"c",
"ts_ms":1719808800201,
"ts_us":1719808800201982,
"ts_ns":1719808800201982000
}
}
payload.after
를 보면 데이터가 어떻게 저장되어 있는지 알 수 있습니다.
여기서 알 수 있는 점은 데이터 변경
시 메시지가 쌓이는 토픽은 {topic.prefix}.{database}.{table} 규칙으로 생성
된다는 것입니다.
이제 V2의 코드를 약간 수정하여 적용시켜 보겠습니다.
Post-Service
의 변경 포인트는 스케줄러 삭제와 Kafka 의존성 제거입니다.
Subscribe-Service
는 Kafka Listener에 변경이 있어, 이 부분을 수정해보겠습니다.
@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
private final KafkaConsumerProperties kafkaConsumerProperties;
private final CreatePostEventDeserializer createPostEventDeserializer;
@Bean
ConcurrentKafkaListenerContainerFactory<String, CreatePostEvent> concurrentKafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, CreatePostEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
ConsumerFactory<String, CreatePostEvent> consumerFactory() {
Map<String, Object> consumerConfigs = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroupId(),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CreatePostEventDeserializer.class,
JsonDeserializer.TRUSTED_PACKAGES, "*",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"
);
return new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(), new ErrorHandlingDeserializer<>(createPostEventDeserializer));
}
}
Consumer Factory
를 직렬화와 역직렬화를 모두 문자열 기반으로 하도록 수정했습니다.
schema.payload.after
데이터를 사용해야 하는데 하나의 비즈니스 처리를 위해 세 개의 객체를 중첩해서 쌓는 것은 좋지 않다고 판단
했습니다.
따라서 CustomDeserializer
를 작성하여 Config Class
에 넣어주었습니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class CreatePostEventDeserializer implements Deserializer<CreatePostEvent> {
private final ObjectMapper objectMapper;
@Override
public CreatePostEvent deserialize(String topic, byte[] data) {
try {
log.info("이벤트 수신");
JsonNode extractedData = objectMapper.readTree(data)
.get("payload")
.get("after");
log.error("새로운 이벤트 = [{}]", extractedData.toPrettyString());
return objectMapper.treeToValue(extractedData, CreatePostEvent.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
KafkaListener
는 기존과 거의 동일합니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class CreatePostListener {
private final SendNotificationUseCase sendNotificationUseCase;
@KafkaListener(topics = "post-create-event.post_service_db.post_event_outbox", groupId = "post-group", containerFactory = "concurrentKafkaListenerContainerFactory")
void consumeEvent(CreatePostEvent event) {
log.info("Create Post Event 수신 Request Data = [{}], 수신 날짜 = [{}]", event, LocalDateTime.now());
final SendNotificationServiceRequest serviceRequest = SendNotificationServiceRequest.of(event.eventId(), event.postId(),event.authorId(), event.title());
sendNotificationUseCase.sendNotification(serviceRequest);
}
}
메시지 데이터의 필드와 CreatePostEvent
객체의 필드는 표기법의 차이가 있습니다.
따라서, @JsonProperty
애너테이션을 이용해서 맞춰주겠습니다.
public record CreatePostEvent(
@JsonProperty("event_id")
UUID eventId,
@JsonProperty("post_id")
Long postId,
@JsonProperty("author_id")
Long authorId,
String title) {
}
이제 테스트
를 통해 정상 동작하는지 확인해보겠습니다.
테스트 방식은 V2
와 같이 Jmeter
로 1초에 10번 게시글 작성 요청을 보냅니다.
메시지
가 구독자들에게 정상적으로 도착한 것을 확인할 수 있습니다.
이번에는 Zookeeper
와 Kafka
, Debezium Connector
를 모두 끄고 테스트 해보겠습니다.
Jmeter
로 1초에 10번 게시글 작성 요청을 보냅니다.
Zookeeper
와 kafka
, Debezium Connector
를 다시 켜줍니다.
메시지
가 정상 수신되는 것을 확인할 수 있습니다. 깨알 Trouble Shooting!
컨테이너
를 종료하고 다시 실행하는 과정에서, 도커 캐시
로 인하여 이전 IP 주소로 연결을 시도해 컨테이너
끼리 연결이 안되는 문제가 발생했습니다.
저는 docker network
를 명시적으로 생성하고, IP 주소를 고정 할당하여 문제를 해결했습니다.
docker network create --driver bridge --subnet 172.20.0.0/16 service-network
version: '3.7'
services:
mysql1:
image: mysql:8.0
platform: linux/x86_64
container_name: mysql1
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: post_service_db
volumes:
- mysql1_data:/var/lib/mysql
- ./mysql/my1.cnf:/etc/mysql/my.cnf
ports:
- "3306:3306"
networks:
my-network:
ipv4_address: 172.20.0.2
mysql2:
image: mysql:8.0
platform: linux/x86_64
container_name: mysql2
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: subscribe_service_db
volumes:
- mysql2_data:/var/lib/mysql
- ./mysql/my2.cnf:/etc/mysql/my.cnf
ports:
- "3307:3306"
networks:
my-network:
ipv4_address: 172.20.0.3
zookeeper:
image: debezium/zookeeper:2.7
container_name: zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
networks:
my-network:
ipv4_address: 172.20.0.6
kafka:
image: debezium/kafka:2.7
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
networks:
my-network:
ipv4_address: 172.20.0.7
connect:
image: debezium/connect:2.7
container_name: connect
depends_on:
- kafka
- mysql1
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
networks:
my-network:
ipv4_address: 172.20.0.8
subscribe-service:
build: ../v3/subscribe-service
container_name: subscribe-service
ports:
- "8082:8082"
depends_on:
- mysql2
- kafka
environment:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql2:3306/subscribe_service_db?useSSL=false&serverTimezone=UTC
SPRING_DATASOURCE_USERNAME: root
SPRING_DATASOURCE_PASSWORD: root
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
networks:
my-network:
ipv4_address: 172.20.0.9
restart: always
post-service:
build: ../v3/post-service
container_name: post-service
ports:
- "8081:8081"
depends_on:
- mysql1
environment:
SPRING_DATASOURCE_URL: jdbc:mysql://mysql1:3306/post_service_db?useSSL=false&serverTimezone=UTC
SPRING_DATASOURCE_USERNAME: root
SPRING_DATASOURCE_PASSWORD: root
networks:
my-network:
ipv4_address: 172.20.0.10
restart: always
networks:
my-network:
external: true
name: service-network
volumes:
mysql1_data:
mysql2_data:
총 3편의 포스팅을 통해 Transactional Outbox Pattern
을 간단히 구현해보고 학습했습니다.
특히 CDC
같은 경우에는 이진 로그 기반
으로 데이터의 변경을 추적하여 이벤트의 순서가 뒤바뀌거나 Polling
처럼 데이터베이스 부하를 일으킬 염려도 적습니다.
Debezium
은 자체적으로 Exactly Once
옵션을 지원하지만 데이터 정합성 문제는 매우 중요하므로 혹시 모를 사태에 대비하기 위해 Consumer
의 멱등한 처리는 유지했습니다.
CDC
는 MySQL
에 제공된 유저의 정보로 로그인하여 이진 로그
를 확인해, 스냅샷
을 만들고 초기 스냅샷 생성 이후로 MySQL의 바이너리 로그를 지속적으로 모니터링 합니다.
이를 통해 실시간성
을 유지할 수 있으며, 이 방식 또한 MySQL
에 지속적으로 연결되어 있어야 하지만 추가적인 쿼리 부하를 발생시키지 않아 효율적
입니다.
지속적인 연결 유지로 인해 일부 리소스
가 사용되지만 데이터베이스
에 직접적으로 부하를 주는 주기적인 폴링 방식보다는 훨씩 적은 부하
를 발생시킵니다.
따라서, 저는 CDC
방식이 Transactional Outbox Pattern
을 구현하는 적합한 방식이라고 생각합니다.
지금까지 이 시리즈를 읽어주셔서 감사합니다.
V3 코드 레포지토리
-> 이동
시리즈 잘 읽었습니다. 그런데 간단히 동기적으로 카프카에 이벤트를 보낸 뒤, two-phase commit을 써서 간단하게 해결할 수도 있었을 거 같은데 db에 이벤트를 저장하시는 방법을 쓰신 이유가 있을까요?