채팅 기능 구현 및 고려 사항들

wellbeing-dough·2024년 9월 4일

1. 문제상황

채팅 기능을 구현하려 한다

우리 서버는 오토스케일링으로 최대 2대까지 ec2가 늘어날 수 있다, 로드밸런서로 부하 분산을 한다 잦은 일은 아니지만 실제로 유저들이 많이 몰리는 시간대에 2대까지 늘어난다

A와 B가 채팅을 하는 상황이라고 생각해보자 A클라이언트는 1번서버에 소켓 연결을 하고 B클라이언트는 2번 서버에 소켓 연결을 하면 A가 보낸 메시지가 1번 서버에만 도착하고 2번 서버에 있는 B에게 전달되지 않는다

2. 해결 방안

1. 스티키 세션 사용

로드 밸런서는 클라이언트의 브라우저에 고유한 식별자가 포함된 쿠키를 설정하고 이후 클라이언트가 해당 쿠키를 포함해 요청을 보내면, 로드 밸런서는 쿠키의 값을 기반으로 클라이언트가 처음 연결된 서버로 트래픽을 라우팅 하는 방식으로 구현하면 된다
NGINX에서 Sticky Cookie 기능을 통해 이를 구현할 수 있다

하지만
서버 간 부하 불균형: 스티키 세션을 사용하면 특정 서버에 특정 사용자의 모든 요청이 집중되므로, 트래픽이 고르게 분산되지 않을 수 있음 예를 들어, 일부 사용자가 과도한 트래픽을 발생시키면 해당 사용자가 연결된 서버에 과부하가 발생할 수 있다 이는 특히 서버의 수가 적을 때 더 심각해질 수 있다고 한다

스케일링의 비효율성: 만약 특정 서버에 많은 세션이 스티키 되어 있다면, 서버의 부하가 커질 때 새로운 서버를 추가해도 기존의 세션은 계속 기존 서버에 남아있기 때문에, 추가된 서버가 충분히 활용되지 못할 수 있다

서버의 재배포로 인해 서버가 재시작되면, 해당 서버에 저장된 모든 세션 데이터가 사라진다 그래서 redis나 RDB같이 외부 세션 저장소를 사용해야 한다.

2. 메시지 브로커 사용

pub/sub 방식으로 서로 직접 통신하는 것이 아닌 메시지 브로커를 통해 메시지를 전달하는 구조이다. 메시지 브로커는 Publisher가 보낸 메시지를 Subscriber로 전달해주는 미들웨어(중간 다리) 역할을 한다.

  • Kafka 는 대량의 데이터를 저장하면서 높은 처리량이 필요에 적합한 메시지 브로커이다. 신뢰성 있는 메시지를 전송할 수 있다.
  • Redis의 Pub/Sub은 간단하게 구현할 수 있지만 다른 메시지 브로커와 다르게 메시지 지속성이 없다. 즉, 메시지를 전송한 후 해당 메시지는 삭제되며 Redis에 저장되지 않는다. 또한, 메시지 전송 신뢰성을 보장하지 않기 때문에 단점을 보완할 별도의 추가 구현이 필요할 수 있다.

결론

일단 세션 방식은 결국엔 스케일링의 비효율성도 있고 배포같이 서버가 재시작 되었을 때를 대비해 redis나 RDB에 외부 세션 저장소를 사용해야 한다는 단점이 있다
나도 구현해보고 테스트 해봐야 알겠지만 일단 redis는 우리 인프라에 이미 elasticache로 구축이 되어있는 상황이다. 그러면 nginx설정이나 스케일링의 비효율성, 서버 간 부하 불균형을 겪을 바엔 redis pub/sub을 사용하는게 맞다는 생각이 들었다.

