[RabbitMQ] Exchange 종류대로 사용해보기

LDB·2025년 12월 6일

RabbitMQ

목록 보기
5/7
post-thumbnail

❓ 들어가기에 앞서

늘 그렇듯이 회사 업무가 바빠서 공부는 고사하고 새벽까지 일하는 경우가 많아져 공부를 소홀히 하게되었습니다. 하지만 틈틈히 조금씩이라도 자료를 수집하고 모으다보니 그래도 조금만 작업하면 되었기에 과거의 저에게 감사하다고 전하고 싶었습니다.

📚 사전준비 및 읽으면 도움되는 글

https://velog.io/@half-phycho/RabbitMQ-SpringBoot로-RabbitMQ-사용해보기
Spring Boot로 RabbitMQ기본 세팅하는 방법 입니다.

https://velog.io/@half-phycho/RabbitMQ-AMQP란-무엇인가
AMQP에 대해 정리한 글 입니다. 이번에는 Exchange를 다양하게 써볼 예정이라 도움이 되겠습니다.


💡RabbitMQ로 다양한 Exchange 써보기

지난번에는 RabbitMQ로 기본적인 Direct Exchange를 보냈습니다. 이번에는 Direct Exchange를 포함한 여러 Exchange를 보내보면서 비교해보는 시간을 가져보도록 하겠습니다.


❗ 변경된 프로젝트 구조

이전 게시글 에서는 RabbitMQ 연결세팅과 Queue, Binding을 모두 하나의 클래스에서 만들었지만 이번에는 역할을 분리하는 등 여러가지 부분이 변경되었습니다.

  • RabbitMQ Connection 파일 별도로 관리
  • Queue, Binding, Exchange에 사용될 이름을 오타 방지를 위해 properties에 등록하고 가져다 사용하는 형식으로 변경
  • 기존 RabbitmqConfig.java 에는 Queue, Binding, Exchange 선언 및 생성을 집중으로 관리
  • RabbitMQ에 Exchange별로 보낸 메세지를 출력하는 코드추가
  • 자체적인 Exception Handler, ApiResponse 구축

RabbitmqConnConfig

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 연결 설정
 */
@Configuration
public class RabbitmqConnConfig {

    @Value("${spring.rabbitmq.host}")
    private String host; // 접속 호스트

    @Value("${spring.rabbitmq.port}")
    private Integer port; // 접속 포트번호

    @Value("${spring.rabbitmq.username}")
    private String username; // 접속 아이디

    @Value("${spring.rabbitmq.password}")
    private String password; // 접속 비밀번호

    /**
     * RabbitMQ와 메시지 통신을 담당하는 클래스
     */
    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(messageConverter());

        return rabbitTemplate;
    }

    /**
     * RabbitMQ와 연결을 관리하는 클래스
     */
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        return connectionFactory;
    }

    /**
     * RabbitMQ 메시지를 JSON형식으로 보내고 받을 수 있다.
     */
    @Bean
    public Jackson2JsonMessageConverter messageConverter() {

        ObjectMapper objectMapper = new ObjectMapper()
                // 날짜 관련 타임스탬프 직렬화를 막고 ISO-8601 형태로 포맷된다.
                .configure(SerializationFeature.WRITE_DATE_KEYS_AS_TIMESTAMPS, true)
                .registerModule(dateTimeModule()); // Java에서 시간을 처리하기위한 모듈

        return new Jackson2JsonMessageConverter(objectMapper);
    }

    /**
     * 자바 시간 모듈 등록
     */
    @Bean
    public JavaTimeModule dateTimeModule() {
        return new JavaTimeModule();
    }

}

RabbitmqExchangeInfo

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * RabbitMQ에서 사용하는 exchange이름과 Queue정보 모아둔 Component
 */
@Component
public class RabbitmqExchangeInfo {

    @Value("${rabbitmq.direct.exchange.name}")
    private String DIRECT_EXCHANGE_NAME; // direct Exchange

    @Value("${rabbitmq.direct.exchange.key}")
    private String DIRECT_EXCHANGE_KEY; // direct Exchange Key

    @Value("${rabbitmq.direct.queue.name}")
    private String DIRECT_QUEUE_NAME;   // direct Queue name

    public String get_DIRECT_EXCHANGE_NAME() {
        return DIRECT_EXCHANGE_NAME;
    }

    public String get_DIRECT_EXCHANGE_KEY() {
        return DIRECT_EXCHANGE_KEY;
    }

    public String get_DIRECT_QUEUE_NAME() {
        return DIRECT_QUEUE_NAME;
    }

}

