

위 그림대로 Exchange와 Queue를 구성합니다.
Employee 객체를 Json 형태 메시지로 Exchange에 전송하고, 해당 Exchange에 바인딩 된 모든 Queue로 메시지를 중계합니다.
Queue에 전달된 메시지는 Consumer에 의해 처리되는데 이 때 여러 문제로 인해 예외가 발생할 수 있습니다.
Consumer에서 예외가 발생하면 기본적으로 예외가 발생한 메시지를 Queue에 다시 넣고 성공할 때까지 무한히 재시도 합니다.
이제 해볼 것은 예외 발생시 무한히 재시도 되지 않고 n번 정도 재시도 한 결과 계속해서 예외가 발생한다면 해당 메시지를 DLX (Dead-Letter-Exchange로 보내는 것 입니다.
위 구성에서 Producer로부터 메시지를 받는 Exchange는 fan-out 타입입니다. DLX의 경우 각 Queue마다 다른 큐에 관리하고자 direct 타입으로 해주었습니다.
Queue 생성시 Dead letter exchange와 Dead letter routing key를 설정해주어 DLX와 매핑시켜주면 됩니다.
Springboot를 사용하면 재시도와 같은 로직을 Java 코드 없이 설정파일로 가능합니다.
재시도에 대한 설정은 Consumer 에만 필요하므로 Consumer에만 설정합니다.
spring:
main:
banner-mode: OFF
rabbitmq:
addresses: localhost:5672
username: guest
password: guest
listener:
simple:
acknowledge-mode: auto # manual X auto O
retry:
enabled: true # 재시도
initial-interval: 3s # 최초 메시지 처리 실패 후 재시도까지의 인터벌
max-interval: 10s # 최대 재시도 인터벌
max-attempts: 2 # 최대 재시도 횟수
multiplier: 2 # 이전 interval * multiplier = 다음 interval
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
private final SpringRetryEmployeeProducer producer;
public ProducerApplication(SpringRetryEmployeeProducer producer) {
this.producer = producer;
}
@Override
public void run(String... args) throws Exception {
for (int i=0;i<5;i++){
Employee employee;
if (i%2==0) {
employee = new Employee((long)i, "", LocalDate.now());
} else {
employee = new Employee((long)i, "name" + i, LocalDate.now());
}
producer.sendMessage(employee);
}
}
}
5개 메시지를 발행하는데 짝수번째 message의 name을 빈 문자열로 전달하고 빈 문자열의 경우 consumer에서 예외가 발생하도록 합니다.
@Service
public class SpringRetryEmployeeConsumer {
private static final Logger LOG = LoggerFactory.getLogger(SpringRetryEmployeeConsumer.class);
private final ObjectMapper om;
public SpringRetryEmployeeConsumer(ObjectMapper om) {
this.om = om;
}
@RabbitListener(queues = {"q.employee.development.work"})
public void listenerDevelopment(String message) throws IOException {
Employee employee = om.readValue(message, Employee.class);
validateEmployee(employee);
LOG.info("development employee: {}", employee);
}
@RabbitListener(queues = {"q.employee.marketing.work"})
public void listerVector(String message) throws IOException {
Employee employee = om.readValue(message, Employee.class);
validateEmployee(employee);
LOG.info("marketing employee: {}", employee);
}
// name이 empty인 경우 예외 발생
private void validateEmployee(Employee employee) {
if (employee.getName().isEmpty()) {
throw new IllegalArgumentException("employee name cannot be empty");
}
}
}
이제 두 큐 (marketing, development)는 5개 메시지를 받아 처리하게 되는데 짝수번째 메시지에서 예외가 터지게 됩니다.
예외가 터지면 2번까지 재시도 후 DLX로 routing-key값과 함께 메시지를 전달하면서 해당 메시지의 처리를 끝냅니다.
Consumer와 Producer를 실행했을 때 5개 메시지 중 1,3 번째 메시지는 정상적으로 처리되고 0,2,4번째 메시지는 예외가 터져 두 번의 재시도를 거치고 DLX로 전달되는 것을 기대하고 실행합니다.

fan-out 타입 Exchange로 전달된 메시지를 direct 타입 DLX로 전달한 테스트 결과입니다. :)