RabbitMQ는 AMQP를 구현한 오픈소스 메세지 브로커로서 시스템 간의 데이터를 안전하고 효율적으로 전달하는 데 사용된다. 다양한 분산 시스템에서 메시지 큐를 통해 데이터를 교환하고 비동기적으로 처리된다.
AMQP는 메시지 지향 미들웨어(MOM) 시스템 간에 통신하기 위한 개방형 네트워크 프로토콜이다. 간단히 말해서, 송신자(Producer)와 수신자(Consumer) 사이에서 메시지를 안전하게 교환하는 표준 프로토콜이다.
// RabbitMQ
implementation 'org.springframework.boot:spring-boot-starter-amqp'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
implementation 'org.springframework.boot:spring-boot-starter-reactor-netty:3.0.0'
Spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
@EnableRabbit
@RequiredArgsConstructor
@Configuration
public class RabbitMQConfig {
private static final String CHAT_QUEUE_NAME = "chat.queue";
private static final String CHAT_EXCHANGE_NAME = "chat.exchange";
private static final String CHAT_ROUTING_KEY = "room.*";
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
// Queue 등록
@Bean
public Queue queue() {
return new Queue(CHAT_QUEUE_NAME);
}
// Exchange 설정 - TopicExchange
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(CHAT_EXCHANGE_NAME);
}
// Exchange와 Queue바인딩
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(topicExchange()).with(CHAT_ROUTING_KEY);
}
// RabbitMQ 와의 연결을 관리하는 클래스
@Bean
public CachingConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
// RabbitMQ 와의 메시지 통신을 담당하는 클래스
@Bean
public RabbitTemplate rabbitTemplate(){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
// RabbitMQ 메시지를 JSON 형식으로 보내고 받을 수 있음
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws/chat")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/pub");
registry.setPathMatcher(new AntPathMatcher("."));
registry.enableStompBrokerRelay("/queue");
}
}
@MessageMapping("chat.message.{roomId}") //여기로 전송되면 메서드 호출 -> WebSocketConfig prefixes 에서 적용한건 앞에 생략
public ResponseEntity<ChatResponse> saveChat(@DestinationVariable("roomId") Long roomId, ChatRequest chatRequest){
rabbitTemplate.convertAndSend("chat.exchange", "room."+roomId, chatRequest);
return ResponseEntity.ok(chatService.saveChatHistory(roomId, chatRequest));
}
주의 사항
RabbitMQ가 기본적으로 STOMP를 지원하지 않기 때문에 STOMP 플러그인을 활성화해야 합니다.rabbitmq-plugins enable rabbitmq_stomp
https://velog.io/@black_han26/AMQPAdvanced-Message-Queuing-Protocol