[Message Queue]RabbitMQ를 사용해 메세지 주고 받기

봄도둑·2022년 8월 4일
1

Spring 개인 노트

목록 보기
7/17
post-thumbnail

앞서 다뤘던 Message Queue의 오픈 소스 중 대중적으로 가장 많이 쓰이는 RabbitMQ를 사용해보는 내용입니다. 사내 후배들이 직접 따라해볼 수 있도록 작성한 가이드를 블로그에 올릴 수 있도록 수정한 내용입니다. 실제 수행해보면서 궁금한 내용, 혹은 추가할만한 내용이 있다면 언제든 말씀 부탁드립니다!

1. RabbitMQ 설치하기

RabbitMQ를 윈도우 환경에서 설치하게 되면 RabbitMQ server부터 Erlang까지 설치를 해야하는 번거로움이 있습니다. 우리는 이러한 번거로운 과정을 진행하지 않기 위해 docker 환경에서 RabbitMQ 컨테이너를 올려 사용할 것입니다. (RabbitMQ를 다루는 글이기 때문에 docker에 관해서는 다루지 않습니다.)

  1. docker hub에 있는 rabbitmq 이미지를 먼저 받아오겠습니다.
docker pull rabbitmq:3-management
  1. 1번 과정에서 받아온 이미지를 컨테이너로 구축합니다.
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 --restart=unless-stopped -e RABBITMQ_DEFAULT_USER=username -e RABBITMQ_DEFAULT_PASS=password rabbitmq:management

본격적으로 위의 명령을 실행하기 전에 각각의 옵션들을 살펴봅시다.

우리는 RabbitMQ의 서비스를 이용하기 위해서는 특정 포트들을 열어놔야 합니다. RabbitMQ는 AMQP 사용을 기본값으로 설정하고 있습니다. 5672:5672 여기서 보이는 5672는 AMQP 서비스의 포트로 이 녀석을 열어놔야 합니다.

뒤에 보이는 15672:15672 은 management 서비스 포트입니다. management는 RabbitMQ의 Message Queue를 GUI로 컨트롤, 모니터링 할 수 있도록 도와주는 서비스입니다.

물론 꼭 5672 포트와 15672 포트를 사용해야 하는 것은 아닙니다. 자신의 환경에 맞춰서 원하는 포트로 연결하시면 됩니다. 원하는 포트 번호:15672 이렇게 사용하시면 됩니다.

단, 컨테이너를 구축할 때, RabbitMQ AMQP포트와 management 포트를 상기한대로 직접 설정해주어야 합니다. 간혹 설정해주지 않았을 경우 기본 포트로 연결되지 않으니 꼭 포트를 직접 설정하는 것은 잊으면 안됩니다!

RABBITMQ_DEFAULT_USER 는 RabbitMQ 컨테이너와 management에서 사용할 사용자 이름을, RABBITMQ_DEFAULT_PASS 는 해당 사용자 이름과 같이 사용할 비밀번호를 입력하면 됩니다. (해당 예제에서는 사용의 편의상 username/password를 사용했습니다)

일반 윈도우 환경에서 RabbitMQ 설치하는 가이드를 보면 포트 설정 외에도 RabbitMQ server, erlang 등 그 과정이 더 있지만 docker 컨테이너 환경에서 실행한다면 여기까지 하면 rabbitMQ를 사용하기 위한 기본 환경은 다 갖춰진 것입니다.


2. RabbitMQ management 설정

이제 RabbitMQ 컨테이너를 가동하고 RabbitMQ management로 이동해봅시다.

로컬 환경에서 실행할 것이기 때문에 localhost:15672 또는 15672 포트 대신 컨테이너를 세팅할 때 입력한 포트를 입력합니다.

로그인 화면에서 컨테이너를 세팅할 때 설정한 username과 password를 입력합니다.

2-1. exchange 생성하기

다음은 RabbitMQ management에서 메세지와 관련된 exchange, queue, routing key를 설정해보겠습니다.

Exchange 탭 클릭 후 add a new exchange 클릭합니다. 하단에 exchange에 대한 properties가 쭉 표시되는데 이 때 exchange의 이름을 입력한 후 Add exchange 버튼을 클릭하면 exchange가 생성됩니다.


(이미지 : exchange 생성하기)

