스프링 stomp 를 이용한 채팅 기능

이진우·2024년 11월 27일
0

스프링 학습

목록 보기
44/46

이전 블로그에서...

이전 블로그에서
WebSocket 만을 이용하여 채팅 기능을 구현하였다.

WebSocket 만을 사용하여 채팅 기능을 사용할 때 특정한 형식이 사용되지 않는
단점이 있었다.

따라서 이번 블로그에서는 stomp 프로토콜을 사용하여 채팅 기능을 구현한다.

상황 설명

  • 1대1 채팅 기능을 구현한다.
  • 상대방이 채팅방에 들어오면 메시지 읽음을 표시하기 위해 1표시를 없앤다.
  • 채팅 메시지는 복잡한 Join 문에 활용되지 않는다.
  • 예상 채팅 메시지를 사용하는 최대 동시 접속자는 100명이라고 예측한다. 이때 전체적인 채팅 flow 가 300ms 안에 완료되는 것이 목표이다.

필요 기능

  • 채팅 메시지는 복잡한 Join 이 필요하지 않으므로, 그리고 Read 와 Write 가 빈번하게 일어나므로 이를 빠르고 효율적이게 저장할 수 있는 mongoDB 를 채택하였다.
    (mongoDB에 대한 내용은 나중에 따로 정리)

  • 상대방이 채팅방에 들어오는 행위를 파악하려면 이 정보를 따로 저장해야 한다.
    이런 정보는 매우 빠르게 삽입과 삭제(채팅방 입장과 탈퇴)가 이루어지며 저장된 정보는 영속적이지 않아도 된다는 특성이 있다. (채팅방을 입장하는 것을 영구적으로 저장할 필요가 없다는 의미)

따라서 이에 적합한 Redis 를 채택한다.
(Redis 에 대한 내용도 나중에 따로 정리)

코드

모든 코드 보다 내가 정리하고 싶은 부분을 우선적으로 작성한다.

만약 전체 코드가 필요하신 분들은 댓글창에 적어주기 바랍니당.

Config

@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class StompConfig implements WebSocketMessageBrokerConfigurer {

    private final StompErrorHandler stompErrorHandler;
    private final StompPreHandler stompPreHandler;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        // stomp 접속 주소 url => /ws-stomp
        registry.addEndpoint("/ws-stomp")
                .setAllowedOriginPatterns("*");// 연결될 엔드포인트
    //     .withSockJS(); // SocketJS 를 연결한다는 설정

        registry.setErrorHandler(stompErrorHandler);
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {

        ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
        te.setPoolSize(1);
        te.setThreadNamePrefix("wss-heartbeat-thread-");
        te.initialize();

//        // 메시지를 구독하는 요청 url => 즉 메시지 받을 때
        registry.enableSimpleBroker("/sub","/queue")
                .setTaskScheduler(te)
                .setHeartbeatValue(new long[]{20000,20000});

        registry.setApplicationDestinationPrefixes("/pub","/sub");


    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(stompPreHandler);
    }


}

configureMessageBroker
웹 소켓에서 클라이언트와 서버간의 연결이 끊겼는지 확인하는 작업이 필요하다.
클라이언트에서 서버로 지속적으로 ping 을 보내 연결이 잘 되어 있는지 확인하고 서버에서는 일정 시간 동안 ping 을 못받았다고 인식한다면 연결이 끊긴다.

그 일정시간이 위에서는 20000ms 이다.

후에 보겠지만

이 사진이 다 ping pong 인 것이다.

만약 이 값이 제대로 활성화 되어 있지 못하다면
사용자가 채팅을 지속적으로 치지 않는 이상 세션 연결이 계속 유지되지 않을 것이다 .

ErrorHandler

서버에서 특정 연결을 거부하였을 경우 이 값의 원인을 클라이언트에게 전달하고 싶었습니다.

따라서 아래와 같은 클래스를 만들어 연결 거부 사유를 클라이언트에게 전송합니다.

@RequiredArgsConstructor
@Configuration
public class StompErrorHandler extends StompSubProtocolErrorHandler {



