[식구하자_MSA] Saga 패턴 이후의 추가 장애 대응: CDC+ Transactional Outbox Pattern 적용 - 3

이민우·2024년 7월 22일
6

🍀 식구하자_MSA

목록 보기
16/21
post-thumbnail

오늘 포스팅은 기존 Saga 패턴 이후의 추가 장애 대응 편의 마지막 포스팅이고, 다루게 될 내용은 CDCTransactional Outbox Pattern을 통해 메시지 유실 없이 보다 더 신뢰성 있는 MSA 시스템을 구축해 보는 과정을 알아보도록 하겠습니다.

👉👉기존 SAGA 패턴 적용 포스팅
👉👉Github 참고
👉👉Saga 패턴 이후의 추가 장애 대응 1편
👉👉Saga 패턴 이후의 추가 장애 대응 2편

👿 기존 SAGA 패턴만 사용했을 때의 문제 상황

SAGA 패턴 흐름도

쿠폰 사용+결제 프로세스는 SAGA 패턴을 통해 결과적 일관성을 확보하여 분산 트랜잭션을 제어했습니다. 위 그림은 정상적인 쿠폰 적용 후 결제 진행 및 문제 발생 시 SAGA 패턴의 흐름을 보여줍니다.

SAGA 패턴을 사용하여:

  • 쿠폰 사용 상태를 DB에 저장
  • 결제 서비스에서 문제 발생 시, rollback을 위한 보상 트랜잭션 발생

하지만, 큰 문제가 있었습니다:
결제 요청 이벤트나 보상 트랜잭션 이벤트를 발행하기 직전에 서비스가 중단된다면?

👉 발행되어야 할 이벤트가 손실되는 심각한 문제 발생

💡 분산 트랜잭션의 일관성 확보: Outbox Pattern with Kafka Connect 적용

Transactional Outbox Pattern이란?

Transaction Outbox Pattern이란 DB를 업데이트하는 트랜잭션의 일부로 데이터베이스에 메시지를 저장하는 방법이 있습니다. 그런 다음에 별도의 프로세스가 저장된 이벤트를 읽어 메시지 브로커에 전송하는 것입니다. 이 방법이 Outbox Pattern 입니다.
애플리케이션은 데이터베이스의 outbox 테이블에 메시지 내용을 저장합니다. 다른 애플리케이션이나 프로세스는 outbox 테이블에서 데이터를 읽고 해당 데이터를 사용하여 작업을 수행할 수 있습니다. 실패시 완료될 때까지 다시 시도할 수 있습니다. 따라서 outbox pattern은 적어도 한 번 이상(at-least once) 메시지가 성공적으로 전송되었는지 확인할 수 있습니다

장점:

  • 데이터 일관성 보장
  • 메시지 손실 방지
  • 시스템 장애 시 복구 용이

CDC(Change Data Capture)란?

CDC는 데이터베이스의 변경 사항을 실시간으로 감지하고 다른 시스템으로 전파하는 기술입니다.

  • Debezium과 같은 CDC 도구를 사용하여 outbox 테이블의 변경 사항을 감지
  • 감지된 변경 사항을 Kafka 등의 메시지 브로커로 전송

CDC도 여러가지가 있지만 그 중에서 Debezium MySQL conenctor를 사용하도록 하겠습니다.

🧑‍⚖️ 기술적 의사결정 과정과 장애 대응 분석

  1. Q: Outbox 패턴을 사용 하지 않고 SAGA 패턴만으로는 충분하지 않은가?

    A: 서비스 중단 시 보상 트랜잭션 발생 전 문제 발생 가능성 존재
  2. Q: Outbox 패턴만 사용하면 되지 않을까?

    A: 단일 서비스 내 일관성은 보장하나, 여러 서비스 간 트랜잭션 관리에 한계
  3. Q: CDC 대신 배치 보상 프로세스를 사용하면?

    A: 시스템 자원 비효율적 사용, 실시간성 저하
  4. Q: 상태 변경 후 이벤트를 발생시키는 프로듀서에 문제 발생 시?

    A: Transactional Outbox 패턴을 사용하면 이 문제를 해결할 수 있습니다. 쿠폰 사용 상태를 저장하고 메시지를 outbox 테이블에 저장하는 작업을 하나의 트랜잭션으로 묶습니다. 이 트랜잭션이 성공적으로 커밋되면, CDC 시스템이 outbox 테이블에서 변경 사항을 캡처하여 메시지를 안전하게 발행합니다.
  5. Q: CDC CONNECTOR가 다운되버리는 상황이면 이벤트 유실이 발생하지 않나?

    A: Debezium은 데이터베이스의 커밋 로그를 기반으로 하므로, 서버가 다운되었다가 복구된 후에도 마지막으로 처리된 오프셋 이후의 변경 사항을 자동으로 캡처하고 Kafka로 전송합니다. 별다른 추가 설정 없이도 Debezium이 자동으로 이벤트를 발행할 수 있습니다.
  6. Q: 컨슈머가 다운되버리는 상황이면 이벤트 유실이 발생하지 않나?

    • 컨슈머의 자동 오프셋 커밋 기능 활용 (auto.offset.commit 설정이 true일 때 활성화, 기본값은 true)
    • CDC와 Transactional Outbox 패턴의 결합으로 해결 가능:
      1. 보상 트랜잭션 이벤트를 outbox 테이블에 기록
      2. CDC 시스템이 이를 캡처하여 이벤트 발행
      3. 서비스 중단 시에도 데이터베이스에 기록된 내용을 기반으로 이벤트 재처리 가능