또한 redis에 채팅 내역이 저장되지 않는다는 문제점이 있다 이 문제에 대해서 생각해본 결론은
1. 어쩌피 채팅 내역은 MongoDB같은 곳이나 RDB에 저장해야되지 않나? 카카오톡도 푸시 알림이 와서 채팅방에 들어가면 1초동안 멈춰있다가 새로 온 채팅이 한꺼번에 보인다. 이게 카프카에서 웹소켓이 끊겨서 데이터를 저장하고 한꺼번에 보내는걸까? 아니면 MongoDB에서 조회를 하는걸까?
그렇다면 카프카에 partition에 채팅 내역을 저장되더라도 DB에 채팅 내역을 저장 안해도되나?
클라이언트에서 소켓 연결 disconnect가 되면 다시 재연결하고 구독도 다시하면 안될까?

  1. 카프카를 사용한다면, Consumer의 수와 관계없이, 전송당 하나의 메시지만 발행될 것이다. 나중에 기획이 추가 될 때 다대다 미팅, 모임 관련된 기능도 추가를 고려하면 아직은 redis가 맞을 것 같다는 생각이 들었다. 하지만 이 부분도 물론 각 서버 인스턴스마다 다른 group ID를 부여하여 사용할 수도 있다. 다대다 미팅 기능이 생겼을 때는 이미 유저가 많을 상황일 것 같은데 그땐 카프카로 전환하려나?

여러 자문과 구글링으로 알아봤지만 명확한 판단이 서질 않았다. 우리 서비스의 채팅 기능은 만남 전날 몇시간동안 약속 리마인드 용도 그 이상도 이하도 아니다. 채팅기능은 서비스의 메인 기능이 아니기 떄문에 대량의 데이터, 높은 처리량과는 맞지 않는다. 물론 나중에 유저가 엄청나게 많이 생기면 그땐 필요하다. 그럼 그때 행복하게 카프카를 도입하기로 하자

해결, 구현

웹소켓의 특징이나 이론적인 내용은 생략하자(검색하면 너무 쉽게 나오니까...ㅎ)

STOMP라고 이번에 처음 알았다
웹소켓만을 사용해서 채팅 서버를 구현하다면, 메시지 포맷 형식이나 메시지 통신 과정 등을 관리해야 하는 번거로움이 있다. 따라서 이러한 관리를 대신해줄 수 있는 STOMP 프로토콜을 서브 프로토콜로 사용할 수 있다.

STOMP(Simple Text Oriented Messaging Protocol)는 메시지의 형식이나 내용 등을 정의하여 메시징 전송을 효율적으로 도와주는 프로토콜이다. 기본적으로 Pub/Sub 구조로 되어있어 STOMP 규칙에 맞춰 메시징 처리를 재정의하여 사용하면 된다

채팅을 웹소켓과 STOMP 방식으로만 구현할 경우, 두 클라이언트가 같은 서버에 pub/sub을 해야 메세지를 주고 받을 수 있다
그래서 구독 대상(Topic) 을 여러 서버에서 접근이 가능하게 하기 위해 위에 해결방안에서 말했듯이 redis를 메시지 브로커로 쓰자

구현 코드는 나중에 보고 이해부터 하려면

이런 구조와 작동 방식이다

디비스펙은 여기에 올리지 못하니 간단하게 설명하면 채팅방 채팅내역이 일대다 방식으로 설계했다

  1. A와 B는 1대1 채팅을 시작한다 채팅방을 만들면서 채팅방 레코드, 채팅방 pk ex)UUID1234 채널을 구독 한다
  2. 서버는 클라이언트에게 채팅방 uuid와 함께 내가 참여중인 채팅방 리스트를 반환해준다
  3. A가 채팅방을 접속하면 웹 소켓을 연결하고 해당 채팅방 UUID1234 기반으로 만들었던 채널 구독한다
  4. B가 채팅방을 접속하면 웹 소켓을 연결하고 해당 채팁방 UUID1234 기반으로 만들었던 채널 구독한다
  5. A가 채팅방에 메시지를 보내면 redis에 메시지를 UUID1234 토픽에 pub하고 메시지 내역을 저장한다
  6. redis는 메시지를 받고 그 메시지를 같은 토픽(UUID1234) 구독된 사람에게 pub한다 누구? B, 왜와이? 4번에서 구독했잖아
  7. 그 메시지를 받은 서버는 다시 클라이언트는 웹소켓으로 메시지를 전송한다

간단하게 이런 과정이다 물론 A와 B가 동시에 채팅방에 접속했다는 해피케이스고 A가 채팅방에 접속해있다고 B가 접속했을거라는 보장은 없다 이건 푸시알림같은걸 사용해서 알려줄 수 있다(채팅방에 접속해있으면 푸시알림을 보내면 안되겠지? 이건 일단 구현하고 생각하자)

