운영하는 서비스가 커지고 다른 서비스와 통신이 많아지다보니 결합도가 올라가서 일부 비동기 통신이 가능한 부분에 대해서는 카프카 메시지 큐를 도입하여 서비스를 운영했습니다.
도입 시점에는 메시지 소비가 실패했을 경우 상황에 대비하여 재시도 토픽
을 두었으며, 설정한 재시도 횟수가 초과했을 경우에는 DLT(Dead letter topic)
에 해당 메시지를 보관하며, DLT 토픽에 담긴 메시지에 대해 에러 로깅 후 모니터링 하는 구조로 설정하여 운영 중입니다.
@RetryableTopic(
attempts = KafkaConsumerConstants.MAX_ATTEMPT_COUNT,
backoff = @Backoff(KafkaConsumerConstants.BACK_OFF_PERIOD),
dltStrategy = DltStrategy.FAIL_ON_ERROR,
listenerContainerFactory = "retryConcurrentFactory",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(
topics = KafkaConsumerTopics.TEST,
groupId = KafkaConsumerGroupIds.GROUP,
containerFactory = "concurrentFactory")
public void consume(@Payload ConsumeMessage consumeMessage) {
// ... 로직 생략
}
컨슈밍 로직을 작성할 때는 @KafkaListener
로 리스너를 작성했는데
초기 도입 시에는 @KafkaListener
로 작성한 리스너에 @RetryableTopic
으로 재시도 토픽을 명시했습니다.
그 이후 @RetryableTopic
옵션을 설정하려고 하면 수많은 리스너에 해당 어노테이션이 붙고 중복 코드가 방대해졌습니다.
/**
* Dlt에 메세지 쌓일 때 실패 로그 쌓음
*/
@DltHandler
public void dltHandler(ConsumerRecord<String, String> record,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage) {
log.error("received message='{}' with partitionId='{}', offset='{}', topic='{}'", record.value(), offset, partitionId, topic);
kafkaConsumerService.saveFailedMessage(topic, partitionId, offset, record.value(), errorMessage);
}
그리고 메서드로 DLT 토픽 소비 시 후처리를 위한 @DltHandler
를 매번 @KafkaListener
가 있는 클래스에 하나하나 메서드로 생성해주어야 합니다.
뭔가 기능이 동작하는데는 문제가 없지만, 매우 불편합니다,,
고민한 내용을 정리하면 다음과 같습니다.
@KafkaListener
, @RetryableTopic
의 중복 코드를 설정값으로 공통 적용 할 수 있는 방법이 없을까?@DltHandler
의 동작은 어느 토픽에서나 동일하게 동작하는 로직인데 클래스마다 작성하지 않고 한 곳에서 관리할 수 있는 방법이 없을까?정답은 항상 공식 문서에 있습니다.
공통 설정들이 별도로 관리되도록 바꿔봅시다.
@kafkaListener
의 경우 containerFactory를 지정 안했을때 기본적으로 kafkaListenerContainerFactory
라는 이름의 빈을 참조합니다.
카프카 설정할때 팩토리 빈 네이밍을 위의 kafkaListenerContainerFactory
로 지정하면 됩니다.
방법은 다음과 같습니다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory()); // 각 서비스 별 consumer 설정값
factory.setMessageConverter(new JsonMessageConverter(objectMapper)); // @Payload로 객체 파싱할 때 필요
return factory;
}
빈 네이밍이 kafkaListenerContainerFactory
로 잡히도록 설정해두면 됩니다.
@RetryableTopic
의 경우엔 어노테이션 말고 RetryTopicConfigurationBuilder
라는 설정 빌더 클래스를 제공합니다.
설정 방법은 다음과 같습니다.
@Bean
public RetryTopicConfiguration retryTopicConfig(KafkaTemplate<String, String> kafkaTemplate) throws NoSuchMethodException {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(REPLICA_COUNT, KafkaConsumerConstants.REPLICATION_FACTOR)
.maxAttempts(KafkaConsumerConstants.MAX_ATTEMPT_COUNT)
.fixedBackOff(KafkaConsumerConstants.BACK_OFF_PERIOD)
.listenerFactory(kafkaListenerContainerFactory())
.setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
.dltHandlerMethod(new EndpointHandlerMethod(ConsumerErrorsHandler.class, "postProcessDltMessage"))
.create(kafkaTemplate);
}
RetryTopicConfiguration 을 빈으로 등록하면 전역적으로 재시도 토픽과 DLT 토픽이 적용이 됩니다.
조금 주의해야할 사항은 listenerFactory 설정인데 @KafkaListener 의 factory 설정과 동일해야 MessageConversionException
을 방지할 수 있습니다.
dltHandlerMethod 설정시에는 클래스 정보와 메서드 이름을 넣으면 됩니다.
예시의 ConsumerErrorsHandler 클래스 내부는 아래와 같습니다.
public void postProcessDltMessage(ConsumerRecord<String, String> record,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
@Header(KafkaHeaders.OFFSET) Long offset,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String errorMessage,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
log.error("[DLT Log] received message='{}' with partitionId='{}', offset='{}', topic='{}', groupId='{}'", record.value(), partitionId, offset, topic, groupId);
kafkaConsumerService.saveFailedMessage(topic, partitionId, offset, record.value(), errorMessage);
}
주의할 점은 설정 이후 @DltHandler 어노테이션과 함께 사용 가능하나 RetryTopicConfigurationBuilder 의 dltHandlerMethod 가 우선 순위 더 높음에 주의!
@KafkaListener(
topics = KafkaConsumerTopics.TEST,
groupId = KafkaConsumerGroupIds.GROUP)
public void consume(@Payload ConsumeMessage consumeMessage) {
// ... 로직 생략
}
어떤 토픽을 소비할 컨슈머를 만들건지, 소비 그룹은 어떤 것인지에 대한 설정만 적용 해도 정상적으로 동작합니다.
https://docs.spring.io/spring-kafka/reference/html/#features
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html