[kafka] dead letter topic 사용기

김지원·2024년 2월 3일

kafka

목록 보기
1/2

서론

카프카 혹은 SQS에는 dead letter queue라는 개념이 존재한다.
수신부 측에서 데이터 적합성을 책임지다보니 소비하는 측에서 실패했을 경우에 대한 고민이 항상 많아진다.
그래서 나온 개념이라고 생각하며 회사에서 컨슘했을 때 성공 혹은 실패했을 경우에 무조건적으로 실행해야되는 작업이 있어 공부하고 적용하게 되었다.
또한 dlt를 사용하면 실패한 이벤트가 무엇인지 트래킹도 할 수 있어 좋은 방향이라 생각했다.

결론적으로 토픽을 하나 더 생성해야되는 것이기 때문에 잘 고민해봐야한다.

실패된 이벤트에 대한 처리에 대한 고민

  1. Retry 전략
    • retry를 통해 실패된 이벤트를 재시도
  2. 에러 핸들러
  3. dead letter queue

총 3가지에 대한 고민을 할 수 있을 것 같다.
필자는 다 필요하다. (욕심 그득)

컨슘 설정

  • 현재 고민하게 된 상황의 설정

acksMode MANUAL_IMMEDIATE

  • Acknowledgement.acknowledge() 로 즉시 커밋하도록 설정해두었다.

enable-auto-commit false

  • acksMode가 MANUAL_IMMEDIATE 이기 때문에 당연히 false
import org.springframework.kafka.support.Acknowledgment
import org.springframework.kafka.annotation.KafkaListener

@KafkaListener(
	topics = ["topic"],
    containerFactory = Config.KAFKA_CONSUMER_FACTORY  // 설정해둔 컨슈머 팩토리
    concurrency = "1"
)
fun comsume (@Payload consumeResponse: Event<DataList>, acknowledgement: Acknowledgment) {
  try {
	// 처리해야 될 작업
  } catch (err: Exception) {
  	log.error(err)
  } finally {
  	acknowledgement.acknowledge()
  }
}

현재는 contanierFactory안에 있는 errorHandler에서 재처리만 시도하는 방식으로 진행하고 있었다.

사용하는 부분


factory.setCommonErrorHandler(KafkaErrorHandler())
...


@Bean(name = "kafkaErrorHandler")
fun KafkaErrorHandler(): DefaultErrorHandler = KafkaConsumeErrorHandler()

에러 핸들러 부분

class KafkaConsumeErrorHandler: DefaultErrorHandler (
	{ record, exception -> 
    	// 실패한 이벤트에 대한 record로 실행할 작업
        // 기본적으로 로그를 넣음
    },
    FixedBackOff(1000L, 2L)
)

이렇게 되면 실패하게 되면 1초에 2번 정도 시도한 후에 catch에서 에러를 잡아 처리한 후 acknowledge를 통해 commit하게 된다.

현재 설정의 문제점 🥲

여기서 2번 시도했는데 실패하면? commit은 이루어져있으니 실패한 값은 무시한 채 끝날 것이다.

잘가..

잘가..

방법 1. 기본으로 제공해주는 설정 사용

그럼 안되니 스프링 카프카에서 제공하는 dlt 기본 설정을 사용해보고자 했다.
https://docs.spring.io/spring-kafka/reference/retrytopic/dlt-strategies.html

  1. 어노테이션으로 retry 토픽을 적용하는 방법
@RetryableTopic(
	attempts = KafkaConsumerConstants.MAX_ATTEMPT_COUNT,
	backoff = @Backoff(KafkaConsumerConstants.BACK_OFF_PERIOD),
	dltStrategy = DltStrategy.FAIL_ON_ERROR,
	listenerContainerFactory = "retryConcurrentFactory",
	topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(
	topics = ["topic"],
    containerFactory = Config.KAFKA_CONSUMER_FACTORY  // 설정해둔 컨슈머 팩토리
    concurrency = "1"
)
fun comsume (@Payload consumeResponse: Event<DataList>, acknowledgement: Acknowledgment) {
...
  1. 에러핸들러에 추가하는 방법
factory.setCommonErrorHandler(
	DefaultErrorHandler(
		DeadLetterPublishingRecoverer(
    		producerKafkaTemplate, 
    		FixedBackOff(1000L, 2L)
    	)
	)
)

여기서 문제점이 여러개가 존재했다.
(필자만 그럴 수 있음 주의)

1. key가 null일 때 실패 후 무한 에러

  • 기본적으로 key가 nullable하지 않은 것은 제공하지 않는다.
    그래서 key가 nullable하다면 무한 에러가 발생했다.
    (다른 옵션이 있을 수 있다)

2. 직렬화 역직렬화 문제

  • 필자는 value에 대한 직렬화를 Json으로 사용하고 있었다.
  • 하지만 value가 들어있는 record.value() 같은 경우 Any 값을 가지고 있기 때문에 직렬화 문제가 있었다.

그래서 코드는 심플하지만 고려할 점이 더 많아지는 단점이 있었다.

방법 2. DLT 빌더 접근 방식으로 사용

잘 정리하신 블로그에 잘 나와있어 이것으로 대체해도 될 것 같다.

RetryTopicConfigurationBuilder를 사용하는 방법이다.

필자는 dltHandlerMethod의 사용 방법을 아직 잘 이해하지 못했지만 여러 컨슈머에 대한 dlt 설정을 할 수 있어 좋아보였다.
하지만 위에서 말했던 직렬화 역직렬화 문제가 있어 이 방법도 어렵게 느껴졌다.

생각해보자.. 🤔

내가 사용하려는 이유

  1. 실패든 성공이든 필요한 작업을 통해 데이터의 적합성을 맞춰줘야한다.
  2. 실패한 이벤트를 저장해 나중에 확인하거나 나중에 처리해주어야한다.
  3. non-blocking 작업으로 진행해서 현재의 상황에 무리를 주지 말아야 한다.

그럼 retry는 기존 에러 핸들러에서 작업하고 dlt용 producer를 두어 dlt처럼 사용하면 되지 않을까? 라는 결론에 들게 되었다.

결론! dlt용 producer를 만들어 위의 코드에서 catch 부분에 넣어준다.

지금 내 지식으로는 이 방법이 제일 입맛에 맞게 처리할 수 있는 방법으로 느껴졌고 이런식으로 작업을 진행하였다.

주의할 점 🚨

삽질 일기 🔨

  • 에러를 내기 위해 try 부분에서 throw UnKnownError("error")를 사용했다가 계속 dlt가 실행이 안되는 이슈가 발생했다.
    • catch가 Exception이기 때문에 더 최상단 개념인 Error를 catch하지 못하고 commit만 진행한 것이다ㅠ
    • 이 내용은 또 다음에 정리해봐야겠다.

참고 글

profile
backend-developer

0개의 댓글