RabbitMQ

장철현·2025년 6월 29일

Message Queue

목록 보기
1/1

Rabbit MQ에 대해 공부하고 왜 쓰는지와 어떻게 쓰는지에 대해 알아보자

메세지 큐

RabbitMQ는 메세지 큐이다. 큐는 FIFO 구조인 자료구조를 말하는 것이다.

메세지 큐란 Queue 자료구조를 채택해 메세지를 전달하는 시스템이다.

메세지를 발행하는 부분은 Producer라고 하고 메세지를 받아서 쓰는 쪽은 Consumer라고 한다.
메세지 큐는 MSA에서, 대용량 트래픽을 처리하기 위해 중요한 역할을 한다.

메세지 브로커

브로커는 메세지를 queue에 넣어주는 역할을 한다.

메세지 브로커와 이벤트 브로커로 두개가 있다.

메세지 브로커

메세지 브로커는 Producer가 생산한 메세지를 큐에 넣고 그 메세지를 Consumer가 가져와서 사용한다. 그리고 메세지가 소비됐으면 메세지가 큐에서 삭제되는 특징이 있다.(RabbitMQ, ActiveMQ, AWS SQS, Redis)

이벤트 브로커

이벤트 브로커는 기본적으로 메세지 브로커와 같이 Producer가 생산한 메시지를 큐에 넣고 그 메세지를 Consumer가 사용한다. 그러나 Consumer가 사용하면 삭제되는 것이 아니기에 소비한 데이터를 다시 소비할 수 있다.
또한 메세지 브로커보다 대용량 데이터를 처리할 수 있다.

메세지 큐의 장점

  1. 비동기(Asynchronous): Queue에 넣어두기 때문에 consumer가 사용하고 싶을때 사용할 수 있다.
  2. 낮은 결합도(Decoupling): 애플리케이션과 분리할 수 있다.
  3. 탄력성(Resilience): 일부가 실패 시 전체에 영향을 받지 않는다.
  4. 과잉(Redundancy): 실패 할 경우 재실행이 가능하다.
  5. 신뢰성(Guarantees): 작업이 처리된 걸 확인할 수 있다.
  6. 확장성(Scalable): 다수의 프로세스들이 큐에 메시지를 보낼 수 있다.

1.비동기

만약 메세지 큐가 없다면 End-To-End로 통신하기 때문에 해당 메세지를 보내는 작업이 끝날 때까지 다른 메세지를 전달하는 작업을 할 수 없다. 이러한 방식이 동기 방식인데 전송속도가 빠르고 전송 결과를 신속하게 알 수 있지만, 대용량 트래픽이 발생하는 서버에서는 매우 비효율적이다.
그래서 메세지큐를 사용한다면 producer는 consumer에게 보내는 것이 아닌 Queue라는 우체통과 같은 곳에 넣어두면 consumer가 알아서 메세지를 가져와서 소비한다.

2. 낮은 결합도

메세지큐를 통해 어플리케이션끼리의 결합도를 낮출 수 있다. 확장성, 유연성, 효율적인 유지보수, 장애 전파 방지 등 장점이 있다.

3. 탄력성

시스템이 예기치 않은 상황 또는 장애에 대응하고 유연하게 대처할 수 있는 능력이다.
예를들어 송금한다는 프로세스가 있다고 하자. A프로세스는 B프로세스에게 송금을 보내고 B프로세스는 송금을 받아 계좌에 반영한다.

만약 이 과정에서 메세지 큐가 없다고 하자. B프로세스가 장애가 생겨 사용하지 못하게 되고 A프로세스도 사용하지 못하게 된다.
하지만 메세지 큐가 있다면 B프로세스가 장애가 발생해서 사용하지 못한다해도 A프로세스는 메세지 큐에다가 넣어두고 B프로세스가 장애가 복구가 되고 처리하게 되면 된다. 그래서 A프로세스는 정상적으로 운영하다가 메세지큐에 넣어두고 B프로세스가 장애가 복구되면 메세지큐에 들어간 메세지들을 처리하면 되는 식이다

4. 과잉

과잉은 정상적인 메세지 송/수신이 실패하는 경우 재실행이 가능하다는 뜻이다.

