MQ별 특징 1. - RabbitMQ

김예지·2024년 2월 7일
1

23-2 캡스톤 디자인

목록 보기
4/8

RabbitMQ이란

RabbitMQ는 얼랭(Erlang)으로 AMQP를 구현한 오픈소스 메시지 브로커이다.

AMQP(Advanced Message Queuing Protocol)란?

메시지 지향 미들웨어 시스템 간 통신을 위한 개방형 네트워크 프로토콜이다.

간단히 말해, Producer와 Consumer간에 안정적이고 효율적으로 메시지를 전송하는 방법에 대한 설명이다. 다양한 MQ에서 AMQP를 구현하고 있으며, 그 중 가장 대표적인 것이 RabbitMQ이다.

AMQP에서는 Queue, Exchange, Binding을 통해 Producer와 Consumer의 직접적인 결합성을 감소시킨다.
AMQP의 구조는 RabbitMQ의 아키텍쳐에 그대로 드러난다.

아키텍쳐

  • Producer: 메시지를 생성하고 발송하는 주체
  • Consumer: 메시지를 수신하고 소비하는 주체. Consumer는 메시지를 수신하면 Queue에게 Ack를 전송해, Queue에서 해당 메시지를 삭제할 수 있도록 한다.
  • Queue: 발송된 메시지가 소비되기 전까지 보관되는 장소. 이름으로 구분한다.
  • Exchange: 메시지를 알맞은 큐에 전달하는 역할
  • Binding: Exchange에게 메시지를 라우팅 할 규칙을 지정해준다.
    (Exchange과 Queue는 M:N binding이 가능하다.)

Exchange의 4가지 타입

1. Direct Exchange

  • Unicast: Routing Key가 정확히 일치하는 Queue에 메시지를 전송한다.
  • 하나의 큐에 여러 개의 라우팅 키를 지정할 수 있다.

2. Topic Exchange

  • Multicast: Routing key 패턴이 일치하는 Queue에 메시지를 전송한다.

3. Headers Exchange

  • Multicast: key-value로 이루어진 header 값을 기준으로, 일치하는 Queue에 메시지를 전송한다.
  • 이 때, 모든 header가 일치할 때/한 개라도 일치할 때 등의 옵션을 줄 수 있다.

4. Fanout Exchange

  • Broadcast: 해당 Exchange에 등록된 모든 Queue에게 메시지를 전송한다.

특징

https://aws.amazon.com/ko/compare/the-difference-between-rabbitmq-and-kafka/

RabbitMQ는 End-to-End 메시지 전달의 우선 순위를 지정하는 범용 메시지 브로커이다. RabbitMQ는 간단한 아키텍처로 복잡한 메시지 라우팅을 제공할 수 있다.

확장성 및 내결함성

확장성

RabbitMQ는 메시지 처리 용량을 수평 및 수직으로 확장할 수 있다. RabbitMQ 서버에 더 많은 컴퓨팅 리소스를 할당하여 메시지 교환 효율을 높일 수 있으며, 상황에 따라 개발자는 RabbitMQ 일관성 해시 교환이라는 메시지 분산 기술을 사용하여 브로커 간의 부하 처리 균형을 맞춘다.

내결함성

RabbitMQ는 시스템 장애에 대한 복원력이 뛰어나다.
여러 RabbitMQ 브로커를 클러스터로 그룹화하여 여러 서버에 배포할 수 있으며
또한 RabbitMQ는 대기열에 있는 메시지를 모든 분산 노드에 복제하여, 이를 통해 시스템은 서버에 영향을 미치는 장애를 복구할 수 있다.

메시징 처리 방식

메시지 정렬

RabbitMQ는 메시지를 특정 순서로 보내고 대기열에 추가한다. 우선 순위가 더 높은 메시지가 시스템 대기열에 추가되지 않는 한 소비자는 전송된 순서대로 메시지를 받는다.

메시지 우선순위

RabbitMQ 브로커를 사용하면 Producer는 우선 순위 대기열을 사용하여 특정 메시지를 에스컬레이션할 수 있다. 브로커는 선입선출 순서로 메시지를 보내는 대신 우선 순위가 높은 메시지를 일반 메시지보다 먼저 처리한다.

메시지 삭제

Consumer는 메시지를 읽으면 브로커에 확인(ACK) 응답을 보내고, 그러면 브로커가 대기열에서 해당 메시지를 삭제한다.

RabbitMQ 사용 시기

복잡한 라우팅 아키텍처

RabbitMQ는 요구 사항이 모호하거나 라우팅 시나리오가 복잡한 클라이언트에게 유연성을 제공한다. Binding과 Exchange를 통해 서로 다른 애플리케이션으로 데이터를 라우팅하도록 RabbitMQ를 설정할 수 있다.

푸시 모델 적용

