이 포스팅은 총 3편으로 이루어져 있습니다.
MSA 환경
에서 메시지가 유실되는 환경을 직접 구현해보며 문제점을 알아봅니다.Transactional Outbox Pattern
을 Polling-Publisher 모델로 구현해봅니다.Transactional Outbox Pattern
을 CDC(Change Data Capture)를 활용해 구현해봅니다.MSA
환경에서 개발을 하다보면, 특정 서비스에서 일어난 데이터의 변경
을 다른 서비스에 전파해야 하는 경우가 많이 발생합니다.
보통 Event-Driven-Architecture
를 채택해서 특정 서비스에서 일어난 데이터의 변경을 하나의 이벤트로 보고 다른 서비스로 전파해서 데이터의 결과적 일관성을 보장합니다.
이 과정에서 외부 메시지 브로커가 많이 사용되는데, 대체로 Kafka
를 많이 사용하고 있습니다.
만약, 메시지가 전송 중 유실
되어 다른 서비스에 전파
되지 않는다면 어떤 일이 일어날까요?
결제 서비스와 재고 관리 서비스가 있다고 예시를 들어보겠습니다.
사용자가 결제를 취소했다면 주문 했던 상품의 재고를 주문하기 전의 수량으로 다시 돌려놓아야 합니다.
이때 메시지
가 유실된다면, 실제 재고와 재고 데이터가 일치하지 않음은 물론 장기적으로 주문할 수 있는 상품을 주문하지 못해 영업 손실
로 이어질 수 있습니다.
어떻게 메시지
가 유실될 수 있는지 간단한 예제
를 통해서 알아보겠습니다.
링크드인과 같은 SNS는 특정 회원
을 구독하면 회원이 글을 작성하거나 상태 변경시 알람이 발송됩니다.
Post-Service
와 Subscriber-Service
를 활용해 간단한 서비스를 구현해보겠습니다.
간단하게 Flow Chart
를 작성해보았습니다. 각 단계에 대해서 간단하게 살펴보겠습니다.
Post-Service DB
에 글을 저장하고, 이벤트를 발생시킵니다.EventHandler
는 외부 메시지 브로커에 메시지를 발행합니다.Subscriber-Service
의 메시지 Consumer는 메시지를 수신합니다.처리된 이벤트
인지 확인합니다.메시지 수신
후, 구독자를 찾아 알람
을 발송
합니다.모든 개발자가 그렇듯이 이렇게 정상 케이스
만 생각하면 행복회로가 굴러갑니다.
우리의 행복회로
는 메시지 유실로 활활 타버리고 있었는데 말이죠,,
CreatePostService
(게시글을 작성하고 이벤트를 발생시킨다)@Service
@Transactional
@RequiredArgsConstructor
public class CreatePostService implements CreatePostUseCase {
private final PostRepository postRepository;
private final ApplicationEventPublisher eventPublisher;
@Override
public void create(Long authorId, String title, String content) {
final Post post = Post.of(authorId, title, content);
postRepository.save(post);
// 게시글 생성 이벤트 발송
final CreatePostEvent event = CreatePostEvent.of(UUID.randomUUID(), post.getPostId(), authorId, post.getTitle());
eventPublisher.publishEvent(event);
}
}
CreatePostEventHandler
(이벤트를 수신하고 외부 메시지 브로커에 메시지를 발송한다)@Component
@RequiredArgsConstructor
public class CreatePostEventHandler {
private final KafkaTemplate<String, CreatePostEvent> kafkaTemplate;
@Async
@TransactionalEventListener
public void handleEvent(CreatePostEvent event) {
kafkaTemplate.send("post-create", event);
}
}
@TransactionalEventListener
사용으로, 게시글이 정상적으로 저장되어야 이벤트가 수신됨을 보장합니다.
Subscriber-Service
는 이벤트를 수신하고, 메시지에 포함된 작성자 정보를 이용하여 구독자를 조회하고 알림을 발송합니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class CreatePostListener {
private final SendNotificationUseCase sendNotificationUseCase;
@KafkaListener(topics = "post-create", groupId = "post-group", containerFactory = "concurrentKafkaListenerContainerFactory")
void consumeEvent(@Payload CreatePostEvent event) {
log.info("Create Post Event 수신 Request Data = [{}], 수신 날짜 = [{}]", event, LocalDateTime.now());
final SendNotificationServiceRequest serviceRequest = SendNotificationServiceRequest.of(event.eventId(), event.postId(), event.authorId(), event.title());
sendNotificationUseCase.sendNotification(serviceRequest);
}
}
@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class SendNotificationService implements SendNotificationUseCase {
private final SubscribeQueryRepository queryRepository;
private final PostEventRepository postEventRepository;
private final MemberRepository memberRepository;
private final RestClient restClient;
@Override
public void sendNotification(SendNotificationServiceRequest serviceRequest) {
// 이미 처리된 이벤트인지 확인
final Optional<PostEvent> findEvent = postEventRepository.findById(serviceRequest.eventId());
if (findEvent.isEmpty()) {
final PostEvent postEvent = PostEvent.of(serviceRequest.eventId(), serviceRequest.authorId());
final String authorNickname = memberRepository.loadMemberNickname(serviceRequest.authorId());
final WebhookCommand webhookCommand = WebhookCommand.of(serviceRequest.title(), authorNickname);
// 구독자의 Webhook-URL을 모두 조회하여 알람을 발송한다.
queryRepository.loadAllWebhookUrls(serviceRequest.authorId())
.forEach(webHookUrl -> {
log.info("요청 시도 중 = {}", webHookUrl);
int result = restClient.post()
.uri(webHookUrl)
.body(webhookCommand)
.retrieve()
.toBodilessEntity().getStatusCode().value();
log.info("상태 코드 = {}", result);
});
postEventRepository.save(postEvent);
}
}
}
그렇다면, 어디서 문제가 생기는지 직접 테스트 해보겠습니다.
알람 발송이 정상적으로 잘 되는지 확인하기 위해, 잔디의 Incoming Webhook 서비스를 활용했습니다.
잔디 토픽
생성 후 Incoming Webhook 연동게시글 작성
요청 발송
모든 구독자
에게 알람 수신
정상 케이스
에서는 문제 없이 동작하는 것을 볼 수 있습니다.
외부 메시지 브로커로 Kafka
를 사용중인데, Kafka
종료 상태에서는 어떻게 동작하는지 확인해보겠습니다.
Kafka
와 Zookeeper
를 종료하고 아까와 같은 게시글 작성 요청을 보내보겠습니다.
게시글 작성 요청
발송
메시지 발송 시간 초과로, TimeoutException
발생
Kafka
재실행
Kafka
가 다시 실행되어 장애가 복구되었지만, 우리의 메시지
는 도착하지 않습니다.
왜 메시지
가 안오냐구요? 우리 메시지는 공중분해 되버렸거든요!
Spring
에서 제공하는 @Retryable
을 사용하더라도, Kafka
가 최대 재시도 횟수 처리 후 복구 된다면, 결국 메시지는 유실됩니다.
따라서 몇 번의 재시도 로직을 추가한다고 해서 해결 될 문제
는 아닌 것입니다.
Kafka
복구 시점까지 계속 재시도 할 수 있지만 Thread
를 반납하지 않고 지속적으로 요청을 보내게되고, 이로 인해 Network I/O가 발생하여 리소스 관리
측면에서 좋은 방법
이라고 하기 어렵습니다.
Transactional Outbox Pattern
이 적용되지 않아, 메시지 브로커 장애시 구독자에게 알람이 가지 않는 것을 확인했습니다.
다음 포스팅에서는 Transactional Outbox Pattern
을 구현할 수 있는 방법 중 지속적으로 Outbox DB에 Polling 하여 이벤트를 처리하는 Polling-Publisher
모델을 살펴보겠습니다.
오늘도 읽어주셔서 감사합니다.
V1 코드 레포지토리
-> 이동다음 포스팅으로 이동 -> 다음 포스팅으로 이동하기
항상 좋은 글 감사합니다~ 이 집 블로그 맛집임 ㅎㅎㅎ