RabbitmqConfig

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 설정 파일
 */
@Configuration
@RequiredArgsConstructor
public class RabbitmqConfig {

    private final RabbitmqExchangeInfo rabbitmqExchangeInfo;
    
    // 공통적으로 RabbitMQ가 재부팅되도 Exchange가 삭제안되고 동시에 대기열에 남도록 설정
    @Bean
    public DirectExchange directExchange() {
        // DIRECT_EXCHANGE_NAME 이름의 direct Exchange 구성
        return ExchangeBuilder
                .directExchange(rabbitmqExchangeInfo.get_DIRECT_EXCHANGE_NAME())
                .build();
    }
    
    // 공통적으로 RabbitMQ가 재부팅되도 Queue 대기열에 남도록 설정
    @Bean
    public Queue directQueue() {
        return new Queue(rabbitmqExchangeInfo.get_DIRECT_QUEUE_NAME(), true);
    }

    /**
     * Queue와 DirectExchange를 바인딩
     */
    @Bean
    public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
        // queue까지 가는 바인딩 Exchange 타입을 directExchange로 지정하고 test.key 이름으로 바인딩 구성
        return BindingBuilder
                .bind(directQueue)
                .to(directExchange)
                .with(rabbitmqExchangeInfo.get_DIRECT_EXCHANGE_KEY());
    }

}

RabbitmqMessage

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 큐에 등록된 메세지 리턴 component
 */
@Slf4j
@Component
public class RabbitmqMessage {

    @RabbitListener(queues = "direct.queue")
    public void directMessage(String message){
        log.info("direct.queue 내의 메시지 반환 : {}", message);
    }

}

(기타 Fanout, Topic, Header 방식은 업로드한 GitHub Repository를 참고하시기 바랍니다. 위의 코드는 게시글이 길어지는 것을 방지하고자 일부러 Direct Exchange관련만 코드에 남겼습니다.)

해당 프로젝트가 있는 Github 주소 입니다.
https://github.com/delight-HK3/rabbitmq-test


▶️ 해당 프로젝트 실행

만약 해당 프로젝트를 실행시키면 자동으로 관련 Exchange와 Queue가 생성되는 것을 확인할 수 있습니다.

여기서 잠깐! 어째서 자동으로 Binding, Queue, Exchange가 생성되는 것 인가?
Spring Boot에서 프로젝트를 만들때 Gradle에서 pring-boot-starter-amqp가 프로젝트에 있는경우 자동으로 AmqpAdmin 이라는 Bean을 구성하고 등록합니다. 그리고 @Configuration 클래스 내에서 @Bean으로 등록된 Queue, Exchange, Binding 설정들이 자동으로 AmqpAdmin의 자동 선언 대상이 됩니다.

📕 Direct Exchange

Direct Exchange는 이전에 간단하게 RabbitMQ에 대해 시연해본 방법으로 요청시 Exchange의 이름과 Exchange에 등록되어 있는 라우팅키가 일치하면 해당 Exchange와 연결된 Queue로 보내는 방식입니다.

Exchange, Queue, Binding 생성 과정

1. Exchange 생성 : Direct 타입의 Exchange를 생성합니다.
2. Queue 생성 : Direct Exchange로 받은 데이터를 적재할 Queue를 생성합니다.
3. Binding 생성 : 앞에서 생성한 Exchange와 Queue간 바인딩을 설정하고 Routing Key 기반으로 바인딩을 수행합니다.

코드예시

private final RabbitmqExchangeInfo rabbitmqExchangeInfo;

@Bean
public DirectExchange directExchange() {
	// DIRECT_EXCHANGE_NAME 이름의 direct Exchange 구성
    return ExchangeBuilder
                .directExchange(rabbitmqExchangeInfo.get_DIRECT_EXCHANGE_NAME())
                .build();
}

@Bean
public Queue directQueue() {
    // 여기서 true는 RabbitMQ가 재부팅되도 Queue 대기열에 남도록 하는 설정입니다.
    return new Queue(rabbitmqExchangeInfo.get_DIRECT_QUEUE_NAME(), true);
}

@Bean
public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
    // queue까지 가는 바인딩 Exchange 타입을 directExchange로 지정하고 test.key 이름으로 바인딩 구성
    return BindingBuilder
                .bind(directQueue)
                .to(directExchange)
                .with(rabbitmqExchangeInfo.get_DIRECT_EXCHANGE_KEY());
}

요청 및 결과

Controller

