[MSA] SAGA Pattern 적용하기

C_Mungi·2024년 10월 1일

MSA

목록 보기
8/8
post-thumbnail

SAGA Pattern이란

SAGA Pattern을 설명하기 이전에 기존 모놀리식 아키텍처에서는 트랜잭션을 어떻게 처리를 했는지 또 마이크로 서비스 아키텍처에서 널리 사용되는 패턴 중 하나인 2PC에 대해 짚고 넘어가겠습니다.

1. 모놀리식 아키텍처에서의 트랜잭션 처리

이미지 출처 : https://blog.bitsrc.io/distributed-transactions-in-microservices-d07aba281f90

이와 같이 모놀리식 아키텍처에서는 요청을 보내면 DB에서 트랜잭션이 생성해 상태 확인이나 요청처리를 수행합니다. 수행이 정상적으로 이루어졌다면 해당 리스폰스가 반환이 되겠지만 이 과정에서 실패하게 된다면, 해당 트랜잭션은 롤백됩니다.

2. 마이크로 서비스 아키텍처에서의 트랜잭션 처리

MSA에서는 독립된 서비스들은 각자 별도의 DB를 구성되어 있습니다. 그렇기에 요청을 보내면 2개 이상의 서비스가 호출이 필요할 수 있는 상황이 생기고 각 DB에서 해당 요청 사항에 대한 처리를 해줘야 합니다.

하지만 독립된 서비스인만큼 ACID 트랜잭션을 유지하기 힘듭니다.

특히, transaction-1이 실패한 경우 transaction-2는 어떻게 처리할지 등 고려해야 합니다.

이러한 고려점에 대한 방안으로 2PC, SAGA Pattern이 있습니다.ㅣ

2-1. 2PC

이미지 출처 : https://timilearning.com/posts/mit-6.824/lecture-12-distributed-transactions/
원본 출처 : Martin Kleppmann - 데이터 중심 애플리케이션 설계

Two-Phase Commit(2PC)는 분산 트랜잭션에서 원자성을 보장하는데 사용되는 프로토콜입니다.
분산 트랜잭션의 경우 2PC는 다음과 같이 동작합니다.

  • DB는 트랜잭션을 담당하는 트랜잭션 코디네이터 라는 또 다른 엔티티를 추가합니다.
  • 트랜잭션에 참여하는 다른 모든 서버를 참가자 라고 합니다.
  • 트랜잭션 코디네이터는 먼저 트랜잭션의 쓰기 권한을 참여자에게 위임합니다. 각 참여자는 원래 트랜잭션에서 중첩된 트랜잭션을 만들고, Lock을 유지해야 할 수 있는 작업을 실행하고, 코디네이터에게 확인을 보냅니다.
  • 코디네이터가 확인 메시지를 받으면 프로토콜의 첫 번째 단계가 시작됩니다. 이 단계에서 코디네이터는 참가자에게 PREPARE 메시지를 보냅니다. 그런 다음 각 참가자는 중첩된 트랜잭션의 결과에 따라 트랜잭션을 커밋하거나 중단할 준비가 되었는지 코디네이터에게 알려서 응답합니다.
  • 참가자 중 누구라도 중단 메시지로 응답 하면 코디네이터는 전체 트랜잭션을 중단하기로 결정합니다. 코디네이터는 모든 참가자가 커밋할 준비가 된 경우에만 트랜잭션을 커밋합니다. 두 번째 단계는 코디네이터가 이러한 조건에 따라 전체 트랜잭션에 대한 COMMITTED 또는 ABORTED 레코드를 생성하고 해당 결과를 내구성 있는 로그에 저장할 때 시작됩니다. 그런 다음 해당 결정을 전체 트랜잭션의 결과로 참가자 노드에 전달합니다.

여기서 2PC는 다음과 같은 약속이 있습니다. 이러한 약속을 통해 2PC의 원자성이 보장됩니다.

  1. 참가자가 YES를 응답했을 때 이후엔 반드시 커밋을 해야한다.
  2. 코디네이터가 결정을 하면 반드시 결정을 강제해야한다.

단점

