STOMP에 RabbitMQ를 추가해보았다.

박준수·2023년 7월 8일
7

[토이프로젝트]

목록 보기
4/5
post-thumbnail

이전 게시글에서는 웹소켓에서 STOMP 프로토콜을 추가해 채팅 서버를 구현해보았습니다. 이번에는 STOMP에 외부 메시지 브로커인 RabbitMQ를 추가해보았습니다.

@Configuration
@EnableWebSocketMessageBroker
public class StompWepSocketConfig implements WebSocketMessageBrokerConfigurer {

    //웹소켓 핸드셰이크 커넥션을 생성할 경로
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        registry.addEndpoint("/stomp/chat").setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {

        registry.setApplicationDestinationPrefixes("/pub");  
        registry.enableSimpleBroker("/sub"); 
}

STOMP 프로토콜로만 채팅서버를 구현했을 때는 다음과 같습니다.

  • Spring 환경에서 STOMP 프로토콜을 사용한다면 메시지 브로커로 In Memory Broker을 사용하게 됩니다.
  • registry.enableSimpleBroker("/sub"); 이 부분이 내장된 인메모리 메시지 브로커를 활성화하는 것입니다.
    • /sub로 시작하는 대상 주제를 가진 클라이언트에게 메시지를 브로드캐스트합니다.
    • 즉 , /sub로 시작하는 대상 주제를 구독하면 해당 주제에 대한 메시지를 수신할 수 있습니다.

인 메모리 브로커의 단점

  • 확장성 : 단일 서버 환경에서 작동하며, 메시지를 메모리에 저장하고 관리합니다. 따라서 메모리 사용량과 동시에 처리할 수 있는 세션 수에는 제한이 있을 수 있습니다.

  • 다음과 같은 경우에는 발행자가 보낸 서버1에 구독중인 사용자1, 2만 메시지를 전달받게 됩니다.
  • 서버1에 발행 메시지를 보냈을 때, 서버1에서 서버2에도 접속중인 사용자를 알 수 있다면, 서버2에 메시지를 보내주도록 알려주면 되는데 이는 세션관리에 빡셀 것 같다는 생각이 듭니다.
  • 결함 허용성 : 장에 발생 시 메세지의 유실될 가능성이 높습니다.
  • 모니터링 : 따로 모니터링 하는 것이 불편합니다.

외부 메시지 브로커가 필요한 이유

  • 확장성 : RabbitMQ같은 메시지 브로커는 클러스터링 및 분산 아키텍처를 지원하여 처리할 수 있는 세션 수를 확장할 수 있습니다. 따라서 세션 수용량의 한계를 극복할 수 있습니다.
  • 결험 허용성 : 메시지를 디스크에 저장하고, 필요한 경우 재전송하며, 메시지 유실을 방지합니다. → 메시지 전송의 안정성과 신뢰성을 향상 시킵니다.
  • 모니터링 : RabbitMQ관리자 도구를 통해 메시지 큐의 상태를 모니터링하고, 웹 기반 인터페이스를 통해 세션 정보, 메시지 전송 및 수신 기록 등을 확인할 수 있습니다.
  • 이 외 등등

결론적으로는 스프링 내부에 있는 인 메모리 브로커로는 사용자의 수가 많아 졌을 때와, 서버가 여러 대 있을 경우 한계가 있어 RabbitMQ같은 외부 메시지 브로커를 적용시킵니다.

  • 외부 메시지 브로커를 적용할 시 이와 같은 방식으로 메시지를 보내줄 수 있습니다.
  • 저는 외부 메시지 브로커로 Rabbitmq를 써보겠습니다.
  • 적용하기 위한 RabbitMQ에 대한 기본 지식은 이 게시글에 정리해두었습니다.

local에서 rabbitmq 실행하는 방법

  • homebrew로 rabbitmq를 설치하고 brew services start rabbitmq (백그라운드) 하면 실행, brew services stop rabbitmq 는 종료
  • 포그라운드에서 실행을 하고 싶은면 rabbitmq 설치 경로 (…/sbin)에 들어가서 ./rabbitmq-server 명령어를 치면 rabbitmq 서버가 실행하게 됩니다.
  • brew info rabbitmq 를 입력하면 설치 경로를 확인할 수 있습니다.

StompWepSocketConfig

@Configuration
@EnableWebSocketMessageBroker
public class StompWepSocketConfig implements WebSocketMessageBrokerConfigurer {