@PostMapping("/direct")
public ResponseEntity<ApiResponse> sendDirectMessage(@RequestBody MessageDTO messageDTO) {
    String resultMessage = messsageService.sendDirectMessage(messageDTO);

    return ApiResponse.success(resultMessage,200);
}

Service

/**
  * Direct Exchange 방식 메세지 전송
  *
  * @param messageDTO 메세지 DTO
  * @return 성공 시 "success_direct" 리턴
  */
public String sendDirectMessage(MessageDTO messageDTO) {
    try{
        ObjectMapper objectMapper = new ObjectMapper();
        String objectToJson = objectMapper.writeValueAsString(messageDTO);

        rabbitTemplate.convertAndSend(rabbitmqExchangeInfo.get_DIRECT_EXCHANGE_NAME()
                                        , rabbitmqExchangeInfo.get_DIRECT_EXCHANGE_KEY()
                                        , objectToJson);
    } catch (JsonProcessingException ex) {
        log.error("parsing error : {}", ex.getMessage(), ex);
    }

    return "success_direct";
}

요청결과

  1. 먼저 API 요청결과를 리턴 받습니다.
  1. 해당 로그는 사전에 @RabbitListener로 Direct Exchange를 통해 direct.queue에 데이터가 들어오면 발동하는 이벤트인데 해당 로그가 나왔다는 의미는 데이터가 Queue 까지 전달이 잘되었다는 의미입니다.

📙 Fanout Exchange

Fanout Exchange는 Exchange에 라우팅 키와 관계없이 Exchange에 연결된 모든 Queue에 메세지를 보내는 방식입니다.

Exchange, Queue, Binding 생성 과정

1. Exchange 생성 : Fanout 타입의 Exchange를 생성합니다.
2. Queue 생성 : Fanout Exchange로 받은 데이터를 적재할 Queue를 2개 생성합니다.
3. Binding 생성 : 앞에서 생성한 Exchange와 Queue간 바인딩을 설정하고 Exchange는 연결설정된 Queue에게만 메세지를 보냅니다.

코드예시

private final RabbitmqExchangeInfo rabbitmqExchangeInfo;

@Bean
public FanoutExchange fanoutExchange() {
    // FANOUT_EXCHANGE_NAME 이름의 fanout Exchange 구성
    return ExchangeBuilder
                .fanoutExchange(rabbitmqExchangeInfo.get_FANOUT_EXCHANGE_NAME())
                .build();
}


@Bean
public Queue fanoutQueueOne() {
    return new Queue(rabbitmqExchangeInfo.get_FANOUT_QUEUE_NAME_ONE(), true);
}

@Bean
public Queue fanoutQueueTwo() {
    return new Queue(rabbitmqExchangeInfo.get_FANOUT_QUEUE_NAME_TWO(), true);
}


/**
  * Queue(fanoutQueueOne)와 FanoutExchange 바인딩
  * Fanout 방식은 Exchange와 연결된 모든 Queue에 보내는 방식으로
  * FanoutExchange와 연결된 fanoutQueueOne, fanoutQueueTwo에게 메세지를 보낸다.
  */
@Bean
public Binding fanoutBindingOne(FanoutExchange fanoutExchange, Queue fanoutQueueOne) {
    return BindingBuilder
                .bind(fanoutQueueOne)
                .to(fanoutExchange);
}

/**
  * Queue(fanoutQueueTwo)와 FanoutExchange 바인딩
  * Fanout 방식은 Exchange와 연결된 모든 Queue에 보내는 방식으로
  * FanoutExchange와 연결된 fanoutQueueOne, fanoutQueueTwo에게 메세지를 보낸다.
  */
@Bean
public Binding fanoutBindingTwo(FanoutExchange fanoutExchange, Queue fanoutQueueTwo) {
    return BindingBuilder
                .bind(fanoutQueueTwo)
                .to(fanoutExchange);
}

요청 및 결과

Controller

@PostMapping("/fanout")
public ResponseEntity<ApiResponse> sendFanoutMessage(@RequestBody MessageDTO messageDTO){
    String resultMessage = messsageService.sendFanoutMessage(messageDTO);

    return ApiResponse.success(resultMessage, 200);
}

Service

/**
  * Fanout Exchange 방식 메세지 전송
  *
  * @param messageDTO 메세지 DTO
  * @return 성공 시 "success_fanout" 리턴
  */