RabbitMQ는 푸시 모델을 적용하여 Producer는 Consumer가 메시지를 사용했는지 여부를 알 수 있다. 데이터를 교환하고 분석할 때 특정 순서 및 전달 보장을 준수해야 하는 애플리케이션에 적합하다.

로그 집계에는 적합하지 않음

사용된 메시지는 Queue에서 삭제되기 때문에 RabbitMQ를 사용하여 로그를 집계하기는 어렵다.

Spring 적용

Broker에서 RabbitMQ 실행

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq:management

⭐RabbitMQ와의 연결은 5672 포트를 이용하며, 15672 포트로 접근하면 RabbitMQ를 더 쉽게 사용할 수 있도록 웹 관리 콘솔이 제공된다.

기본 Username과 PW는 'guest'이다.

Producer & Consumer 공통

1. build.gradle 추가

implementation 'org.springframework.boot:spring-boot-starter-amqp'

2. application.yml 작성 (선택)

rabbitmq:
  host: ${RABBITMQ_URL}
  port: 5672
  username: ${RABBITMQ_DEFAULT_USER}
  password: ${RABBITMQ_DEFAULT_PASS}
  exchange: ${RABBITMQ_EXCHANGE}
  queue: ${RABBITMQ_QUEUE}
  routing: ${RABBITMQ_ROUTING}

3. config 작성

@Configuration
public class RabbitMqConfig {

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    @Value("${spring.rabbitmq.port}")
    private int rabbitmqPort;

    @Value("${spring.rabbitmq.username}")
    private String rabbitmqUsername;

    @Value("${spring.rabbitmq.password}")
    private String rabbitmqPassword;

    @Value("${spring.rabbitmq.exchange}")
    private String exchange;

    @Value("${spring.rabbitmq.queue}")
    private String queue;

    @Value("${spring.rabbitmq.routing}")
    private String routingKey;

    //지정된 큐 이름으로 Queue 빈을 생성. @return Queue 빈 객체
    @Bean
    public Queue exampleQueue() {
        return new Queue(queue);
    }


    //지정된 익스체인지 이름으로 DirectExchange 빈을 생성. @return TopicExchange 빈 객체
    @Bean
    public DirectExchange exampleExchange() {
        return new DirectExchange(exchange);
    }


    /**
     * 주어진 큐와 익스체인지를 바인딩하고 라우팅 키를 사용하여 Binding 빈을 생성
     *
     * @param queue    바인딩할 Queue
     * @param exchange 바인딩할 TopicExchange
     * @return Binding 빈 객체
     */
    @Bean
    public Binding exampleBinding(Queue exampleQueue, DirectExchange exampleExchange) {
        return BindingBuilder.bind(exampleQueue).to(exampleExchange).with(routingKey);
    }


    //RabbitMQ 연결을 위한 ConnectionFactory 빈을 생성하여 반환
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(rabbitmqPort);
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        return connectionFactory;
    }

    /**
     * RabbitTemplate을 생성하여 반환
     *
     * @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체
     * @return RabbitTemplate 객체
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    /**
     * Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
     *
     * @return MessageConverter 객체
     */
    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

Producer

@Service
private final RabbitTemplate rabbitTemplate;
@RequiredArgsConstructor
public class ProducerService{

  public BasicResponseDto.ResultDto rabbitMqProduce(byte[] content) {

      ObjectMapper objectMapper = new ObjectMapper();
      try {
          String value = objectMapper.writeValueAsString(content);

          //키-값으로 메시지를 전송하는 방법. 이 외에도 다양한 방법으로 전송할 수 있다.
          HashMap<String, String> map = new HashMap<>();

          map.put("info", value);

          //전송 시간 등 간단한 정보는 Header에 담아 전달할 수 있다.
          MessageProperties messageProperties = new MessageProperties();
          messageProperties.getHeaders()
              .put("claimTime",String.valueOf(System.currentTimeMillis()));

          Message message = new Message(value.getBytes(), messageProperties);

          rabbitTemplate.convertAndSend(emailExchange, emailRoutingKey, message);

      } catch (JsonProcessingException e) {
          throw new RuntimeException(e);
      }

      return BasicResponseDto.ResultDto.builder()
              .succeed("success")
              .build();
  }
}

Consumer

@Slf4j
@Service
@RequiredArgsConstructor
public class RabbitMqService {

    @RabbitListener(queues = "${spring.rabbitmq.queue}")
    public void receiveMessage(Message message) throws IOException {

        ObjectMapper objectMapper = new ObjectMapper();
        String info = message.getBody().toString();

        Long claimTime = Long.parseLong(message.getMessageProperties().getHeaders().get("claimTime").toString());
        
		//info를 이용한 로직 작성
    }
}

추가

Plugins(timestamp)

