
이전에 Exchange를 종류대로 사용하는법을 설명하고 공부하면서 RabbitMQ와 같이 실시간 메세지 브로커에서 과연 연결이 안되는 상황에는 어떤걸 사용해야 하는가에 대해 생각해본 적이 있습니다. 다행히 관련해서 자료를 찾아보니 메세지가 안보내지는 상황에 어떻게 해야하는지 방법이 있었고 그부분을 공부하고 공유하고자 이렇게 글을 작성하게 되었습니다. 그리고 겸사겸사 TTL이 무엇이고 RabbitMQ에서는 이걸 어떻게 사용하는지 알아보는 시간을 가져보겠습니다.
https://velog.io/@half-phycho/RabbitMQ-Exchange-종류대로-사용해보기
이전에 작성한 게시글 입니다.
TTL은 간단히 말하면 유효시간 입니다. 의외로 TTL은 IT 여러분야에서 볼 수 있다고 생각하는데 대표적으로 네트워크, 데이터베이스가 있습니다. 그리고 RabbitMQ에서도 사용가능한데 메세지 자체에 TTL을 설정하여 시간을 다르게 하거나 아니면 Queue자체에 TTL을 설정해 들어오는 모든 메세지에 일관된 유효시간을 부여할 수 있습니다.
메세지에서 TTL 설정하고 Queue에서도 TTL 설정을 한다면 어떤결과가 나오는지 찾아보니 메세지 TTL이 짧은쪽을 기준으로 유효시간을 가진다고 합니다. 결국 Queue TTL보다 메세지 TTL이 짧게 설정되면 해당하는 메세지만 삭제되고 메세지의 TTL이 Queue TTL보다 길면 Queue TTL 수명이 다되면 Queue에 남아있는 모든 메세지가 삭제 됩니다.
앞에서 말한대로 메시지 자체에 TTL을 설정할 수 있고 Queue에서도 TTL설정이 가능합니다.
메세지 레벨에서 TTL 설정하기
public String sendDirectTTLMessage(MessageDTO messageDTO) {
ObjectMapper objectMapper = new ObjectMapper();
try {
// DTO -> String 직렬화 수행
String objectToJSON = objectMapper.writeValueAsString(messageDTO);
// 라우터를 기반으로 큐에 메시지 전송
// ttl 시간은 5초 (5000) 으로 설정
rabbitTemplate.convertAndSend(rabbitmqExchangeInfo.get_DIRECT_EXCHANGE_NAME()
, rabbitmqExchangeInfo.get_DIRECT_EXCHANGE_KEY()
, objectToJSON
, message -> {
message.getMessageProperties().setExpiration("5000");
return message;
});
} catch (JsonProcessingException ex) {
log.error("parsing error : {}", ex.getMessage(), ex);
}
return "success_ttl_direct";
}
Queue 레벨에서 TTL 설정
@Bean
public Queue directQueue() {
// TTL 설정이 미포함된 Queue
//return new Queue(rabbitmqExchangeInfo.get_DIRECT_QUEUE_NAME(), true);
// TTL 설정이 포함된 Queue
// QueueBuilder에 durable 메서드 안에 Queue이름을 넣으면
// RabbitMQ가 재부팅되도 Queue 대기열에 남는다.
return QueueBuilder.durable(rabbitmqExchangeInfo.get_DIRECT_QUEUE_NAME())
.withArgument("x-message-ttl", 3000)
.build();
}
메세지가 정상적으로 처리되지 못하고 RabbitMQ의 큐 레벨에서 제거될 때 발생하는 처리 방식을 말합니다. 메세지가 정상적으로 처리되지 못하면 데드 레터 큐로 보내집니다.
데드 레터링 발생 경우
- 메세지가 큐에서 거부(Reject)되거나 NACK 상태 혹은 다시 큐(requeue)로 돌아오는 경우
- 메세지가 큐에 있었던 시간이 메세지의 TTL을 초과하여 만료된 경우
- 메세지가 큐의 최대 길이를 초과하여 메시지를 추가할 수 없는 경우
- 큐가 가득 차서 메시지를 더 이상 저장할 수 없는 경우
(큐 전체가 만료되어도 큐에 있는 메세지는 데드 레터링 되지 않습니다.)
NACK(Negative Acknowledgement)
네트워크에서 사용하는 용어로 네트워크 기기 간에 데이터를 주고 받았을 때, 수신 장비에 데이터가 도착하지 않았을 때 보내는 신호 그 반대로 ACK(Acknowledgement)는 장비에 데이터가 도착했을때 보내는 신호이다.
RabbitMQ에서 제공하는 익스체인지 유형으로 메세지가 큐에서 제거되는 경우에 사용됩니다. 기존 Exchange를 설정한다면 DirectExchange, FanoutExchange 이런 식으로 특정 Exchange를 설정했는데 데드 레터 Exchange를 설정하려면 Queue에 "x-dead-letter-exchange"라는 매개변수를 설정하고 메시지가 큐에서 제거될 때 메시지를 받을 익스체인지의 이름을 파라미터로 설정해주면 됩니다.