public String sendFanoutMessage(MessageDTO messageDTO) {
    try{
        ObjectMapper objectMapper = new ObjectMapper();
        String objectToJson = objectMapper.writeValueAsString(messageDTO);

        rabbitTemplate.convertAndSend(rabbitmqExchangeInfo.get_FANOUT_EXCHANGE_NAME()
                                        , ""
                                        , objectToJson);
    } catch (JsonProcessingException ex) {
        log.error("parsing error : {}", ex.getMessage(), ex);
    }

    return "success_fanout";
}

요청결과

  1. 먼저 API 요청결과를 리턴 받습니다.
  1. 마찬가지로 해당 로그도 해당 Queue에게 전달 되었기 때문에 출력되는 로그 입니다. 다른점은 Direct Exchange와 다르게 Exchange와 바인딩 된 Queue에게 일괄적으로 메세지를 전송했다는 점 입니다.

📗 Topic Exchange

Topic Exchange 방식은 라우팅 키에 등록된 패턴에 따라 메세지를 보내는 방식으로 보통 키의 구분자를 *를 주로 사용하는 것 같은데 경우에 따라 !,#등 방법은 다양합니다.

Exchange, Queue, Binding 생성 과정

1. Exchange 생성 : Topic 타입의 Exchange를 생성합니다.
2. Queue 생성 : Topic Exchange로 받은 데이터를 적재할 Queue를 2개 생성합니다.
3. Binding 생성 : 앞에서 생성한 Exchange와 Queue간 바인딩을 설정하고 Exchange는 라우팅키 규칙과 맞는 Queue에게만 메세지를 보냅니다.

코드예시

private final RabbitmqExchangeInfo rabbitmqExchangeInfo;

@Bean
public TopicExchange topicExchange() {
    // TOPIC_EXCHANGE_NAME 이름의 topic Exchange 구성
    return ExchangeBuilder
                .topicExchange(rabbitmqExchangeInfo.get_TOPIC_EXCHANGE_NAME())
                .build();
}


@Bean
public Queue topicQueue() {
    return new Queue(rabbitmqExchangeInfo.get_TOPIC_QUEUE_NAME(), true);
}

// 편의상 fanout 전용으로 사용한 Queue를 사용하겠습니다
@Bean
public Queue fanoutQueueTwo() {
    return new Queue(rabbitmqExchangeInfo.get_FANOUT_QUEUE_NAME_TWO(), true);
}


/**
  * topic Exchange 와 topicQueue간 바인딩
  * producer에서 topic.send. 으로 시작하는 라우팅 키를 보내주면 라우팅 키 규칙과 같은 Exchange와 연결
  */
@Bean
public Binding topicBinding(TopicExchange topicExchange, Queue topicQueue){
        return BindingBuilder
                .bind(topicQueue)
                .to(topicExchange)
                .with("topic.send.*");
}

@Bean
public Binding topicBinding2(TopicExchange topicExchange, Queue fanoutQueueTwo){
        return BindingBuilder
                .bind(fanoutQueueTwo)
                .to(topicExchange)
                .with("topic.send.tester");
}

요청 및 결과

Controller

@PostMapping("/topic")
public ResponseEntity<ApiResponse> sendTopicMessage(@RequestBody MessageDTO messageDTO){
    String resultMessage = messsageService.sendTopicMessage(messageDTO);

    return ApiResponse.success(resultMessage, 200);
}

Service

/**
  * Topic Exchange 방식 메세지 전송
  *
  * @param messageDTO 메세지 DTO
  * @return 성공 시 "success_topic" 리턴
  */
public String sendTopicMessage(MessageDTO messageDTO) {
    try{
        ObjectMapper objectMapper = new ObjectMapper();
        String objectToJson = objectMapper.writeValueAsString(messageDTO);

        rabbitTemplate.convertAndSend(rabbitmqExchangeInfo.get_TOPIC_EXCHANGE_NAME()
                                        , "topic.send.test"
                                        , objectToJson);
    } catch (JsonProcessingException ex) {
        log.error("parsing error : {}", ex.getMessage(), ex);
    }

    return "success_topic";
}

요청결과

  1. 먼저 API 요청결과를 리턴 받습니다.
  1. 마찬가지로 해당 로그도 해당 Queue에게 전달되었기 때문에 출력되는 로그입니다. 다른 점은 분명 2개의 Queue를 만들고 바인딩까지 해주었는데 topic.queue에만 메세지가 들어왔고 fanout.queue.two 에는 데이터가 안 들어왔습니다. 그 이유는 Service에서 설정한 라우팅 키 때문인데 sendTopicMessage에는 routingKey를 topic.send.test로 설정했습니다. Topic Exchange는 특성상 라우팅 키의 규칙을 따르거나 라우팅 키가 정확히 일치해야 통신이 됩니다. 그런데 fanoutQueueTwo에 설정된 라우팅키는 topic.send.tester입니다. 그래서 topicBinding2으로는 메세지가 전송이 안 된 것입니다.

