안녕하세요 오늘은 Road To MQ 네번째 아티클인 WebSocket + STOMP + RabbitMQ입니다!
WebSocket + stomp 조합에 외부 메세지 브로커인 RabbitMQ를 적용해서 업그레이드 해보겠습니다!
RabbitMQ에 대한 기본원리 + 사용 이유는 저번 아티클 https://velog.io/@joonoo3/Road-To-MQ-RabbitMQ-기본-개념 에 정리해 두었습니다!
저는 window 환경이라 https://axce.tistory.com/121 의 글을 보고 진행했습니다! 도커로 하기 귀찮으신 분들은 로컬에 설치하셔도 됩니다.
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -p 61613:61613 --restart=unless-stopped rabbitmq:3-management
--name rabbitmq : 컨테이너 이름을 rabbitmq로 지정합니다.
-p 5672:5672 -p 15672:15672 -p 61613:61613 : docker환경의 포트와 제 포트를 맞춰줍니다 (5672는 기본 포트, 15672는 웹 메니지먼트 포트이고 저는 STOMP를 사용하기 때문에 추가로 61613포트도 열어주었습니다.)
--restart=unless-stopped : 컨테이너를 자동으로 재시작합니다.
rabbitmq:3-management : image를 받아옵니다.
잘 실행되고 있습니다!
참고로 STOMP를 호환하기 위해서는 플러그인을 설치해주어야합니다!
rabbitmq-plugins enable rabbitmq_stomp
컨테이너의 EXEC탭에서 입력해줍니다!
localhost:15672에 들어가면 웹 메니지먼트 화면이 나오고 guest / guest 로 로그인 해줍니다
잘 돌아가고 있습니다!🔫🔫
implementation 'org.springframework.boot:spring-boot-starter-amqp'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
implementation 'org.springframework.boot:spring-boot-starter-reactor-netty:3.0.0'
RabbitMQ는 기본적으로 AMQP 프로토콜을 사용하기 때문에 해당 의존성도 추가해줍니다.
@Configuration
@EnableWebSocketMessageBroker
public class StompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws/chat")
.setAllowedOrigins("*");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setPathMatcher(new AntPathMatcher("."));
registry.setApplicationDestinationPrefixes("/pub");
registry.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue");
}
}
spring:
rabbitmq:
port: 5672
host: localhost
username: guest
password: guest
rabbitmq:
exchange: test.exchange
keroro:
binding-key: keroro
geroro:
binding-key: geroro
garuru:
binding-key: garuru
exchange 이름은 test.exchange로 하고 queue는 3개 만들어줍니다!
@Configuration
@EnableRabbit
public class RabbitConfig {
@Value("${rabbitmq.exchange}")
String CHAT_EXCHANGE_NAME;
@Value("${rabbitmq.keroro.binding-key}")
String KERORO_BINDING_KEY;
@Value("${rabbitmq.geroro.binding-key}")
String GERORO_BINDING_KEY;
@Value("${rabbitmq.garuru.binding-key}")
String GARURU_BINDING_KEY;
//RabbitAdmin을 사용하면 RabbitMQ 서버에 Exchange, Queue, Binding을 등록할 수 있습니다.
//RabbitAdmin은 RabbitTemplate을 사용하여 RabbitMQ 서버에 접근합니다.
@Bean
public AmqpAdmin amqpAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
rabbitAdmin.declareExchange(exchange());
rabbitAdmin.declareQueue(KeroroQueue());
rabbitAdmin.declareQueue(GeroroQueue());
rabbitAdmin.declareQueue(GaruruQueue());
rabbitAdmin.declareBinding(bindingKeroro(exchange(), KeroroQueue()));
rabbitAdmin.declareBinding(bindingGeroro(exchange(), GeroroQueue()));
rabbitAdmin.declareBinding(bindingGaruru(exchange(), GaruruQueue()));
return rabbitAdmin;
}
//Exchange 등록
@Bean
public DirectExchange exchange() {
return new DirectExchange(CHAT_EXCHANGE_NAME);
}
//Queue 등록
@Bean
public Queue KeroroQueue() {
return new Queue(KERORO_BINDING_KEY, false);
}
@Bean
public Queue GeroroQueue() {
return new Queue(GERORO_BINDING_KEY, false);
}
@Bean
public Queue GaruruQueue() {
return new Queue(GARURU_BINDING_KEY, false);
}
//Binding 등록
@Bean
public Binding bindingKeroro(DirectExchange exchange, Queue KeroroQueue) {
return BindingBuilder.bind(KeroroQueue).to(exchange).with(KERORO_BINDING_KEY);
}
@Bean
public Binding bindingGeroro(DirectExchange exchange, Queue GeroroQueue) {
return BindingBuilder.bind(GeroroQueue).to(exchange).with(GERORO_BINDING_KEY);
}
@Bean
public Binding bindingGaruru(DirectExchange exchange, Queue GaruruQueue) {
return BindingBuilder.bind(GaruruQueue).to(exchange).with(GARURU_BINDING_KEY);
}
//RabbitTemplate을 사용하여 RabbitMQ 서버에 메시지를 전송할 수 있습니다.
//RabbitTemplate은 RabbitMQ 서버에 접근하기 위한 클래스입니다.
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setExchange(CHAT_EXCHANGE_NAME);
return template;
}
//ConnectionFactory 등록
//ConnectionFactory는 RabbitMQ 서버에 접근하기 위한 클래스입니다.
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
return factory;
}
}
RabbitMQ를 위한 설정을 해줍니다. exchange와 queue들을 생성해주고 바인딩 & 등록해줍니다.
실행을 해주면 브로커가 활성화됩니다.
exchange와 queue들이 잘 생성된 것을 확인할 수 있습니다!
@Controller
@RequiredArgsConstructor
@Slf4j
public class StompRabbitController {
private final RabbitTemplate template;
@Value("${rabbitmq.exchange}")
String CHAT_EXCHANGE_NAME;
@Value("${rabbitmq.keroro.binding-key}")
String KERORO_BINDING_KEY;
@Value("${rabbitmq.geroro.binding-key}")
String GERORO_BINDING_KEY;
@Value("${rabbitmq.garuru.binding-key}")
String GARURU_BINDING_KEY;
@MessageMapping("keroro")
public void Keroro(String chat) {
log.info("Keroro : " + chat);
template.convertAndSend(CHAT_EXCHANGE_NAME, KERORO_BINDING_KEY, chat);
}
@MessageMapping("geroro")
public void Geroro(String chat) {
log.info("Geroro : " + chat);
template.convertAndSend(CHAT_EXCHANGE_NAME, GERORO_BINDING_KEY, chat);
}
@MessageMapping("Garuru")
public void Garuru(String chat) {
log.info("Garuru : " + chat);
template.convertAndSend(CHAT_EXCHANGE_NAME, GARURU_BINDING_KEY, chat);
}
}
메세지를 받기위한 controller입니다.
@MessageMapping("/keroro")
이전 config에서 발행을 위한 prefix를 /pub로 설정했으므로
ws://localhost:8080/pub/keroro 엔드포인트로 들어오는 요청은 해당 controller에 연결됩니다.template.convertAndSend(EXCHANGE_NAME, BINDING_KEY, message);
exchange에 binding key와 메세지를 전달합니다.
두개의 창을 켜 url입력 & STOMP를 체크 후 연결해줍니다.
창1에서 구독 엔드포인트를 입력해주고 구독해줍니다.
새로운 consumer가 생겼습니다!
창2에서 발행 엔드포인트와 메세지를 입력해주고 send를 눌러줍니다!
로그가 잘 찍히는 것을 볼 수 있습니다!🔫🔫