이제 의사 결정과 추가 장애 대응 분석을 마치고
직접 Transactional Outbox Pattern +Debezium MySQL Connector을 설정해보겠습니다.

🎨 Transactional Outbox Pattern with Kafka Connect 적용한 최종 설계

설계도 2

  1. 쿠폰 서비스:

    • 쿠폰 사용 처리 + Outbox 테이블에 이벤트 저장 (단일 트랜잭션)
    • Debezium이 Outbox 테이블 변경 감지 및 Kafka로 전송
  2. 결제 서비스:

    • Kafka에서 쿠폰 사용 이벤트 소비
    • 결제 처리 + Outbox 테이블에 결제 완료 이벤트 저장
    • 실패 시, 보상 트랜잭션 이벤트를 Outbox 테이블에 저장
  3. Kafka Connect (Debezium):

    • 각 서비스의 Outbox 테이블 모니터링
    • 변경 사항을 해당 Kafka 토픽으로 전송

이 설계로 서비스 간 데이터 일관성을 보장하고, 메시지 손실 위험을 최소화할 수 있습니다.

🛠 실습 과정

Outbox Pattern 적용 Application 로직

[OutboxEvent 엔티티]

@Entity
@Getter
@Table(name = "outbox_events")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class OutboxEvent {
    @Id
    private String id;

    @Column(name = "aggregate_id", nullable = false)
    private String aggregateId;

    @Column(name = "aggregate_type", nullable = false)
    private String aggregateType;

    @Column(name = "event_type", nullable = false)
    private String eventType;

    @Column(name = "payload", nullable = false, columnDefinition = "json")
    private String payload;

    @Column(name = "created_at", nullable = false)
    private LocalDateTime createdAt;

    public OutboxEvent(String aggregateId, String aggregateType, String eventType, String payload) {
        this.id = UUID.randomUUID().toString();
        this.aggregateId = aggregateId;
        this.aggregateType = aggregateType;
        this.eventType = eventType;
        this.payload = payload;
        this.createdAt = LocalDateTime.now(ZoneId.of("Asia/Seoul"));
    }

}

[쿠폰 마이크로 서비스 아웃박스 이벤트 저장]

@Transactional
public void useCouponAndPayment(PaymentRequestDto paymentRequestDto) throws JsonProcessingException {
    // Transactional outbox pattern + CDC 적용
    OutboxEvent outboxEvent = parsingEvent(paymentRequestDto);
    
    if (paymentRequestDto.getCouponStatus() == CouponStatus.쿠폰미사용) {
        log.info("======couponUseConsumer Data :{}======", paymentRequestDto);
        outboxEventRepository.save(outboxEvent);
        return;
    }
    
    Coupon coupon = couponRepository.findByMemberNoAndCouponNo(paymentRequestDto.getMemberNo(), paymentRequestDto.getCouponNo());
    //사용완료 및 변경감지
    coupon.useCoupon();
    log.info("======couponUseConsumer Data :{}======", paymentRequestDto);
    
    //쿠폰 적용완료후 결제 요청
    outboxEventRepository.save(outboxEvent);
}
이 메서드는 쿠폰 사용과 결제 요청을 하나의 트랜잭션으로 처리합니다. 쿠폰 상태 변경과 Outbox 이벤트 저장이 원자적으로 이루어져, 데이터 일관성을 보장합니다.

[결제 마이크로 서비스 컨슈머]

@KafkaListener(topics = "Coupon.events", containerFactory = "paymentListenerContainerFactory")
public void handlePayment(String eventPayload) throws JsonProcessingException {
    OutboxEvent outboxEvent = objectMapper.readValue(eventPayload, OutboxEvent.class);
    PaymentRequestDto paymentRequestDto = objectMapper.readValue(outboxEvent.getPayload(), PaymentRequestDto.class);
    log.info("결제 요청 시작 ==> 결제 정보 : {}", paymentRequestDto);
    paymentService.tradePayMoney(paymentRequestDto);
}
이 컨슈머는 Kafka의 'Coupon.events' 토픽을 구독하여 쿠폰 사용 이벤트를 처리합니다.

