Kafka Consumer @RetryableTopic

dh·2025년 4월 21일
post-thumbnail

컨슈머에서 메시지를 소비해서 처리하다가 에러가 발생시 같은 메시지를 재시도 하도록 Retry설정을 했다.

설정한 Retry횟수를 초과하면 Dead Letter Topic(DLT)로 이동하여 별도의 DLT컨슈머에서 처리를한다.

//Retry 컨슈머
	@KafkaListener(topics = "retry-test", groupId = "retry-group-test")
    @RetryableTopic(
            attempts = "3",
            backoff = @Backoff(delay = 2000), // 초기 간격은 1초로 재시도 간격이 2배씩 증가
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
            dltTopicSuffix = "-dead-t" // 기존 토픽 이름에서 -dead-t 접미사가 추가된 이름으로 Dead Letter Topic 생성
    )
    void achievementConsumer(Map<String, Object> eventMap) {
        System.out.println("=================> Consumer 호출");

        AchieveEventDto achieveEventDto = objectMapper.convertValue(eventMap, AchieveEventDto.class);
        if(achieveEventDto.getAchievementTitle().equals("retry-test")) {
            throw new RuntimeException("retry-test");
        }
        messageQueue.offer(eventMap);
    }

// DLT 컨슈머
    @KafkaListener(topics = "retry-test-dead-t", groupId = "retry-group-test")
    void achievementConsumerDead(Map<String, Object> eventMap) {
        System.out.println("Dead Letter Topic 메시지 도착: " + eventMap);
        messageQueue.offer(eventMap); // 실패 메시지를 messageQueue에 넣어줌
    }

    @Test
    @DisplayName("업적 메시지 전송 테스트")
    void retryDLTConsumerTest() {
        AchieveEventDto achieveEventDto = AchieveEventDto.builder()
                .userId(UUID.fromString("aaaaaaaa-0bda-11f0-b183-cad3a17bbf53"))
                .userMediaId("U08MRJYTBL1")
                .achievementId(UUID.fromString("aaaaaaaa-bbbb-cccc-b183-cad3a17bbf53"))
                .achievementTitle("retry-test")
                .achievementDescription("test description")
                .build();
        kafkaTemplate.send("retry-test", achieveEventDto);

        try {
            Map<String, Object> poll = messageQueue.poll(10, TimeUnit.SECONDS);
            AchieveEventDto pollDto = objectMapper.convertValue(poll, AchieveEventDto.class);
            System.out.println("컨슈머 소비 : " + poll);

            Assertions.assertThat(pollDto).usingRecursiveComparison().isEqualTo(achieveEventDto);

        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

@RetryableTopic어노테이션 설정으로 Retry설정을 했다.

  • attempts : 재시도 작업의 반복 횟수 결정. 첫 번째 시도도 포함되고 기본값은 3.
  • backoff : 재시도 사이의 예상 시간을 결정.
    • backoff = @Backoff(delay = 2000)= 2초마다 재시도
    • backoff = @Backoff(delay = 2000, multiplier = 2)= 2, 4, 8초마다 재시도
    • backoff = @Backoff(delay = 1000, maxDelay = 5000, random = true)= 1~5초 사이 랜덤한 간격으로 재시도
  • topicSuffixingStrategy :
    • TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE: {topic_name}-retry-0, {topic_name}-retry-1 등의 이름으로 토픽 생성
    • TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE : {topic_name}-retry-1000, {topic_name}-retry-2000 등의 이름으로 토픽 생성
  • dltTopicSuffix :
    • {topic_name}{suffix}의 이름으로 DLT토픽 생성. 위 코드는 retry-test-dead-t라는 이름의 토픽 생성

전체흐름

kafka 컨슈머 테스트 코드인데 흐름은 retry-test라는 토픽을 프로듀서에서 보내면 해당 토픽을 받는 컨슈머(achievementConsumer)에서 이벤트를 소비하게 된다.

@RetryableTopic 어노테이션 설정으로 에러 발생시 5번의 Retry시도를 하도록 설정했다.

이 횟수를 초과하면 retry-test-dead-t라는 토픽을 발행하게 된다.

retry-test-dead-t 을 처리하는 컨슈머에서 이벤트를 처리한다.

3번의 재시도를 하면서 Retry컨슈머가 3번 실행이 되고, 초과하면 DLT토픽으로 보내게 된다.

그래서 DLT컨슈머에서 메시지를 받는 것을 확인할 수 있다.

0개의 댓글