다음은 메세지를 저장하고 consumer에게 전달할 queue를 생성해보겠습니다. management를 통한 생성 방식은 exchange와 크게 다르지 않습니다.

Queues 탭을 선택한 후 Add a new queue 토글을 클릭합니다. 이후 Name을 지정하고 Add queue 를 클릭하면 됩니다.


(이미지 : queue 생성하기)

이제 위에서 생성한 queue와 exchange를 바인딩하겠습니다. queue와 exchange를 바인딩해야 메세지를 적재하는 기능을 수행할 수 있습니다.

  1. 먼저 바인딩할 exchange를 선택합니다.


2. Bindings 토글을 클릭합니다. To queue 옵션을 선택한 후 생성한 queue의 이름을 넣습니다. 다음 바인딩할 때 메세지를 routing 할 때 사용할 routing key를 입력한 후, Bind 버튼을 클릭합니다.


이렇게 되면 exchage와 queue가 우리가 지정한 routing key에 의해 정상적으로 바인딩된 것을 확인할 수 있습니다.

아래는 제가 예제에서 설정한 항목들의 이름입니다.(반드시 똑같이 이름을 설정할 필요는 없습니다)

  • exchange : sample.exchange
  • queue : sample.queue
  • routing key : sample.routing.#

※ rabbitMQ를 사용할 때 management를 통해 queue와 exchange를 설정하면 간편하다는 것을 보여주기 위해 해당 항목을 작성했습니다만, 보다 상세한 설정은 코드 레벨에서 queue와 exchange를 설정해주는 것이 좋다고 생각합니다. 코드 레벨에서 생성해줄 경우, 맨처음 rabbitMQ에서 선언한 exchange와 queue가 없다면 생성해주고, 이미 있다면 해당 exchage와 queue를 바라보도록 처리합니다. 무엇보다 코드 레벨에서는 우리가 설정한 properties를 한 번에 확인할 수 있기 때문에 management를 통한 생성보다 코드 레벨에서의 생성을 추천합니다.


3. 프로젝트(서비스) 설정

메세지의 송, 수신을 위해 2개의 프로젝트를 운영하며 메시지를 주고 받는 간단한 동작을 구현해보겠습니다. 메세지를 발행해주는 녀석을 publisher, 메세지를 받아서 처리할 녀석을 consumer라는 프로젝트로 생성해봅시다.

3-1. 공동 설정

예제 환경은 manven에서 진행합니다. amqp에 대해 dependency를 추가해야 하는데 코드는 아래와 같습니다.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml은 아래와 같이 설정합니다. 우리가 사용할 server의 포트 번호는 예제 코드 기준 8081이지만 자신이 사용하고자 하는 포트 번호로 사용해도 무방합니다. 이 때 두 개의 프로젝트(publisher, consumer)는 서로 다른 포트를 사용해야 하기 때문에 server 의 포트는 다른 값으로 지정해주고 rabbitmq 에 대한 properties는 동일하게 사용하면 됩니다. rabbitMQ를 docker 컨테이너로 생성할 때 설정해주었던 username과 password를 입력합시다. 실행은 local 레벨에서 진행할 것이기 때문에 host는 localhost로 지정했습니다.

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: username
    password: password

server:
  port: 8081

※만약 yml로 설정하지 않겠다면 아래의 application.properties로 설정을 지정해주셔야 합니다. (쉽게 생각하자면 yml은 properties를 계층적 구조로 편리하게 다루기 위한 형식으로 보면 됩니다)

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=username
spring.rabbitmq.password=password

3-2. consumer 프로젝트

consumer에서 메세지를 받기 위한 SampleListener 클래스를 생성해봅시다.

package com.c.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SampleListener {

    private static final Logger log = LoggerFactory.getLogger(SampleListener.class);

    @RabbitListener(queues = "sample.queue")
    public void reciveMessage(final Message message) {
        log.info(message.toString());
    }
}

메세지를 받아서 처리할 메소드에 @RabbitListener 어노테이션을 사용합니다. 이 어노테이션이 있으면 메소드에게 메세지가 push됩니다. 반대로 @RabbitTemplate 를 사용하면 메세지가 pull됩니다.

이 때 해당 어노테이션에서 사용할 property는 queues 로 조금 전 생성했던 queue의 이름을 넣습니다. 만약 해당 queue가 없다면 consumer가 빌드될 때 해당 queue를 생성하게 됩니다.