[결제 마이크로 서비스 메서드]

@Transactional
public void tradePayMoney(PaymentRequestDto paymentRequestDto) throws JsonProcessingException {
    OutboxEvent outboxEvent = parsingEvent(paymentRequestDto);
    try {
        Payment buyerPayment = paymentRepository.findByMemberNo(paymentRequestDto.getMemberNo());
        Integer buyerPayMoney = paymentRequestDto.getPayMoney();
        if (paymentRequestDto.getCouponStatus() == CouponStatus.쿠폰사용) {
            buyerPayMoney += paymentRequestDto.getDiscountPrice();
        }
        paymentRepository.tradePayMoney(paymentRequestDto.getSellerNo(), buyerPayment.getMemberNo(), paymentRequestDto, buyerPayMoney);
    } catch (Exception e) {
        log.error("===[결제 요청 오류] -> coupon-rollback , 쿠폰 번호 :{} / {}====", paymentRequestDto.getCouponNo(), e.getMessage());
        outboxEventRepository.save(outboxEvent);
    }
}
이 메서드는 결제 처리를 수행하며, 실패 시 보상 트랜잭션을 위한 Outbox 이벤트를 저장합니다.

[쿠폰 서비스 보상 트랜잭션 받는 컨슈머]

@KafkaListener(topics = "Payment.events", containerFactory = "paymentFailedListenerContainerFactory")
public void handleCouponRollbackEvent(String eventPayload) throws JsonProcessingException {
    OutboxEvent outboxEvent = objectMapper.readValue(eventPayload, OutboxEvent.class);
    PaymentRequestDto paymentRequestDto = objectMapper.readValue(outboxEvent.getPayload(), PaymentRequestDto.class);
    log.info("======보상 트랜잭션 동작 , 쿠폰 번호 :{} =====", paymentRequestDto.getCouponNo());
    couponService.revertCouponStatus(paymentRequestDto);
}
이 컨슈머는 결제 실패 시 발생하는 보상 트랜잭션 이벤트를 처리하여 쿠폰 상태를 원래대로 되돌립니다.

Debezium Kafka Connect 구축 과정

MySQL 플러그인 다운로드 및 설치

## Kafka 컨테이너에 접속하여 플러그인이 위치할 connectors 폴더를 생성
docker exec -it kafka bash
cd /opt/kafka_2.13-2.8.1
mkdir connectors
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz

## 압축 풀기!
tar -zxvf debezium-connector-mysql-2.7.0.Final-plugin.tar.gz

Kafka Connect 설정


cd /opt/kafka_2.13-2.8.1/config
vim connect-distributed.properties

# 설정 파일에 아래 라인 추가!
plugin.path=/opt/kafka_2.13-2.8.1/connectors

Kafka Connect 실행 및 확인

## Kafka Connect를 distributed 모드로 실행해줍니다.
connect-distributed.sh -daemon /opt/kafka/config/connect-distributed.properties

# Kafka Connect 확인
curl http://localhost:8083/


## 결과
[
    {
        "class": "io.debezium.connector.mysql.MySqlConnector",
        "type": "source",
        "version": "2.7.0.Final"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "type": "sink",
        "version": "2.8.1"
    },
    {
        "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
        "type": "source",
        "version": "2.8.1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
        "type": "source",
        "version": "1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
        "type": "source",
        "version": "1"
    },
    {
        "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
        "type": "source",
        "version": "1"
    }
]

MySQL Connector 설정 및 등록

curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{
  "name": "coupon-outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "<MYSQL_HOST>",
    "database.port": "<MYSQL_PORT>",
    "database.user": "<MYSQL_USER>",
    "database.password": "<MYSQL_PASSWORD>",
    "database.server.id": "1",
    "database.include.list": "coupon",
    "table.include.list": "coupon.outbox_events",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "${routedByValue}.events",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.timestamp": "created_at",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.field.event.type": "event_type",
    "database.history.kafka.bootstrap.servers": "<KAFKA_BROKER_1>:<PORT>,<KAFKA_BROKER_2>:<PORT>,<KAFKA_BROKER_3>:<PORT>",
    "schema.history.internal.kafka.bootstrap.servers": "<KAFKA_BROKER_1>:<PORT>,<KAFKA_BROKER_2>:<PORT>,<KAFKA_BROKER_3>:<PORT>",
    "schema.history.internal.kafka.topic": "schema-changes.coupon",
    "topic.prefix": "coupon.outbox"
  }
}'
  • connector.class: Debezium MySQL 커넥터를 사용함을 명시
  • database.hostname, database.port: 모니터링할 MySQL 서버 정보
  • table.include.list: 변경 사항을 추적할 테이블 지정 (여기서는 outbox_events 테이블)
  • transforms: Outbox 변환기를 사용하여 이벤트를 라우팅
  • transforms.outbox.route.by.field: 이벤트 라우팅에 사용할 필드 지정
  • transforms.outbox.table.field.*: Outbox 테이블의 각 필드와 이벤트 필드 매핑