    @Override
    public Message<byte[]> handleClientMessageProcessingError(Message<byte[]> clientMessage,
            Throwable ex) {

        System.out.println("StompErrorHandler.handleClientMessageProcessingError");
        System.out.println(ex.getMessage());
        System.out.println(ex.getCause());
        System.out.println(ex.getCause().getMessage());
        System.out.println(ex.getLocalizedMessage());
        System.out.println(ex.getSuppressed());
        System.out.println(ex.getStackTrace());


        if(ex.getCause().getMessage().equals(ErrorCode.ACCESS_TOKEN_NOT_MATCH.getMessage())) {
            return errorMessage(ErrorCode.ACCESS_TOKEN_NOT_MATCH);
        }

        if(ex.getCause().getMessage().equals(ErrorCode.SUB_URL_CANT_ACCESS.getMessage())){
            return errorMessage(ErrorCode.SUB_URL_CANT_ACCESS);
        }

        if(ex.getCause().getMessage().equals(ErrorCode.CANT_FIND_CHATTING_ROOM.getMessage())){
            return errorMessage(ErrorCode.CANT_FIND_CHATTING_ROOM);
        }

        if(ex.getCause().getMessage().equals(ErrorCode.SUB_URL_NOT_MATCH.getMessage())){
            return errorMessage(ErrorCode.SUB_URL_NOT_MATCH);
        }


        return super.handleClientMessageProcessingError(clientMessage, ex);


    }


    private Message<byte[]> errorMessage(ErrorCode errorCode) {
        String code = errorCode.getMessage();

        System.out.println("code"+code);
        StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.ERROR);

        accessor.setMessage("에러");
        accessor.setLeaveMutable(true);

        Message<byte[]> message = MessageBuilder.createMessage(
                code.getBytes(StandardCharsets.UTF_8), accessor.getMessageHeaders());
        System.out.println(message);

        return message;
    }

이렇게 말입니다.
이는 개발의 편의성을 위해 이렇게 만들었습니다.

StompPrehandler

메시지를 전송하기 전과 후 원하는 처리를 할 수 있는 클래스입니다.

@Configuration
@RequiredArgsConstructor
public class StompPreHandler implements ExecutorChannelInterceptor {

    private final TokenProvider tokenProvider;
    private final ChattingRoomRepository chattingRoomRepository;
    private static final String CHAT_SUB_PREFIX = "/sub/chat/room";
    private static final String CHAT_SUB_ERROR_PREFIX="/user/queue/error";

    private static final String CHAT_SUB_PREFIX_RABBIT = "/exchange/chat.exchange/room";

    private final ApplicationEventPublisher applicationEventPublisher;

    @Override
    @Transactional
    public Message<?> preSend(Message<?> message, MessageChannel channel) {

        //헤더 접근
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

        if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {

            String accessToken = accessor.getFirstNativeHeader("Authorization");
            tokenProvider.validateToken(accessToken);
            Authentication authentication = tokenProvider.getAuthentication(accessToken);


            if (authentication != null) {

                accessor.setUser(authentication);
                return message;
            }

            throw new BadRequestException(ErrorCode.BAD_REQUEST);
        }

        if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
            String destination = accessor.getDestination();
            // destination을 사용하여 구독 경로를 확인하거나 로깅
            System.out.println("Subscribing to destination: " + destination);

            if(!destination.equals(CHAT_SUB_ERROR_PREFIX)&& !destination.startsWith(CHAT_SUB_PREFIX)){
                throw new BadRequestException(ErrorCode.SUB_URL_NOT_MATCH);
            }

            if(destination.startsWith(CHAT_SUB_PREFIX)){
                Long roomId = Long.valueOf(destination.substring(CHAT_SUB_PREFIX.length()+1));
                ChattingRoom chattingRoom =
                        chattingRoomRepository.findById(roomId).orElseThrow(()->new NotFoundException(ErrorCode.CANT_FIND_CHATTING_ROOM));

                if(!chattingRoom.getReceiver().getLoginId().equals(accessor.getUser().getName())&&
                !chattingRoom.getSender().getLoginId().equals(accessor.getUser().getName())){
                    throw new BadRequestException(ErrorCode.SUB_URL_CANT_ACCESS);
                }

            }


        }