이제 코드로 가보자

    // web socket
    implementation 'org.springframework.boot:spring-boot-starter-websocket'
    implementation 'org.webjars:stomp-websocket:2.3.4'
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private final StompInterceptor stompInterceptor;
    private final StompExceptionHandler stompExceptionHandler;

    /**
     * stompInterceptor가 token 을 체크할 수 있도록 인터셉터 설정 소켓은 Webconfig
     * WebSocket 통신에서는 HTTP 요청과 다르게 동작하기 때문에, WebMvcConfigurer에 설정된 인터셉터가 적용되지 않습니다
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(stompInterceptor);
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry){
        // stomp 접속 주소 url => /ws-stomp
        registry
                .setErrorHandler(stompExceptionHandler)
                .addEndpoint("/ws-stomp")
                .setAllowedOrigins("*");
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry){
        // 메시지를 구독하는 요청 url => 즉 메시지 받을 때
        registry.enableSimpleBroker("/sub");

        // 메시지를 발행하는 요청 url => 즉 메시지 보낼 때
        registry.setApplicationDestinationPrefixes("/pub");
    }

}
  • @EnableWebSocketMessageBroker: 이 어노테이션은 WebSocket 메시징을 활성화하고 STOMP 프로토콜을 사용한 메시지 브로커를 구성할 수 있게 해준다 즉, WebSocket 서버를 설정하고 메시지 브로커를 통해 클라이언트 간 통신을 가능하게 만든다\

  • WebSocketMessageBrokerConfigurer: 이 인터페이스는 WebSocket 메시지 브로커의 구성을 정의할 수 있는 메소드를 제공한다 Spring에서 WebSocket을 설정하기 위해 이 인터페이스를 구현함

  • configureClientInboundChannel(): 이 메소드는 클라이언트로부터 들어오는 메시지를 처리하는 채널을 설정 여기서는 stompInterceptor라는 인터셉터를 설정하여 들어오는 WebSocket 메시지에 대한 추가적인 처리, 즉 토큰 검증과 같은 작업을 수행할 수 있도록 설정한다.

  • stompInterceptor: 우리는 이 인터셉터는 STOMP 메시지에서 클라이언트의 토큰을 검증하게 했다 추후에 코드로 설명 WebSocket 통신에서는 HTTP와 달리 헤더 정보가 제한되기 때문에, WebSocket 메시지의 페이로드에 포함된 토큰을 읽어 인증해야함

  • registerStompEndpoints(): 이 메소드는 STOMP 프로토콜을 사용하기 위한 엔드포인트를 설정하는 곳

  • addEndpoint("/ws-stomp"): 클라이언트는 이 엔드포인트(/ws-stomp)를 통해 WebSocket 서버에 연결할 수 있음

  • setAllowedOrigins(""): 이 메소드는 CORS 설정으로, 모든 도메인에서 WebSocket에 접속할 수 있게 허용하는 설정. 보안상의 이유로 실제 운영에서는 대신 허용된 도메인만 명시할 예정

  • setErrorHandler(stompExceptionHandler): 메시지 처리 중 예외가 발생할 때 이를 처리하는 핸들러 이 핸들러는 WebSocket 연결 도중 발생하는 오류를 처리하고, 클라이언트에 적절한 에러 메시지를 전달하는 역할을 함

  • configureMessageBroker(): 이 메소드는 실제로 메시지를 송신하고 수신하는 브로커를 설정하는 곳

  • enableSimpleBroker("/sub"): 이 부분은 간단한 메시지 브로커를 활성화 /sub로 시작하는 URL을 클라이언트가 구독 가능 즉, 클라이언트가 /sub로 시작하는 경로에 연결하면 서버에서 발행된 메시지를 실시간으로 받을 수 있다 ex) 클라이언트는 /sub~~ 을 구독하면, 서버가 /sub/chat 경로로 메시지를 발행할 때 이를 받을 수 있다

  • setApplicationDestinationPrefixes("/pub"): 클라이언트가 메시지를 서버로 발행할 때 사용하는 URL 프리픽스를 설정. /pub로 시작하는 URL은 클라이언트가 서버에 메시지를 보낼 때 사용된다. ex) 클라이언트는 /pub~~ 경로로 메시지를 전송하면, 이 메시지가 서버에서 처리됩니다.

@Configuration
public class RedisConfig {

    @Value("${spring.redis.host}")
    private String redisHost;

    @Value("${spring.redis.port}")
    private String redisPort;

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setHostName(redisHost);
        redisStandaloneConfiguration.setPort(Integer.parseInt(redisPort));
        return new LettuceConnectionFactory(redisStandaloneConfiguration);
    }

    @Bean
    public RedisTemplate<Long, String> redisTemplate() {
        RedisTemplate<Long, String> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setKeySerializer(new GenericToStringSerializer<>(Long.class));
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        return redisTemplate;
    }

    @Bean
    public StringRedisTemplate stringRedisTemplate(){
        StringRedisTemplate stringRedisTemplate= new StringRedisTemplate();
        stringRedisTemplate.setKeySerializer(new StringRedisSerializer());
        stringRedisTemplate.setValueSerializer(new StringRedisSerializer());
        stringRedisTemplate.setConnectionFactory(redisConnectionFactory());
        return stringRedisTemplate;
    }

    // redis 에 발행(publish)된 메시지 처리를 위한 리스너 설정
    @Bean
    public RedisMessageListenerContainer redisMessageListener(
            RedisConnectionFactory connectionFactory
    ) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }


    // 실제 메시지를 처리하는 subscriber 설정 추가
    @Bean
    public MessageListenerAdapter listenerAdapter(RedisSubscriber subscriber) { // (2)
        return new MessageListenerAdapter(subscriber, "sendMessage");
    }
}
  • RedisMessageListenerContainer: Redis의 발행/구독(pub/sub) 기능을 사용하는 메시지 리스너 Redis에서 발행된 메시지를 처리할 수 있는 리스너 컨테이너

  • setConnectionFactory(): 리스너 컨테이너가 Redis와 통신할 때 사용할 연결 팩토리를 지정 Redis에 연결된 후, 메시지가 발행되면 해당 메시지를 구독하고 처리하는 작업을 수행

  • MessageListenerAdapter: Redis에서 발행된 메시지를 처리할 때 사용하는 어댑터 이 어댑터는 RedisSubscriber 클래스의 메소드와 연결되어, Redis에서 메시지를 구독할 때 그 메시지를 처리할 방법을 정의

  • subscriber: RedisSubscriber는 실제로 메시지를 처리하는 클래스이고 직접 구현한 클래스이다 메시지를 받으면 sendMessage() 메소드를 호출하여 처리한다

  • sendMessage: 여기서 명시한 "sendMessage"는 RedisSubscriber 클래스 내의 메소드를 가리킨다 추후 코드에 잇음

결론은 메시지가 발행되면 RedisMessageListenerContainer가 이를 구독하고, MessageListenerAdapter를 통해 RedisSubscriber의 sendMessage() 메소드로 메시지를 전달한다
sendMessage() 메소드에서는 메시지를 처리하고, 클라이언트에게 해당 메시지를 전달한다

@RequiredArgsConstructor
@Service
@Slf4j
public class RedisSubscriber {

    private final ObjectMapper objectMapper;
    private final SimpMessageSendingOperations messagingTemplate;

    /**
     * Redis에서 메시지가 발행(publish)되면
     * 대기하고 있던 Redis Subscriber가 해당 메시지를 받아 처리한다.
     */
    public void sendMessage(String jsonMessage) {
        try {

            PublishChattingMessage chattingMessage = objectMapper.readValue(jsonMessage, PublishChattingMessage.class);
            SendChattingMessageWSRequest chattingWsRequest = chattingMessage.toWSRequest();

            log.warn("Redis Subcriber chatMSG : {}", chattingMessage.getMessage());

            // 채팅방을 구독한 클라이언트에게 메시지 발송
            messagingTemplate.convertAndSend(
                    "/sub/chatting/room/chatroom-" + chattingMessage.getRoomId(), chattingWsRequest
            );

        } catch (Exception e) {
            log.error("Exception {}", e);
        }
    }
}
  • messagingTemplate.convertAndSend(): 이 메소드는 WebSocket을 통해 메시지를 클라이언트로 전송한다 WebSocket을 구독 중인 클라이언트들이 메시지를 받을 수 있도록 설정된 채널로 메시지를 메시지를 전달함
    구독 경로는 /sub/chatting/room/chatroom-{roomId}, 특정 채팅방을 구독 중인 클라이언트들에게 해당 메시지를 전달한다.
  • 왜 직렬화/역직렬화를 해서 주고받을까?
    직렬화와 역직렬화를 사용하는 이유는 시스템 간에 데이터를 주고받을 때 효율적이고 표준화된 형식으로 데이터를 전달하기 위함이다 특히 분산 시스템이나 실시간 시스템(예: Redis Pub/Sub)에서 다양한 애플리케이션이 데이터를 주고받을 때, 직렬화는 필수적인 과정이다
    ex) 자바로 작성된 애플리케이션에서 Redis에 메시지를 발행한 후, 다른 프로그래밍 언어(Python, JavaScript 등)로 작성된 애플리케이션에서 이 데이터를 구독할 수 있음
    또한 Redis와 같은 시스템은 바이트 배열 또는 문자열을 기반으로 데이터를 처리한다 자바에서 사용되는 객체들은 매우 복잡한 상태를 가지고 있기 때문에, Redis와 같은 시스템이 직접 자바 객체를 처리가 힘들다 이 때문에 자바 객체를 직렬화해서 Redis에 전송하고, 다시 역직렬화하여 자바 객체로 변환해야 한다