https://www.rabbitmq.com/installing-plugins.html

RabbitMQ에서는 기능을 확장할 수 있는 Plugins를 제공하고 있다.

1. 메시지가 Producer에서 출발하는 시간
2. 메시지가 RabbitMQ Server(Broker)에 도착하는 시간
3. 메시지가 Consumer에 도착하는 시간

내 경우, 위의 3개의 timestamp를 측정해 각 MQ의 성능을 파악하고자 하였는데 (2.)의 기능을 RabbitMQ에서는 지원하지 않았다. 따라서 Broker에서의 timestamp를 남기는 plugin을 추가하여 성능 테스트를 진행했다.

Plugin은 직접 docker로 빌드된 rabbitMQ 서버의 config 파일에 추가할 수도 있고, 아래와 같이 커스텀 이미지를 생성해 사용할 수도 있다.

FROM rabbitmq:3.9.11-management

# curl을 사용하여 플러그인 다운로드 및 추가
RUN apt-get update && apt-get install -y curl \
    && curl -O -L https://github.com/rabbitmq/rabbitmq-message-timestamp/releases/download/v3.9.11/rabbitmq_message_timestamp-3.9.11.ez \
    && mv rabbitmq_message_timestamp-3.9.11.ez /plugins/

# RabbitMQ 플러그인 활성화
RUN rabbitmq-plugins enable rabbitmq_message_timestamp
docker run --name rabbitmq-container -p 5672:5672 -p 15672:15672 custom-rabbitmq

신뢰성 & 영속성 보장

1. Queue & 메시지 보존

RabbitMQ server가 종료되고 재부팅하면 기본적으로 Queue는 모두 제거된다.

이를 방지하기 위해 Queue를 생성할 때 Durable 옵션에 true를 주고 생성해야 하며, Producer가 메시지를 발송할 때 RERSISTENT_TEXT_PLAIN 옵션을 주어야 메시지가 보존된다.

#Queue 생성 예시. 2번째 파라미터가 durable option에 대한 설정
rabbitmqChannel.queueDeclare(rabbitmqQueueName, true, false, false, null);

#Message 발송 예시
rabbitmqChannel.basicPublish(exchangeName, routingKey, 
	MessageProperties.RERSISTENT_TEXT_PLAIN, message.getBytes());

2. Message Durability

Queue의 메시지를 디스크에 저장할지 메모리에 저장할 지 선택할 수 있다. 해당 옵션은 durability(위의 durable option)을 설정하면 된다.

  • Disk 모드: Default 설정이며, Cluster 구성 시 하나 이상의 RabbitMQ는 반드시 Disk Mode로 동작시켜야 한다.
  • Ram 모드: Message, Message Index, Queue Index, 다른 RabbitMQ의 상태 정보를 제외한 나머지 모든 정보를 RAM에만 저장한다. 따라서 Exchange /Queue/Binding 등의 정보가 많고 설정이 자주 변경되는 환경에서 빠르게 설정을 변경하기 위해 사용된다.
    이 때, Message 관련 정보는 여전히 Disk에 저장되어 Message 처리량 자체는 증가하지 않는다.

3. Consumer Server가 죽었을 경우

Queue는 Consumer로부터 Ack를 받았을 때 해당 메시지를 삭제하므로, Ack를 받지 못한 메시지가 있는 경우 Queue는 대기하다가 메시지를 수신한 Consumer의 상태를 확인하여, 메시지 처리가 불가능한 상황일 때 다른 Consumer에게 동일한 메시지를 전달한다.

dispatching

여러 소비자가 1개의 Queue를 바라 보고 있다면, RabbitMQ에서는 Round-Robin을 사용해 메시지를 균등하게 분배한다.

1개의 Queue에 대해 3개의 Consumer가 메시지를 소비할 때, 20,000건의 메시지를 발송한 상황이다. Redis Stream에 비해 RabbitMQ와 Kafka는 꽤 공평하게 메시지를 소비하고 있다.

✅Prefetch count
각 메시지의 크기가 달라서 RR을 사용해도 메시지가 공평하게 소비되지 않는 경우가 발생할 수 있다.
prefetch count를 1로 설정한다면 Consumer로부터 Ack를 받지 못한 메시지가 한 개라도 존재할 때 해당 Consumer에게는 메시지를 전달하지 않도록 한다.

멀티스레드 개수

RabbitMQ의 기본 설정으로 충분하다면 추가적인 구성이 필요 없다.
하지만, 캡스톤 디자인 진행 중에 RabbitMQ 내의 병렬 스레드를 조정해가며 지연시간을 측정해보고 싶어 아래의 글을 참고하였다.

https://m.blog.naver.com/PostView.naver?isHttpsRedirect=true&blogId=oidoman&logNo=220833403583

0개의 댓글

관련 채용 정보