만약 메세지 큐가 없다고 가정하자. A프로세스와 B프로세스가 End-To-End 통신을 하고 있는데 B프로세스가 장애가 생겨 A프로세스도 사용에 지장이 생긴다. 그리고 애플리케이션 수준에서 시스템 응답성 저하, 데이터 불일치, 메세지 유실 등 다양한 문제가 발생할 수 있다.

하지만 메세지 큐를 사용하면 일정 장애 기간동안 송신된 메세지는 큐에 남아있기에 장애 복구 시 정상적으로 재시도 및 복구가 가능하다.

5. 신뢰성

신뢰성은 메세지의 안전하고 정확한 전달을 의미한다. 메세지 큐 덕분에 메세지를 안전하고 확실하게 Consumer가 사용할 수 있다.

6. 확장성

확장성은 수평확장을 의미한다. 기존 메세지 큐를 이용한 통신에서 부하가 증가하거나 클라이언트의 동시다발적인 요청이 증가할 때, 메세지 큐에 Producer와 Consumer를 추가하여 비교적으로 간단하고 쉽게 확장할 수 있다.

RabbitMQ

여기서 AMQP라는 것이 있는데 이것은 메세지 지향 미들웨어를 위한 개방형 표준 응용 계층 프로토콜이다. AMQP는 메세지 큐와 비슷하지만 Exchange, Binding이라는 것을 통해 라우팅을 통해 특정 Queue로 메세지를 전달한다.

구성요소

  • Producer : 요청을 보내는 주체이며 보내고자 하는 메세지를 Exchange 에 Publish 한다.
  • Consumer : Producer 로 부터 메세지를 받아 처리하는 주체이다.
  • Exchange : Producer 로 부터 전달받은 메세지를 어떤 메세지 큐로 전송할지 결정하는 장소이다. 이러한 기능을 라우팅 기능이라고 할 수 있다. 하나의 Exchange에 1개 이상의 Queue가 등록되고 둘 사이에 Routing Key(Binding Key)가 존재한다.
  • Queue : Consumer 가 소비하기 전까지 메세지가 보관되는 장소이다.
  • Binding : Exchange 와 Queue 와의 관계입니다. 즉, 특정 Exchange 가 특정 Queue 에 메세지를 보내도록 정의한다. Binding은 Routing Key를 통해 이루어진다.

특징 및 장점

  • AMQP를 구현해 놓은 메세지 큐이다.
  • 신뢰성, 안정성, 성능을 충족할 수 있도록 다양한 기능 제공한다.
  • 라우팅 기능으로 유연하다.
  • UI를 통해 쉽게 관리할 수 있다.
  • 20kb/sec의 속도를 가지고 있다.
  • 거의 모든 언어와 운영체제를 지원한다.

처리구조

  1. Producer가 Broker로 메세지로 보낸다.
  2. Broker 내 Exchange, Binding을 통해 해당하는 Key에 맞게 Queue에 분배한다.
  3. 해당 Queue를 구독하는 Consumer가 메세지를 소비한다.

단점

  1. 메세지 큐 서버가 종료 후 재가동되면 큐 내용은 삭제된다.
  2. Producer와 Consumer의 결합도가 높다.

RabbitMQ 실습 전

실습하기 전 Binding 전략에 대해 알아보자
Binding이란 Producer로 부터 받은 메세지를 Exchange가 어떤 Queue로 전달할지 라우팅하는 방식이다.

여기에 4가지 전략이 있다. (Direct Exchange, Fanout Exchange, Topic Exchange, Headers Exchange)

Direct Exchange

메세지의 Routing Key와 정확히 일치하는 Binding 된 Queue로 Routing 한다.

Direct Exchange는 Routing key와 동일한 key로 Binding된 Queue에만 전송한다. rabbit이라는 key를 가진 Queue에만 전송한다.

Fanout Exchange

Binding된 모든 Queue에 메세지를 Routing 한다.

Fanout Exchange는 Routing key에 관계없이 Exchange에 등록된 모든 Queue에 메세지를 전송한다.

Topic Exchange

특정 Routing Pattern에 일치하는 Queue로 Routing 한다.