📘 Header Exchange

Header Exchange는 요청 시 HTTP Header에 지정된 value와 키가 있으면 메세지를 보내는 Exchange 방식입니다. 지금까지는 Routing Key가 해왔던 역할을 Header가 대신해준다고 생각하면 됩니다. 그래서 RabbitMQ 관리자 페이지에서 Header Exchange를 조회하면 다음과 같이 조회됩니다.

여기서 (key):true 이렇게 되어있는게 이건 해당 키가 있으면 어떤 value여도 통과 시켜준다는 의미입니다.

Exchange, Queue, Binding 생성 과정

1. Exchange 생성 : Header 타입의 Exchange를 생성합니다.
2. Queue 생성 : Header Exchange로 받은 데이터를 적재할 Queue를 1개 생성합니다.
3. Binding 생성 : 앞에서 생성한 Exchange와 Queue 간 바인딩을 설정하고 Header Key를 지정해 줍니다, 만약 요청 시해당하는 key와 value 값이 있으면 연결된 Queue로 메시지를 전송합니다.

코드예시

private final RabbitmqExchangeInfo rabbitmqExchangeInfo;

@Bean
public HeadersExchange headersExchange() {
    // HEADER_EXCHANGE_NAME 이름의 header Exchange 구성
    return ExchangeBuilder
                .headersExchange(rabbitmqExchangeInfo.get_HEADER_EXCHANGE_NAME())
                .build();
}


@Bean
public Queue headersQueue() {
    return new Queue(rabbitmqExchangeInfo.get_HEADER_QUEUE_NAME(), true);
}


/**
  * headers Exchange 와 headersQueue간 바인딩
  * headersExchange 방식으로 headersQueue와 Header값을 조건으로 바인딩 수행
  */
@Bean
public Binding headerBinding(HeadersExchange headersExchange, Queue headersQueue){
        return BindingBuilder
                .bind(headersQueue)
                .to(headersExchange)
                .where("x-execute-key").matches(true);
                // x-execute-key에 들어온 모든 값을 허용한다는 의미
}

요청 및 결과

Controller

@PostMapping("/header")
public ResponseEntity<ApiResponse> sendHeaderMessage(@RequestBody MessageDTO messageDTO){
    String resultMessage = messsageService.sendHeaderMessage(messageDTO);

    return ApiResponse.success(resultMessage, 200);
}

Service

/**
  * Header Exchange 방식 메세지 전송
  *
  * @param messageDTO 메세지 DTO
  * @return 성공 시 "success_header" 리턴
  */
public String sendHeaderMessage(MessageDTO messageDTO){
	try{
        ObjectMapper objectMapper = new ObjectMapper();
        String objectToJson = objectMapper.writeValueAsString(messageDTO);

        rabbitTemplate.convertAndSend(rabbitmqExchangeInfo.get_HEADER_EXCHANGE_NAME()
                    ,""
                    ,objectToJson);
    } catch (JsonProcessingException ex) {
    	log.error("parsing error : {}", ex.getMessage(), ex);
    }

	return "success_header";
}

요청결과

(Header Exchange는 Header Key 값이 들어가야 해서 요청형태가 다릅니다.)

  1. 먼저 API 요청결과를 리턴 받습니다.
  1. 마찬가지로 해당 로그도 해당 Queue에게 전달 되었기 때문에 출력되는 로그 입니다.

😊 직접 경험해보고 느낀점

이 글을 쓰기까지 비록 긴 시간이 걸렸지만 도움은 많이 되었습니다. RabbitMQ를 간단하게나마 써보니 왜 대용량 트래픽 제어에 사용하는지 알 수 있었습니다, 아직도 생소하지만, 이론적 지식과 실제로 프로젝트를 만들어 보면서 여러 가지 프로세스를 생각할 수 있게 해주었던 시간이었습니다, 만약 여러 요청이 들어와도 각기 다른 서버에서 병렬로 데이터를 처리할 수만 있다면 처리시간이 대폭 줄일 수 있겠다고 생각했습니다.


📋 참고 사이트

https://hoestory.tistory.com/85

https://adjh54.tistory.com/497?category=1187853#3

(항상 감사합니다.)

profile
가끔은 정신줄 놓고 멍 때리는 것도 필요하다.

0개의 댓글