    //웹소켓 핸드셰이크 커넥션을 생성할 경로
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {

        registry.addEndpoint("/stomp/chat").setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setPathMatcher(new AntPathMatcher(".")); // URL을 / -> .으로
        registry.setApplicationDestinationPrefixes("/pub");  //  @MessageMapping 메서드로 라우팅된다.  Client에서 SEND 요청을 처리
        registry.enableStompBrokerRelay("/queue", "/topic", "/exchange", "/amq/queue");
    }
}

  • 공식 자료에 enableStompBrokerRelay메서드의 인자값에 대한 특징들이 나와있습니다.
  • 저는 /exchange를 썼습니다.
    • SUBSCRIBE - 임의의 바인딩 패턴으로 구독
    • SEND - 임의의 라우팅키와 함께 전송

RabbitConfig

@Configuration
@EnableRabbit
public class RabbitConfig {

    private static final String CHAT_QUEUE_NAME = "chat.queue";
    private static final String CHAT_EXCHANGE_NAME = "chat.exchange";
    private static final String ROUTING_KEY = "*.room.*";

    //Queue 등록
    @Bean
    public Queue queue() {
        return new Queue(CHAT_QUEUE_NAME, true);
    }

    //Exchange 등록
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange(CHAT_EXCHANGE_NAME);
    }

    // Exchange와 Queue바인딩
    @Bean
    public Binding binding(Queue queue, TopicExchange exchange){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTING_KEY);
    }

    // RabbitMQ와의 메시지 통신을 담당하는 클래스
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    // RabbitMQ와의 연결을 관리하는 클래스
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        return factory;
    }

    // 메시지를 JSON형식으로 직렬화하고 역직렬화하는데 사용되는 변환기
    // RabbitMQ 메시지를 JSON형식으로 보내고 받을 수 있음
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
  • connectionFactory() 함수에서 factory.setHost("localhost"); 은 현재 실행 중인 로컬 시스템의 RabbitMQ 서버에 연결하려는 것을 의미하고 factory.setVirtualHost("/");는 가상 호스트(Virtual Host)를 설정합니다,. RabbitMQ는 가상 호스트를 사용하여 여러 개의 독립적인 브로커 환경을 구성할 수 있습니다. 각 가상 호스트는 독립적인 메시지 브로커로 간주되며, 큐, 익스체인지, 바인딩, 사용자 권한 등이 해당 가상 호스트 내에서 관리됩니다.
  • 참고 영상이나 블로그에서 큐를 등록하고 Exchange와 바인딩을 하길레 저도 똑같이 따라한거지만 Exchange 등록만 등록해놔도 채팅 메시지는 잘 송수신이 되었습니다. 그럼에도 불구하고 큐를 따로 바인딩하는 큰 이유는 (제가 느꼈을 때는) 메세지 보존 이라고 생각했습니다. 메시지가 소비자(Consumer)가 없는 경우에도 메시지를 보존할 수 있어 중요한 메시지의 유실을 방지할 수 있습니다.

다음 사진에서 볼 수 있듯이 apic(STOMP 툴)을 통해 STOMP 연결을 하면 각 연결마다 큐(stomp-subscription-…)가 생성된 것을 확인할 수 있고 이때 RabbitConfig에 등록한 큐(chat.queue)에 메세지들을 보존 할 수 있습니다.

  • (내가 apic을 통해 보냈던 json 형식의 메세지들이 보존된 것을 확인할 수 있었다.)

  • RabbitConfig 설정으로 chat.exchange도 등록된 것을 확인할 수 있습니다.
  • 나머지 exchange는 기본 exchange인 것 같습니다.

RabbitController

@Controller
@RequiredArgsConstructor
public class MessageController {

    private final MessageService messageService;
    private final RabbitTemplate rabbitTemplate;

    // 채팅방 입장
    @MessageMapping("chat.enter.{roomId}")
    public MessageInfo enterUser(@DestinationVariable("roomId") Long roomId, @Payload MessageCreateRequest message) {
        message.setMessage(message.getSender() + "님이 채팅방에 입장하였습니다.");
        rabbitTemplate.convertAndSend("chat.exchange", "enter.room." + roomId, message);
        return messageService.saveMessage(message);
    }