Topic Routing은 Routing key의 패턴을 통해 메세지를 Routing 한다. 여러 Consumer에서 메세지 형식에 따라 선택적으로 수신해야 하는 경우 등에 사용한다.

animal.rabbit이라는 Routing key로 Exchange가 메세지를 발행했고 이 패턴에 일치하는 Routing key로 Binding 된 Queue로 메세지가 전달된다. (#은 모든 걸 일치)

Headers Exchange

Key-Value로 정의된 Header 속성을 통해 Routing 한다.

Headers Exchange는 Topic Exchange와 비슷하지만 header를 이용한다. Producer에서 정의한 header에 Key-Value 상과 Consumer에서 정의된 Argument의 Key-Value 쌍이 일치하면 Binding 된다.

Exchange 속성

  • Name : Exchange의 이름
  • Type : Binding 전략
  • Durability : 메세지 브로커가 시작될 때 남아 있는지 여부, Durable이면 서버가 재시작되어도 기존 Exchange가 남아있고 Transient라면 삭제된다.
  • Auto-delete : 마지막 Queue 연결이 해제되면 Exchange가 삭제된다.

RabbitMQ 실습

Spring Boot를 이용하여 RabbitMQ를 사용할 예정이다.

설정

build.gradle

implementation 'org.springframework.boot:spring-boot-starter-amqp'

application.yml

server:
  port: 8080

# Log Level 설정
logging:
  level:
    root: info

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

# RabbitMQ queue, exchange, routing-key 정보
rabbitmq:
  queue.name: sample.queue
  exchange.name: sample.exchange
  routing.key: sample.key

도커설정

도커에서 rabbitMQ를 실행하는게 편하기 때문에 도커에서 설치하고 실행하자

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=unless-stopped rabbitmq:management

포트 5672는 브로커 연결이고 15672는 우리가 편하게 보기 위한 웹 관리 콘솔에 사용한다.

위의 명령어를 실행하면 설치하고 실행된다. 그럼 docker ps를 입력하면 이렇게 rabbitMQ가 실행된게 보인다.

id와 password는 guest guest로 입력하면 로그인된다.

Spring(Direct Exchange 1개)

RabbitMQConfig.java

@Value를 통해 application.yml에서 설정한 값(Queue, Binding, Exchange)들을 가져와서 사용한다.

@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    @Value("${spring.rabbitmq.port}")
    private int rabbitmqPort;

    @Value("${spring.rabbitmq.username}")
    private String rabbitmqUsername;

    @Value("${spring.rabbitmq.password}")
    private String rabbitmqPassword;

    @Value("${rabbitmq.queue.name}")
    private String queueName;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    /** 지정된 큐 이름으로 Queue 빈을 생성
     *
     * @return Queue 빈 객체
     */
    @Bean
    public Queue queue() {
        return new Queue(queueName);
    }

    /** 지정된 exchange 이름으로 DirectExchange 빈을 생성
     *
     * @return TopicExchange 빈 객체
     */
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    /**
     * 주어진 queue와 exchange를 바인딩하고 라우팅 키를 이용하여 Binding 빈을 생성
     *
     * @param queue    바인딩할 Queue
     * @param exchange 바인딩할 TopicExchange
     * @return Binding 빈 객체
     */
    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }

    /**
     * RabbitMQ 연결을 위한 ConnectionFactory 빈을 생성하여 반환
     *
     * @return ConnectionFactory 객체
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(rabbitmqPort);
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        return connectionFactory.getRabbitConnectionFactory();
    }

    /**
     * RabbitTemplate을 생성하여 반환
     *
     * @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체
     * @return RabbitTemplate 객체
     */
    @Bean
    public RabbitTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    /**
     * Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
     *
     * @return MessageConverter 객체
     */
    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

MessageDto.java

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageDto {

    private String title;

    private String content;
}

MessageService.java

@Slf4j
@Service
@RequiredArgsConstructor
public class MessageService {

    @Value("${rabbitmq.queue.name}")
    private String queueName;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    private final RabbitTemplate rabbitTemplate;

    /**
     * 1. Queue 로 메세지를 발행
     * 2. Producer 역할 -> Direct Exchange 전략
     **/
    public void sendMessage(MessageDto messageDto) {
        log.info("messagge send: {}",messageDto.toString());
        this.rabbitTemplate.convertAndSend(exchangeName,routingKey,messageDto);
    }

