
FCM 푸시 알림간 카프카를 최근에 사용하는 일이 있었다.
이 때 네트워크 문제나 기타 문제 발생 시 이를 DLQ(Dead Letter Queue) 또는 DLT(Dead Letter Topic
를 통해서 실패 메세지를 재처리 하고 해당 실패 메세지가 다른 메세지 처리에 영향을 주지 않도록 하였다.
이번 글에서는 DLQ의 개념과 Spring에서 설정하는 방법, 그리고 내가 사용한 DLQ의 방식이 최선 이었는지에 대해서 이야기 해보고자 한다.

DLQ(Dead Letter Queue) 는 메시지 처리 실패 시, 정상적인 큐와 분리하여 처리할 수 없는 메시지를 저장하는 메커니즘이다.
DLT(Dead Letter Topic 라고도 부르는 이유는 Consumer가 정상적으로 처리하지 못한 메시지를 따로 저장하는 하나의 토픽 이기도 하기 때문이다.
카프카에서 DLQ를 통해서 시스템의 안정성을 높이고, 실패한 메시지를 추적하여 문제 해결에 도움을 줄 수 있다.

실패한 메세지를 왜 따로 저장을 하여 재처리를 해야할까?
실패한 메세지가 FCM 푸시 알림일 경우에는 중요도가 떨어져 보일 수도 있지만, 은행의 송금이라고 생각을 해보자.
돈이라고만 생각을 해도, 충분히 이해가 되었을 것이라고 생각한다.
추가로 Spring-kafka를 통해서 Consumer를 구현하고, 이 Consumer에서 메시지를 처리하는 과정에서 오류가 발생하면 기본 설정으로 최초 요청을 포함해 최대 10회까지 재시도 한다.
최대 10회까지 재시도하고, 모든 재시도가 실패하면 그 때는 해당 메세지를 스킵 하게 된다.
아래 글에서 Spring-kafka에서 어떤 코드를 통해서 초기값 10번이 지정 된 것인지 잘 나와있다.
https://lsj8367.tistory.com/entry/Kafka가-내-로직을-9번이나-재시도를-했다
더 자세한 것은 아래 공식 문서를 참고하자.
https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#default-eh
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServerIp;
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> template) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
template, (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition())
);
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(5);
backOff.setInitialInterval(Duration.ofSeconds(1).toMillis());
backOff.setMultiplier(2.0);
backOff.setMaxInterval(Duration.ofSeconds(10).toMillis());
return new DefaultErrorHandler(recoverer, backOff);
}
@Bean
public ConsumerFactory<String, MessageDTO> messageDtoConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerIp);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "push-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, MessageDTO.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageDTO> messageDtoKafkaListenerContainerFactory(
ConsumerFactory<String, MessageDTO> cf,
DefaultErrorHandler errorHandler
) {
ConcurrentKafkaListenerContainerFactory<String, MessageDTO> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
위 코드는 컨슈머와 DLT 관련 설정을 기입한 소스 코드이다.
이 때 각 Bean 등록 메서드를 상세히 살펴보고, 각 객체가 어떠한 역할을 수행 하는지 살펴보자.
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> pf) {
return new KafkaTemplate<>(pf);
}

KafkaTemplate는 Kafka로 메시지를 프로듀싱(보내기) 할 때 사용되는 클래스이다.
카프카 프로듀서에서는 아래와 같이 kafkaTemplate 객체를 통해서 메시지를 카프카로 프로듀싱 하게 된다.
@Override
@EventListener
@Async
@TransactionalEventListener(phase = AFTER_COMMIT)
public void handlePushAlarmEvent(MessageDTO dto) {
kafkaTemplate.send("push-notification-topic", dto)
.addCallback(
success -> log.info("Message sent successfully: {}", success.getRecordMetadata()),
failure -> log.error("Failed to send message", failure)
);
}

KafkaTemplate 빈 등록시 인자로 주입받는 ProducerFactory는 Kafka 메시지를 보낼 수 있는 객체를 생성하는 역할을 한다.
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<String, Object> template) {
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
template, (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition())
);
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(5);
backOff.setInitialInterval(Duration.ofSeconds(1).toMillis());
backOff.setMultiplier(2.0);
backOff.setMaxInterval(Duration.ofSeconds(10).toMillis());
return new DefaultErrorHandler(recoverer, backOff);
}

DefaultErrorHandler는 이전에 사용되던 SeekToCurrentErrorHandler, RecoveringBatchErrorHandler 를 대체하는 새로운 에러 핸들러이다.
Kafka에서 메시지 처리 중 예외가 발생했을 때, 재시도 / 복구 / 스킵 처리를 담당한다.
DefaultErrorHandler는 ConsumerRecordRecoverer와 BackOff 객체를 인자로 받는다.

DeadLetterPublishingRecoverer는 ConsumerRecordRecoverer를 상속 받은 ConsumerAwareRecordRecoverer의 구현체이다.
DeadLetterPublishingRecoverer는 메시지 처리 중 예외가 발생 했을 때 재시도 이후에도 실패한 메세지를 어떻게 처리할 것인지를 정의하는 인터페이스이다.
해당 로직에서는 메시지 처리 중 예외 발생 시, 해당 메시지를 .DLT suffix가 붙은 토픽으로 보내는 역할을 수행한다.

