
Consumer에 스케줄링 기능을 추가해서 은행 점검 시간 과 비슷한 구조를 갖도록 할 것 입니다.
00:00 ~ 00:30 까지 은행 점검 시간으로 인해 결제가 되지 않는다고 가정하면 00:00 부터 쌓이는 결제 관련 message 들은 Consumer에 의해 처리되지 않도록 해야 합니다.
그리고 00:30이 되면 Consumer는 다시 결제 관련 message를 처리해야 합니다.
이와 같은 기능을 Spring Scheduler를 이용해서 구현할 것 입니다.
++
은행 점검과 조금 다른 점 하나는 Consumer가 동작을 멈춘 동안에도 message는 쌓인다는 것 입니다.
RabbitMQ 의존성 추가
implementation 'org.springframework.boot:spring-boot-starter-amqp'
스케줄러는 spring이 제공하는 scheduler를 그대로 사용합니다.
RabbitListenerEndpointRegistry를 이용해서 현재 구동중인 Consumer를 중단하고 시작하는 것이 가능합니다.
@Service
public class RabbitmqScheduler {
private final RabbitListenerEndpointRegistry registry;
private static final Logger LOG = LoggerFactory.getLogger(RabbitmqScheduler.class);
public RabbitmqScheduler(RabbitListenerEndpointRegistry registry) {
this.registry = registry;
}
@Scheduled(cron = "0 54 19 * * *")
public void stopAll() {
LOG.info("stopAll");
registry.getListenerContainers().forEach(c -> {
LOG.info("stop listener container: {}", c);
c.stop();
});
}
@Scheduled(cron = "0 55 19 * * *")
public void startAll() {
LOG.info("startAll");
registry.getListenerContainers().forEach(c -> {
LOG.info("start listener container: {}", c);
c.start();
});
}
}
stop()과 start() 메서드로 중단, 재시작이 가능합니다.
Producer는 1초에 한 개씩 message를 보내게 해두고 Consumer가 멈추고 시작되는 것을 테스트합니다.
점검 시작시간: 19:54
점검 종료시간: 19:55

19:53:xx 까지 message를 처리하고 19:54:xx 부터 Consumer의 동작이 멈추는 것을 확인할 수 있습니다.
그리고 19:55:xx가 되는 순간 Consumer는 다시 동작합니다.
위 코드의 경우 현재 구동 중인 모든 Consumer를 제어합니다.
특정 Consumer에 대해서만 제어가 가능하도록 변경해볼께요.
Consumer를 식별하기 위해 Consumer에 id값을 설정해줘야 합니다.
@Service
public class DummyConsumer {
private static final Logger LOG = LoggerFactory.getLogger(DummyConsumer.class);
@RabbitListener(queues = {"q.dummy"}, id = "q_dummy")
private void listener(DummyMessage message) {
LOG.info("dummyMessage={}", message);
}
}
@RabbitListner에 id값을 q_dummy로 설정했습니다.
@Service
public class RabbitmqScheduler {
private final RabbitListenerEndpointRegistry registry;
private static final Logger LOG = LoggerFactory.getLogger(RabbitmqScheduler.class);
private final String containerId = "q_dummy";
public RabbitmqScheduler(RabbitListenerEndpointRegistry registry) {
this.registry = registry;
}
@Scheduled(cron = "0 40 21 * * *")
public void stopAll() {
LOG.info("stopAll");
registry.getListenerContainer(containerId).stop();
}
@Scheduled(cron = "0 41 21 * * *")
public void startAll() {
LOG.info("startAll");
registry.getListenerContainer(containerId).start();
}
}
getListenerContainers가 아닌 getListenerContainer를 사용해서 특정 id에 대해 제어하도록 변경했습니다.
점검 시작시간: 21:40
점검 종료시간: 21:41

설정한 시간에 맞춰 Consumer의 동작이 제어되는 것을 확인할 수 있습니다.
:)