    /**
     * 1. Queue 에서 메세지를 구독
     **/
    @RabbitListener(queues = "${rabbitmq.queue.name}")
    public void receiveMessage(MessageDto messageDto) {
        log.info("Received Message : {}", messageDto.toString());
    }
}

MessageController.java

@RestController
@Slf4j
@RequiredArgsConstructor
public class MessageController {

    private final MessageService messageService;

    /**
     * Queue로 메시지를 발행
     *
     * @param messageDto 발행할 메시지의 Dto 객체
     * @return ResponseEntity 객체로 응답을 반환
     */
    @PostMapping(value = "/send/message")
    public ResponseEntity<?> sendMessage(@RequestBody MessageDto messageDto) {
        messageService.sendMessage(messageDto);
        return ResponseEntity.ok("Message sent to RabbitMQ!🐇");
    }
}

결과

postman

RabbitMQ UI

Spring

Spring(Direct Exchange 2개, Fanout Exchange 1개)

두개 비슷하지만 헷갈리니 조심하자.

application.yml

server:
  port: 8080

# Log Level 설정
logging:
  level:
    root: info

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest


# RabbitMQ queue, exchange, routing-key 정보
rabbitmq:
  queue.name: sample.queue
  exchange.name: sample.exchange
  routing.key: sample.key
  queue1.name: sample.queue1
  exchange1.name: sample.exchange1
  routing1.key: sample.key1
  sample.fanout: sample.fanout

나는 queue1, exchange1, routing1을 만들기 위해 작성했고 추가적으로 fanout방식을 위해 하나 더 만들었다. fanout 방식은 routingkey가 필요없다.

RabbitMQConfig.java

@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    @Value("${spring.rabbitmq.port}")
    private int rabbitmqPort;

    @Value("${spring.rabbitmq.username}")
    private String rabbitmqUsername;

    @Value("${spring.rabbitmq.password}")
    private String rabbitmqPassword;

    @Value("${rabbitmq.queue.name}")
    private String queueName;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    // 두번째 큐 만들기
    @Value("${rabbitmq.queue1.name}")
    private String queueName1;

    // 나는 하나의 exchange로 두개의 queue(queue, queue1)을 묶었다.
    // 각각 만들고 싶으면 여기와 아래 주석 풀기
//    @Value("${rabbitmq.exchange1.name}")
//    private String exchangeName1;

    @Value("${rabbitmq.routing1.key}")
    private String routingKey1;

    //Fanout 방식
    @Value("${rabbitmq.sample.fanout}")
    private String sampleFanout;

    /** 지정된 큐 이름으로 Queue 빈을 생성
     *
     * @return Queue 빈 객체
     */
    @Bean
    public Queue queue() {
        return new Queue(queueName);
    }

    /**
     *  두번째 큐
     *
     */
    @Bean
    public Queue queue1() {
        return new Queue(queueName1);
    }

    /** 지정된 exchange 이름으로 DirectExchange 빈을 생성
     *
     * @return TopicExchange 빈 객체
     */
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(exchangeName);
    }

    // 나는 queue와 queue1을 하나의 exchange로 바인딩했다. queue와 queue1의 각각 exchange를 만들어도 된다.