이 설정으로 Debezium은 outbox_events 테이블의 변경 사항을 감지하고, 이를 지정된 Kafka 토픽으로 전송합니다.

Connector 상태 확인

curl -X GET http://localhost:8083/connectors/coupon-outbox-connector/status

curl -X GET http://localhost:8083/connectors/payment-outbox-connector/status


## 결과
{"name":"coupon-outbox-connector","connector":{"state":"RUNNING","worker_id":"172.18.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.18.0.3:8083"}],"type":"source"}
{"name":"payment-outbox-connector","connector":{"state":"RUNNING","worker_id":"172.18.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.18.0.3:8083"}],"type":"source"}

이 과정을 통해 Outbox 패턴과 CDC를 결합한 이벤트 발행 시스템을 구축할 수 있습니다. 이 시스템은 데이터베이스 트랜잭션과 이벤트 발행의 일관성을 보장하며, 장애 상황에서도 안정적으로 동작할 수 있습니다.

📜 정상 동작 테스트

outbox_events 테이블에 넣을 데이터를 준비해줍니다!

INSERT INTO outbox_events (id, aggregate_id, aggregate_type, created_at, event_type, payload)
VALUES (
'3x2clq32-31xx-4743-b93d-5d84ed8a5236',4,
'Coupon',
'2024-07-17 18:19:59.236696',
'payment',
'{"couponNo": 4, "memberNo": 1, "payMoney": 20300, "sellerNo": 2, "couponStatus": "쿠폰사용", "discountPrice": 3000}'
);

Debezium Kafka Connect가 해당 테이블의 로그를 통해 변경 사항을 감지합니다

[결제 서비스 컨슈머]

[보상 트랜잭션 발생 시]

Connector 다운시 동작 테스트

Debezium MySQL Connector를 pause 시켜놓고 이벤트가 유실이 안되는지 확인해보도록 하겠습니다.

curl -X PUT http://localhost:8083/connectors/[connector 이름]/pause


멈춰놓고 다시 outbox_event 테이블에 데이터를 insert를 해주고, 다시 커넥터를 재시작하고 로그를 확인해보면 정상적으로 이벤트를 전달하는 것을 확인할 수 있습니다!

🧑‍🏫 정리


오늘 포스팅을 끝으로 SAGA 패턴을 적용한 쿠폰 서비스 + 결제 서비스 프로세스를 개선하는 과정을 알아봤습니다. 그전까지 진행했던 과정을 정리해보도록 하겠습니다!

Kafka 클러스터 구축:

  • 시스템의 가용성확장성을 크게 향상시켰습니다.
  • 브로커 장애 시에도 지속적인 서비스 제공이 가능해졌습니다.

Transactional Outbox Pattern과 CDC(Change Data Capture) 구현:

  • 데이터베이스 업데이트와 메시지 발행을 원자적 작업으로 처리하여 데이터 일관성을 보장했습니다.
  • polling 방식이 아닌 Debezium을 활용한 CDC로 데이터베이스 변경 사항을 감지하기 때문에 결제 프로세스에 실시간성을 향상시켰습니다
  • 이 조합을 통해 서비스 간 데이터 동기화의 신뢰성을 높이고, 서비스 장애 시에도 메시지 손실 위험을 최소화했습니다.

멱등성 프로듀서 설정:

  • 메시지의 중복 전송을 방지하여 시스템의 안정성을 향상시켰습니다.

긴 포스팅 읽어주셔서 감사합니다!! 잘못된 점이나 궁금하신 점은 편하게 피드백 주시면 감사하겠습니다 🙏🙏

참고

https://techblog.woowahan.com/10000/
https://techblog.uplus.co.kr/debezium%EC%9C%BC%EB%A1%9C-db-synchronization-%EA%B5%AC%EC%B6%95%ED%95%98%EA%B8%B0-1b6fba73010f
https://velog.io/@jm94318/3%ED%8E%B8-MSA-%EC%9D%B4%EB%B2%A4%ED%8A%B8-%EB%B0%9C%ED%96%89-Debezium-Transformations
https://blog.gangnamunni.com/post/transactional-outbox/

profile
백엔드 공부중입니다!

0개의 댓글

관련 채용 정보