        return message;
    }

    @Override
    public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {

        SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);

        if (accessor.getMessageType() == SimpMessageType.SUBSCRIBE && handler instanceof AbstractBrokerMessageHandler) {

            applicationEventPublisher.publishEvent(new SessionSubscribedEvent(message));
        }
    }
    if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {

            String accessToken = accessor.getFirstNativeHeader("Authorization");
            tokenProvider.validateToken(accessToken);
            Authentication authentication = tokenProvider.getAuthentication(accessToken);


            if (authentication != null) {

                accessor.setUser(authentication);
                return message;
            }

            throw new BadRequestException(ErrorCode.BAD_REQUEST);
        }

WebSocket 의 핸드 셰이크 이후 클라이언트에서 서버로 가장 먼저 수행하는 요청인 CONNECT 입니다.

이 때 서버에서 메시지를 확인하여 헤더에 Authorization 키가 있는지 , 그 key 에 대한 값이 유효한 토큰인지 확인하는 작업을 거친 후에

accessor.setUser 을 통해 세션에 사용자 정보를 저장합니다.
이를 통해 언제든 세션 내에서 필요할 때 사용자의 정보를 꺼내 쓸 수 있습니다.

if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
            String destination = accessor.getDestination();
            // destination을 사용하여 구독 경로를 확인하거나 로깅
            System.out.println("Subscribing to destination: " + destination);

            if(!destination.equals(CHAT_SUB_ERROR_PREFIX)&& !destination.startsWith(CHAT_SUB_PREFIX)){
                throw new BadRequestException(ErrorCode.SUB_URL_NOT_MATCH);
            }

            if(destination.startsWith(CHAT_SUB_PREFIX)){
                Long roomId = Long.valueOf(destination.substring(CHAT_SUB_PREFIX.length()+1));
                ChattingRoom chattingRoom =
                        chattingRoomRepository.findById(roomId).orElseThrow(()->new NotFoundException(ErrorCode.CANT_FIND_CHATTING_ROOM));

                if(!chattingRoom.getReceiver().getLoginId().equals(accessor.getUser().getName())&&
                !chattingRoom.getSender().getLoginId().equals(accessor.getUser().getName())){
                    throw new BadRequestException(ErrorCode.SUB_URL_CANT_ACCESS);
                }

            }


        }

메시지를 전달 받기 위해 채팅방을 구독을 할 때에는
아까 저장한 세션 정보를 통해 자신이 그 채팅방에 실제로 참여하고 있는지 체크하는 로직을 추가하였습니다.

이 과정에서 유효한 구독 경로인지 검사하고 만약 그렇지 않다면 이전과 마찬가지로

이런 메시지가 전달 됩니다.

 @Override
    public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {

        SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);

        if (accessor.getMessageType() == SimpMessageType.SUBSCRIBE && handler instanceof AbstractBrokerMessageHandler) {

            applicationEventPublisher.publishEvent(new SessionSubscribedEvent(message));
        }

이 부분은 사용자가 구독을 했다면
이벤트를 발행함으로써 구독 이후 의 처리를 진행하는 코드입니다.

구독 이후의 처리라고 한다면 사용자가 채팅방에 입장했기 때문에
자신이 입장한 정보를 다른 클라이언트에게 알려 1 메시지(읽음여부) 를 삭제하는 등의 처리를 진행할 수 있다.

후에 Event 처리를 다음과 같이 처리할 수 있습니다.

