RabbitMQ Sharding with RabbitMQ Plugin (1)

na.ram·2025년 11월 5일

all in auction

목록 보기
6/14
post-thumbnail

이전에 RabbitMQ 클러스터링 + 해싱을 통한 메시지 분산을 통해 Queue의 부하를 줄이려는 시도를 했던 적이 있습니다. 현재는 3개의 노드, 그리고 3개의 큐로 고정이 되어 있는데 만약 노드가 더 늘어나고 큐가 늘어난다면 이벤트를 소비하는 부분이나 큐를 정의하는 부분에 수정이 필요하게 됩니다.

이를 해결하기 위한 방법으로 RabbitMQ Sharding Plugin을 알게 되었습니다.

RabbitMQ Sharding Plugin?

저는 원래 어플리케이션 단에서 해싱을 통해 라우팅 키를 지정해 메시지를 분산시켜왔지만
RabbitMQ의 Sharding Plugin를 사용하면 아래 그림과 같이 Exchange에서 메시지를 분산시킵니다.

또, README.md의 Auto-scaling When Nodes are Added 섹션의 내용을 번역해보면
RabbitMQ 클러스터에 새 노드가 추가되면 플러그인이 자동으로 새 노드에 더 많은 샤드를 생성하게 됩니다.
노드 a에 4개의 대기열이 있는 샤드가 있고 노드 b가 방금 클러스터에 합류한다고 가정하면 플러그인은 노드 b에 자동으로 4개의 대기열을 생성하고 이를 샤드 파티션에 "가입"하게 합니다.
이미 전달된 메시지는 재조정되지 않지만 새로 도착한 메시지는 새 대기열로 분할됩니다.

추가로 How Evenly Will Messages Be Distributed? 섹션의 내용을 번역해보면
RabbitMQ Sharding Plugin은 해싱 함수를 기반으로 한 많은 데이터 분산 접근 방식과 마찬가지로, 샤드 간의 짝수 분산은 입력의 분포(변동성), 즉 라우팅 키에 따라 달라집니다.
라우팅 키 집합이 클수록 공유된 메시지 간의 균등한 메시지 분산이 이루어집니다. 모든 메시지가 동일한 라우팅 키를 가지고 있다면 모두 같은 샤드에 있게 됩니다.

위와 같은 RabbitMQ Sharding Plugin의 특성을 이용하면 원래의 코드보다 좀 더 확장성 있고, 유연한 코드가 될 것 같습니다. 한 번 적용해보도록 하겠습니다.

RabbitMQ Sharding Plugin 적용

플러그인 활성화

rabbitmq-plugins enable rabbitmq_sharding

정책 수립

rabbitmqctl set_policy --apply-to exchanges "auction-sharding-policy" "^exchange.auction$" '{"shards-per-node": 1}'

"auction-sharding-policy"라는 이름으로 정책을 만들어줍니다.
Exchange에 해당 정책이 적용될 수 있도록 --apply-to 옵션에 exchanges를 주고,
Definition({}로 감싸진 부분)으로 노드마다 총 1개의 샤드를 가질 수 있도록 정의해주었습니다.

Definition에 routing-key를 지정할 수도 있습니다. 그러나 x-modulus-hash exchange를 이용하는 경우에는 라우팅 키가 무시되고, 저는 x-modulus-hash exchange를 이용하기 때문에 정의하지 않았습니다. 사용하는 exchange type에 따라 라우팅 키 정책 정의가 메시지를 라우팅하는 동안 미치는 영향이 달라질 수 있습니다.


RabbitMQ Config 수정

Exchange의 type을 x-modulus-hash로 수정하고, 큐를 동적으로 생성하는 코드를 모두 주석 처리해주었습니다.

@Configuration
public class RabbitMqConfig {
     @Bean
     public CustomExchange auctionExchange() {
         Map<String, Object> arguments = new HashMap<>();
         arguments.put("x-delayed-type", "x-modulus-hash");
         return new CustomExchange(AUCTION_EXCHANGE, "x-delayed-message", true, false, arguments);
     }
}

RabbitMQ Pulisher 수정

원래는 AuctionEvent의 해시 코드를 기반으로 라우팅 키를 지정해 보내주었지만, 이제는 Exchange에서 적절하게 메시지를 분산시켜줄 것이므로 라우팅 키를 보내는 부분에 AuctionEvent의 ID를 보내도록 수정했습니다.

@Component
@RequiredArgsConstructor
public class AuctionPublisher {
    private final ObjectMapper objectMapper;
    private final RabbitTemplate rabbitTemplate;

    public void auctionPublisher(Object object, long targetTime1, long targetTime2) {
        try {
            AuctionEvent auctionEvent = (AuctionEvent) object;
            rabbitTemplate.convertAndSend(AUCTION_EXCHANGE, Long.toString(auctionEvent.getAuctionId()),
                    objectMapper.writeValueAsString(object), msg -> {
                        msg.getMessageProperties().setDelayLong(subtractTime(targetTime1, targetTime2));
                        return msg;
                    });
        } // 생략
    }
    
    // 생략
}

이제 메시지를 보내기 위한 준비가 모두 완료되었습니다!
자동으로 생성된 큐에 메시지가 잘 분산되는지 한 번 테스트 해보겠습니다.

다른 테스트들과 마찬가지로 총 3개의 노드, 노드당 1개의 큐, 총 10,000개의 메시지를 보냈습니다.

위와 같이 자동으로 생성된 큐에 메시지들이 잘 분산되는 것을 확인할 수 있습니다.

노드당 샤드를 더 늘리면 어떨까요? shards-per-node를 3으로 늘리고 다시 테스트 해봤습니다.

정책을 수립한대로 3개의 노드, 노드당 3개의 큐가 자동으로 생성되어 총 9개의 샤드가 있으며 메시지는 10,000개는 9개의 샤드에 분산되었습니다.

RabbitMQ Consumer 수정

원래는 @RabbitListener 어노테이션에 큐들의 이름을 명시해 메시지를 구독하고 있었습니다.
그러나 RabbitMQ Sharding Plugin을 통해 생성되는 큐는 sharding: exchange.auction - rabbit@rabbitmq3와 같은 큐의 이름으로는 메시지를 소비할 수 없습니다.

대신에 @RabbitListener(queues = AUCTION_EXCHANGE)와 같이 큐 이름을 Exchange와 동일하게 두어야 합니다.

이제 3개의 노드에 존재하는 모든 샤드에서 메시지를 받아볼 수 있으리라고 생각했는데요 ..
네 . . .. 이상하게도 딱 3번 노드의 0번 샤드의 메시지만 소비되는 것을 확인할 수 있었습니다.

0개의 댓글