WebSocket과 Stomp를 이용해서 채팅 구현하기

byeol·2024년 2월 10일

"모두의 텃밭"이라는 텃밭 중개 플랫폼에 채팅 기능을 추가하게 되면서 겪었던 문제들과
그 방법들을 정리해보려고 합니다.

WebSocket이란?

HTTP와 비교해서 정리합니다.

  • HTTP
    • 비연결성
    • 매번 연결 맺고 끊는 과정의 비용
      (3way hand shake, 4way hand shake)
    • 요청 - 응답 구조
    • HTTP에서도 실시간성을 보장하는 기법이 존재한다. (Polling, Long Polling, Streaming)
  • WebSocket
    • 연결지향
    • 한 번 연결 맺은 뒤 유지
    • 양방향 통신
    • 누군가 연결을 끊자고 말하지 않은 이상 계속 유지
    • 물론 처음에는 Http 연결을 통해서 연결을 수립한다. 그러나 그 이후에는 간단한 연결만으로 통신

🤔 WebSocket을 지원하지 않은 브라우저 환경이 존재한다면?

(그림 출처 : https://caniuse.com/?search=%20WebSockets)
대부분 지원해주는 것 같지만 위 그림의 빨간 색 버전에서 WebSocket을 지원하지 않습니다. 그럴 때는 실시간 통신처럼 보이도록 하는 HTTP의 Polling, Streaming 등의 기법을 사용하면 됩니다. 하지만 이를 관리하기 번거롭기 때문에 SockJs와 Socket.io 라이브러리만 추가하여 사용하면 개발자가 브라우저 환경을 고려하지 않고 개발할 수 있습니다.

🤔 스프링에서 어떻게 웹소켓을 사용할까요?

  • WebSocketConfigurer interface의 registerWebSocketHandler를 통해서 handler와 handshake 주소를 설정합니다. Cors 설정, 웹소켓 미지원 브라우저에 관한 SockJs설정도 할 수 있습니다.
  • TextWebSocketHandler는 WebSocketSession을 메서드의 파라미터로 받아서 같은 채팅방을 구독하는 사람들을 생성(afterConnectionEstablished)하고 메세지를 보내고(handleTextMessage) 커넥션을 종료(afterConnectionClosed)하는 메서드를 가지고 있습니다.

하지만 WebSocket은 HTTP와 다르게 프레임(헤더, 바디)이 정해져 있지 않습니다. 그리고 위와 같이 발행과 구독을 개발자가 직접 자료구조를 고려하는 등의 추가적인 작업이 필요합니다. 위 그림처럼 Set으로 관리된다면 다른 채팅방의 세션들을 관리할 수 없어 여러 채팅방을 생성하는 경우 적합하지 않습니다. 이렇게 개발자가 고려해야할 부분이 많습니다.

STOMP란

Spring WebSocket은 프레임이 없고 구독과 발행 구조를 개발자가 구현해야하는 불편하다는 점이 존재했는데 이런 단점을 보완하여 Spring에서는 spring messaging을 통해 STOMP를 지원합니다.

🔖 STOMP(Simple Text Oriented Messaging Protocol)란?

  • 메세지 브로커를 활용하여 쉽게 메세지를 주고 받을 수 있는 프로토콜
    • Pub - Sub (발생 - 구독) : 발신자가 메세지를 발행하며 수진자는 그것을 수신하는 메세징 패러다임
    • 메세지 브로커 : 발신자의 메세지를 받아와서 수신자들에게 메세지를 전달하는 어떤 것
  • WebSocket 위에 얹어 함께 사용할 수 있는 하위 프로토콜
  • 이거 왜 사용하지? 웹소켓만으로 가능한데?
    • 웹소켓만으로 가능하나 웹소켓은 주고 받는 메세지에 대한 어떤 표준 형식이 없습니다. 그래서 서비스가 커짐에 따라 이런 양식이 없다보니 소통의 어려움이 생깁니다.
    • 양식이나 파싱에 대한 고민없이 사용가능하도록 합니다.

🔖 STOMP를 사용하면 좋은 점

  • 하위 프로토콜 혹은 컨벤션을 따로 정의할 필요가 없습니다.
  • 연결 주소마다 새로 핸들러를 구현하고 설정해줄 필요가 없습니다.
  • 외부 Messagin Queue를 사용할 수 있습니다. (RabbitMQ, 카프카 ..)
  • Spring Security를 사용할 수 있습니다.

🔖 WebSocket + STOMP 구조

구조는 위와 같습니다. 하지만 몇 가지 헷갈리는 부분이 존재하기 때문에 제대로 정리해보겠습니다.
STOMP는 요청을 CONNECT, SUBSCRIBE, SEND, DISCONNECT 등의 종류가 있습니다. 마치 HTTP가 POST, GET, DELETE가 있는 것과 같습니다. 이 부분을 생각해보고 각각의 요청의 종류를 요청 순서에 따라 정리해보겠습니다.

  • GET CONNECTION UPGRADE : 처음에는 Client가 HTTP의 GET 요청을 보내는데 이 요청의 CONNECTION : UPGRADE를 보내 WEBSOCKET 연결로 업그레이드 시켜달라는 요청을 보내게 됩니다. 저는 웹소켓 연결 요청을 ws://localhost:8080/ws/connect로 설정해 놓았기 때문에 GET 요청이 /ws/connect로 가게 됩니다.
    GET /ws/connect HTTP/1.1
    Host: localhost:8080
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Key: x3JJHMbDL1EzLkh9GBhXDw==
    Sec-WebSocket-Version: 13
    Origin: https://https://jxy.me
    참고로 위 요청은 제가 https://jxy.me/websocket-debug-tool/이라고 웹소켓 디버깅 툴을 이용해 보냈기 때문에 Origin에 이 주소가 등장하네요. 만약에 위 디버깅 툴을 이용하고 Spring Security를 사용한다면 CORS의 AllowedOrigin에 저 디버킹 툴 주소를 추가해주시기 바랍니다.
  • 101 Switching Protocols : 그러면 서버는 응답으로 101 Switching Protocols를 줘서 Upgrade 요청에 성공했다고 응답합니다. 즉 ws://localhost:8080/ws/connect을 통한 CONNECT 요청을 성공합니다.

  • SUBSCRIBE : 사실 위에 등장하는 UPGRADE 요청과 101 응답은 웹소켓 연결을 하기 위해 등장하는 개념이지만 이제부터는 STOMP라는 하위 프로토콜에 의해서 등장하게 되는 개념이라고 생각하면 될 거 같습니다. SUBSCRIBE를 통해서 내가 존재하고 있는 하나의 채팅방을 구독하게 됩니다. 만약에 상대방도 나와 같은 방을 구독했다면 내가 보낸 메세지를 실시간으로 받아볼 수 있게 됩니다. 위 그림에서는 /topic이 SUBSCRIBE 요청이 되겠네요
  • SEND : 이제 메세지를 보내야하는데 STOMP는 이를 SEND로 받습니다. SEND로 받은 요청은 Controller와 연결되어 저장되어져야 합니다. 추가로 같은 방을 구독하는 사용자들에게 실시간으로 보내져야 하는데요 이를 위해 @MessageMapping이라는 어노테이션을 Controller에 붙여 가능하게 만들면 됩니다. SimpleAnnotaionMethodMessageHandler가 MessageMapping이 붙은 컨트롤러를 웹소켓과 관련된 것으로 인식하고 "/app"으로 받은 SEND요청을 MESSAGE로 바꿔 구독하고 있는 사람들에게 발행하게 되는 것이죠

🔖 WebSocket + STOMP 코드

코드를 통해서 더 구체적으로 살펴보려고 합니다.
제가 구현한 코드의 UML을 위 형태와 같고 코드는 아래와 같습니다.

@Configuration
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
@EnableWebSocketMessageBroker
public class SocketConfig implements WebSocketMessageBrokerConfigurer {

    private final ChatPreHandler chatPreHandler; // Spring Security를 위한 핸들러
    private final ChatErrorHandler chatErrorHandler; // 웹소켓에서 발생하는 에러를 위한 핸들러

    public SocketConfig(ChatPreHandler chatPreHandler, ChatErrorHandler chatErrorHandler) {
        this.chatPreHandler = chatPreHandler;
        this.chatErrorHandler = chatErrorHandler;
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry
                .setErrorHandler(chatErrorHandler)
                .addEndpoint("/ws/connect") // 연결을 위한 Connect 주소
                .setAllowedOriginPatterns("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/queue"); // 구독
        registry.setApplicationDestinationPrefixes("/app"); // SEND 발행
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(chatPreHandler);
    }

}

추가로 @MessaggeMapping이 붙은 컨트롤러는 아래와 같습니다.

@RestController
public class GardenChatController {

    private final GardenChatService gardenChatService;
    private final ChatRoomFacade chatRoomFacade;

    public GardenChatController(GardenChatService gardenChatService, ChatRoomFacade chatRoomFacade) {
        this.gardenChatService = gardenChatService;
        this.chatRoomFacade = chatRoomFacade;
    }

    @MessageMapping("/garden-chats/{roomId}")
    @SendTo("/queue/garden-chats/{roomId}")
    public GardenMessageSendResponse sendMessage(
        @DestinationVariable("roomId") Long roomId,
        Authentication  authentication,
        @Payload GardenMessageSendRequest request,
        @Header("simpSessionId") String sessionId) {

        return GardenMessageSendResponse.to(gardenChatService.saveMessage(
            request.to(
                authentication.getName(),
                roomId,
                sessionId)));
    }
  }

✅ 채팅을 구현하며 마추친 문제들

채팅을 구현하면서 겪은 하나의 문제가 있습니다.
읽음 처리 여부를 어떻게 해결할까 였습니다.
이 문제를 바탕으로 발생했던 꼬리 질문들을 바탕으로 내용을 정리해보려고 합니다.
완성된 코드는 여기에 있습니다.

그리고 한 가지 의문이 있었는데 SecurityContextHolder에 유저 정보를 어떻게 담을 것인가였습니다.
이는 포스팅을 따로 분리하였습니다.
여기에서 확인가능합니다.

📌 읽음 처리 여부를 어떻게 해결할 것인가?

현재 같은 방에 접속 중이라면 내가 보낸 메세지는 읽음 처리가 될 것이다.
그러면 나와 같은 방에 있는지 어떻게 증명할 것인가?

저는 웹소켓의 세션 아이디가 해당 채팅방에 존재하는가를 바탕으로 읽음 처리 여부를 해결하기로 하였습니다.
Map<SessionId, ChatRoomEntry.ChatRoomEntryInfo> chatEntries = new ConcurrentHashMap<>();로 선언하였고 session id를 키로 가지고 있고 rooom id와 member id의 필드를 갖는 ChatRoomEntryInfo를 value로 갖는 Map을 로컬메모리에 저장하기로 했습니다.

public record ChatRoomEntry(
        SessionId sessionId,
        ChatRoomEntryInfo chatRoomEntryInfo
) {
    public record ChatRoomEntryInfo(
            Long roomId,
            Long memberId
    ){}
}
/**
 * Chat room entry repository implementation using local storage.
 */
@Component
public class GardenChatRoomRoomEntryLocalRepository {
    private final Map<SessionId, ChatRoomEntry.ChatRoomEntryInfo> chatEntries = new ConcurrentHashMap<>();

    public void addMemberToRoom(ChatRoomEntry chatRoomEntry) {
        chatEntries.put(chatRoomEntry.sessionId(), chatRoomEntry.chatRoomEntryInfo());
    }

    public void removeMemberFromRoom(SessionId sessionId) {
        chatEntries.remove(sessionId);
    }

    public boolean isMemberInRoom(ChatRoomEntry chatRoomEntry) {
        return chatEntries.get(chatRoomEntry.sessionId()) != null;
    }

    public boolean isContainsRoomIdAndMember(Long roomId, Long memberId) {
        return chatEntries.values()
                .stream()
                .anyMatch(entryInfo -> Objects.equals(entryInfo.roomId(), roomId)
                        && Objects.equals(entryInfo.memberId(), memberId));
    }

    public void deleteChatRoomEntryByRoomId(SessionId sessionId) {
        chatEntries.remove(sessionId);
    }

}

그러면 상대방과 나의 session id를 저장해야 합니다.
어떻게 session id에 접근해야할까요 처음에 저는 프론트 측에서 session id를 알 것이라고 생각했습니다.
하지만 프론트 측에서 알 수 없었던 상황이었습니다.
이를 어떻게 해결했는지 다음 질문으로 넘어가 보겠습니다.

📌 프론트 측에서 session id를 모르는데 서버에서 어떻게 관리하지?

결론은 WebSocket 측에서 만든 이벤트 리스너를 사용하기로 하였습니다.
AbstractSubProtocolEvent 중에서 header에 session id를 가지고 있는 구현체들이 있습니다.
이를 활용하기로 하였습니다.

그래서 로직은 아래와 같습니다.

  • Connect 되는 순간 session id와 member id를 저장한다. (이를 Session id를 key로 member id를 value인 Map에 저장)
  • Subscribe 되는 순간 Connect 시 저장되었던 map에서 session id를 바탕으로 member id를 찾는다. 왜냐하면 Subscribe할 때는 header에 access token이 없어서 member id를 얻을 수 없기 때문이다. 그리고 Subscribe의 destination에서 room id를 분리한다. 이렇게 session id, member id, room id 3가지 정보를 얻었다. 이를 바탕으로 앞 서 언급했던 Map<SessionId, ChatRoomEntry.ChatRoomEntryInfo> chatEntries = new ConcurrentHashMap<>();에 저장하는 것이다.
@Component
public class WebSocketListenerConfig {

    private static final String SUBSCRIBE_URL = "/queue/garden-chats/";
    private final GardenChatService gardenChatService;
    private final GardenChatRoomService gardenChatRoomService;

    public WebSocketListenerConfig(GardenChatService gardenChatService, GardenChatRoomService gardenChatRoomService) {
        this.gardenChatService = gardenChatService;
        this.gardenChatRoomService = gardenChatRoomService;
    }

    @EventListener
    public void onDisconnectEvent(SessionDisconnectEvent event) {
        gardenChatService.leaveChatRoom(SessionId.of(event.getSessionId()));
    }

    @EventListener(SessionConnectEvent.class)
    public void onConnect(SessionConnectEvent event) {
        String sessionId = getSessionId(event);
        Long memberId = getMemberId(event);
        gardenChatService.saveSocketInfo(sessionId, memberId);
    }

    @EventListener(SessionSubscribeEvent.class)
    public void onSubscribe(SessionSubscribeEvent event) {
        String sessionId = getSessionId(event);
        Long roomId = getRoomId(event);
        Long memberId = gardenChatService.getWebSocketInfo(sessionId);

        gardenChatRoomService.createSessionInfo(
            new GardenSessionCreateParam(
                SessionId.of(sessionId),
                roomId,
                memberId
            )
        );
    }

    private String getSessionId(AbstractSubProtocolEvent event) {
        Object sessionIdObj = event.getMessage().getHeaders().get("simpSessionId");
        if (sessionIdObj == null) {
            throw new IllegalArgumentException("Session Id null일 수 없습니다.");
        }
        return sessionIdObj.toString();
    }

    private Long getMemberId(AbstractSubProtocolEvent event) {
        Map<String, List<String>> nativeHeaders = event.getMessage().getHeaders().get("nativeHeaders", Map.class);
        if (nativeHeaders == null) {
            throw new IllegalArgumentException("Native header는 null일 수 없습니다.");
        }

        List<String> memberIdList = nativeHeaders.get("memberId");
        if (memberIdList == null || memberIdList.isEmpty()) {
            throw new IllegalArgumentException("Member Id는 null이거나 빈 값일 수 없습니다.");
        }

        String memberId = memberIdList.get(0);
        return Long.parseLong(memberId.replaceAll("[\\[\\]]", ""));
    }


    private Long getRoomId(AbstractSubProtocolEvent event) {
        String destination = event.getMessage().getHeaders().get("simpDestination", String.class);
        if (destination == null) {
            throw new IllegalArgumentException("Destination는 null일 수 없습니다.");
        }
        return Long.parseLong(destination.replace(SUBSCRIBE_URL, ""));
    }

}

그래서 이렇게 저장된 정보를 어떻게 읽음 처리 여부에 활용했을까요?
아래 코드를 통해 살펴보겠습니다.

    @Transactional
    public GardenChatMessageSendResult saveMessage(GardenChatMessageSendParam param) {
        gardenChatRoomEntryRepository.isMemberInRoom(param.toChatRoomEntry());

        GardenChatMessageDomainParam gardenChatMessageDomainParam = param.toGardenChatMessageDomainParam();
        Long partnerId = gardenChatRoomInfoRepository.findPartnerId(param.roomId(), param.memberId()).getMemberId();
        if (gardenChatRoomEntryRepository.isContainsRoomIdAndMember(param.roomId(), partnerId)) {
            return GardenChatMessageSendResult.to(
                gardenChatMessageRepository.save(
                    GardenChatMessage.toReadGardenChatMessage(gardenChatMessageDomainParam))
            );
        }

        return GardenChatMessageSendResult.to(
            gardenChatMessageRepository.save(
                GardenChatMessage.toNotReadGardenChatMessage(gardenChatMessageDomainParam)
            )
        );

    }
  • 먼저 해당 채팅방에 존재하는 member인지 확인합니다.
  • 그 다음으로 해당 채팅방에 존재하는 상대방의 id를 얻습니다.
  • 상대방의 id의 정보가 ChatRoomEntry에 존재한다면 같이 채팅방에 접속중이므로 읽은 메세지로 저장합니다.
  • 그 반대라면 읽지 않은 메세지로 저장합니다.

느낀점

채팅 구현은 사실 당근 마켓 클론 코딩을 하면서 구현해 본 적이 있습니다.
하지만 그 때 제가 맡았던 역할은 메세지 목록과 채팅방 입장과 생성으로 HTTP 요청이었습니다.
그래서 다음에 WebSocket과 관련 기능을 구현해보고 싶었고 이번 모두의 텃밭 프로젝트에 채팅 기능이 신규 기능으로 생기면서 구현하게 되었습니다.
기존 당큰 클론 프로젝트에서 발견하지 못했던 문제점들을 해결하고 제대로 알지 못했던 웹소켓의 개념을 확립할 수 있었습니다.

하지만 몇 가지 아쉬운 점이 있다면 내장 메세지 브로커와 Message의 저장을 RDBMS에 했다는 것입니다. 사실 프로젝트의 유저가 10명 미만인 상태에서 외부 메세지 브로커를 사용하는게 무거워 보였기 때문이라는 나름의 이유가 있었으나 외부 메세지 브로커도 사용할 만큼 서비스가 커졌으면 좋겠습니다. 또한 Message의 형태가 정형화 되어져 있다고 생각했습니다. 중복을 허용하고 join을 회피하도록 하는 NoSQL을 사용해야할까 라는 의문점도 있었습니다. 하지만 이 글을 작성하고 한 달이 지난 지금 시점에서 생각해보니 RDBMS는 조회의 성능을 높이기 위한 것으로 읽기와 쓰기가 1:1로 발생하는 채팅 메세지를 저장하기에 좋지 않다는 것을 깨달았습니다. 그래서 현재 NoSQL인 MongoDB로 바꾸려고 시도하고 있습니다.

구현하면서 크게 배웠던 부분은 디버깅을 하자였습니다.
디버깅 레벨을 DEBUG로 설정해서 로그를 꼭 확인해야 한다는 것입니다.(종운님께 감사드립니다)
가장 기본적인 것인데 INFO로 설정해서 로그의 양이 너무 많아 중요했던 로그를 확인하지 못했고 그래서 많은 시간을 허비했습니다.
(그 때 발견한 것이 CORS에 AllowedOrigin에 디버킹 툴 url이 포함되어 있지 않아 발생했던 것이었습니다.)

부록

profile
꾸준하게 Ready, Set, Go!

0개의 댓글