2PC의 가장 큰 단점은 코디네이터가 참가자에게 결과를 전달하기 전에 실패하면 참가자가 대기 상태에 갇힐 수 있다는 것입니다. 커밋할 준비가 되었다고 표시한 참가자는 다른 참가자가 중단할 준비가 되어 있을 수 있으므로 스스로 트랜잭션의 결과를 결정할 수 없습니다. 또한, 갇힌 참가자는 코디네이터가 충돌하기 전에 다른 참가자에게 커밋 메시지를 보냈을 수 있으므로 스스로 트랜잭션을 중단할지 결정할 수 없습니다.

이 방법은 참가자가 대기 상태에 갇혀 있는 동안 공유 객체에 대한 Lock을 유지할 수 있으므로 이상적이지 않습니다. 이로 인해 다른 트랜잭션이 진행되지 못할 수 있습니다.

2-2. SAGA Pattern

SAGA는 일련의 로컬 트랜잭션입니다. 각 로컬 트랜잭션은 익숙한 ACID 트랜잭션 프레임워크를 사용하여 로컬 데이터베이스를 업데이트하고 이벤트를 게시하여 사가의 다음 로컬 트랜잭션을 트리거합니다. 로컬 트랜잭션이 실패하면 사가는 일련의 보상 트랜잭션을 실행하여 이전 로컬 트랜잭션에서 완료한 변경 사항을 취소합니다.

이는 비동기적이며 일관된 트랜잭션 방식으로, 분산 트랜잭션이 관련 마이크로서비스의 비동기 트랜잭션에 의해 실행되는 전형적인 마이크로서비스 애플리케이션 아키텍처와 매우 유사합니다.

SAGA 메시지의 비동기적인 특성의 큰 장점은 SAGA의 일부 참가자가 일시적으로 사용 불가능하더라도 모든 단계가 실행되도록 보장한다는 점입니다. 또한, 다른 마이크로서비스나 객체를 방해하지 않고 장기적인 트랜잭션도 처리할 수 있습니다.

또한 SAGA Pattern에는 Orchestration과 Choreography 2종류가 존재합니다.

2-2-1. Orchestration SAGA Pattern

이미지 출처 : https://medium.com/cloud-native-daily/microservices-patterns-part-04-saga-pattern-a7f85d8d4aa3

Orchestration SAGA Pattern은 분산 트랜잭션을 관리하는 방식으로, 중앙 조정자(Orchestrator)가 각 서비스의 작업을 순차적으로 지시하고 관리하는 패턴입니다. 중앙 조정자가 트랜잭션의 전체 흐름을 제어하며, 각 서비스는 중앙 조정자로부터 명령을 받아 수행합니다.

그림에서 볼 수 있듯이, Micro service 01에 구현된 SAGA Orchestrator가 SAGA 트랜잭션(예: 주문 생성 SAGA)을 시작합니다. 앞서 설명한 것처럼, 상호작용 방식은 비동기 요청/응답 방식이며, 요청은 "명령 메시지"로 전달됩니다. 비동기 요청/응답 방식을 사용하기 때문에, 메시지 브로커 내에서 별도의 요청 및 응답 채널을 사용합니다.

Micro service 01이 create() 요청을 받으면, SAGA Orchestrator가 생성됩니다. 이 Orchestrator는 전체 승인이 완료될 때까지 서비스 요청을 PENDING 상태로 설정합니다. 이 과정에서 SAGA Orchestrator는 Micro service 2와 3에 명령 요청을 보내고, 메시지 브로커의 SAGA Orchestrator 응답 채널을 통해 응답을 받습니다. 두 응답의 결과에 따라 Orchestrator는 요청을 승인하거나 거부하게 됩니다.

장점

  • 더 간단한 의존성: Orchestrator가 항상 SAGA 참가자들을 호출하고, 그 반대는 발생하지 않으므로 순환 의존성이 없습니다.

  • 결합도 감소: Choreography SAGA Pattern과는 달리, 다른 SAGA 참가자들이 발행하는 이벤트나 구현된 비즈니스 로직에 대해 알 필요가 없습니다. 이렇게 하면 결합도가 낮아지고 비즈니스 로직이 훨씬 단순해집니다.

