그렇다면 메시지
유실을 방지하기 위해서 어떻게 V1 프로젝트
를 개선해야 할까요?
MSA 환경
에서 메시지 전송 신뢰성을 보장할 수 있는 한 패턴을 소개하려고 합니다.
Transactional Outbox Pattern
은 데이터 변경과 발송할 메시지를 저장하는 행위를 한 트랜잭션
에 묶어서 데이터만 변경되고 메시지가 발송되지 않는 상황을 방지해줍니다.
V1 프로젝트
에서는 메시지 발송이 실패하면, 메시지 전송
을 더 이상 시도하지 않아서 발송해야 할 메시지 객체가 메모리에서 삭제되게 됩니다.
따라서 이번에는 전송할 메시지
를 Outbox 테이블에 저장하여 영구적으로 보존하여 메시지 데이터가 사라지지 않게 함으로서 전송
을 보장
할 수 있는 것입니다.
Message Relay
모듈이 Outbox 테이블
에 저장된 메시지 데이터를 읽어서 외부 메시지 브로커에게 메시지를 전송하게 됩니다.
V2 Flow Chart
를 단계별로 살펴보겠습니다.
사용자
가 글을 작성합니다.Post 테이블
에 저장하고, 이벤트 데이터를 Outbox 테이블에 저장합니다.Message Relay
모듈 역할을 하는 스케줄러가 지속적으로 Outbox 테이블의 데이터를 Polling 합니다.Consumer
가 메시지를 수신합니다.이벤트
인지 확인합니다.수신한 메시지
의 데이터로 구독자를 찾고 알람을 발송합니다.V2에서 V1에 비해 달라지는 부분은 Post-Service
에만 변화가 있습니다.
Post-Service
는 Outbox 테이블이 추가되고 지속적으로 메시지
를 Polling
하는 스케줄러가 추가됩니다.
@Service
@Transactional
@RequiredArgsConstructor
public class CreatePostService implements CreatePostUseCase {
private final PostRepository postRepository;
private final CreatePostEventOutboxService createPostEventOutboxService;
@Override
public void create(Long authorId, String title, String content) {
final Post post = Post.of(authorId, title, content);
postRepository.save(post);
createPostEventOutboxService.create(post.getPostId(), authorId, title);
}
}
게시글 생성 후 이벤트를 발생시키지 않고, Outbox 테이블
에 메시지 데이터를 저장합니다.
@Slf4j
@Component
@Transactional
@RequiredArgsConstructor
public class SendPostEventScheduler {
private final LoadPostEventOutboxUseCase loadPostEventOutboxUseCase;
private final KafkaTemplate<String, CreatePostEvent> kafkaTemplate;
@Scheduled(fixedRate = 1000)
void sendPostEvent() {
log.info("1초 마다 스케쥴링 테스트 : 수행 시간 = [{}]", LocalDateTime.now());
List<PostEventOutbox> postEventOutboxList = loadPostEventOutboxUseCase.loadBeforeProcessingEvents();
// 메시지 발송이 되지 않은 Outbox List를 순회하면서, 메시지 발송 처리
postEventOutboxList.forEach(postEventOutbox -> {
final CreatePostEvent createPostEvent =
CreatePostEvent.of(postEventOutbox.getEventId(), postEventOutbox.getPostId(),
postEventOutbox.getAuthorId(), postEventOutbox.getTitle());
kafkaTemplate.send("post-create", createPostEvent);
// 전송된 메시지에 대해, 상태를 "완료"로 수정한다.
postEventOutbox.completeEvent();
});
log.info("처리한 이벤트 개수 = [{}], 종료 시간 = [{}]", postEventOutboxList.size(), LocalDateTime.now());
}
}
스케줄러
는 1초마다 처리되지 않은 메시지를 Outbox 테이블
에서 조회합니다.
조회된 메시지 리스트를 순회하면서 외부 메시지 브로커
에 메시지를 보내게 됩니다.
메시지 발송이 완료되면 Outbox Entity
의 완료 상태를 변경함으로써 DB에 반영합니다.
메시지 처리
중 문제가 생기면 중복 메시지 발행의 문제가 생길 수 있는데, 이런 문제를 해결하기 위해 Subscriber-Service
의 Message Consumer는 멱등성
을 보장해야 합니다.
예시
를 들어보자면 아래와 같은 상황에서 중복 메시지 발행 가능성이 있습니다.
Outbox 테이블
에서 처리되지 않은 메시지 A,B,C를 조회A,B
메시지를 전송 후 처리 상태를 "완료"로 변경C
메시지 전송 중 예외 발생A,B
메시지는 발송 되었으나 Update Query가 실행되지 않음스케줄러
가 처리되지 않은 메시지 A,B,C에 대해서 발송 시도C 메시지
의 처리 실패로 변경된 엔티티에 대해서 업데이트 쿼리가 나가지 않아 생기는 문제입니다.
Message Consumer
가 이벤트 처리의 멱등성을 보장해준다면 충분히 해결 가능한 문제입니다.
이제 V1
에서 문제가 되던 부분이 개선
되었는지 테스트를 수행해보겠습니다.
이번에는 Jmeter
를 활용해서 1초 동안 10개의 글 등록 요청
을 보내보겠습니다.
Jmeter
를 활용해 1초 동안 10개의 글 등록 요청 발송
각 구독자
는 10개
의 메시지 수신
모든 구독자
에게 메시지
가 정상적으로 수신되는 것을 확인할 수 있습니다.
V1
과 동일한 방식으로 Kafka
를 종료하고 Jmeter
를 활용해 정상 케이스와 같은 요청을 보내보겠습니다.
Jmeter
를 활용해 1초 동안 10개의 글 등록 요청 발송
Kafka 재실행
각 구독자
는 10개
의 메시지 수신
V1과 다르게, Kafka
의 장애가 복구되면 정상적으로 메시지가 수신되는 것을 볼 수 있습니다.
Kafka
가 장애로 인해 동작하지 않더라도, 스케줄러
가 1초
마다 처리되지 않은 메시지 발송을 시도하기 때문에 Kafka
의 장애가 복구되면 메시지가 순차적으로 처리되는 것입니다.
Transactional Outbox Pattern
을 이용하여 Polling-Publisher
형태로 메시지 전송 신뢰성을 보장하는 모델을 구현해보았습니다.
Polling-Publisher 모델을 활용하면 메시지 전송 신뢰성을 보장할 수 있지만,
지속적인 Polling으로 인해 데이터베이스에 심각한 부하를 줄 수 있다는 문제점도 내포하고 있습니다.
마지막 포스팅에서는 CDC(Change Data Capture)를 이용하여 DB에 부하를 주지 않고 변경된 데이터를 감지하여 메시지를 발송해주는 방식으로 개선해보도록 하겠습니다.
오늘도 읽어주셔서 감사합니다.
V2 코드 레포지토리
-> 이동
다음 포스팅으로 이동 -> 다음 포스팅으로 이동하기
잘 했다.
젊은 해병
네가 목숨을 걸고 만들어낸 용기있는 몇 초는 좋든 안 좋든 바로 지금 세계의 운명을 크게 바꿨다.
이 전쟁을... 끝내러 왔다.