[RabbitMQ] Consumer에서 예외발생 시 n번 재시도 후 DLX로 큐잉하기 ( with Spring)

Kim Dae Hyun·2022년 2월 23일
0

RabbitMQ

목록 보기
3/5
post-thumbnail

위 그림대로 ExchangeQueue를 구성합니다.

Employee 객체를 Json 형태 메시지로 Exchange에 전송하고, 해당 Exchange에 바인딩 된 모든 Queue로 메시지를 중계합니다.

Queue에 전달된 메시지는 Consumer에 의해 처리되는데 이 때 여러 문제로 인해 예외가 발생할 수 있습니다.

Consumer에서 예외가 발생하면 기본적으로 예외가 발생한 메시지를 Queue에 다시 넣고 성공할 때까지 무한히 재시도 합니다.

이제 해볼 것은 예외 발생시 무한히 재시도 되지 않고 n번 정도 재시도 한 결과 계속해서 예외가 발생한다면 해당 메시지를 DLX (Dead-Letter-Exchange로 보내는 것 입니다.


위 구성에서 Producer로부터 메시지를 받는 Exchangefan-out 타입입니다. DLX의 경우 각 Queue마다 다른 큐에 관리하고자 direct 타입으로 해주었습니다.

Queue 생성시 Dead letter exchangeDead letter routing key를 설정해주어 DLX와 매핑시켜주면 됩니다.


📌 Springboot & RabbitMQ wow

Springboot를 사용하면 재시도와 같은 로직을 Java 코드 없이 설정파일로 가능합니다.

재시도에 대한 설정은 Consumer 에만 필요하므로 Consumer에만 설정합니다.

spring:
  main:
    banner-mode: OFF
  rabbitmq:
    addresses: localhost:5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: auto # manual X auto O
        retry:
          enabled: true # 재시도
          initial-interval: 3s # 최초 메시지 처리 실패 후 재시도까지의 인터벌
          max-interval: 10s # 최대 재시도 인터벌
          max-attempts: 2 # 최대 재시도 횟수
          multiplier: 2 # 이전 interval * multiplier = 다음 interval

📌 테스트

@SpringBootApplication
public class ProducerApplication implements  CommandLineRunner{

	public static void main(String[] args) {
		SpringApplication.run(ProducerApplication.class, args);
	}

	private final SpringRetryEmployeeProducer producer;

	public ProducerApplication(SpringRetryEmployeeProducer producer) {
		this.producer = producer;
	}

	@Override
	public void run(String... args) throws Exception {
		for (int i=0;i<5;i++){
			Employee employee;
			if (i%2==0) {
				employee = new Employee((long)i, "", LocalDate.now());
			} else {
				employee = new Employee((long)i, "name" + i, LocalDate.now());
			}
			producer.sendMessage(employee);
		}
	}
}    

5개 메시지를 발행하는데 짝수번째 messagename을 빈 문자열로 전달하고 빈 문자열의 경우 consumer에서 예외가 발생하도록 합니다.

✅ Consumer code

@Service
public class SpringRetryEmployeeConsumer {

    private static final Logger LOG = LoggerFactory.getLogger(SpringRetryEmployeeConsumer.class);
    private final ObjectMapper om;

    public SpringRetryEmployeeConsumer(ObjectMapper om) {
        this.om = om;
    }

    @RabbitListener(queues = {"q.employee.development.work"})
    public void listenerDevelopment(String message) throws IOException {
        Employee employee = om.readValue(message, Employee.class);
        validateEmployee(employee);

        LOG.info("development employee: {}", employee);
    }

    @RabbitListener(queues = {"q.employee.marketing.work"})
    public void listerVector(String message) throws IOException {
        Employee employee = om.readValue(message, Employee.class);
        validateEmployee(employee);

        LOG.info("marketing employee: {}", employee);
    }

    // name이 empty인 경우 예외 발생
    private void validateEmployee(Employee employee) {
        if (employee.getName().isEmpty()) {
            throw new IllegalArgumentException("employee name cannot be empty");
        }
    }
}

이제 두 큐 (marketing, development)는 5개 메시지를 받아 처리하게 되는데 짝수번째 메시지에서 예외가 터지게 됩니다.

예외가 터지면 2번까지 재시도 후 DLXrouting-key값과 함께 메시지를 전달하면서 해당 메시지의 처리를 끝냅니다.

ConsumerProducer를 실행했을 때 5개 메시지 중 1,3 번째 메시지는 정상적으로 처리되고 0,2,4번째 메시지는 예외가 터져 두 번의 재시도를 거치고 DLX로 전달되는 것을 기대하고 실행합니다.

✅ 테스트 결과

fan-out 타입 Exchange로 전달된 메시지를 direct 타입 DLX로 전달한 테스트 결과입니다. :)


📌 참고

RabbitMQ & Java (Spring Boot) for System Integration

profile
좀 더 천천히 까먹기 위해 기록합니다. 🧐

0개의 댓글