
RabbitMQ로 메세지 큐를 구현하기에 앞서 이전에 작성한 RabbitMQ의 이론지식과 AMQP가 어떤의미인지 읽어보는걸 추천드립니다.
RabbitMQ를 사용하기에 앞서 해당 예제에서 사용한 프레임워크 및 서버입니다.
IDE : intellij Ultimate
Framework : Spring Boot
Language : Java 17
Server : AWS EC2
이번에 만드는 예제는 Github에 등록해서 얼마든지 가져오실 수 있습니다.
만약 본인 PC에 직접 RabbitMQ를 설치하셨으면 상관이 없지만 지금까지 게시글을 따라오셨다면 EC2의 보안 그룹에서 5672포트를 반드시 인바운드 규칙에 포함시켜야 합니다. 처음에 이걸 생각못해서 시간을 많이 낭비했습니다.

(해당 이미지에 나온 인바운드 규칙처럼 다른 포트는 몰라도 5672포트는 추가해야 합니다.)
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
# spring boot port
server.port=8088
spring.application.name=rabbitmq-test
logging.level.root=info
# RabbitMQ setting
spring.rabbitmq.host=<localhost / 서버주소>
spring.rabbitmq.port=5672
spring.rabbitmq.username=<설정한 사용자 이름 / admin>
spring.rabbitmq.password=<설정한 비밀번호 / admin>
spring.rabbitmq.virtual-host=/
# application status check
management.endpoints.web.exposure.include=health,info
여기에서 중요한 부분은 RabbitMQ setting 부분인데 하나하나 해석하면 다음과 같습니다.
| 설정 | 설명 |
|---|---|
| spring.rabbitmq.host | RabbitMQ를 실행시키고 있는 호스트 IP 주소 |
| spring.rabbitmq.port | RabbitMQ에서 사용하는 포트 |
| spring.rabbitmq.username | RabbitMQ의 사용자 이름 |
| spring.rabbitmq.password | RabbitMQ에 설정한 패스워드 |
| spring.rabbitmq.virtual-host | RabbitMQ에서 설정한 가상 호스트 기본 URL (만약 가상 호스트를 추가안해도 기본 주소는 '/' 입니다.) |
RabbitMQ 설정은 별도의 설명없이 주석으로 작성해놓았습니다.
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ 설정 파일
*/
@EnableRabbit // RabbitMQ의 설정을 활성화 하기위해 필요
@Configuration
@RequiredArgsConstructor
public class RabbitmqConfig {
@Value("${spring.rabbitmq.host}")
private String host; // 접속 호스트
@Value("${spring.rabbitmq.port}")
private Integer port; // 접속 포트번호
@Value("${spring.rabbitmq.username}")
private String username; // 접속 아이디
@Value("${spring.rabbitmq.password}")
private String password; // 접속 비밀번호
private static final String BINDING_KEY = "test.key"; // 바인딩 키 name
private static final String EXCHANGE_NAME = "test.exchange"; // exchange name
private static final String QUEUE_NAME = "queue"; // 목적지 queue name
/**
* direct Exchange 구성
*
* @return DirectExchange
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_NAME); // Exchange속성을 direct로 설정
}
/**
* Queue 구성
*
* @return Queue
*/
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME);
}
/**
* Queue와 DirectExchange를 바인딩
* test.key라는 이름으로 바인딩 구성
*
* @param directExchange
* @param queue
* @return Binding
*/
@Bean
public Binding binding(DirectExchange directExchange, Queue queue) {
// queue까지 가는 바인딩 Exchange 타입을 directExchange로 지정하고 test.key 이름으로 바인딩 구성
return BindingBuilder
.bind(queue)
.to(directExchange)
.with(BINDING_KEY);
}
/**
* RabbitMQ와 메시지 통신을 담당하는 클래스
*/
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
/**
* RabbitMQ와 연결을 관리하는 클래스
*/
@Bean
public ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
/**
* RabbitMQ 메시지를 JSON형식으로 보내고 받을 수 있다.
*/
@Bean
public Jackson2JsonMessageConverter messageConverter() {
ObjectMapper objectMapper = new ObjectMapper()
// 날짜 관련 타임스탬프 직렬화를 막고 ISO-8601 형태로 포맷된다.
.configure(SerializationFeature.WRITE_DATE_KEYS_AS_TIMESTAMPS, true)
.registerModule(dateTimeModule()); // Java에서 시간을 처리하기위한 모듈
return new Jackson2JsonMessageConverter(objectMapper);
}
/**
* 자바 시간 모듈 등록
*/
@Bean
public JavaTimeModule dateTimeModule() {
return new JavaTimeModule();
}
}
Mapping 설정은 postman혹은 intellij에 내장되어있는 클라이언트 요청 기능으로 직접 메세지를 보내는 용도로 만든 Mapping 입니다.
import com.example.rabbitmqtest.common.codes.SuccessCode;
import com.example.rabbitmqtest.common.response.ApiResponse;
import com.example.rabbitmqtest.dto.MessageDTO;
import com.example.rabbitmqtest.service.MesssageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequestMapping("/api/v1/publisher")
public class MessageController {
private final MesssageService messsageService;
public MessageController(MesssageService messsageService) {
this.messsageService = messsageService;
}
@PostMapping("/send")
public ResponseEntity<?> sendMessage(@RequestBody MessageDTO messageDTO) {
messsageService.sendMessage(messageDTO);
ApiResponse ar = ApiResponse.builder()
.resultMsg(SuccessCode.SELECT.getMessage())
.resultCode(SuccessCode.SELECT.getStatus())
.build();
return new ResponseEntity<>(ar, HttpStatus.OK);
}
}
import com.example.rabbitmqtest.dto.MessageDTO;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
/**
* MessageService 서비스 Layer
*/
@Slf4j
@Service
public class MesssageService {
private final RabbitTemplate rabbitTemplate;
public MesssageService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
private static final String BINDING_KEY = "test.key";
private static final String EXCHANGE_NAME = "test.exchange";
/**
* 메세지 전송
*
* @param messageDTO 메세지 DTO
*/
public void sendMessage(MessageDTO messageDTO) {
try{
ObjectMapper objectMapper = new ObjectMapper();
String objectToJson = objectMapper.writeValueAsString(messageDTO);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, BINDING_KEY, objectToJson);
} catch (JsonProcessingException ex) {
log.error("parsing error : {}", ex.getMessage(), ex);
}
}
}
import lombok.Getter;
@Getter
public enum SuccessCode {
SELECT(200,"SELECT SUCCESS");
private final Integer status;
private final String message;
SuccessCode(Integer status, String message){
this.status = status;
this.message = message;
}
}
import lombok.*;
/**
* 메세지 전송하는 DTO
*/
@Getter
public class MessageDTO {
private String title;
private String content;
}
import lombok.Builder;
import lombok.Getter;
/**
* API 결과를 저장하는 Response 객체
*/
@Getter
public class ApiResponse {
private final String resultMsg;
private final Integer resultCode;
@Builder
public ApiResponse(String resultMsg, Integer resultCode) {
this.resultMsg = resultMsg;
this.resultCode = resultCode;
}
}
위와 같이 설정했으면 대부분의 경우 요청이 보내지고 RabbitMQ에 바인딩이 쌓이게 됩니다. 하지만 만일에 대비해 현재 RabbitMQ와의 연결상태를 확인해보겠습니다.
(저의 경우에는 intellij Ultimate에서 제공하는 기능을 사용했습니다.)

해당 주소로 GET요청을 보내면 현재 RabbitMQ가 동작하는지 확인할 수 있습니다, 만약 RabbitMQ가 정상적으로 올라왔으면 UP으로 나오고 아니면 DOWN으로 나옵니다.

POST http://localhost:8088/api/v1/publisher/send
Content-Type: application/json
{
"title": "test",
"content": "test"
}
다음과 같이 주소를 작성하고 파라미터를 입력하여 보내면


요청이 성공하고 RabbitMQ에 Direct속성으로 test.exchange이름의 바인딩이 등록된 것을 확인할 수 있습니다.
회사업무도 많고 예제 프로젝트를 만드느라 시간이 많이 걸렸습니다. 하지만 처음으로 RabbitMQ를 실행시키고 나아가 SpringBoot와 처음 연결해보니 새삼 대학교에서 처음 배우던 시절이 떠올랐습니다. 이 게시글에는 direct 타입 Exchange만 전송했지만 다음 게시글에서는 topic이나 Fanout Exchange를 보내보고 비교해보는 게시글을 작성할 계획입니다.
https://adjh54.tistory.com/292
(항상 감사합니다.)