@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketEventListener {

   private final ChattingRoomConnectService chattingRoomConnectService;
   private final ChattingService chattingService;
   private static final String SUB = "SUB";

   @EventListener
   public void handleWebSocketConnectListener(SessionConnectedEvent event) {

       log.info("connection 생성");
   }

   @EventListener
   public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {

       StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

       chattingRoomConnectService.disconnectChattingRoom(headerAccessor.getSessionId());
       log.info("disconnect 끊김");
   }


   @EventListener
   public void handleWebSocketUnsubscribeListener(SessionUnsubscribeEvent event) {

       StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

       final Long chattingRoomId = Long.valueOf(headerAccessor.getSubscriptionId());

       chattingRoomConnectService.disconnectChattingRoom(headerAccessor.getSessionId());
       log.info("채팅방 퇴장: chattingRoomId: {}",chattingRoomId);

   }

   @EventListener
   public void handleWebSocketAfterSubscribe(SessionSubscribedEvent event){

       StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

       final Long chattingRoomId = Long.valueOf(headerAccessor.getSubscriptionId());
       final String loginId = headerAccessor.getUser().getName();

       chattingService.updateCountAllZero(chattingRoomId,loginId);
       chattingRoomConnectService.connectChattingRoom(chattingRoomId,loginId,headerAccessor.getSessionId());

       if(chattingRoomConnectService.isAllConnected(chattingRoomId)){
           chattingService.sendEnterMessage(chattingRoomId,loginId);
       }

       headerAccessor.getSessionAttributes().put(SUB,chattingRoomId);


       log.info("채팅방 입장: chattingRoomId: {}",chattingRoomId);


   }



}

connectChattingRoomdisconnectChattingRoom 은 Redis 에서 관리한다고 이해해주시면 됩니다.

메시지 전송

@MessageMapping("/message/{roomId}")
    public ResponseEntity<Void> sendMessage(@DestinationVariable("roomId")final Long roomId, @Payload  @Valid final ChatDto chatDto,
            final SimpMessageHeaderAccessor simpMessageHeaderAccessor){

        chattingService.sendMessage(roomId,chatDto,simpMessageHeaderAccessor);

        return ResponseEntity.ok().build();
    }
@Service
@RequiredArgsConstructor
@Slf4j
public class ChattingService {

    private final MemberRepository memberRepository;
    private final ChattingRoomRepository chattingRoomRepository;
    private final MessagingService messagingService;
    //private final MessageRepository messageRepository;
    private final ChattingMessageRepository chattingMessageRepository;
    private final ChattingRoomConnectService chattingRoomConnectService;
    private final MongoTemplate mongoTemplate;
    private static final String SUB = "SUB";



    public void sendMessage(final Long roomId,final ChatDto chatDto,final SimpMessageHeaderAccessor simpMessageHeaderAccessor){

        ChattingRoom chattingRoom = chattingRoomRepository.findById(roomId).orElseThrow(()->new NotFoundException(ErrorCode.CANT_FIND_CHATTING_ROOM));
        Member sender = getCurrentLoginMember(simpMessageHeaderAccessor);

        checkCanPublishMessage(chattingRoom,sender);

        int readCount = chattingRoomConnectService.isAllConnected(chattingRoom.getId())?0:1;


        LocalDateTime messageCreatedTime = LocalDateTime.now();

        messagingService.sendMessage("/sub/chat/room/"+roomId,
                MessageResponseDto.ofPayLoad(sender.getId(),chatDto,messageCreatedTime,readCount,roomId));

      //  kafkaSender.send("chattest",MessageResponseDto.ofPayLoad(sender.getId(),chatDto,messageCreatedTime,readCount,roomId));


         chattingMessageRepository.save(ChattingMessage.builder()
                 .chattingRoomId(roomId)
                 .senderId(sender.getId())
                 .contents(chatDto.getMessage())
                 .createdAt(messageCreatedTime)
                 .readCount(readCount)
                 .build());


    }

    public void updateCountAllZero(Long chattingRoomId, String loginId) {

        Member findMember = memberRepository.findMemberByLoginId(loginId)
                .orElseThrow(()->new NotFoundException(ErrorCode.CANT_FIND_MEMBER));

        Update update = new Update().set("readCount", 0);
        Query query = new Query(Criteria.where("chattingRoomId").is(chattingRoomId)
                .and("senderId").ne(findMember.getId()));

        mongoTemplate.updateMulti(query, update, ChattingMessage.class);
    }

    

    public void sendEnterMessage(Long chattingRoomId, String loginId) {

        messagingService.sendMessage("/sub/chat/room/"+chattingRoomId,MessageResponseDto.ofEnter("접속하였습니다"+loginId,chattingRoomId));

    }