@RequiredArgsConstructor
@Component
public class StompInterceptor implements ChannelInterceptor {

    private final AuthService authService;
    private final UserChattingSessionManager userChattingSessionManager;
    private final ChattingValidator chattingValidator;
    private final ChattingRoomReader chattingRoomReader;
    private final UserReader userReader;

    private static final String SUB_PREFIX = "/sub/chatting/room/chatroom-";

    /**
     * websocket을 통해 들어온 요청이 처리 되기전 실행된다.
     */
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        //웹 소켓 연결할 때, 세션 생성 -> 그냥 jwt 검증만
        if (StompCommand.CONNECT.equals(accessor.getCommand())) {
            UUID userId = authenticate(authService.getAuthorizationToken(
                    accessor.getFirstNativeHeader("Authorization")));

            return message;
        }

        // 구독할때 -> jwt검증하고 이 유저가 해당 채팅방을 구독할 수 있는건지 검증, 채널 구독과 세션 관리는 추후 서비스 로직에서 관
        if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
            String destination = accessor.getDestination();

            UUID roomId = UUID.fromString((destination.substring(SUB_PREFIX.length() + 1)));
            UUID userId = authenticate(authService.getAuthorizationToken(
                    accessor.getFirstNativeHeader("Authorization")));
            ChattingRoom chattingRoom = chattingRoomReader.readById(roomId);
            User user = userReader.readById(userId);
            chattingValidator.validateChattingRoomAndUserExist(user, chattingRoom);
        }

        // DISCONNECT 요청 처리 세션 삭제
        if (StompCommand.DISCONNECT.equals(accessor.getCommand())) {
            UUID userId = authenticate(authService.getAuthorizationToken(
                    accessor.getFirstNativeHeader("Authorization")));
            // Redis에서 상태 제거
            userChattingSessionManager.removeOnlineStatus(userId);
        }

        if (StompCommand.SEND.equals(accessor.getCommand())) {
            UUID userId = authenticate(authService.getAuthorizationToken(accessor.getFirstNativeHeader("Authorization")));
//        accessor.setNativeHeader("userId", userId.toString());
            Objects.requireNonNull(accessor.getSessionAttributes()).put("userId", userId);
            return message;
        }
        return message;
    }

    private UUID authenticate(String token) {
        authService.isValidToken(token);  // 토큰 유효성 검사
        return extractUserIdFromJwtToken(token);
    }

    private UUID extractUserIdFromJwtToken(final String token) {
        // JWT 토큰을 파싱하여 Claims 객체를 얻음
        HashMap<String, Object> parseJwtTokenMap = authService.parseJwtToken(token);
        Claims claims = (Claims) parseJwtTokenMap.get("claims");

        // 토큰 타입이 "NORMAL"인지 확인
        validateIsTokenTypeNormal(claims);

        // 토큰에서 사용자 ID를 추출
        return getUserIdFromToken(claims);
    }

    private void validateIsTokenTypeNormal(Claims claims) {
        if (!authService.checkTokenType(claims, UserType.NORMAL)) {
            throw new AuthenticationException(ErrorCode.INVALID_USER_TYPE, ErrorCode.INVALID_USER_TYPE.getStatusMessage());
        }
    }

    private UUID getUserIdFromToken(final Claims claims) {
        Object userId = claims.get("userId");
        String userIdString = userId.toString();
        // UUID 형식인지 확인
        try {
            UUID.fromString(userIdString);
        } catch (IllegalArgumentException e) {
            throw new InvalidJWTUserIdException(
                    ErrorCode.INVALID_JWT_USER_ID_ERROR,
                    ErrorCode.INVALID_JWT_USER_ID_ERROR.getStatusMessage()
            );
        }
        return UUID.fromString(userIdString);
    }
}
  • preSend(): WebSocket을 통해 들어오는 메시지 요청이 처리되기 전에 실행된다 이 메소드는 WebSocket 요청의 헤더를 분석하고, JWT 토큰을 인증한 후 사용자의 userId를 세션에 저장하는 역할을 한다

  • StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message): STOMP 메시지의 헤더 정보를 쉽게 다룰 수 있도록 StompHeaderAccessor로 래핑한다 이를 통해 메시지의 다양한 헤더 값과 STOMP 명령을 처리할 수 있다

  • 나머지는 JWT 검증, userId 추출

  • Objects.requireNonNull(accessor.getSessionAttributes()).put("userId", userId): 검증된 userId를 WebSocket 세션에 저장한다. 이 userId는 이후 메시지 처리 과정에서 메시지를 보낸 사용자를 추적하거나 인증된 사용자임을 확인하는 데 사용된다

  • 만약에 COMMAND가 SUBSCRIBE(구독)일 경우에 구독하는 사람이 과연 이 채팅방을 접속할 수 잇는지 검증을 한번 때린다 그리고 채널을 구독하기 위해 controller로 넘긴다 (코드는 아래에 있음)

  • COMMAND가 SEND일 경우에 jwt만 검증하고(왜? 아까 SUBSCRIBE에서 각종 validation을 했으니까) controller 로 보낸다

  • CONNECTION일 경우에도 일단 JWT만 검증한다

  • DISCONNECT일 경우에는 jwt를 검증하고 세션을 삭제한다