위의 예제에서는 메세지를 가지고 특별한 로직을 수행하는 것이 아닌 받은 메세지를 로그로 찍어보는 코드입니다.

이제 프로젝트를 빌드하고 rabbitMQ와 connection이 되었는지 확인해봅시다.


Connections 탭에 들어가서 위의 사진과 같이 표시되면 rabbitMQ와 정상적으로 연결이 된 것입니다.

그렇다면 이제 queue에 메세지를 넣어놓고 consumer가 이를 받아서 처리하고 있는지 확인해 봅시다.

Exchange 탭을 클릭한 후 binding한 exchange를 선택합니다. routing key를 입력하고 payload에 받을 메세지 내용을 입력한 후 Publish message를 클릭합니다.


메세지를 발행한 후 다시 consumer 프로젝트의 console 창을 확인해 아래와 같은 로그가 있다면 queue에서 메세지를 꺼내 정상적으로 처리했음을 확인할 수 있습니다.

redelivered=false, receivedExchange=sample.exchange, receivedRoutingKey=sample.routing.#, deliveryTag=2, consumerTag=amq.ctag-T_8qLZjsSRq_L2cwr6BHpQ, consumerQueue=sample.queue])

3-2. publisher 설정

이제 메세지를 발행해 처리할 publisher 서비스를 만들어봅시다.

메세지를 발급하기 위해서는 config 설정이 필요합니다. SampleConfig 를 생성해봅시다.

package com.c.publisher;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;

public class SampleConfig {

    private static final String EXCAHGE_NAME = "sample.exchange";
    private static final String QUEUE_NAME = "sample.queue";
    private static final String ROUTING_KEY = "sample.routing.#";

    @Bean
    TopicExchange exchange() {
        return  new TopicExchange(EXCAHGE_NAME);
    }

    @Bean
    Queue queue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }
}

앞서 RabbitMQ management에서 설정한 exchange, queue, routing key 정보를 바탕으로 bean 생성합니다.

이러한 config가 무엇을 말하고 설정하는지 간단히 살펴봅시다.

  • Binding은 exchange와 routing key의 패턴이 일치하는 queue에 메시지를 전달하겠다는 일종의 규칙입니다.
  • rabbitTemplate.setMessageConverter를 통해 메세지에 담을 Object를 rabbitmq의 메시지 형식으로 변환합니다.
  • 이 때 Jackson2JsonMessageConverter 가 변환한 메세지는 body 형식의 JSON message로 변환이 됩니다. 만약 Jackson2JsonMessageConverter 를 사용하지 않는다면 우리는 ObjectMapper를 통해 내부에서 한 번 더 변환 작업을 수행하고 메세지를 발행해야 하는 추가 작업이 발생하게 됩니다.
    (Jackson2JsonMessageConverter를 사용하지 않고 ObjectMapper를 통해 메세지를 전달할 경우, 이미지 출처 : 바보개발 - [RabbitMQ] Jackson2JsonMessageConvertor)
    (Jackson2JsonMessageConverter를 사용해 메세지를 발행하는 경우, 이미지 출처 : 바보개발 - [RabbitMQ] Jackson2JsonMessageConvertor)

이제 메세지 발행을 해줄 메소드를 생성해봅시다. 우리는 controller를 통해 url로 메세지 전송을 호출하는 동작을 구현해보겠습니다.

package com.c.publisher;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SampleController {

    private static final String EXCAHGE_NAME = "sample.exchange";

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("sample/queue")
    public String samplePublish() {
        rabbitTemplate.convertAndSend(EXCAHGE_NAME, "sample.routing.#", "RabbitMQ + SpringBoot = Success");
        return "Message seding!";
    }
}

우리가 주목할 부분은 앞서 been으로 생성한 RabbitTemplate를 주입하는 것입니다. (예제에서는 @Autowired 를 사용했지만, 생성자 주입을 통해 사용해야 합니다.)

consumer가 연결되어 있는지 확인하기 위해 들어갔던 management에서 잘 연결되어 있나 확인해봅시다. 앞서 확인 했던 consumer만 연결되어 있는 것으로 보입니다.