단점

  • Orchestrator 수준에서의 비즈니스 로직 감소: SAGA 오케스트레이터 내에 비즈니스 로직을 포함하지 않도록 하여 더 낮은 결합도를 유지하는 것이 좋습니다. 관련 서비스 외부에 비즈니스 로직을 두는 것은 바람직하지 않으며, Orchestrator 비즈니스 로직을 포함하는 것은 권장되지 않습니다.

  • 격리성 부족: 전반적으로 마이크로서비스 아키텍처는 전통적인 ACID 트랜잭션에 비해 격리성이 부족합니다. 이는 SAGA 참가자들이 전체 트랜잭션이 완료되기 전에 로컬 트랜잭션으로 변경 사항을 커밋하기 때문에 발생합니다. 이로 인해 데이터베이스 레벨에서 불일치가 생길 수 있습니다.

  • 단일 장애 지점: 중앙 조정자가 실패하면 전체 트랜잭션 흐름이 중단될 수 있습니다.

2-2-2. Choreography SAGA Pattern

이미지 출처 : https://medium.com/cloud-native-daily/microservices-patterns-part-04-saga-pattern-a7f85d8d4aa3

Choreography SAGA Pattern의 방식에서는 Orchestration SAGA와 달리, SAGA 참가자들에게 무엇을 해야 할지 지시하는 중앙 조정자가 없습니다. SAGA 참가자들은 서로의 이벤트를 구독하고 그에 맞게 대응합니다. 이에 따라 롤백에 대한 책임도 각 SAGA 참가자들에게 있습니다.

장점

  • Orchestration SAGA Pattern에 비해 로직을 구성하기 편합니다.

  • 비즈니스 흐름의 자연스러운 표현: 이벤트 기반의 상호작용을 통해 비즈니스 흐름을 자연스럽게 표현할 수 있어, 시스템의 가독성이 향상됩니다.

단점

  • 흐름 이해의 어려움: 일반적으로 이 방식은 SAGA 구현을 서비스 간에 분산시킵니다. 따라서 SAGA 흐름을 정의할 중앙집중적인 장소가 없습니다.

  • 서비스 간 순환 의존성: SAGA 참가자 간에 순환 의존성이 발생할 가능성이 있으며, 예를 들어 Micro service 01 → Micro service 02 → Micro service 01과 같은 형태가 될 수 있습니다.

  • 높은 결합도의 위험: 자신들에게 영향을 미치는 모든 이벤트를 구독해야 하므로 서비스 간 결합도가 높아질 위험이 있습니다.


정리

Chreography나 Orchestration중 어떤게 좋냐 라기보단 상황에 맞춰서 적용하는 것이 가장 좋습니다.

Chreography는 다음과 같은 이유로 규모가 작은 애플리케이션에 적용하기 좋습니다.

  • 규모가 작으면 흐름을 읽기 힘들정도로 복잡해지는 경우가 없습니다.

  • 서비스들의 이벤트를 서로 구독하고 이벤트 처리를 해주면 되기에 로직자체는 비교적 간단합니다.

Orchestration의 경우는 대규모 애플리케이션이 좀 더 적용하기 좋습니다.

  • Orchestrator가 트랜잭션의 흐름을 관리하게 되므로, 복잡한 비즈니스 로직을 효과적으로 관리할 수 있습니다.

  • Orchestrator가 오류를 중앙에서 처리할 수 있어 일관된 오류 처리가 가능합니다.


Choreography SAGA Pattern 적용하기

저의 경우 위에 언급했듯이 규모가 작은 애플리케이션이기에 Chreography SAGA Pattern을 적용하기로 했습니다. 그외의 이유로는 Orchestrator를 구현할 만큼 시간적 여유가 많지 않다라는 점과 당장 적용하기에는 Orchestration SAGA Pattern의 학습 곡선이 높다는 이유가 있었습니다.

Producer

1. Configuration 작성

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.producer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;
    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        return new DefaultKafkaProducerFactory<>(configMap);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

2. Dto 작성하기

ReservationMessageResponse.java

public record ReservationMessageResponse(@JsonProperty("reservationId") Long reservationId,
                                         @JsonProperty("status") ReservationStatus status,
                                         @JsonProperty("message") String message) {

} ,

ReservationStatus.java

@Getter
@AllArgsConstructor
public enum ReservationStatus {

    PENDING("예약 대기"),
    SUCCESS("예약 성공"),
    CANCEL("예약 취소"),
    FAILURE("예약 실패");

