MSA 환경에서 메시지 유실? 지옥가는 지름길 - 1

DevSeoRex·2024년 7월 1일
13
post-thumbnail

🫠 들어가며,,

이 포스팅은 총 3편으로 이루어져 있습니다.

  • 1편
    • MSA 환경에서 메시지가 유실되는 환경을 직접 구현해보며 문제점을 알아봅니다.
  • 2편
    • Transactional Outbox PatternPolling-Publisher 모델로 구현해봅니다.
  • 3편
    • Transactional Outbox PatternCDC(Change Data Capture)를 활용해 구현해봅니다.

😥 메시지가 유실되면 어떤 일이 일어날까?

MSA 환경에서 개발을 하다보면, 특정 서비스에서 일어난 데이터의 변경을 다른 서비스에 전파해야 하는 경우가 많이 발생합니다.

보통 Event-Driven-Architecture를 채택해서 특정 서비스에서 일어난 데이터의 변경을 하나의 이벤트로 보고 다른 서비스로 전파해서 데이터의 결과적 일관성을 보장합니다.

이 과정에서 외부 메시지 브로커가 많이 사용되는데, 대체로 Kafka를 많이 사용하고 있습니다.

만약, 메시지가 전송 중 유실되어 다른 서비스에 전파되지 않는다면 어떤 일이 일어날까요?

결제 서비스와 재고 관리 서비스가 있다고 예시를 들어보겠습니다.
사용자가 결제를 취소했다면 주문 했던 상품의 재고를 주문하기 전의 수량으로 다시 돌려놓아야 합니다.

이때 메시지가 유실된다면, 실제 재고재고 데이터가 일치하지 않음은 물론 장기적으로 주문할 수 있는 상품을 주문하지 못해 영업 손실로 이어질 수 있습니다.

🫢 왜 메시지가 유실될까?

어떻게 메시지가 유실될 수 있는지 간단한 예제를 통해서 알아보겠습니다.

링크드인과 같은 SNS는 특정 회원을 구독하면 회원이 글을 작성하거나 상태 변경시 알람이 발송됩니다.
Post-ServiceSubscriber-Service를 활용해 간단한 서비스를 구현해보겠습니다.

🤩 Flow Chart

간단하게 Flow Chart를 작성해보았습니다. 각 단계에 대해서 간단하게 살펴보겠습니다.

  1. 특정 회원이 글을 작성합니다.
  2. Post-Service DB에 글을 저장하고, 이벤트를 발생시킵니다.
  3. EventHandler는 외부 메시지 브로커에 메시지를 발행합니다.
  4. Subscriber-Service의 메시지 Consumer는 메시지를 수신합니다.
  5. 이미 처리된 이벤트인지 확인합니다.
  6. 메시지 수신 후, 구독자를 찾아 알람발송합니다.

모든 개발자가 그렇듯이 이렇게 정상 케이스만 생각하면 행복회로가 굴러갑니다.

우리의 행복회로는 메시지 유실로 활활 타버리고 있었는데 말이죠,,

😎 이제 진짜 구현해보자!

  • CreatePostService (게시글을 작성하고 이벤트를 발생시킨다)
@Service
@Transactional
@RequiredArgsConstructor
public class CreatePostService implements CreatePostUseCase {

    private final PostRepository postRepository;
    private final ApplicationEventPublisher eventPublisher;

    @Override
    public void create(Long authorId, String title, String content) {
        final Post post = Post.of(authorId, title, content);
        postRepository.save(post);

        // 게시글 생성 이벤트 발송
        final CreatePostEvent event = CreatePostEvent.of(UUID.randomUUID(), post.getPostId(), authorId, post.getTitle());
        eventPublisher.publishEvent(event);
    }
}

  • CreatePostEventHandler (이벤트를 수신하고 외부 메시지 브로커에 메시지를 발송한다)
@Component
@RequiredArgsConstructor
public class CreatePostEventHandler {

    private final KafkaTemplate<String, CreatePostEvent> kafkaTemplate;

    @Async
    @TransactionalEventListener
    public void handleEvent(CreatePostEvent event) {
        kafkaTemplate.send("post-create", event);
    }
}

@TransactionalEventListener 사용으로, 게시글이 정상적으로 저장되어야 이벤트가 수신됨을 보장합니다.

Subscriber-Service는 이벤트를 수신하고, 메시지에 포함된 작성자 정보를 이용하여 구독자를 조회하고 알림을 발송합니다.

@Slf4j
@Component
@RequiredArgsConstructor
public class CreatePostListener {

    private final SendNotificationUseCase sendNotificationUseCase;