원래는 참고한 게시글을 기반으로 대부분 작업하려고 했으나 생각보다 잘 되지않았습니다. 그래서 방향을 틀어서 ttl 시간이 만료되면 dead Queue로 이동하는게 아닌 Exception을 의도적으로 발생시켜 dead Queue로 보내는 방법을 생각했습니다.
// RabbitMQ 설정
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 설정 파일
*/
@Configuration
@RequiredArgsConstructor
public class RabbitmqConfig {
private final RabbitmqExchangeInfo rabbitmqExchangeInfo;
/**
* 메세지 성공을 못하는 경우 direct.queue로 라우팅하고 dead.queue로 이동
*/
@Bean
public DirectExchange deadExchange(){
return ExchangeBuilder
.directExchange(rabbitmqExchangeInfo.get_DEAD_EXCHANGE_NAME())
.build();
}
/**
* Direct Exchange 구성 : direct.queue를 라우팅 하는데 사용
*/
@Bean
public DirectExchange directExchange(){
return ExchangeBuilder
.directExchange(rabbitmqExchangeInfo.get_DIRECT_EXCHANGE_NAME())
.build();
}
/**
* deadQueue와 라우팅 키(Routing key)를 기반으로 바인딩 수행.
*
* @param deadQueue 성공적으로 처리하지 못한 메시지를 담는 공간
* @param deadExchange 성공적으로 처리하지 못한 메시지를 라우팅
*/
@Bean
public Binding deadBinding(Queue deadQueue, DirectExchange deadExchange){
return BindingBuilder
.bind(deadQueue)
.to(deadExchange)
.with(rabbitmqExchangeInfo.get_DEAD_ROUTING_KEY());
}
/**
* Direct Exchange 와 direct Queue 간의 바인딩을 수행합니다.
*
* @param directQueue 메시지를 담을 큐
* @param directExchange 메시지를 담기 위한 라우팅
*/
@Bean
public Binding directBinding(Queue directQueue, DirectExchange directExchange){
return BindingBuilder
.bind(directQueue)
.to(directExchange)
.with(rabbitmqExchangeInfo.get_DIRECT_ROUTING_KEY());
}
/**
* 만약 메세지 전송에 실패하면 해당 Queue(dead.queue) 로 이동
*
* @return dead Queue
*/
@Bean
public Queue deadQueue(){
return new Queue(rabbitmqExchangeInfo.get_DEAD_QUEUE_NAME(), true);
}
/**
* directQueue 이름의 큐를 구성
* - 해당 큐에서는 속성 값으로 x-dead-letter-exchange가 발생시 rabbit.dead 로 라우팅
* - 해당 큐에서는 속성 값으로 x-dead-letter-routing-key를 통해 Direct Queue의 라우팅 키를 전달하여 라우팅
*
* @return direct Queue
*/
@Bean
public Queue directQueue(){
return QueueBuilder.durable(rabbitmqExchangeInfo.get_DIRECT_QUEUE_NAME())
.withArgument("x-dead-letter-exchange", rabbitmqExchangeInfo.get_DEAD_EXCHANGE_NAME())
.withArgument("x-dead-letter-routing-key", rabbitmqExchangeInfo.get_DEAD_ROUTING_KEY())
.build();
}
}
// 주요 리스너 설정
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* 큐에 등록된 메세지 리턴 component
*/
@Slf4j
@Component
public class RabbitmqMessage {
// direct Queue 메세지
@RabbitListener(queues = "direct.queue")
public void directMessage(Message message){
String body = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("direct.queue 내의 메시지 반환 : {}", body);
// 해당 Exception은 바로 해당 Queue에 등록된 Dead Queue로 이동시키는 Exception 입니다.
throw new AmqpRejectAndDontRequeueException("Dead Queue 테스트");
}
// ttl이 적용된 direct Queue 메세지
@RabbitListener(queues = "direct.queue.ttl")
public void directTTLMessage(Message message){
String body = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("direct.queue.ttl 내의 메시지 반환 : {}", body);
}
// dead Queue 메세지
@RabbitListener(queues = "dead.queue")
public void deadDirectMessage(Message message){
String body = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("dead.queue 내의 메시지 반환 : {}", body);
}
}
Service 혹은 Controller 파일은 기존과 동일하여 게시글에는 넣지는 않았습니다.
AmqpRejectAndDontRequeueException
이번에 새롭게 알게된 Exception 입니다. 해당 Exception은 다른 일반적인 Exception과는 다르게 만약 리스너에 해당하는 Queue에 등록된 dead queue로 이동 시킵니다.
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring boot의 .properties 파일의 설정중 하나로 해당 설정을 추가하면 리스너에서 자동으로 예외 혹은 에러가 발생하면 자동으로 해당 queue에 둥록된 dead.queue로 이동 하도록 해줍니다. 그리고 해당 설정을 해주면 의도적으로 AmqpRejectAndDontRequeueException 을 따로 안해주어도 됩니다.
만약 Exception을 해제한다면 direct.queue 리스너로 정상적으로 넘어가 다음과 같이 로그가 출력되는 것을 확인할 수 있습니다.
만약 의도적으로 Exception을 날렸다면 처음에는 direct.queue가 나오고 그 다음에는 의도한대로 Exception이 동작하고 dead.queue로 이동하는 것을 확인 할 수 있습니다.

만약 설정이 잘되었다면 Queue에는 다음과 같이 설정이 나오게 됩니다.
오히려 예시대로 안되었기에 AmqpRejectAndDontRequeueException 라는 새로운 Exception에 대해 알게 되었고 RabbitMQ의 추가 설정에 대해 알 수 있었습니다. 가끔은 실패해도 거기 안에서 본인이 몰랐던 새로운걸 알게되는 거는 더더욱 의미 있는거 같다고 생각합니다.
https://github.com/delight-HK3/rabbitmq-test
https://www.rabbitmq.com/docs/dlx
https://ko.wikipedia.org/wiki/%EB%B6%80%EC%A0%95_%EC%9D%91%EB%8B%B5
https://adjh54.tistory.com/501?category=1187853
https://www.cloudamqp.com/blog/when-and-how-to-use-the-rabbitmq-dead-letter-exchange.html
(항상 감사합니다.)