    private final String message;
}

3. Producer 관련 클래스 작성하기

@Component
@RequiredArgsConstructor
public class TransactionProducer {

    private static final String TOPIC_RESERVATION_TRANSACTION_RESULT = "reservation-transaction-result";
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ObjectMapper objectMapper;

    public void sendTransactionResult(final ReservationMessageResponse responseMessage)
        throws JsonProcessingException {
        final String message = objectMapper.writeValueAsString(responseMessage);
        kafkaTemplate.send(TOPIC_RESERVATION_TRANSACTION_RESULT, message);
    }

}

4. 기존 Consumer 로직 수정

@Component
@RequiredArgsConstructor
public class ReservationConsumer {

    private final RoomService roomService;
    private final TransactionProducer transactionProducer;
    private final ObjectMapper objectMapper;

    @KafkaListener(topics = "room-reserve", groupId = "reservation-group")
    public void consumeReservationEvent(final String reservationMessage)
        throws JsonProcessingException {
        final ReservationMessage message = objectMapper.readValue(reservationMessage,
            ReservationMessage.class);

        try {
            roomService.decreaseCountByOne(message.roomId(), message.checkInDate(),
                message.checkOutDate());
            final ReservationMessageResponse response = new ReservationMessageResponse(
                message.reservationId(), ReservationStatus.SUCCESS,
                StringUtils.EMPTY);
            transactionProducer.sendTransactionResult(response);

        } catch (Exception e) {
            final ReservationMessageResponse response = new ReservationMessageResponse(
                message.reservationId(), ReservationStatus.FAILURE,
                e.getMessage());
            transactionProducer.sendTransactionResult(response);
        }
    }
    
    ...
    
}

5. room-dev.yml

## msa server-dev
server:
  port: 8084

## db
spring:

	...

  kafka:
    ## kafka consumer
    consumer:
      bootstrap-servers: localhost:9092
      group-id: reservation-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    ## kafka producer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Consumer

1. Configuration 작성

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.consumer.key-deserializer}")
    private String keyDeserializer;
    @Value("${spring.kafka.consumer.value-deserializer}")
    private String valueDeserializer;

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        return new DefaultKafkaConsumerFactory<>(configMap);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

2. Dto 작성하기

ReservationMessageResponse.java

public record ReservationMessageResponse(@JsonProperty("reservationId") Long reservationId,
                                         @JsonProperty("status") ReservationStatus status,
                                         @JsonProperty("message") String message) {

}

ReservationStatus.java

@Getter
@AllArgsConstructor
public enum ReservationStatus {

    PENDING("예약 대기"),
    SUCCESS("예약 성공"),
    CANCEL("예약 취소"),
    FAILURE("예약 실패");

    private final String message;
}

3. Consumer 관련 클래스 작성하기

@Slf4j
@Component
@RequiredArgsConstructor
public class TransactionConsumer {

    private final ReservationRepository reservationRepository;
    private final ObjectMapper objectMapper;

    @Transactional
    @KafkaListener(topics = "reservation-transaction-result", groupId = "reservation-transaction-result-group")
    public void consumeReservationTransactionResultEvent(final String roomResponseMessage)
        throws JsonProcessingException {
        final ReservationMessageResponse response = objectMapper.readValue(roomResponseMessage,
            ReservationMessageResponse.class);

        if (response.status() == ReservationStatus.FAILURE) {
            reservationRepository.updateStatusToFailureById(response.reservationId());
            log.info("Reservation transaction failed with Id And Message. ID : {}, MESSAGE {}",
                response.reservationId(), response.message());
            return;
        } else if (response.status() == ReservationStatus.CANCEL) {
            reservationRepository.deleteById(response.reservationId());
            log.info("Reservation cancel transaction with Id And Message. ID : {}, MESSAGE {}",
                response.reservationId(), response.message());
            return;
        }
        reservationRepository.updateStatusToSuccessById(response.reservationId());
    }
}

4. reservation-dev.yml

## msa server-dev
server:
  port: 8083

## db
spring:

  ...

  kafka:
    ## kafka producer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    ## kafka consumer
    consumer:
      bootstrap-servers: localhost:9092
      group-id: reservation-transaction-result-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
profile
백엔드 개발자의 수집상자

0개의 댓글