publisher는 서비스를 실행해도 connection에 바로 표시가 안됩니다. (이 부분에 대해서는 아직 해결하지 못했습니다ㅠㅠ) 아무 걱정 말고 메세지를 발행하기 위해 지정해준 주소값을 실행합니다.


위의 사진처럼 결과 화면이 나온 것을 확인한 후, 다시 management를 확인해보면 이제 publisher가 연결되어 있는 것을 확인할 수 있습니다.


이 때 우리는 메세지 발행을 진행했기 때문에 consumer의 console창을 확인해보면 메세지가 정상 수신되었음을 확인할 수 있습니다.


4. RabbitListener 활용

메세지를 수신하기 위해 사용한 @RabbitListener 의 properties를 자세히 살펴봅시다.

RabbitListener 어노테이션을 잘 사용하면 management에서 각각의 exchange와 queue를 binding하지 않아도 되며, management보다 상세한 설정으로 binding 관리를 할 수 있게 됩니다.

아래는 @RabbitListener 를 사용할 때 기본 틀이며, 사용 환경에 따라 다른 property와 인자값을 사용할 수 있습니다.

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = 큐이름, durable = "true"),
            exchange = @Exchange(value = 익스체인지이름, type = "fanout"),
            key = 라우팅키
  ))
public void method1(SagaEventMessage message, Channel channel) throws IOException {
	
}

RabbitListener의 property는 아래와 같이 계층 구조로 되어 있습니다.

  • @RabbitListener 의 attributes
    • queues = String : 메시지를 가져올 큐 지정, 값은 지정할 큐의 이름을 String으로 입력
      @RabbitListener(queues = "sample.queue")
    • bindings : listener에게 binding할 queue, exchange 등 binding에 대한 정보를 배열로 받음. 대체로 @QueueBinding 을 사용
    • QueueBinding : queue와 exchange, 선택적으로 routing key를 정의할 수 있음
      • value : queue를 정의하는 attribute → @Queue 를 이용해 정의
        • @Queue 의 value : String으로 큐의 이름 지정 ⇒ 만약 지정한 이름의 queue가 존재하지 않는다면 해당 queue를 지정한 attribute의 값들에 따라 생성!(management를 쓰지 않고도 queue 생성 가능)
        • @Queue 의 durable : consumer의 처리를 기다리고 있는 메시지를 담고 있는 queue가 어떠한 이상으로 인해 죽어버리거나 재시작할 때 메시지가 소실될 수 있음 → 이 때 durable 속성을 true로 주면 이러한 상황이 발생했을 때 메시지를 유지할 수 있음(메시지 지속성 durablity 지정)
        • @Queue 의 exclusive : 큐 이름을 지정해주지 않았을 때, rabbitMQ에서 임의의 큐에 binding을 시킴. exclusive를 ture로 주면 임의의 큐와 consumer의 연결이 끊어졌을 때 해당 큐를 자동으로 제거함
        • @Queue 의 autoDelete : true를 주게 되면 consumer와 연결이 끊긴 큐를 자동 삭제(exclusive는 지정되지 않은 임의의 큐를 삭제함)
      • exchange : exchange를 정의하는 attribute → @Exchange 를 이용해 정의
        • @Exchange 의 value : String으로 exchange의 이름 지정
        • @Exchange 의 type : exchange type을 지정, default 값은 “direct”
        • @Exchange 의 delayed : 메시지를 바로 큐에 보내는 게 아니라 일정 시간 후에 보내고자 할 때 사용. 값이 true라면 ‘x-delayed-message’ exchange에 메시지가 보내지고 일정 시간 후 큐로 메시지가 이동함. default 값은 false
      • key : routing key 값 지정

REFERENCE

호형 - Springboot + RabbitMQ 연동 및 초간단 샘플 프로젝트 만들기
바보개발 - [RabbitMQ] Jackson2JsonMessageConvertor
https://blog.leocat.kr/notes/2018/07/31/rabbitmq-delayed-queue
https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/Exchange.html#delayed()
https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/QueueBinding.html
https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/annotation/RabbitListener.html
teragon - [RabbitMQ] Message durability(메시지 잃어버리지 않기 : durable=true, props=PERSISTENT_TEXT_PLAIN) -2

profile
Java Spring 백엔드 개발자입니다. java 외에도 다양하고 흥미로운 언어와 프레임워크를 학습하는 것을 좋아합니다.

0개의 댓글