카프카 혹은 SQS에는 dead letter queue라는 개념이 존재한다.
수신부 측에서 데이터 적합성을 책임지다보니 소비하는 측에서 실패했을 경우에 대한 고민이 항상 많아진다.
그래서 나온 개념이라고 생각하며 회사에서 컨슘했을 때 성공 혹은 실패했을 경우에 무조건적으로 실행해야되는 작업이 있어 공부하고 적용하게 되었다.
또한 dlt를 사용하면 실패한 이벤트가 무엇인지 트래킹도 할 수 있어 좋은 방향이라 생각했다.
결론적으로 토픽을 하나 더 생성해야되는 것이기 때문에 잘 고민해봐야한다.
총 3가지에 대한 고민을 할 수 있을 것 같다.
필자는 다 필요하다. (욕심 그득)
acksMode MANUAL_IMMEDIATE
enable-auto-commit false
import org.springframework.kafka.support.Acknowledgment
import org.springframework.kafka.annotation.KafkaListener
@KafkaListener(
topics = ["topic"],
containerFactory = Config.KAFKA_CONSUMER_FACTORY // 설정해둔 컨슈머 팩토리
concurrency = "1"
)
fun comsume (@Payload consumeResponse: Event<DataList>, acknowledgement: Acknowledgment) {
try {
// 처리해야 될 작업
} catch (err: Exception) {
log.error(err)
} finally {
acknowledgement.acknowledge()
}
}
현재는 contanierFactory안에 있는 errorHandler에서 재처리만 시도하는 방식으로 진행하고 있었다.
사용하는 부분
factory.setCommonErrorHandler(KafkaErrorHandler())
...
@Bean(name = "kafkaErrorHandler")
fun KafkaErrorHandler(): DefaultErrorHandler = KafkaConsumeErrorHandler()
에러 핸들러 부분
class KafkaConsumeErrorHandler: DefaultErrorHandler (
{ record, exception ->
// 실패한 이벤트에 대한 record로 실행할 작업
// 기본적으로 로그를 넣음
},
FixedBackOff(1000L, 2L)
)
이렇게 되면 실패하게 되면 1초에 2번 정도 시도한 후에 catch에서 에러를 잡아 처리한 후 acknowledge를 통해 commit하게 된다.
여기서 2번 시도했는데 실패하면? commit은 이루어져있으니 실패한 값은 무시한 채 끝날 것이다.

잘가..
그럼 안되니 스프링 카프카에서 제공하는 dlt 기본 설정을 사용해보고자 했다.
https://docs.spring.io/spring-kafka/reference/retrytopic/dlt-strategies.html
@RetryableTopic(
attempts = KafkaConsumerConstants.MAX_ATTEMPT_COUNT,
backoff = @Backoff(KafkaConsumerConstants.BACK_OFF_PERIOD),
dltStrategy = DltStrategy.FAIL_ON_ERROR,
listenerContainerFactory = "retryConcurrentFactory",
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE
)
@KafkaListener(
topics = ["topic"],
containerFactory = Config.KAFKA_CONSUMER_FACTORY // 설정해둔 컨슈머 팩토리
concurrency = "1"
)
fun comsume (@Payload consumeResponse: Event<DataList>, acknowledgement: Acknowledgment) {
...
factory.setCommonErrorHandler(
DefaultErrorHandler(
DeadLetterPublishingRecoverer(
producerKafkaTemplate,
FixedBackOff(1000L, 2L)
)
)
)
여기서 문제점이 여러개가 존재했다.
(필자만 그럴 수 있음 주의)
그래서 코드는 심플하지만 고려할 점이 더 많아지는 단점이 있었다.
잘 정리하신 블로그에 잘 나와있어 이것으로 대체해도 될 것 같다.
RetryTopicConfigurationBuilder를 사용하는 방법이다.
필자는 dltHandlerMethod의 사용 방법을 아직 잘 이해하지 못했지만 여러 컨슈머에 대한 dlt 설정을 할 수 있어 좋아보였다.
하지만 위에서 말했던 직렬화 역직렬화 문제가 있어 이 방법도 어렵게 느껴졌다.
내가 사용하려는 이유
그럼 retry는 기존 에러 핸들러에서 작업하고 dlt용 producer를 두어 dlt처럼 사용하면 되지 않을까? 라는 결론에 들게 되었다.
결론! dlt용 producer를 만들어 위의 코드에서 catch 부분에 넣어준다.
지금 내 지식으로는 이 방법이 제일 입맛에 맞게 처리할 수 있는 방법으로 느껴졌고 이런식으로 작업을 진행하였다.
throw UnKnownError("error")를 사용했다가 계속 dlt가 실행이 안되는 이슈가 발생했다.