지난 포스팅에서 메시지 큐를 공부하면서 Kafka와 RabbitMQ를 비교했었다. 오늘은 Spring Boot와 RabbitMQ를 연동해보고자 한다.
brew update
brew install rabbitmq
위 명령어를 입력하면 RabbitMQ를 설치해주고 RabbitMQ를 실행하기 위해 해야할 것들을 알려준다.
brew services start rabbitmq
기본 포트 번호
는 15672이다.
서버 실행 후 localhost:15672에 접속하면 RabbitMQ 로그인 화면이 나온다.
초기 Username과 Password는 각각 guest
로 접속할 수 있다.
접속하면 위와 같은 화면이 나오는데, 직접 Exchang와 Queue를 추가할 수 있지만 Spring과 연동해서 Producer(생산자), Consumer(소비자)를 연동해보겠다.
Spring Initializr를 이용한다면 프로젝트 생성 시 Dependencies에 Spring for RabbitMQ
를 추가하거나 아래 코드를 직접 추가해준다.
implementation 'org.springframework.boot:spring-boot-starter-amqp'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
포트 번호 및 RabbitMQ 접속 정보를 작성한다.
server:
port: 8080
# Log Level 설정
logging:
level:
root: info
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# DB
datasource:
driver-class-name: org.h2.Driver
url:
username: sa
password:
# RabbitMQ queue, exchange, routing-key 정보
rabbitmq:
queue.name: sample.queue
exchange.name: sample.exchange
routing.key: sample.key
RabbitMQ와의 연결을 설정하고, 메시지 전송을 위한 Queue, DirectExchange, Binding 등의 구성 요소를 추가한다.
//import 생략
@Configuration
public class RabbitmqConfig {
@Value("${spring.rabbitmq.host}")
private String rabbitmqHost;
@Value("${spring.rabbitmq.port}")
private int rabbitmqPort;
@Value("${spring.rabbitmq.username}")
private String rabbitmqUsername;
@Value("${spring.rabbitmq.password}")
private String rabbitmqPassword;
@Value("${rabbitmq.queue.name}")
private String queueName;
@Value("${rabbitmq.exchange.name}")
private String exchangeName;
@Value("${rabbitmq.routing.key}")
private String routingKey;
/** 지정된 큐 이름으로 Queue 빈을 생성
*
* @return Queue 빈 객체
*/
@Bean
public Queue queue() {
return new Queue(queueName);
}
/** 지정된 exchange 이름으로 DirectExchange 빈을 생성
*
* @return TopicExchange 빈 객체
*/
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchangeName);
}
/**
* 주어진 queue와 exchange를 바인딩하고 라우팅 키를 이용하여 Binding 빈을 생성
*
* @param queue 바인딩할 Queue
* @param exchange 바인딩할 TopicExchange
* @return Binding 빈 객체
*/
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
/**
* RabbitMQ 연결을 위한 ConnectionFactory 빈을 생성하여 반환
*
* @return ConnectionFactory 객체
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(rabbitmqHost);
connectionFactory.setPort(rabbitmqPort);
connectionFactory.setUsername(rabbitmqUsername);
connectionFactory.setPassword(rabbitmqPassword);
return connectionFactory.getRabbitConnectionFactory();
}
/**
* RabbitTemplate을 생성하여 반환
*
* @param connectionFactory RabbitMQ와의 연결을 위한 ConnectionFactory 객체
* @return RabbitTemplate 객체
*/
@Bean
public RabbitTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// JSON 형식의 메시지를 직렬화하고 역직렬할 수 있도록 설정
rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
return rabbitTemplate;
}
/**
* Jackson 라이브러리를 사용하여 메시지를 JSON 형식으로 변환하는 MessageConverter 빈을 생성
*
* @return MessageConverter 객체
*/
@Bean
public MessageConverter jackson2JsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}
Queue로 메시지를 발행할 때는 RabbitTemplate 클래스에 convertAndSend 메서드를 사용하고, Queue에서 메시지를 구독할 때는 @RabbitListener 어노테이션을 사용한 메서드를 추가한다.
//import 생략
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageService {
private final RabbitTemplate rabbitTemplate;
@Value("${rabbitmq.exchange.name}")
private String exchangeName;
@Value("${rabbitmq.routing.key}")
private String routingKey;
/**
* Queue로 메시지를 발행
*
* @param messageDto 발행할 메시지의 DTO 객체
*/
public void sendMessage(MessageDto messageDto) {
log.info("message sent: {}", messageDto.toString());
rabbitTemplate.convertAndSend(exchangeName, routingKey, messageDto);
}
/**
* Queue에서 메시지를 구독
*
* @param messageDto 구독한 메시지를 담고 있는 MessageDto 객체
*/
@RabbitListener(queues = "${rabbitmq.queue.name}")
public void reciveMEssage(MessageDto messageDto) {
log.info("Received message: {}", messageDto.toString());
}
}
메시지를 보내기 위한 Dto를 추가한다.
//import 생략
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageDto {
private String title;
private String content;
}
//import 생략
@RestController
@Slf4j
@RequiredArgsConstructor
public class MessageController {
private final MessageService messageService;
/**
* Queue로 메시지를 발행
*
* @param messageDto 발행할 메시지의 Dto 객체
* @return ResponseEntity 객체로 응답을 반환
*/
@RequestMapping(value = "/send/message", method = RequestMethod.POST)
public ResponseEntity<?> sendMessage(@RequestBody MessageDto messageDto) {
messageService.sendMessage(messageDto);
return ResponseEntity.ok("Message sent to RabbitMQ!🐇");
}
}
포스트맨으로 간단하게 테스트를 진행해본다.
localhost:8080/send/message
로 POST
요청을 보낸다.
위 사진과 같이 response가 정상적으로 오는 것을 확인할 수 있다.
로그에도 정상적으로 찍힌다.
RabbitMQ 웹 콘솔에서도 메시지가 정상적으로 발행된 것을 확인할 수 있다.
이번 포스팅에서는 RabbitMQ와 Spring Boot를 연동해보는 시간을 가졌다. 물론 더 다양한 기능과 코드가 있겠지만 기본적인 개념과 프로세스를 익히는데는 충분히 좋았던 것 같다.
잘 봤습니다. 좋은 글 감사합니다.