RabbitMQ Sharding with RabbitMQ Plugin (2)

na.ram·2025년 11월 10일

all in auction

목록 보기
7/14
post-thumbnail

RabbitMQ Sharding with RabbitMQ Plugin (1)로부터 이어집니다!

😮‍💨 문제 상황

이제 3개의 노드에 존재하는 모든 샤드에서 메시지를 받아볼 수 있으리라고 생각했는데요 ..

네 . . .. 이상하게도 3번 노드, 0번 샤드의 메시지만 소비되는 것을 확인할 수 있었습니다.
왜 이런 문제가 발생하는걸까요?


🔎 이유

이를 해결하기 위해서 구글링을 통해 해결법을 찾다 이 문제와 관련된 메일을 보게 되었습니다.

단일 소비자를 통해 모든 노드를 소비할 수 없습니다. 이 플러그인이 달성하려는 목표, 즉 단일 논리 큐에서 병렬 처리하는 것은 달성할 수 없습니다. (생략) 플러그인을 단일 소비자와 함께 사용하는 것은 전혀 의미가 없습니다.

현재 3개 노드에 걸쳐 분산된 총 9개의 샤드가 하나의 논리적 큐(Pseudo-Queue)로 묶여있습니다.
소비자가 논리적 큐 이름으로 basic.consume을 요청하면, Sharding Plugin은 이 요청을 인터셉트하여 9개의 샤드 중 하나를 선택해 컨슈머에 연결합니다.
단일 컨슈머(Channel)는 하나의 샤드에 고정됩니다. 다른 8개의 샤드는 메시지를 쌓아두고 소비되지 않으므로, 병렬 소비를 위해서는 샤드 수만큼 컨슈머를 늘려야 합니다.
만약 단 하나의 Channel, 단 하나의 Consumer만 등록되면 이 컨슈머는 전체 샤드 중에서 하나만 선택하여 연결됩니다.

그러므로, concurrency를 설정해 채널 수를 늘리거나 Consumer를 노드 수 X 샤드 수만큼 등록해주어 해결할 수 있습니다. 저는 3개의 노드, 그리고 노드당 3개의 샤드로 총 9개의 컨슈머가 필요합니다.

💡 해결

RabbitMQ Config 수정

    @Value("${spring.rabbitmq.host}")
    private String hostname;
    
    @Bean
    @Primary
    public ConnectionFactory node1ConnectionFactory() {
        return new CachingConnectionFactory(hostname, 5672);
    }

    @Bean
    public ConnectionFactory node2ConnectionFactory() {
        return new CachingConnectionFactory(hostname, 5673);
    }

    @Bean
    public ConnectionFactory node3ConnectionFactory() {
        return new CachingConnectionFactory(hostname, 5674);
    }

    @Bean
    @Primary
    public SimpleRabbitListenerContainerFactory node1Factory(
            @Qualifier("node1ConnectionFactory") ConnectionFactory cf
    ) {
        var factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf);
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory node2Factory(
            @Qualifier("node2ConnectionFactory") ConnectionFactory cf
    ) {
        var factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf);
        return factory;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory node3Factory(
            @Qualifier("node3ConnectionFactory") ConnectionFactory cf
    ) {
        var factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf);
        return factory;
    }

RabbitMQ Consumer 수정

    @RabbitListener(queues = AUCTION_EXCHANGE, queuesToDeclare = {}, containerFactory = "node1Factory")
    public void auctionConsumerNode1Shard0(String message) {
        try {
            log.info("Node 1, Shard 0 :: AuctionEvent = {}", message);
            AuctionEvent auctionEvent = objectMapper.readValue(message, AuctionEvent.class);
            auctionService.closeAuction(auctionEvent);
        } catch (JsonProcessingException e) {
            log.error(e.getMessage());
        }
    }

    @RabbitListener(queues = AUCTION_EXCHANGE, queuesToDeclare = {}, containerFactory = "node1Factory")
    public void auctionConsumerNode1Shard1(String message) {
        try {
            log.info("Node 1, Shard 1 :: AuctionEvent = {}", message);
            AuctionEvent auctionEvent = objectMapper.readValue(message, AuctionEvent.class);
            auctionService.closeAuction(auctionEvent);
        } catch (JsonProcessingException e) {
            log.error(e.getMessage());
        }
    }

    @RabbitListener(queues = AUCTION_EXCHANGE, queuesToDeclare = {}, containerFactory = "node1Factory")
    public void auctionConsumerNode1Shard2(String message) {
        try {
            log.info("Node 1, Shard 2 :: AuctionEvent = {}", message);
            AuctionEvent auctionEvent = objectMapper.readValue(message, AuctionEvent.class);
            auctionService.closeAuction(auctionEvent);
        } catch (JsonProcessingException e) {
            log.error(e.getMessage());
        }
    }

    @RabbitListener(queues = AUCTION_EXCHANGE, queuesToDeclare = {}, containerFactory = "node2Factory")
    public void auctionConsumerNode2Shard0(String message) {
        try {
            log.info("Node 2, Shard 0 :: AuctionEvent = {}", message);
            AuctionEvent auctionEvent = objectMapper.readValue(message, AuctionEvent.class);
            auctionService.closeAuction(auctionEvent);
        } catch (JsonProcessingException e) {
            log.error(e.getMessage());
        }
    }
    
    // 나머지 생략..

이렇게 노드 수 X 샤드 수만큼의 컨슈머를 등록함으로써 위처럼 모든 노드, 모든 샤드의 메시지를 소비할 수 있었습니다.
그러나 이런 방식이라면 후에 노드가 늘어나거나 샤드 개수를 늘리게 되면 그에 따른 코드 수정이 불가피해집니다.
이외에도 노드 수에 맞게 컨슈머를 늘리거나, concurrency를 높이는 방식으로 문제를 완화할 수 있지만 concurrency를 지나치게 크게 설정하면 스레드 수와 채널 수가 불필요하게 늘어나 성능에 영향이 갈 수도 있음을 명심해야 할 것 같습니다.

결론적으로 처음에 시도했던대로 어플리케이션 단에서 해싱을 통해 메시지를 분산하는게 더 낫다는 결론에 이르게 되었습니다.. 이후에는 Consistent Hashing과 Virtual Node을 적용할 방법을 찾아봐야겠습니다!

0개의 댓글