@RequiredArgsConstructor
@Controller
@Slf4j
public class ChattingWSController {

    private final ChattingService chattingService;

    /**
     * websocket "/pub/chatting/message"로 들어오는 메시징을 처리한다.
     */
    @MessageMapping("/chatting/message")
    public ResponseEntity<HttpStatus> message(ChattingMessageWSRequest message,
                                              @Header("simpSessionAttributes") Map<String, Object> sessionAttributes) {
        UUID sendUserId = (UUID) sessionAttributes.get("userId");
        chattingService.sendChattingMessage(message, sendUserId); //RedisPublisher 호출;
        return ResponseEntity.ok().build();
    }

    @MessageMapping("/chatting/room/{roomId}")
    public void handleRoomSubscription(@DestinationVariable UUID roomId,
                                       @Header("simpSessionAttributes") Map<String, Object> sessionAttributes) {
        UUID userId = (UUID) sessionAttributes.get("userId");
        chattingService.enterChattingRoom(roomId, userId);
        log.info("User {} subscribed to room {}", userId, roomId);
    }

}
  • @MessageMapping: WebSocket에서 특정 경로로 들어오는 메시지를 처리하는 어노테이션이다 여기서는 클라이언트가 /pub/chatting/message 경로로 WebSocket을 통해 메시지를 보내면, 이 메소드가 호출된다
    이 경로는 클라이언트가 메시지를 서버로 발행할 때 사용되는 경로이다 STOMP 프로토콜을 사용하여 메시지를 전송할 때, 클라이언트는 이 경로에 메시지를 발송하게 된다

  • @Header("simpSessionAttributes") Map<String, Object> sessionAttributes: WebSocket 세션에서 전송된 세션 정보를 가져오는 어노테이션 이다. 여기서 simpSessionAttributes라는 헤더에서 세션 정보를 가져오며, 이는 세션에 저장된 사용자 정보를 포함한다 원래는 세션에 담지 않고 @Header("userId")로 하려 했는데 컨트롤러까지 페이로드가 넘어오지 않아서 일단 이렇게 구현했다 ㅠ (개선 사항 1순위)

  • 근데 왜 url이 pub/chatting/message가 아닐까??: 이전에 WebSocketConfig에서 setApplicationDestinationPrefixes("/pub")을 사용했었다 클라이언트가 메시지를 서버로 보낼 때 사용할 경로에 /pub이라는 프리픽스를 추가한다는 건데 클라이언트는 실제로 /pub/chatting/message 경로로 메시지를 보내지만, 서버에서는 @MessageMapping("/chatting/message")와 매칭된다 이때 @MessageMapping 경로는 프리픽스 없이 설정한다
    즉, /pub은 메시지를 발행할 때 자동으로 붙는 경로의 프리픽스일 뿐, 실제로 서버에서는 이를 고려하지 않고 핸들러에서 처리할 경로만 지정하는 것이다. 그래서 컨트롤러 메소드에서는 /pub이 포함되지 않은 경로를 설정한다