    private Member getCurrentLoginMember(final SimpMessageHeaderAccessor simpMessageHeaderAccessor){

        if(simpMessageHeaderAccessor.getUser()==null){
            throw new NotFoundException(ErrorCode.CANT_FIND_MEMBER);
        }

        return memberRepository.findMemberByLoginId(simpMessageHeaderAccessor.getUser().getName()).orElseThrow(
                ()->new NotFoundException(ErrorCode.CANT_FIND_MEMBER));
    }

    private void checkCanPublishMessage(ChattingRoom chattingRoom, Member sender){

        if(!chattingRoom.getSender().getId().equals(sender.getId()) &&
           !chattingRoom.getReceiver().getId().equals(sender.getId())){

            throw new BadRequestException(ErrorCode.PUB_URL_CANT_ACCESS);
        }
    }

@Service
@RequiredArgsConstructor
public class MessagingService {

   private final SimpMessageSendingOperations simpMessageSendingOperations;

   public void sendMessage(final String destination, final MessageResponseDto messageResponseDto){

       simpMessageSendingOperations.convertAndSend(destination,messageResponseDto);

   }

}

로 메시지를 전송할 수 있습니다.

시연 영상

참고

WebSocket 기반의 프로토콜을 사용하려면 http 에서 upgrade 되는 형식으로 사용합니다.

만약 nginx 를 사용한다면

다음과 같은 내용을 추가하여 http 를 upgrade 할 수 있게 합니다.

참고 2

https://docs.spring.io/spring-framework/docs/4.3.x/spring-framework-reference/html/websocket.html#websocket-stomp-authentication

를 읽고 새로 알게된 내용을 적습니다.

  • WebSocket의 브라우저 미지원에 대한 fall back option 으로 SockJS 를 사용
  • HTTP 와 별도로 초기 http 핸드 셰이크 에만 단일 URL 을 적용하고 모든 메시지는 동일한 TCP 연결을 통해 공유된다. 이는 JMS ,AMQP 와 가까운 완전히 다른 비동기적이고, 이벤트 중심의 메시지 아키텍처를 나타낸다.
  • Message, MessageChannel, MessageHandler 을 Spring 에서 주축으로 가짐
  • WebSocket 은 TCP 위에서 매우 얇은 계층으로 바이트 스트림을 텍스트 또는 바이너리 메시지 스트림으로 변환하는 역할만을 수행
  • 웹소켓 만으로 구현할 때 아래와 같이 HTTP handshake request 시 customize 가 가능하다.
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new MyHandler(), "/myHandler")
            .addInterceptors(new HttpSessionHandshakeInterceptor());
    }

}
  • SockJS 프로토콜은 프록시( 리버스 프록시 , Nginx? ) 가 연결이 중단된 것으로 판단하지 않도록 서버가 주기적으로 하트비트 메시지를 보내야 한다고 요구합니다.

  • STOMP 는 simple text oriented messaging protocol 이다. 루비, 파이썬 , perl 과 같은 스크립팅 언어가 엔터프라이즈 메시지 브로커에 연결할 수 있도록 만들어졌다.

  • stomp 프로토콜은 일반적으로 사용되는 메시징 패턴중 일부를 처리하도록 설계

  • TCP 나 WebSocket 과 같은 신뢰할 수 있는 양방향 스트리밍 네트워크 프로토콜을 통해 사용 가능

  • STOMP 가 텍스트 기반 프로토콜이지만, 메시지의 페이로드는 텍스트일 수도 바이너리 일 수도 있다.

  • SEND , SUBSCRIBE 에서 destination 헤더와 함께 사용할 수 있다.

  • Spring 의 STOMP 를 사용할 떄 Spring WebSocket application 은 클라이언트에게 STOMP 브로커로써 동작한다.

  • RabbitMQ 와 같은 외부 메시지 브로커를 사용할 경우 Spring 은 브로커와의 TCP 연결을 유지하고, 메시지를 브로커로 전달하며 , 브로커로부터 받은 메시지를 연결된 WebSocket 클라이언트로 전달한다.

MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM

{"ticker":"MMM","price":129.45}^@

