RabbitMQ DLX를 사용하여 예외발생 시 무한 Retry를 제한하자.

Jeonghwa·2024년 4월 11일
2

RabbitMQ

목록 보기
2/3

서론

메세지를 Consume하던 중 예외가 발생하면 어떻게 될까요? RabbitMQ는 Consumer가 성공적으로 처리했음을 확인할 때까지 메시지를 가지고있으며 성공할 때까지 무한으로 시도하게됩니다.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.modoospace.alarm.consumer.AlarmConsumer.handler(java.lang.String)' threw exception
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:272) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:209) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1674) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1593) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1581) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1572) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1516) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1001) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:948) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:86) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1326) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1232) ~[spring-rabbit-2.4.12.jar:2.4.12]
	at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]

이때 필요한게 바로 DLX입니다.

DLX(Dead-Letter-Exchange)

DLX는 Dead Letter Exchange의 줄임말입니다. 특별한 Exchange 처럼 보이지만 이는 일반적인 교환기와 다를게 없습니다. 차이점은 실패한 메시지를 다루는 Exchange로 만약 특정 메시지를 다른 Queue가 처리하지 못할 경우 해당 Exchange로 전송합니다. 그리고 DLX를 통해 오류 발생 시 다른방식으로 메시지를 처리할 수 있는 기회를 제공할 수 있습니다.

설정방법

1. DLX 생성

예외가 발생한 메시지를 받을 Exchange(x.alarm.dead)를 생성합니다. 저는 하나의 Queue로만 메시지를 처리할 것이기 때문에 Fanout Type으로 생성하였습니다.

2. DLQ 생성

DLX와 연결할 Queue(q.alarm.dead)를 생성합니다.

그리고 1번에서 생성한 DLX와 바인딩해줍니다.

3. Work Queue 생성 및 DLX 설정

비지니스로직을 처리할 Exchange와 Queue를 생성하고 Queue에는 x-dead-letter-exchange를 설정해줍니다. 만약 메시지 처리를 실패하게 된다면 해당 DLX로 전달될 것 입니다.

[Exchange 생성]

[Queue 생성]

q.alarm.work overview > Details를 보면 x-dead-letter-exchange가 설정된것을 볼 수 있습니다.

메세지 실패 처리하기

메세지를 Consume하던 도중 실패하면 DLX로 전달됩니다. 실패처리하는 방법에는 아래와같이 2가지 방법이 있습니다.

1. AmqpRejectAndDontRequeueException 던지기

@RequiredArgsConstructor
@Component
@Slf4j
public class AlarmConsumer {

    private final AlarmService alarmService;
    private final ObjectMapper objectMapper;

    @RabbitListener(queues = "q.alarm.work")
    public void handler(String message) {
        log.info("AlarmEvent consume from q.alarm.work");
        try {
            AlarmEvent alarmEvent = objectMapper.readValue(message, AlarmEvent.class);
            alarmService.saveAndSend(alarmEvent);
        } catch (Exception e) {
            // 예외가 발생 시 AmqpRejectAndDontRequeueException으로 메시지를 실패처리한다.
            throw new AmqpRejectAndDontRequeueException(e);
        }
    }
}

AmqpRejectAndDontRequeueException은 Spring AMQP에서 제공하는 특수한 예외클래스 입니다. 해당 예외는 RabbitMQ 메세지 소비 중 발생하는 예외를 처리할 때 사용되며, 메시지가 자동으로 거부되고 큐에 다시 들어가지 않도록 합니다. 이를 통해 오류가 발생한 메시지가 무한히 재처리 되는것을 방지할 수 있으며, Dead Letter Exchange(DLX)로 전달될 수 있습니다.

위 코드는 q.alarm.work 큐로부터 메시지를 Consume하는 도중 발생하는 Exception을 AmqpRejectAndDontRequeueException로 포장하여 다시 던짐으로써 메시지가 큐에 다시 들어가지 않도록 처리합니다.

2. 직접 거부하기

  rabbitmq:
    host: localhost
    username: guest
    password: guest
    port: 5672
    # 수동 설정
    listener:
      simple:
        acknowledge-mode: manual

먼저 스프링 설정파일에 들어가 메시지를 수동으로 승인/거부할 수 있도록 수정해줍니다.

@RequiredArgsConstructor
@Component
@Slf4j
public class AlarmConsumer {

    private final AlarmService alarmService;
    private final ObjectMapper objectMapper;

    @RabbitListener(queues = "q.alarm.work")
    public void handler(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        log.info("AlarmEvent consume from q.reservation");
        try {
            AlarmEvent alarmEvent = objectMapper.readValue(message, AlarmEvent.class);
            alarmService.saveAndSend(alarmEvent);
        } catch (Exception e) {
        	// 예외발생 시 메시지를 직접 거부한다.
            channel.basicReject(tag, false);
        }
		
        // 예외가 발생하지 않는다면 메시지를 수동으로 승인한다.
        channel.basicAck(tag, false);
    }
}

channel은 Spring서버와 RabbitMQ 사이에서 메시지가 이동하는 터널과 같습니다. Exception을 던지는 대신 메세지를 직접거부하여 DLX로 메시지를 전달할 수 있습니다.

하지만 여기엔 부작용이 있습니다. 스프링 설정파일에서 acknowledge-mode을 manual로 바꿔주었기때문에 승인 또한 수동으로 해줘야합니다.

실패한 메세지 Retry하기

오류 처리기를 직접 구현하는 방법도 존재하지만 스프링에서 제공하는 방법으로 간단하게 재처리 로직을 구성할 수 있습니다.

  rabbitmq:
    host: localhost
    username: guest
    password: guest
    port: 5672
    # 재처리 설정
    listener:
      simple:
        retry:
          enabled: true
          initial-interval: 3s
          max-interval: 10s
          max-attempts: 5
          multiplier: 2
  • initial-interval: 초기 간격
  • max-interval: 최대 간격
  • max-attempts: 최대 재시도 횟수
  • multiplier: 간격 증가율

위 예시대로 설정을 하게된다면 최대 5번 시도를 하며 아래와 같은 메커니즘으로 재시도를 하게 됩니다.

첫번째 시도 ➡️ 3s ➡️ 두번째 시도 ➡️ 6s ➡️ 세번째 시도 ➡️ 10s ➡️ 네번째 시도 ➡️ 10s ➡️ 다섯번째 시도

만약 다섯번째 시도까지 실패한다면 Spring은 자동으로 메세지를 DLX로 전달합니다.

더 나아가기

이는 과연 완벽한 예외처리일까요?

해당 메커니즘엔 큰 단점이 존재하는데 만약 재시도 횟수를 3번, 재시도 간격을 1분으로 설정했다면 Spring은 메시지 A가 성공적으로 처리되거나 재시도 기간이 끝날 때까지 메시지 B의 처리를 시작하지 않습니다.

또한 메시지 A가 영구적인 오류를 가지고 있을 경우엔 메세지 처리 과정을 무려 3분이나 차단하게 됩니다. 만약 기술적인 예외(네트워크 지연 등)에 대한 예외는 해당 재처리가 유의미할 수 있지만 애초에 잘못된 메시지로 인한 예외는 재처리 과정이 불필요할 것입니다.

다음 글에서는 예외에 따라 재처리를 직접구현하는 글을 작성해보겠습니다.

참고: RabbitMQ & Java (Spring Boot 3) Bootcamp - Basic To Advanced

profile
backend-developer🔥

0개의 댓글

관련 채용 정보