@Service
@RequiredArgsConstructor
@Slf4j
public class ChattingService {

    private final RedisTemplate<String, String> redisTemplate;
    private final ObjectMapper objectMapper;
    private final UserReader userReader;
    private final ChattingValidator chattingValidator;
    private final ChattingContentWriter chattingContentWriter;
    private final RedisMessageListenerContainer redisMessageListenerContainer;
    private final MessageListenerAdapter messageListenerAdapter;
    private final ChattingRoomReader chattingRoomReader;
    private final ChattingRoomWriter chattingRoomWriter;
    private final ChattingContentReader chattingContentReader;

    public void sendChattingMessage(ChattingMessageWSRequest request, UUID sendUserId) {
        User user = userReader.readById(sendUserId);
        ChattingRoom chattingRoom = chattingValidator.validateChattingRoomAndUserExist(user, request.getRoomId());
        ChattingContent chattingContent = chattingContentWriter.write(createChattingContent(user, request.getRoomId(), request.getMessage()));
        publishMessage(chattingContent);
    }

    private void publishMessage(ChattingContent chattingContent) {
        PublishChattingMessage chattingMessage = PublishChattingMessage.builder()
                .roomId(chattingContent.getChattingRoomId())
                .sendAt(chattingContent.getCreatedDate())
                .message(chattingContent.getContent())
                .sendUserName(chattingContent.getSendUserName())
                .build();
        try {
            // ChatMessageDto를 JSON 문자열로 변환
            String messageJson = objectMapper.writeValueAsString(chattingMessage);
            String topic = "chatroom-" + chattingContent.getChattingRoomId().toString();
            // Redis에 JSON 문자열을 발행
            redisTemplate.convertAndSend(topic, messageJson);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    private ChattingContent createChattingContent(User user, UUID roomId, String message) {
        return ChattingContent.builder()
                .chattingRoomId(roomId)
                .sendUserName(user.getUserProfile().getFullName())
                .content(message)
                .build();
    }
    public void subscribeToTopic(UUID roomId) {
        ChattingRoom chattingRoom = chattingRoomReader.readById(roomId);
        ChannelTopic topic = new ChannelTopic("chatroom-" + chattingRoom.getId());
        redisMessageListenerContainer.addMessageListener(messageListenerAdapter, topic);
    }
    
    public void unsubscribeFromTopic(UUID roomId) {
        ChattingRoom chattingRoom = chattingRoomReader.readById(roomId);
        chattingRoomWriter.delete(chattingRoom);
        ChannelTopic topic = new ChannelTopic("chatroom-" + chattingRoom.getId());
        redisMessageListenerContainer.removeMessageListener(messageListenerAdapter, topic);
    }
}
  • validateChattingRoomAndUserExist: 채팅을 보낸 유저가 요청한 채팅방 id와 실제 그 유저가 그 채팅방에 존재하는지 검증 해준다.
  • 채팅 내역을 저장한다
  • publishMessage: 채팅 내역을 발행한다 채팅방 생성할때 topic을 채팅방의 uuid로 설정했기 때문에 (설정한부분은 subscribeToTopic 메서드)
  • 그리고 아까 말했듯이 json으로 변환하고 redis에 발행하면 된다

참고 자료:
https://redis.io/docs/latest/develop/interact/pubsub/
https://medium.com/frientrip/pub-sub-%EC%9E%98-%EC%95%8C%EA%B3%A0-%EC%93%B0%EC%9E%90-de9dc1b9f739
https://nebulaisme.tistory.com/147
https://docs.spring.io/spring-framework/reference/web/websocket/stomp.html

0개의 댓글