

오늘 포스팅은 기존 Saga 패턴 이후의 추가 장애 대응 편의 마지막 포스팅이고, 다루게 될 내용은 CDC와 Transactional Outbox Pattern을 통해 메시지 유실 없이 보다 더 신뢰성 있는 MSA 시스템을 구축해 보는 과정을 알아보도록 하겠습니다.
👉👉기존 SAGA 패턴 적용 포스팅
👉👉Github 참고
👉👉Saga 패턴 이후의 추가 장애 대응 1편
👉👉Saga 패턴 이후의 추가 장애 대응 2편

쿠폰 사용+결제 프로세스는 SAGA 패턴을 통해 결과적 일관성을 확보하여 분산 트랜잭션을 제어했습니다. 위 그림은 정상적인 쿠폰 적용 후 결제 진행 및 문제 발생 시 SAGA 패턴의 흐름을 보여줍니다.
SAGA 패턴을 사용하여:
하지만, 큰 문제가 있었습니다:
결제 요청 이벤트나 보상 트랜잭션 이벤트를 발행하기 직전에 서비스가 중단된다면?
👉 발행되어야 할 이벤트가 손실되는 심각한 문제 발생
장점:
CDC는 데이터베이스의 변경 사항을 실시간으로 감지하고 다른 시스템으로 전파하는 기술입니다.
CDC도 여러가지가 있지만 그 중에서 Debezium MySQL conenctor를 사용하도록 하겠습니다.
Q: Outbox 패턴을 사용 하지 않고 SAGA 패턴만으로는 충분하지 않은가?
Q: Outbox 패턴만 사용하면 되지 않을까?
Q: CDC 대신 배치 보상 프로세스를 사용하면?
Q: 상태 변경 후 이벤트를 발생시키는 프로듀서에 문제 발생 시?
Q: CDC CONNECTOR가 다운되버리는 상황이면 이벤트 유실이 발생하지 않나?
Q: 컨슈머가 다운되버리는 상황이면 이벤트 유실이 발생하지 않나?
이제 의사 결정과 추가 장애 대응 분석을 마치고
직접 Transactional Outbox Pattern +Debezium MySQL Connector을 설정해보겠습니다.


