오늘 포스팅은 기존 Saga 패턴 이후의 추가 장애 대응 편의 마지막 포스팅이고, 다루게 될 내용은 CDC
와 Transactional Outbox Pattern
을 통해 메시지 유실 없이 보다 더 신뢰성 있는 MSA 시스템을 구축해 보는 과정을 알아보도록 하겠습니다.
👉👉기존 SAGA 패턴 적용 포스팅
👉👉Github 참고
👉👉Saga 패턴 이후의 추가 장애 대응 1편
👉👉Saga 패턴 이후의 추가 장애 대응 2편
쿠폰 사용+결제 프로세스는 SAGA 패턴을 통해 결과적 일관성을 확보하여 분산 트랜잭션을 제어했습니다. 위 그림은 정상적인 쿠폰 적용 후 결제 진행 및 문제 발생 시 SAGA 패턴의 흐름을 보여줍니다.
SAGA 패턴을 사용하여:
하지만, 큰 문제가 있었습니다:
결제 요청 이벤트나 보상 트랜잭션 이벤트를 발행하기 직전에 서비스가 중단된다면?
👉 발행되어야 할 이벤트가 손실되는 심각한 문제 발생
장점:
CDC는 데이터베이스의 변경 사항을 실시간으로 감지하고 다른 시스템으로 전파하는 기술입니다.
CDC도 여러가지가 있지만 그 중에서 Debezium MySQL conenctor
를 사용하도록 하겠습니다.
Q: Outbox 패턴을 사용 하지 않고 SAGA 패턴만으로는 충분하지 않은가?
Q: Outbox 패턴만 사용하면 되지 않을까?
Q: CDC 대신 배치 보상 프로세스를 사용하면?
Q: 상태 변경 후 이벤트를 발생시키는 프로듀서에 문제 발생 시?
Q: CDC CONNECTOR가 다운되버리는 상황이면 이벤트 유실이 발생하지 않나?
Q: 컨슈머가 다운되버리는 상황이면 이벤트 유실이 발생하지 않나?
이제 의사 결정과 추가 장애 대응 분석을 마치고
직접 Transactional Outbox Pattern
+Debezium MySQL Connector
을 설정해보겠습니다.
쿠폰 서비스:
결제 서비스:
Kafka Connect (Debezium):
이 설계로 서비스 간 데이터 일관성을 보장하고, 메시지 손실 위험을 최소화할 수 있습니다.
@Entity
@Getter
@Table(name = "outbox_events")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class OutboxEvent {
@Id
private String id;
@Column(name = "aggregate_id", nullable = false)
private String aggregateId;
@Column(name = "aggregate_type", nullable = false)
private String aggregateType;
@Column(name = "event_type", nullable = false)
private String eventType;
@Column(name = "payload", nullable = false, columnDefinition = "json")
private String payload;
@Column(name = "created_at", nullable = false)
private LocalDateTime createdAt;
public OutboxEvent(String aggregateId, String aggregateType, String eventType, String payload) {
this.id = UUID.randomUUID().toString();
this.aggregateId = aggregateId;
this.aggregateType = aggregateType;
this.eventType = eventType;
this.payload = payload;
this.createdAt = LocalDateTime.now(ZoneId.of("Asia/Seoul"));
}
}
@Transactional
public void useCouponAndPayment(PaymentRequestDto paymentRequestDto) throws JsonProcessingException {
// Transactional outbox pattern + CDC 적용
OutboxEvent outboxEvent = parsingEvent(paymentRequestDto);
if (paymentRequestDto.getCouponStatus() == CouponStatus.쿠폰미사용) {
log.info("======couponUseConsumer Data :{}======", paymentRequestDto);
outboxEventRepository.save(outboxEvent);
return;
}
Coupon coupon = couponRepository.findByMemberNoAndCouponNo(paymentRequestDto.getMemberNo(), paymentRequestDto.getCouponNo());
//사용완료 및 변경감지
coupon.useCoupon();
log.info("======couponUseConsumer Data :{}======", paymentRequestDto);
//쿠폰 적용완료후 결제 요청
outboxEventRepository.save(outboxEvent);
}
@KafkaListener(topics = "Coupon.events", containerFactory = "paymentListenerContainerFactory")
public void handlePayment(String eventPayload) throws JsonProcessingException {
OutboxEvent outboxEvent = objectMapper.readValue(eventPayload, OutboxEvent.class);
PaymentRequestDto paymentRequestDto = objectMapper.readValue(outboxEvent.getPayload(), PaymentRequestDto.class);
log.info("결제 요청 시작 ==> 결제 정보 : {}", paymentRequestDto);
paymentService.tradePayMoney(paymentRequestDto);
}
@Transactional
public void tradePayMoney(PaymentRequestDto paymentRequestDto) throws JsonProcessingException {
OutboxEvent outboxEvent = parsingEvent(paymentRequestDto);
try {
Payment buyerPayment = paymentRepository.findByMemberNo(paymentRequestDto.getMemberNo());
Integer buyerPayMoney = paymentRequestDto.getPayMoney();
if (paymentRequestDto.getCouponStatus() == CouponStatus.쿠폰사용) {
buyerPayMoney += paymentRequestDto.getDiscountPrice();
}
paymentRepository.tradePayMoney(paymentRequestDto.getSellerNo(), buyerPayment.getMemberNo(), paymentRequestDto, buyerPayMoney);
} catch (Exception e) {
log.error("===[결제 요청 오류] -> coupon-rollback , 쿠폰 번호 :{} / {}====", paymentRequestDto.getCouponNo(), e.getMessage());
outboxEventRepository.save(outboxEvent);
}
}
@KafkaListener(topics = "Payment.events", containerFactory = "paymentFailedListenerContainerFactory")
public void handleCouponRollbackEvent(String eventPayload) throws JsonProcessingException {
OutboxEvent outboxEvent = objectMapper.readValue(eventPayload, OutboxEvent.class);
PaymentRequestDto paymentRequestDto = objectMapper.readValue(outboxEvent.getPayload(), PaymentRequestDto.class);
log.info("======보상 트랜잭션 동작 , 쿠폰 번호 :{} =====", paymentRequestDto.getCouponNo());
couponService.revertCouponStatus(paymentRequestDto);
}
## Kafka 컨테이너에 접속하여 플러그인이 위치할 connectors 폴더를 생성
docker exec -it kafka bash
cd /opt/kafka_2.13-2.8.1
mkdir connectors
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
## 압축 풀기!
tar -zxvf debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
cd /opt/kafka_2.13-2.8.1/config
vim connect-distributed.properties
# 설정 파일에 아래 라인 추가!
plugin.path=/opt/kafka_2.13-2.8.1/connectors
## Kafka Connect를 distributed 모드로 실행해줍니다.
connect-distributed.sh -daemon /opt/kafka/config/connect-distributed.properties
# Kafka Connect 확인
curl http://localhost:8083/
## 결과
[
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "2.7.0.Final"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "2.8.1"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "2.8.1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{
"name": "coupon-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "<MYSQL_HOST>",
"database.port": "<MYSQL_PORT>",
"database.user": "<MYSQL_USER>",
"database.password": "<MYSQL_PASSWORD>",
"database.server.id": "1",
"database.include.list": "coupon",
"table.include.list": "coupon.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "${routedByValue}.events",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.timestamp": "created_at",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.type": "event_type",
"database.history.kafka.bootstrap.servers": "<KAFKA_BROKER_1>:<PORT>,<KAFKA_BROKER_2>:<PORT>,<KAFKA_BROKER_3>:<PORT>",
"schema.history.internal.kafka.bootstrap.servers": "<KAFKA_BROKER_1>:<PORT>,<KAFKA_BROKER_2>:<PORT>,<KAFKA_BROKER_3>:<PORT>",
"schema.history.internal.kafka.topic": "schema-changes.coupon",
"topic.prefix": "coupon.outbox"
}
}'
이 설정으로 Debezium은 outbox_events
테이블의 변경 사항을 감지하고, 이를 지정된 Kafka 토픽으로 전송합니다.
curl -X GET http://localhost:8083/connectors/coupon-outbox-connector/status
curl -X GET http://localhost:8083/connectors/payment-outbox-connector/status
## 결과
{"name":"coupon-outbox-connector","connector":{"state":"RUNNING","worker_id":"172.18.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.18.0.3:8083"}],"type":"source"}
{"name":"payment-outbox-connector","connector":{"state":"RUNNING","worker_id":"172.18.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.18.0.3:8083"}],"type":"source"}
이 과정을 통해 Outbox 패턴과 CDC를 결합한 이벤트 발행 시스템을 구축할 수 있습니다. 이 시스템은 데이터베이스 트랜잭션과 이벤트 발행의 일관성을 보장하며, 장애 상황에서도 안정적으로 동작할 수 있습니다.
outbox_events 테이블에 넣을 데이터를 준비해줍니다!
INSERT INTO outbox_events (id, aggregate_id, aggregate_type, created_at, event_type, payload)
VALUES (
'3x2clq32-31xx-4743-b93d-5d84ed8a5236',
‘4’,
'Coupon',
'2024-07-17 18:19:59.236696',
'payment',
'{"couponNo": 4, "memberNo": 1, "payMoney": 20300, "sellerNo": 2, "couponStatus": "쿠폰사용", "discountPrice": 3000}'
);
Debezium Kafka Connect가 해당 테이블의 로그를 통해 변경 사항을 감지합니다
Debezium MySQL Connector
를 pause 시켜놓고 이벤트가 유실이 안되는지 확인해보도록 하겠습니다.
curl -X PUT http://localhost:8083/connectors/[connector 이름]/pause
멈춰놓고 다시 outbox_event 테이블에 데이터를 insert를 해주고, 다시 커넥터를 재시작하고 로그를 확인해보면 정상적으로 이벤트를 전달하는 것을 확인할 수 있습니다!
오늘 포스팅을 끝으로 SAGA 패턴을 적용한 쿠폰 서비스 + 결제 서비스
프로세스를 개선하는 과정을 알아봤습니다. 그전까지 진행했던 과정을 정리해보도록 하겠습니다!
가용성
과 확장성
을 크게 향상시켰습니다.polling
방식이 아닌 Debezium을 활용한 CDC
로 데이터베이스 변경 사항을 감지하기 때문에 결제 프로세스에 실시간성을 향상시켰습니다긴 포스팅 읽어주셔서 감사합니다!! 잘못된 점이나 궁금하신 점은 편하게 피드백 주시면 감사하겠습니다 🙏🙏
https://techblog.woowahan.com/10000/
https://techblog.uplus.co.kr/debezium%EC%9C%BC%EB%A1%9C-db-synchronization-%EA%B5%AC%EC%B6%95%ED%95%98%EA%B8%B0-1b6fba73010f
https://velog.io/@jm94318/3%ED%8E%B8-MSA-%EC%9D%B4%EB%B2%A4%ED%8A%B8-%EB%B0%9C%ED%96%89-Debezium-Transformations
https://blog.gangnamunni.com/post/transactional-outbox/