//    @Bean
//    public DirectExchange exchange1() {
//        return new DirectExchange("sample.exchange1");
//    }

    /**
     * 주어진 queue와 exchange를 바인딩하고 라우팅 키를 이용하여 Binding 빈을 생성
     *
     * @param queue    바인딩할 Queue
     * @param exchange 바인딩할 TopicExchange
     * @return Binding 빈 객체
     */
    @Bean
    public Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey);
    }

    /**
     * 두번째 Queue
     * @param queue
     * @param exchange
     * @return
     */

    @Bean
    public Binding binding1(@Qualifier("queue1") Queue queue, @Qualifier("exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(routingKey1);
    }

    /**
     * RabbitMQ 연결을 위한 ConnectionFactory 빈을 생성하여 반환
     *
     * @return ConnectionFactory 객체
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(rabbitmqHost);
        connectionFactory.setPort(rabbitmqPort);
        connectionFactory.setUsername(rabbitmqUsername);
        connectionFactory.setPassword(rabbitmqPassword);
        return connectionFactory.getRabbitConnectionFactory();
    }

    /**
     * RabbitTemplate을 생성하여 반환
     *
     * @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체
     * @return RabbitTemplate 객체
     */
    @Bean
    public RabbitTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    /**
     * Fanout 방식
     * 두개의 Queue(queue, queue1)를 만들어서 두개에 메세지를 보낸다.
     */

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(sampleFanout);
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding3() {
        return BindingBuilder.bind(queue()).to(fanoutExchange());
    }

    /**
     * Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
     *
     * @return MessageConverter 객체
     */
    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

MessageDto.java

위와 동일하다

MessageService.java

@Slf4j
@Service
@RequiredArgsConstructor
public class MessageService {

    @Value("${rabbitmq.queue.name}")
    private String queueName;

    @Value("${rabbitmq.exchange.name}")
    private String exchangeName;

    @Value("${rabbitmq.routing.key}")
    private String routingKey;

    @Value("${rabbitmq.queue1.name}")
    private String queueName1;

    @Value("${rabbitmq.exchange1.name}")
    private String exchangeName1;

    @Value("${rabbitmq.routing1.key}")
    private String routingKey1;

    @Value("${rabbitmq.sample.fanout}")
    private String sampleFanout;

    private final RabbitTemplate rabbitTemplate;

    /**
     * 1. Queue 로 메세지를 발행
     * 2. Producer 역할 -> Direct Exchange 전략
     **/
    public void sendMessage(MessageDto messageDto) {
        log.info("messagge send: {}",messageDto.toString());
        this.rabbitTemplate.convertAndSend(exchangeName,routingKey,messageDto);
    }

    // routingkey1으로 전송한다.(이 routingKey는 queue1과 exchange가 바인딩 되어 만들어진 routingkey이다)
    public void sendMessage1(MessageDto messageDto) {
        log.info("messagge send: {}",messageDto.toString());
        this.rabbitTemplate.convertAndSend(exchangeName,routingKey1,messageDto);
    }

    //Fanout방식
    public void sendMessageAll(MessageDto messageDto) {
        log.info("messagge send: {}",messageDto.toString());
        this.rabbitTemplate.convertAndSend(sampleFanout, "", messageDto);
    }

    /**
     * 1. Queue 에서 메세지를 구독
     **/
    @RabbitListener(queues = "${rabbitmq.queue.name}")
    public void receiveMessage(MessageDto messageDto) {
        log.info("Received Message : {}", messageDto.toString());
    }

    // queue1을 이벤트를 받을 리스너
    @RabbitListener(queues = "${rabbitmq.queue1.name}")
    public void receiveMessage1(MessageDto messageDto) {
        log.info("Received Message1 : {}", messageDto.toString());
    }

}

MessageController.java

@RestController
@Slf4j
@RequiredArgsConstructor
public class MessageController {

    private final MessageService messageService;

    /**
     * Queue로 메시지를 발행
     *
     * @param messageDto 발행할 메시지의 Dto 객체
     * @return ResponseEntity 객체로 응답을 반환
     */
    @PostMapping(value = "/send/message")
    public ResponseEntity<?> sendMessage(@RequestBody MessageDto messageDto) {
        messageService.sendMessage(messageDto);
        return ResponseEntity.ok("Message sent to RabbitMQ!🐇");
    }

    // queue1으로 보낼 api
    @GetMapping(value = "/send/message")
    public ResponseEntity<?> sendMessage1(@RequestBody MessageDto messageDto) {
        messageService.sendMessage1(messageDto);
        return ResponseEntity.ok("Message sent to RabbitMQ1!🐇");
    }

    // fanout방식의 api
    @PostMapping(value = "/send/messageAll")
    public ResponseEntity<?> sendToAllMessage(@RequestBody MessageDto messageDto) {
        messageService.sendMessageAll(messageDto);
        return ResponseEntity.ok("Message sent to All RabbitMQ!🐇");
    }
}

결과

queue(direct exchange)

queue1(direct exchange)

fanout(fanout exchange)

참고

[서버] SpringBoot 을 이용한 RabbitMQ 구축하기

0개의 댓글