를 사용하면 구독한 모든 클라이언트에게 Message 를 전달할 수 있다.

  • WebSocket 과 비교한 stopm 의 장점

  • STOMP endpoint 노출 시 스프링 Application 은 connected client 를 위한 STOMP Broker 가 된다.

  • Message: 헤더와 페이로드를 포함하는 메시지의 간단한 표현

  • MessageHandler : 메시지를 처리하기 위한 contract

  • MessageChannel - producer 와 consumer 간의 느슨한 결합을 가능하게 하는 메시지를 전송하기 위한 contract

  • SubscriableChannel - MessageHandler 구독자가 있는 MessageChannel

  • ExecutorSubscribableChannel - 메시지를 전달하기 위해 Executor 를 사용하는 SubscribableChannel

  • ClientInboundChannel(messageChannel 의 종류) : WebSocket client 로 서버로 메시지를 전달하기 위한 채널 : 메시지 인터셉터 등 메시지 전처리

(RequestChannel 과 차이점 : RequestChannel 이 더 상위 개념)

  • ClientOutboundChannel (messageChannel 의 종류) : 서버에서 웹소켓 클라이언트로 메시지를 보내는 채널

(ResponseChannel 과 차이점: ResponseChannel 이 더 상위 개념)

  • brokerChannel (messageChannel 의 종류) : 서버 측 애플리케이션 코드 내에서 메시지 브로커로 메시지를 보내는 채널

  • In Memory Message Broker 사용 시

흐름

  1. Client 의 Send 메시지 서버에 전송

  2. 클라이언트가 보낸 메세지는 requestChannel 에서 처리 : 서버로 들어오는 메시지를 받아들이는 역할 , destination 이 app일 경우에는 SimpAnnotationMethod 로 destination 이 /topic 일 경우에는 SimpleBrokerMessageHandler로 전달.

  1. SimpAnnotationMehtod 에서는 @MessageMapping 과 같은 어노테이션 처리, 클라이언트의 요청을 처리하고 응답을 준비

  2. SimpleBrokerMessageHandler : 클라이언트 간의 메시지 브로드 캐스팅을 관리하는 역할, 해당 topic 에 가입한 모든 클라이언트에게 메시지 전송

  3. ResponseChannel 메시지 처리 이후 responseChannel 을 통해 결과 메시지가 클라이언트에 전송 이때 클라이언트가 구독한 destination(ex: /topic/a) 로 결과 메시지 전송

  4. @MessageMapping 으로 한번 받고 broker channel 로 이동. broker channel 은 주제(/topic/room) 에 대한 채널을 의미 , 클라이언트들이 구독한 경로에 메시지를 전달하는 역할 ,
    broker channel은 메시지가 특정 주제에 구독한 클라이언트들에게 전달되도록 라우팅하는 역할 ,
    messagingTemplate.convertAndSend 는 메시지를 broker channel로 보내는 코드입니다. 이 코드를 실행하면 broker channel이 메시지를 해당 경로로 라우팅,

  1. broker channel 에서 broker 로 전달.
    (broker: 메시지 라우팅과 배달을 담당하는 중앙컴포넌트, 클라이언트가 구독한 경로에 기반으로 메시지를 적절한 구독자에게 전달) (내장 브로커는 메모리 기반으로 동작) 클라이언트가 /topic/greetings를 구독하면 브로커는 해당 클라이언트에게 메시지를 전달.

brokerChannel은 브로커 자체가 아니라 메시지가 브로커에 전달되기 전에 거치는 채널
(/sub/chat/room 같은 것은 브로커나 브로커 채널이 아니라 , 브로커가 관리하는 메시지의 목적지, 구독자들이 메시지를 받을 수 있는 경로)

  1. broker 가 /topic/greetings 를 구독중인 클라이언트를 찾아 메시지 배달

  2. 메시지는 ClientOutboundChannel 통해 전달.

<외부 메시지 브로커 사용할 때>

위 다이어그램에서 주요 차이점은 "broker relay"를 사용하여 메시지를 외부 STOMP 브로커로 전달하고, 브로커에서 구독된 클라이언트로 메시지를 전달하는 방식입니다.

