Kafka DLT 메시지를 원본 토픽으로 재전송하는 방법

김기현·2024년 10월 17일
1

TL;DR

Kafka에서 DLT(Dead Letter Topic)에 쌓인 메시지를 원본 토픽으로 재전송하는 방법에는 두 가지가 있습니다:

1. **KafkaEndpointRegistry를 이용해 컨슈머를 정지시키고 오프셋을 초기화하는 방법**
2. **DLT를 원본 토픽으로 재생산하는 API를 구현하는 방법**

저희는 **두 번째 방법인 DLT를 원본 토픽으로 재생산하는 API 구현을 선택**했습니다. 이 방법은 서비스 중단 없이 필요한 메시지만 선택적으로 재처리할 수 있어 효율적이기 때문입니다.

서론

안녕하세요! 오늘은 KotlinSpring Kafka를 사용하는 환경에서 Dead Letter Topic(DLT)에 쌓인 메시지를 원본 토픽으로 재전송하는 방법에 대해 알아보겠습니다. Spring Boot 3.x 버전을 기준으로 설명드리며, 두 가지 방법의 장단점을 비교하고, 저희가 최종적으로 선택한 방법을 공유하고자 합니다.

Kafka를 운영하다 보면, 처리되지 못한 메시지들이 DLT에 쌓이는 경우가 있습니다. 이러한 메시지들을 다시 처리해야 할 필요가 있을 때, 어떻게 원본 토픽으로 재전송할 수 있을까요?


방법 1: KafkaEndpointRegistry를 이용한 컨슈머 정지 및 오프셋 초기화

구현 방법

  1. KafkaListenerEndpointRegistry를 주입받아 컨슈머를 정지 또는 일시 중지(pause) 시킵니다.

    @Autowired
    private lateinit var kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
    
    fun pauseConsumers() {
        kafkaListenerEndpointRegistry.listenerContainers.forEach { it.pause() }
    }
  2. KafkaAdminClient 또는 kafka-consumer-groups.sh 스크립트를 이용하여 해당 컨슈머 그룹의 오프셋을 초기화하거나 특정 위치로 이동시킵니다.

    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group your-consumer-group --reset-offsets --to-earliest --execute --all-topics
  3. 컨슈머를 다시 재개(resume) 하여 원본 토픽의 메시지를 재처리하도록 합니다.

    fun resumeConsumers() {
        kafkaListenerEndpointRegistry.listenerContainers.forEach { it.resume() }
    }

특징

  • 컨슈머를 일시적으로 일시 중지(pause) 시킨 후 오프셋을 조정하여 재처리합니다.
  • 서비스 중단 없이 운영할 수 있으며, 필요한 경우 특정 파티션이나 오프셋만 조정할 수 있습니다.
  • 별도의 메시지 이동 없이 원본 토픽의 메시지를 재소비합니다.

방법 2: DLT를 원본 토픽으로 재생산하는 API 구현

구현 방법

  1. DLT로부터 메시지를 읽어들이는 로직을 구현합니다.

    @KafkaListener(topics = ["dlt-topic"], groupId = "dlt-group")
    fun listenToDlt(record: ConsumerRecord<String, String>) {
        // DLT 메시지 처리 로직
    }
  2. 읽어들인 메시지를 원본 토픽으로 재생산하는 API를 제공합니다.

    @Autowired
    private lateinit var kafkaTemplate: KafkaTemplate<String, String>
    
    fun resendToOriginTopic(record: ConsumerRecord<String, String>) {
        kafkaTemplate.send("origin-topic", record.key(), record.value())
    }
  3. 필요한 경우 REST API로 노출하여 특정 조건에 맞는 메시지만 재전송할 수 있도록 합니다.

    @RestController
    class DltController {
    
        @PostMapping("/resend")
        fun resend(@RequestBody request: ResendRequest) {
            // 재전송 로직
        }
    }

특징

  • DLT에 쌓인 특정 메시지만 선택적으로 원본 토픽으로 재전송합니다.
  • 기존 컨슈머 로직에 영향 없이 개별 메시지를 재처리할 수 있습니다.
  • 서비스 중단 없이 운영 가능합니다.

장단점 비교

방법 1의 장단점

장점:

  • 서비스 중단 없이 모든 메시지를 일괄적으로 재처리할 수 있습니다.
  • 추가적인 API 개발이 필요 없습니다.
  • 오프셋을 조정하여 필요한 범위의 메시지만 재처리할 수 있습니다.

단점:

  • 오프셋을 초기화하면 이미 처리된 메시지도 다시 처리되어 중복 처리의 위험이 있습니다.
  • 오프셋 관리에 대한 이해와 주의가 필요합니다.
  • 모든 컨슈머에 대해 일괄 적용되므로, 세분화된 제어가 어렵습니다.

방법 2의 장단점

장점:

  • 필요한 메시지만 선택적으로 재처리할 수 있어 효율적입니다.
  • 기존 컨슈머 로직에 영향을 주지 않습니다.
  • 서비스 중단 없이 운영 가능합니다.

단점:

  • 추가적인 API 및 로직 구현이 필요하여 개발 비용이 증가합니다.
  • 메시지를 재생산하는 과정에서 메시지 순서일관성에 문제가 발생할 수 있습니다.
  • 보안 및 접근 제어에 대한 추가 고려가 필요합니다.

결론

저희는 방법 2인 DLT를 원본 토픽으로 재생산하는 API 구현을 선택했습니다. 그 이유는 다음과 같습니다:

  • 특정 메시지만 선택적으로 재처리할 수 있어 효율적입니다.
  • 기존 컨슈머 로직과 오프셋에 영향을 주지 않아 안정성을 확보할 수 있습니다.
  • 서비스 중단 없이 운영할 수 있어 가용성을 높일 수 있습니다.

방법 1도 서비스 중단 없이 수행할 수 있지만, 오프셋을 조정하는 과정에서 중복 처리의 위험이 있고, 모든 메시지를 재처리해야 하는 경우에만 적합합니다. 반면, 저희는 특정 조건의 메시지만 재처리하고자 했기 때문에 방법 2가 더 적합하다고 판단했습니다.

각 방법의 장단점을 고려하여 서비스의 특성요구사항에 맞는 최적의 방법을 선택하시기 바랍니다.

감사합니다!

0개의 댓글