    @KafkaListener(topics = "post-create", groupId = "post-group", containerFactory = "concurrentKafkaListenerContainerFactory")
    void consumeEvent(@Payload CreatePostEvent event) {
        log.info("Create Post Event 수신 Request Data = [{}], 수신 날짜 = [{}]", event, LocalDateTime.now());

        final SendNotificationServiceRequest serviceRequest = SendNotificationServiceRequest.of(event.eventId(), event.postId(), event.authorId(), event.title());
        sendNotificationUseCase.sendNotification(serviceRequest);
    }
}
@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class SendNotificationService implements SendNotificationUseCase {

    private final SubscribeQueryRepository queryRepository;
    private final PostEventRepository postEventRepository;
    private final MemberRepository memberRepository;
    private final RestClient restClient;

    @Override
    public void sendNotification(SendNotificationServiceRequest serviceRequest) {
    	// 이미 처리된 이벤트인지 확인
        final Optional<PostEvent> findEvent = postEventRepository.findById(serviceRequest.eventId());

        if (findEvent.isEmpty()) {
            final PostEvent postEvent = PostEvent.of(serviceRequest.eventId(), serviceRequest.authorId());
            final String authorNickname = memberRepository.loadMemberNickname(serviceRequest.authorId());

            final WebhookCommand webhookCommand = WebhookCommand.of(serviceRequest.title(), authorNickname);

		   // 구독자의 Webhook-URL을 모두 조회하여 알람을 발송한다.           
           queryRepository.loadAllWebhookUrls(serviceRequest.authorId())
                            .forEach(webHookUrl -> {
                                log.info("요청 시도 중 = {}", webHookUrl);

                                int result = restClient.post()
                                        .uri(webHookUrl)
                                        .body(webhookCommand)
                                        .retrieve()
                                        .toBodilessEntity().getStatusCode().value();

                                log.info("상태 코드 = {}", result);
                            });

            postEventRepository.save(postEvent);
        }

    }
}

그렇다면, 어디서 문제가 생기는지 직접 테스트 해보겠습니다.

🙄 정상 케이스 테스트

알람 발송이 정상적으로 잘 되는지 확인하기 위해, 잔디의 Incoming Webhook 서비스를 활용했습니다.

  • 잔디 토픽 생성 후 Incoming Webhook 연동

  • 게시글 작성 요청 발송

  • 모든 구독자에게 알람 수신

정상 케이스 에서는 문제 없이 동작하는 것을 볼 수 있습니다.

🫤 비정상 케이스 테스트 - Kafka 장애

외부 메시지 브로커로 Kafka 를 사용중인데, Kafka 종료 상태에서는 어떻게 동작하는지 확인해보겠습니다.

KafkaZookeeper를 종료하고 아까와 같은 게시글 작성 요청을 보내보겠습니다.

  • 게시글 작성 요청 발송

  • 메시지 발송 시간 초과로, TimeoutException 발생

  • Kafka 재실행

Kafka가 다시 실행되어 장애가 복구되었지만, 우리의 메시지 는 도착하지 않습니다.
메시지가 안오냐구요? 우리 메시지는 공중분해 되버렸거든요!

Spring 에서 제공하는 @Retryable 을 사용하더라도, Kafka 가 최대 재시도 횟수 처리 후 복구 된다면, 결국 메시지는 유실됩니다.

따라서 몇 번의 재시도 로직을 추가한다고 해서 해결 될 문제는 아닌 것입니다.

Kafka 복구 시점까지 계속 재시도 할 수 있지만 Thread 를 반납하지 않고 지속적으로 요청을 보내게되고, 이로 인해 Network I/O가 발생하여 리소스 관리 측면에서 좋은 방법이라고 하기 어렵습니다.

😘 다음으로..

Transactional Outbox Pattern이 적용되지 않아, 메시지 브로커 장애시 구독자에게 알람이 가지 않는 것을 확인했습니다.

다음 포스팅에서는 Transactional Outbox Pattern을 구현할 수 있는 방법 중 지속적으로 Outbox DB에 Polling 하여 이벤트를 처리하는 Polling-Publisher 모델을 살펴보겠습니다.

오늘도 읽어주셔서 감사합니다.

  • V1 코드 레포지토리 -> 이동

다음 포스팅으로 이동 -> 다음 포스팅으로 이동하기

🙇🏻

5개의 댓글

comment-user-thumbnail
2024년 7월 1일

항상 좋은 글 감사합니다~ 이 집 블로그 맛집임 ㅎㅎㅎ

1개의 답글
comment-user-thumbnail
2024년 7월 4일

ApplicationEventPublisher 와 @Async 사용을 spring boot 비동기 이벤트 처리로 작성이 되어있는데
비동기 이벤트 쓰레드 사용을 위해 별도 쓰레드 config를 작성하여 사용했을거로 보입니다.
트래픽이 증가함에 따라 비동기 쓰레드 부족 등 여러 에러로 인해서도 유실 가능성이 있을 수 있어 보이는데
해당 이슈에 대해서는 어떻게 생각하시는지 궁금합니다!

1개의 답글