메시지가 WebSocket 연결을 통해 수신되면, 메시지는 STOMP 프레임으로 디코딩되고, Spring Message 표현으로 변환된 후 "clientInboundChannel"로 전송되어 추가 처리됩니다. 예를 들어, "/app"으로 시작하는 대상 헤더를 가진 STOMP 메시지는 @MessageMapping 메서드가 있는 주석이 달린 컨트롤러로 라우팅될 수 있고, "/topic" 및 "/queue" 메시지는 메시지 브로커로 직접 라우팅될 수 있습니다.

클라이언트로부터 STOMP 메시지를 처리하는 주석이 달린 @Controller는 "brokerChannel"을 통해 메시지를 메시지 브로커로 전송할 수 있으며, 브로커는 구독 중인 클라이언트들에게 일치하는 메시지를 브로드캐스트합니다. 동일한 컨트롤러는 HTTP 요청에 응답하여 동일한 작업을 할 수도 있습니다. 예를 들어, 클라이언트가 HTTP POST를 수행한 후 @PostMapping 메서드는 메시지를 메시지 브로커로 전송하여 구독 중인 클라이언트들에게 브로드캐스트할 수 있습니다.

클라이언트가 "http://localhost:8080/portfolio"에 접속하고 WebSocket 연결이 설정되면, STOMP 프레임이 이 연결을 통해 흐르기 시작합니다.
클라이언트는 "/topic/greeting"을 대상 헤더로 하는 SUBSCRIBE 프레임을 전송합니다. 이 프레임이 수신되어 디코딩되면, 메시지는 "clientInboundChannel"로 전달되고, 메시지 브로커로 라우팅되어 클라이언트의 구독 정보를 저장합니다.
그 후 클라이언트는 "/app/greeting"으로 SEND 프레임을 보냅니다. "/app" 접두사는 해당 메시지가 주석이 달린 컨트롤러로 라우팅되도록 도와줍니다. "/app"이 제거된 후 남은 "/greeting" 부분은 GreetingController의 @MessageMapping 메서드에 매핑됩니다.
GreetingController에서 반환된 값은 Spring 메시지로 변환되고, 페이로드는 반환 값에 기반하여 기본 대상 헤더 "/topic/greeting"을 가집니다. 이 메시지는 "brokerChannel"로 전송되어 메시지 브로커에서 처리됩니다.
메시지 브로커는 모든 일치하는 구독자를 찾아, 각 구독자에게 MESSAGE 프레임을 "clientOutboundChannel"을 통해 보냅니다. 이 메시지는 STOMP 프레임으로 인코딩되어 WebSocket 연결을 통해 전송됩니다.

  • @SubscribeMapping 의 return value 는 broker channel 이 아닌 clientOutboundChannel 에 직접 매핑되기에 일회성 요청-응답 메시지에 편리, 애플리케이션 초기화시 데이터 로드 및 표시가 필요한 경우에 사용

  • @SubscribeMapping@SendTo 를 같이 사용함으로써 return Value 를 broker channel 에 전달 할 수 있다.

  • 외부 메시지를 브로커를 사용하면 내부 메시지 브로커를 사용하는 것보다 클러스터링에 효율적이다.

  • Spring Security의 메시지 권한 부여를 사용할 때, 현재로서는 인증용 ChannelInterceptor 설정이 Spring Security 설정보다 앞서도록 순서를 보장해야 한다는 점에 유의해야 합니다. 이를 가장 잘 수행하는 방법은 사용자 정의 인터셉터를 @Order(Ordered.HIGHEST_PRECEDENCE + 99)로 표시된 AbstractWebSocketMessageBrokerConfigurer의 하위 클래스에서 선언하는 것입니다.

  • 구독한 채널이 아닌 유저 각각에게 메시지를 전달하고 싶다면 @SendToUser 를 사용하는 것이 바람직 할 수도 있다.

만일 여러 세션이 아닌 메시지가 핸들되는 단일 세션에만 메시지를 전달하고 싶다면 broadcast 를 false 로 설정 가능

  • sendTimeLimit" 및 "sendBufferSizeLimit" 이 속성들은 클라이언트로 메시지를 전송할 때 허용되는 최대 시간과 버퍼에 저장될 수 있는 데이터의 최대 크기를 설정하는 데 사용.
profile
기록을 통해 실력을 쌓아가자

0개의 댓글