쿠폰 서비스:
결제 서비스:
Kafka Connect (Debezium):
이 설계로 서비스 간 데이터 일관성을 보장하고, 메시지 손실 위험을 최소화할 수 있습니다.
@Entity
@Getter
@Table(name = "outbox_events")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class OutboxEvent {
    @Id
    private String id;
    @Column(name = "aggregate_id", nullable = false)
    private String aggregateId;
    @Column(name = "aggregate_type", nullable = false)
    private String aggregateType;
    @Column(name = "event_type", nullable = false)
    private String eventType;
    @Column(name = "payload", nullable = false, columnDefinition = "json")
    private String payload;
    @Column(name = "created_at", nullable = false)
    private LocalDateTime createdAt;
    public OutboxEvent(String aggregateId, String aggregateType, String eventType, String payload) {
        this.id = UUID.randomUUID().toString();
        this.aggregateId = aggregateId;
        this.aggregateType = aggregateType;
        this.eventType = eventType;
        this.payload = payload;
        this.createdAt = LocalDateTime.now(ZoneId.of("Asia/Seoul"));
    }
}
@Transactional
public void useCouponAndPayment(PaymentRequestDto paymentRequestDto) throws JsonProcessingException {
    // Transactional outbox pattern + CDC 적용
    OutboxEvent outboxEvent = parsingEvent(paymentRequestDto);
    
    if (paymentRequestDto.getCouponStatus() == CouponStatus.쿠폰미사용) {
        log.info("======couponUseConsumer Data :{}======", paymentRequestDto);
        outboxEventRepository.save(outboxEvent);
        return;
    }
    
    Coupon coupon = couponRepository.findByMemberNoAndCouponNo(paymentRequestDto.getMemberNo(), paymentRequestDto.getCouponNo());
    //사용완료 및 변경감지
    coupon.useCoupon();
    log.info("======couponUseConsumer Data :{}======", paymentRequestDto);
    
    //쿠폰 적용완료후 결제 요청
    outboxEventRepository.save(outboxEvent);
}
@KafkaListener(topics = "Coupon.events", containerFactory = "paymentListenerContainerFactory")
public void handlePayment(String eventPayload) throws JsonProcessingException {
    OutboxEvent outboxEvent = objectMapper.readValue(eventPayload, OutboxEvent.class);
    PaymentRequestDto paymentRequestDto = objectMapper.readValue(outboxEvent.getPayload(), PaymentRequestDto.class);
    log.info("결제 요청 시작 ==> 결제 정보 : {}", paymentRequestDto);
    paymentService.tradePayMoney(paymentRequestDto);
}
@Transactional
public void tradePayMoney(PaymentRequestDto paymentRequestDto) throws JsonProcessingException {
    OutboxEvent outboxEvent = parsingEvent(paymentRequestDto);
    try {
        Payment buyerPayment = paymentRepository.findByMemberNo(paymentRequestDto.getMemberNo());
        Integer buyerPayMoney = paymentRequestDto.getPayMoney();
        if (paymentRequestDto.getCouponStatus() == CouponStatus.쿠폰사용) {
            buyerPayMoney += paymentRequestDto.getDiscountPrice();
        }
        paymentRepository.tradePayMoney(paymentRequestDto.getSellerNo(), buyerPayment.getMemberNo(), paymentRequestDto, buyerPayMoney);
    } catch (Exception e) {
        log.error("===[결제 요청 오류] -> coupon-rollback , 쿠폰 번호 :{} / {}====", paymentRequestDto.getCouponNo(), e.getMessage());
        outboxEventRepository.save(outboxEvent);
    }
}
@KafkaListener(topics = "Payment.events", containerFactory = "paymentFailedListenerContainerFactory")
public void handleCouponRollbackEvent(String eventPayload) throws JsonProcessingException {
    OutboxEvent outboxEvent = objectMapper.readValue(eventPayload, OutboxEvent.class);
    PaymentRequestDto paymentRequestDto = objectMapper.readValue(outboxEvent.getPayload(), PaymentRequestDto.class);
    log.info("======보상 트랜잭션 동작 , 쿠폰 번호 :{} =====", paymentRequestDto.getCouponNo());
    couponService.revertCouponStatus(paymentRequestDto);
}
## Kafka 컨테이너에 접속하여 플러그인이 위치할 connectors 폴더를 생성
docker exec -it kafka bash
cd /opt/kafka_2.13-2.8.1
mkdir connectors
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
## 압축 풀기!
tar -zxvf debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
cd /opt/kafka_2.13-2.8.1/config
vim connect-distributed.properties
# 설정 파일에 아래 라인 추가!
plugin.path=/opt/kafka_2.13-2.8.1/connectors
## Kafka Connect를 distributed 모드로 실행해줍니다.
connect-distributed.sh -daemon /opt/kafka/config/connect-distributed.properties
# Kafka Connect 확인
curl http://localhost:8083/
## 결과
[
    {
        "class": "io.debezium.connector.mysql.MySqlConnector",
        "type": "source",
        "version": "2.7.0.Final"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "2.8.1"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "2.8.1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "1"
    }
]
curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{
  "name": "coupon-outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "<MYSQL_HOST>",
    "database.port": "<MYSQL_PORT>",
    "database.user": "<MYSQL_USER>",
    "database.password": "<MYSQL_PASSWORD>",
    "database.server.id": "1",
    "database.include.list": "coupon",
    "table.include.list": "coupon.outbox_events",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.timestamp": "created_at",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.field.event.type": "event_type",
    "database.history.kafka.bootstrap.servers": "<KAFKA_BROKER_1>:<PORT>,<KAFKA_BROKER_2>:<PORT>,<KAFKA_BROKER_3>:<PORT>",
    "schema.history.internal.kafka.bootstrap.servers": "<KAFKA_BROKER_1>:<PORT>,<KAFKA_BROKER_2>:<PORT>,<KAFKA_BROKER_3>:<PORT>",
    "schema.history.internal.kafka.topic": "schema-changes.coupon",
    "topic.prefix": "coupon.outbox"
  }
}'
이 설정으로 Debezium은 outbox_events 테이블의 변경 사항을 감지하고, 이를 지정된 Kafka 토픽으로 전송합니다.
curl -X GET http://localhost:8083/connectors/coupon-outbox-connector/status
curl -X GET http://localhost:8083/connectors/payment-outbox-connector/status
## 결과
{"name":"coupon-outbox-connector","connector":{"state":"RUNNING","worker_id":"172.18.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.18.0.3:8083"}],"type":"source"}
{"name":"payment-outbox-connector","connector":{"state":"RUNNING","worker_id":"172.18.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.18.0.3:8083"}],"type":"source"}
이 과정을 통해 Outbox 패턴과 CDC를 결합한 이벤트 발행 시스템을 구축할 수 있습니다. 이 시스템은 데이터베이스 트랜잭션과 이벤트 발행의 일관성을 보장하며, 장애 상황에서도 안정적으로 동작할 수 있습니다.
outbox_events 테이블에 넣을 데이터를 준비해줍니다!
INSERT INTO outbox_events (id, aggregate_id, aggregate_type, created_at, event_type, payload)
VALUES (
'3x2clq32-31xx-4743-b93d-5d84ed8a5236',
‘4’,
'Coupon',
'2024-07-17 18:19:59.236696',
'payment',
'{"couponNo": 4, "memberNo": 1, "payMoney": 20300, "sellerNo": 2, "couponStatus": "쿠폰사용", "discountPrice": 3000}'
);
Debezium Kafka Connect가 해당 테이블의 로그를 통해 변경 사항을 감지합니다



Debezium MySQL Connector를 pause 시켜놓고 이벤트가 유실이 안되는지 확인해보도록 하겠습니다.
curl -X PUT http://localhost:8083/connectors/[connector 이름]/pause

멈춰놓고 다시 outbox_event 테이블에 데이터를 insert를 해주고, 다시 커넥터를 재시작하고 로그를 확인해보면 정상적으로 이벤트를 전달하는 것을 확인할 수 있습니다!

오늘 포스팅을 끝으로 SAGA 패턴을 적용한 쿠폰 서비스 + 결제 서비스 프로세스를 개선하는 과정을 알아봤습니다. 그전까지 진행했던 과정을 정리해보도록 하겠습니다!
가용성과 확장성을 크게 향상시켰습니다.polling 방식이 아닌 Debezium을 활용한 CDC로 데이터베이스 변경 사항을 감지하기 때문에 결제 프로세스에 실시간성을 향상시켰습니다긴 포스팅 읽어주셔서 감사합니다!! 잘못된 점이나 궁금하신 점은 편하게 피드백 주시면 감사하겠습니다 🙏🙏
https://techblog.woowahan.com/10000/
https://techblog.uplus.co.kr/debezium%EC%9C%BC%EB%A1%9C-db-synchronization-%EA%B5%AC%EC%B6%95%ED%95%98%EA%B8%B0-1b6fba73010f
https://velog.io/@jm94318/3%ED%8E%B8-MSA-%EC%9D%B4%EB%B2%A4%ED%8A%B8-%EB%B0%9C%ED%96%89-Debezium-Transformations
https://blog.gangnamunni.com/post/transactional-outbox/