컨슈머에서 메시지를 소비해서 처리하다가 에러가 발생시 같은 메시지를 재시도 하도록 Retry설정을 했다.
설정한 Retry횟수를 초과하면 Dead Letter Topic(DLT)로 이동하여 별도의 DLT컨슈머에서 처리를한다.
//Retry 컨슈머
@KafkaListener(topics = "retry-test", groupId = "retry-group-test")
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delay = 2000), // 초기 간격은 1초로 재시도 간격이 2배씩 증가
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
dltTopicSuffix = "-dead-t" // 기존 토픽 이름에서 -dead-t 접미사가 추가된 이름으로 Dead Letter Topic 생성
)
void achievementConsumer(Map<String, Object> eventMap) {
System.out.println("=================> Consumer 호출");
AchieveEventDto achieveEventDto = objectMapper.convertValue(eventMap, AchieveEventDto.class);
if(achieveEventDto.getAchievementTitle().equals("retry-test")) {
throw new RuntimeException("retry-test");
}
messageQueue.offer(eventMap);
}
// DLT 컨슈머
@KafkaListener(topics = "retry-test-dead-t", groupId = "retry-group-test")
void achievementConsumerDead(Map<String, Object> eventMap) {
System.out.println("Dead Letter Topic 메시지 도착: " + eventMap);
messageQueue.offer(eventMap); // 실패 메시지를 messageQueue에 넣어줌
}
@Test
@DisplayName("업적 메시지 전송 테스트")
void retryDLTConsumerTest() {
AchieveEventDto achieveEventDto = AchieveEventDto.builder()
.userId(UUID.fromString("aaaaaaaa-0bda-11f0-b183-cad3a17bbf53"))
.userMediaId("U08MRJYTBL1")
.achievementId(UUID.fromString("aaaaaaaa-bbbb-cccc-b183-cad3a17bbf53"))
.achievementTitle("retry-test")
.achievementDescription("test description")
.build();
kafkaTemplate.send("retry-test", achieveEventDto);
try {
Map<String, Object> poll = messageQueue.poll(10, TimeUnit.SECONDS);
AchieveEventDto pollDto = objectMapper.convertValue(poll, AchieveEventDto.class);
System.out.println("컨슈머 소비 : " + poll);
Assertions.assertThat(pollDto).usingRecursiveComparison().isEqualTo(achieveEventDto);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@RetryableTopic어노테이션 설정으로 Retry설정을 했다.
backoff = @Backoff(delay = 2000)= 2초마다 재시도backoff = @Backoff(delay = 2000, multiplier = 2)= 2, 4, 8초마다 재시도backoff = @Backoff(delay = 1000, maxDelay = 5000, random = true)= 1~5초 사이 랜덤한 간격으로 재시도TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE: {topic_name}-retry-0, {topic_name}-retry-1 등의 이름으로 토픽 생성TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE : {topic_name}-retry-1000, {topic_name}-retry-2000 등의 이름으로 토픽 생성kafka 컨슈머 테스트 코드인데 흐름은 retry-test라는 토픽을 프로듀서에서 보내면 해당 토픽을 받는 컨슈머(achievementConsumer)에서 이벤트를 소비하게 된다.
@RetryableTopic 어노테이션 설정으로 에러 발생시 5번의 Retry시도를 하도록 설정했다.
이 횟수를 초과하면 retry-test-dead-t라는 토픽을 발행하게 된다.
retry-test-dead-t 을 처리하는 컨슈머에서 이벤트를 처리한다.


3번의 재시도를 하면서 Retry컨슈머가 3번 실행이 되고, 초과하면 DLT토픽으로 보내게 된다.
그래서 DLT컨슈머에서 메시지를 받는 것을 확인할 수 있다.