Kafka 재시도, DLT 빌더 접근 방식으로 리팩토링

wwlee94·2022년 12월 2일
2

리팩토링

목록 보기
1/1
post-thumbnail

카프카 도입

운영하는 서비스가 커지고 다른 서비스와 통신이 많아지다보니 결합도가 올라가서 일부 비동기 통신이 가능한 부분에 대해서는 카프카 메시지 큐를 도입하여 서비스를 운영했다.

도입 시점에는 메시지 소비가 실패했을 경우 상황에 대비하여 재시도 토픽을 두었으며, 설정한 재시도 횟수가 초과했을 경우에는 DLT(Dead letter topic)에 해당 메시지를 보관하며, DLT 토픽에 담긴 메시지에 대해 에러 로깅 후 모니터링 하는 구조로 설정하여 운영 중이다.

카프카 도입 이후 boilerplate에 대한 고민

    @RetryableTopic(
            attempts = KafkaConsumerConstants.MAX_ATTEMPT_COUNT,
            backoff = @Backoff(KafkaConsumerConstants.BACK_OFF_PERIOD),
            dltStrategy = DltStrategy.FAIL_ON_ERROR,
            listenerContainerFactory = "retryConcurrentFactory",
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
    @KafkaListener(
            topics = KafkaConsumerTopics.TEST,
            groupId = KafkaConsumerGroupIds.GROUP,
            containerFactory = "concurrentFactory")
    public void consume(@Payload ConsumeMessage consumeMessage) {
        // ... 로직 생략
    }

컨슈밍 로직을 작성할때는 @KafkaListener 로 리스너를 작성했는데
초기 도입시에는 @KafkaListener로 작성한 리스너에 @RetryableTopic 으로 재시도 토픽을 명시했다.
그 이후 @RetryableTopic 옵션을 설정하려고 하면 수많은 리스너에 해당 어노테이션이 붙고 중복 코드가 방대해졌다.

    /**
     * Dlt에 메세지 쌓일 때 실패 로그 쌓음
     */
    @DltHandler
    public void dltHandler(ConsumerRecord<String, String> record,
                           @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                           @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
                           @Header(KafkaHeaders.OFFSET) Long offset,
                           @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {
        log.error("received message='{}' with partitionId='{}', offset='{}', topic='{}'", record.value(), offset, partitionId, topic);
        kafkaConsumerService.saveFailedMessage(topic, partitionId, offset, record.value(), errorMessage);
    }

그리고 메서드로 DLT 토픽 소비시 후처리를 위한 @DltHandler 를 매번 @KafkaListener 가 있는 클래스에 하나하나 메서드로 생성해주어야 한다.

뭔가 기능이 동작하는데는 문제가 없지만, 매우 불편하다.

고민한 내용을 정리하면 다음과 같다.
1. @KafkaListener, @RetryableTopic중복 코드를 설정값으로 공통 적용 할 수 있는 방법이 없을까?
2. @DltHandler 의 동작은 어느 토픽에서나 동일하게 동작하는 로직인데 클래스마다 작성하지 않고 한 곳에서 관리할 수 있는 방법이 없을까?

해결 방법

정답은 항상 공식 문서에 있다

공통 설정 관리

공통 설정들이 별도로 관리되도록 바꿔보자.

containerFactory 옵션 지우기

@kafkaListener 의 경우 containerFactory를 지정 안했을때 기본적으로 kafkaListenerContainerFactory 라는 이름의 빈을 참조한다.

카프카 설정할때 팩토리 빈 네이밍을 위의 kafkaListenerContainerFactory 로 지정하면 된다.

방법은 다음과 같다.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory()); // 각 서비스 별 consumer 설정값
        factory.setMessageConverter(new JsonMessageConverter(objectMapper)); // @Payload로 객체 파싱할 때 필요
        return factory;
    }

빈 네이밍이 kafkaListenerContainerFactory 로 잡히도록 설정해두면 된다.

@RetryableTopic 설정 & 모든 클래스에 @DltHandler 메서드 지우기

@RetryableTopic 의 경우엔 어노테이션 말고 RetryTopicConfigurationBuilder 라는 설정 빌더 클래스를 제공한다.

설정 방법은 다음과 같다.

    @Bean
    public RetryTopicConfiguration retryTopicConfig(KafkaTemplate<String, String> kafkaTemplate) throws NoSuchMethodException {
        return RetryTopicConfigurationBuilder
                .newInstance()
                .autoCreateTopicsWith(REPLICA_COUNT, KafkaConsumerConstants.REPLICATION_FACTOR)
                .maxAttempts(KafkaConsumerConstants.MAX_ATTEMPT_COUNT)
                .fixedBackOff(KafkaConsumerConstants.BACK_OFF_PERIOD)
                .listenerFactory(kafkaListenerContainerFactory())
                .setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
                .dltHandlerMethod(new EndpointHandlerMethod(ConsumerErrorsHandler.class, "postProcessDltMessage"))
                .create(kafkaTemplate);
    }

RetryTopicConfiguration 을 빈으로 등록하면 전역적으로 재시도 토픽과 DLT 토픽이 적용이 된다.
조금 주의해야할 사항은 listenerFactory 설정인데 @KafkaListener 의 factory 설정과 동일해야 MessageConversionException 을 방지할 수 있다.

dltHandlerMethod 설정시에는 클래스 정보와 메서드 이름을 넣으면 된다.
예시의 ConsumerErrorsHandler 클래스 내부는 아래와 같다.

  • @DltHandler 에서 사용한 메서드 그대로를 넣었다.
    public void postProcessDltMessage(ConsumerRecord<String, String> record,
                                      @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
                                      @Header(KafkaHeaders.OFFSET) Long offset,
                                      @Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage,
                                      @Header(KafkaHeaders.GROUP_ID) String groupId) {
        log.error("[DLT Log] received message='{}' with partitionId='{}', offset='{}', topic='{}', groupId='{}'", record.value(), partitionId, offset, topic, groupId);
        kafkaConsumerService.saveFailedMessage(topic, partitionId, offset, record.value(), errorMessage);
    }

주의할 점은 설정 이후 @DltHandler 어노테이션과 함께 사용 가능하나 RetryTopicConfigurationBuilder 의 dltHandlerMethod 가 우선 순위 더 높음에 주의하자.

결과

    @KafkaListener(
            topics = KafkaConsumerTopics.TEST,
            groupId = KafkaConsumerGroupIds.GROUP)
    public void consume(@Payload ConsumeMessage consumeMessage) {
        // ... 로직 생략
    }

어떤 토픽을 소비할 컨슈머를 만들건지, 소비 그룹은 어떤 것인지에 대한 설정만 적용 해도 정상적으로 동작한다.

레퍼런스

https://docs.spring.io/spring-kafka/reference/html/#features

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html

profile
우엉이의 개발 블로그 📝

0개의 댓글