[RabbitMQ] Consumer 스케줄링 (Spring Scheduler, RabbitListenerEndpointRegistry)

Kim Dae Hyun·2022년 2월 24일
0

RabbitMQ

목록 보기
4/5
post-thumbnail

Consumer에 스케줄링 기능을 추가해서 은행 점검 시간 과 비슷한 구조를 갖도록 할 것 입니다.

00:00 ~ 00:30 까지 은행 점검 시간으로 인해 결제가 되지 않는다고 가정하면 00:00 부터 쌓이는 결제 관련 message 들은 Consumer에 의해 처리되지 않도록 해야 합니다.

그리고 00:30이 되면 Consumer는 다시 결제 관련 message를 처리해야 합니다.

이와 같은 기능을 Spring Scheduler를 이용해서 구현할 것 입니다.


++
은행 점검과 조금 다른 점 하나는 Consumer가 동작을 멈춘 동안에도 message는 쌓인다는 것 입니다.


📌 Code

RabbitMQ 의존성 추가

implementation 'org.springframework.boot:spring-boot-starter-amqp'

스케줄러는 spring이 제공하는 scheduler를 그대로 사용합니다.

RabbitListenerEndpointRegistry를 이용해서 현재 구동중인 Consumer를 중단하고 시작하는 것이 가능합니다.

✅ Consumer Code

@Service
public class RabbitmqScheduler {

    private final RabbitListenerEndpointRegistry registry;
    private static final Logger LOG = LoggerFactory.getLogger(RabbitmqScheduler.class);

    public RabbitmqScheduler(RabbitListenerEndpointRegistry registry) {
        this.registry = registry;
    }

    @Scheduled(cron = "0 54 19 * * *")
    public void stopAll() {
        LOG.info("stopAll");
        registry.getListenerContainers().forEach(c -> {
                    LOG.info("stop listener container: {}", c);
                    c.stop();
                });
    }

    @Scheduled(cron = "0 55 19 * * *")
    public void startAll() {
        LOG.info("startAll");
        registry.getListenerContainers().forEach(c -> {
            LOG.info("start listener container: {}", c);
            c.start();
        });
    }
}

stop()start() 메서드로 중단, 재시작이 가능합니다.

✅ Test

Producer는 1초에 한 개씩 message를 보내게 해두고 Consumer가 멈추고 시작되는 것을 테스트합니다.

점검 시작시간: 19:54
점검 종료시간: 19:55

19:53:xx 까지 message를 처리하고 19:54:xx 부터 Consumer의 동작이 멈추는 것을 확인할 수 있습니다.

그리고 19:55:xx가 되는 순간 Consumer는 다시 동작합니다.

📌 특정 Consumer의 동작만 제어하도록 처리

위 코드의 경우 현재 구동 중인 모든 Consumer를 제어합니다.

특정 Consumer에 대해서만 제어가 가능하도록 변경해볼께요.

Consumer를 식별하기 위해 Consumerid값을 설정해줘야 합니다.

@Service
public class DummyConsumer {

    private static final Logger LOG = LoggerFactory.getLogger(DummyConsumer.class);

    @RabbitListener(queues = {"q.dummy"}, id = "q_dummy")
    private void listener(DummyMessage message) {
        LOG.info("dummyMessage={}", message);
    }

}

@RabbitListnerid값을 q_dummy로 설정했습니다.

스케줄러 코드

@Service
public class RabbitmqScheduler {

    private final RabbitListenerEndpointRegistry registry;
    private static final Logger LOG = LoggerFactory.getLogger(RabbitmqScheduler.class);
    private final String containerId = "q_dummy";

    public RabbitmqScheduler(RabbitListenerEndpointRegistry registry) {
        this.registry = registry;
    }

    @Scheduled(cron = "0 40 21 * * *")
    public void stopAll() {
        LOG.info("stopAll");
        registry.getListenerContainer(containerId).stop();
    }

    @Scheduled(cron = "0 41 21 * * *")
    public void startAll() {
        LOG.info("startAll");
        registry.getListenerContainer(containerId).start();
    }
}

getListenerContainers가 아닌 getListenerContainer를 사용해서 특정 id에 대해 제어하도록 변경했습니다.

✅ Test

점검 시작시간: 21:40
점검 종료시간: 21:41

설정한 시간에 맞춰 Consumer의 동작이 제어되는 것을 확인할 수 있습니다.

:)


📌 참고

RabbitMQ & Java (Spring Boot) for System Integration

profile
좀 더 천천히 까먹기 위해 기록합니다. 🧐

0개의 댓글