    // 채팅방 대화
    @MessageMapping("chat.talk.{roomId}")
    public MessageInfo talkUser(@DestinationVariable("roomId") Long roomId, @Payload MessageCreateRequest message) {
        rabbitTemplate.convertAndSend("chat.exchange", "*.room." + roomId, message);
        return messageService.saveMessage(message);
    }

    // 채팅방 퇴장
    @MessageMapping("chat.exit.{roomId}")
    public MessageInfo exitUser(@DestinationVariable("roomId") Long roomId, @Payload MessageCreateRequest message){
        message.setMessage(message.getSender() + "님이 채팅방에 퇴장하였습니다.");
        rabbitTemplate.convertAndSend("chat.exchange", "exit.room." + roomId, message);
        return messageService.saveMessage(message);
    }
}
  • 채팅메시지 controller을 다음과 같이 작성했습니다.
    - 이렇게 되면 Producer은 채팅방 입장, 대화, 퇴장 3가지이고 클라이언트마다 입장, 대화, 퇴장 3가지의 큐가 생성됩니다.
  • convertAndSend() 메서드는 내가 등록한 exchange에 Routing Key에 맞는 큐를 바인딩하고 메시지를 전달해주는 역할을 합니다.
  • 채팅방 대화 controller중 convertAndSend()메서드에서는 Routing key가 "*.room." + roomI 로 설정해놨는데, 예를들어 x.room.1을 구독한 사람은 enter.room.1 , talk.room.1 으로 보넨 메시지를 모두 받을 수 있습니다. (밑에 예시 사진) ⬇️ 채팅 중에 다른 사람이 채팅방에 들어왔을 경우 입장 메시지를 받는 경우입니다.
  • 참고로 "*.room." + roomI 로 설정하면 Subscription URL을 ~/enter.room.1 or ~/talk.room.1으로 하면 알아서 라우팅키가 enter.room.1 , talk.room.1이 됩니다.

  • Routing Key가 enter.room.1인 경우에는 exchange를 topic으로 생성했지만 실제로는 direct 타입의 exchange와 유사하게 동작한다는 것입니다. (*.room.1은 topic 타입입니다.)

마무리

apic을 통해 채팅 시스템이 잘되는 것을 확인하였고 데이터베이스에도 채팅 데이터가 잘 저장됩니다!

채팅 서버를 구현해보기 위해 Web socket → + STOMP → + RabbitMQ를 적용해보았습니다. 이번 기회에 메시지 브로커인 Rabbitmq와 좀 더 친해졌다고 생각했고, 스마트폰을 가지고 있는 대한민국 국민들이 대부분 사용하는 채팅 서비스인 카카오톡은 정말 대단한 것 같습니다.

이 채팅기능에 서비스에 대한 추가적인 아이디어를 부여하고 그것에 맞는 기능들을 즉, 살을 붙여가며 다양한 기술들도 접목시키며 시스템을 확장시켜보면 좋을 것 같습니다.

참고 :
STOMP Plugin — RabbitMQ
WebSocket Support
Spring Websocket & STOMP
[Spring Boot] WebSocket과 채팅 (4) - RabbitMQ
WebSocket - In Memory 대신 외부 브로커 사용하는 이유

profile
방구석개발자

2개의 댓글

comment-user-thumbnail
2024년 4월 15일

상세한 설명 글 덕분에 많은 도움이 되었습니다.
질문이 하나 있는데요
// 채팅방 대화
@MessageMapping("chat.talk.{roomId}")
public MessageInfo talkUser(@DestinationVariable("roomId") Long roomId, @Payload MessageCreateRequest message) {
rabbitTemplate.convertAndSend("chat.exchange", "*.room." + roomId, message);
return messageService.saveMessage(message);
}

=> 여기서 rabbitTemplate.convertAndSend("chat.exchange", "*.room." + roomId, message);
이 코드를 실행하면 stomp-subscription-임의의문자열 의 큐가 자동으로 생성되는 건가요?

아무리 찾아봐도 이해가 안가서요 ㅜㅜ

1개의 답글