ExponentialBackOffWithMaxRetries는 BackOff를 구현한 ExponentialBackOff를 상속 받은 클래스이다.
ExponentialBackOffWithMaxRetries는 재시도 횟수와 지연 전략을 제어 하는 역할을 한다.
ExponentialBackOffWithMaxRetries 는 백오프 전략에 대한 값들을 주입 받게 된다.
백오프 전략이란 오류 발생 시 재시도간 시간 간격을 조절하여서 시스템 안정성을 높이는 방식을 의미한다.
재시도간 시간 간격을 조절 하는 이유는 반복 실패를 줄이고, 재시도 시도 간격을 넓혀서 시스템이 회복할 시간을 확보할 수 있기 때문이다.
백오프 전략에는 다음과 같은 종류가 있다.
고정(Fixed) 백오프 : 실패 후 항상 일정 시간 지연
ex : 3초 후, 3초 후, 3초 후…
지수(Exponential) 백오프 : 점점 증가하는 지연 시간.
ex : 1초 → 2초 → 4초 → 8초…
랜덤(Random) 백오프 : 랜덤값 만큼 지연 시간
ex : 1.2초 → 2.8초 → 3.9초 등
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(5); // 최대 재시도 5회
backOff.setInitialInterval(1000); // 최초 재시도 지연 시간: 1초
backOff.setMultiplier(2.0); // 배수: 2배씩 증가
backOff.setMaxInterval(10000); // 최대 지연 시간: 10초
현재 위 코드는 지수 백오프이고, 1초 → 2초 → 4초 → 8초 → 10초 최대 5회까지 재시도하는 설정이다.
@Bean
public ConsumerFactory<String, MessageDTO> messageDtoConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerIp);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "push-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, MessageDTO.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}

ConsumerFactory에서는 Kafka Consumer 설정을 위한 기본 속성들을 설정한다.
이번 글에서는 컨슈머에 대한 글은 아니기에, 디테일한 설정들은 따로 설명 하지는 않겠다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageDTO> messageDtoKafkaListenerContainerFactory(
ConsumerFactory<String, MessageDTO> cf,
DefaultErrorHandler errorHandler
) {
ConcurrentKafkaListenerContainerFactory<String, MessageDTO> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
ConcurrentKafkaListenerContainerFactory는 실제 메시지 소비를 담당할 @KafkaListener에 주입 될 Consumer와 에러 핸들러를 지정 할 수 있다.
@Service
public class PushAlarmConsumerService {
@KafkaListener(topics = "push-notification-topic", groupId = "push-consumer-group", containerFactory = "messageDtoKafkaListenerContainerFactory")
public void consume(MessageDTO message) {
// 푸시 알림 로직 수행...
}
@KafkaListener(
topics = "push-notification-topic.DLT",
groupId = "push-consumer-group-dlt",
containerFactory = "messageDtoKafkaListenerContainerFactory"
)
public void listenDlq(ConsumerRecord<String, MessageDTO> messageRecord) throws FirebaseMessagingException {
// 푸시 알림 로직 수행...
try {
log.error("Failed to send FCM message: {}", e.getMessage(), e);
} catch (Exception e) {
log.error("still same problem in DLQ: {}", e.getMessage(), e);
}
}
}
위 코드를 살펴보자.
위 코드에서는 FCM 푸시 알림을 발송 하기 위해서 컨슈머에서 메시지를 처리 하다가 에러가 발생한 메시지는 DLQ에 넣어준다.
이 때 현재 코드에서는 사실상 DLQ에서는 실패한 시도를 한번 더 해주는 것 밖에는 역할이 없다.
그렇기에 실질적으로 DLQ를 본연의 목적(실패 한 메세지를 확실하게 재처리)으로 사용 하기 보다는 단순한 재처리 횟수 += 1 를 수행하는 역할에 그치게 되었다.

FCM 알림 발송간에 문제가 발생 했을 때는 대부분의 문제가 네트워크 또는 FCM 서버의 문제, 즉 일시적인 문제일 가능성이 제일 크다.
그렇기에 내 로직과 같이 단순히 DLQ를 재시도 처리만 하는 것은 크게 의미가 없다.
1초도 안되는 작은 시간 사이에 네트워크나 FCM 서버의 문제가 해결 될 가능성은 적기 때문이다.
그렇다면 문제가 해결 될 때까지 컨슈머에서 에러가 발생한 메시지들을 DLQ로 넣은 뒤 DLQ Consumer를 On Off 하여 복구되는 경우 Consumer를 On 하는 방식이 있다.
@Component
@Requiredargsconstructor
public class ConsumerController {
private final KafkaListenerEndpointRegistry registry;
/**
* 특정 컨슈머 시작
*/
public void startConsumer(String listenerId) {
var container = registry.getListenerContainer(listenerId);
if (container != null && !container.isRunning()) {
container.start();
System.out.println(listenerId + " started");
}
}
/**
* 특정 컨슈머 중지
*/
public void stopConsumer(String listenerId) {
var container = registry.getListenerContainer(listenerId);
if (container != null && container.isRunning()) {
container.stop();
System.out.println(listenerId + " stopped");
}
}
}ㅌㅈ
이렇게 된다면 확실하게 DLQ의 메시지들이 네트워크 및 FCM 서버 등의 에러가 해결 된 후에 소비 될 수 있게 할 수 있다.
컨슈머를 on/off 하는 방식 외에도 DLT로 간 메시지를 DB에 저장하거나 S3에 보관하여 추후 분석 및 재처리를 할 수 있도록 보존하는 방식도 있다.