스프링 stomp 채팅 기능 부하 테스트 및 오류율 개선

이진우·2024년 11월 27일
0

스프링 학습

목록 보기
45/46

초기에는...

지금까지 통상적으로 구독을 완료한 이후에
어떤 로직을 처리할 때 다음과 같은 코드로 처리했다.

@Configuration
@RequiredArgsConstructor
public class StompPreHandler implements ChannelInterceptor {

    private final TokenProvider tokenProvider;
    private final ChattingRoomRepository chattingRoomRepository;
    private static final String CHAT_SUB_PREFIX = "/sub/chat/room";
    private static final String CHAT_SUB_ERROR_PREFIX="/user/queue/error";

    private static final String CHAT_SUB_PREFIX_RABBIT = "/exchange/chat.exchange/room";

    private final ApplicationEventPublisher applicationEventPublisher;

    @Override
    @Transactional
    public Message<?> preSend(Message<?> message, MessageChannel channel) {

        //헤더 접근
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

        if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {

            String accessToken = accessor.getFirstNativeHeader("Authorization");
            tokenProvider.validateToken(accessToken);
            Authentication authentication = tokenProvider.getAuthentication(accessToken);


            if (authentication != null) {

                accessor.setUser(authentication);
                return message;
            }

            throw new BadRequestException(ErrorCode.BAD_REQUEST);
        }

        if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
            String destination = accessor.getDestination();
            // destination을 사용하여 구독 경로를 확인하거나 로깅
            System.out.println("Subscribing to destination: " + destination);

            if(!destination.equals(CHAT_SUB_ERROR_PREFIX)&& !destination.startsWith(CHAT_SUB_PREFIX)){
                throw new BadRequestException(ErrorCode.SUB_URL_NOT_MATCH);
            }

            if(destination.startsWith(CHAT_SUB_PREFIX)){
                Long roomId = Long.valueOf(destination.substring(CHAT_SUB_PREFIX.length()+1));
                ChattingRoom chattingRoom =
                        chattingRoomRepository.findById(21L).orElseThrow(()->new NotFoundException(ErrorCode.CANT_FIND_CHATTING_ROOM));

                if(!chattingRoom.getReceiver().getLoginId().equals(accessor.getUser().getName())&&
                !chattingRoom.getSender().getLoginId().equals(accessor.getUser().getName())){
                    throw new BadRequestException(ErrorCode.SUB_URL_CANT_ACCESS);
                }

            }


        }


        return message;
    }
@EventListener
    public void handleSubscribe(SessionSubscribeEvent event){
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        final Long chattingRoomId = Long.valueOf(headerAccessor.getSubscriptionId());
        final String loginId = headerAccessor.getUser().getName();

        chattingService.updateCountAllZero(chattingRoomId,loginId);
        chattingRoomConnectService.connectChattingRoom(chattingRoomId,loginId,headerAccessor.getSessionId());

        if(chattingRoomConnectService.isAllConnected(chattingRoomId)){
            chattingService.sendEnterMessage(chattingRoomId,loginId);
        }

        headerAccessor.getSessionAttributes().put(SUB,chattingRoomId);


        log.info("채팅방 입장: chattingRoomId: {}",chattingRoomId);
    }

구독 요청이 온 후를 알리는 event SessionSubscribeEvent 가 발생한다면

위에서 chattingRoomConnectService.isAllConnected 가 실행된다.

이 코드는 만약 채팅방에 1대1 채팅에서 상대방이 채팅방에 존재한다면
카카오톡의 1처럼 1을 제거하기 위해서
채팅방에 enterMessage 를 보낸다.

코드는 정확하게 내가 의도한 대로 동작하였고,
코드를 잘 짠 줄 알았다.

그러나...

부하테스트 시 에러율 증가.

JMeter 를 사용하여,

String STOMP_SEND = "SEND\n" + 
    "destination:/pub/message/${__threadNum}\n" + 
    "content-length:20\n" +  // 메시지 본문의 정확한 바이트 길이
    "\n" +  // 헤더와 본문 사이의 빈 줄
    "{\"message\":\"hihihp\"}\u0000";  // 실제 메시지 본문

   

String STOMP_SUBSCRIBE = "SUBSCRIBE\n" + "id:${__threadNum}\n" + "destination:/sub/chat/room/${__threadNum}\n" + "\n" +
      "\u0000";

String STOMP_UNSUBSCRIBE = "UNSUBSCRIBE\n" + "id:${__threadNum}\n" + "\n" + "\u0000";

String STOMP_CONNECT = "CONNECT\n" + "accept-version:1.2,1.1,1.0\n" + "heart-beat:0,0\n" + 
      "Authorization:eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJleHAiOjE3MzMyMTUwNjcsInN1YiI6ImRpb25pc29zMTk4IiwiYXV0aCI6Ik5PUk1BTCJ9.263me0HITWBLtLsEDvCigL5uaCEMkzIzT0l6HhMutlPGcjivYQdH_bQqlYzrfIixgVeZf3RB9Tun7urKcxm2rw\n" + "\n" +              "\u0000";



String STOMP_DISCONNECT = "DISCONNECT\n" + "receipt:close-0\n" + "\n" + "\u0000";

vars.put("s", s);
vars.put("STOMP_SEND", STOMP_SEND);
vars.put("STOMP_SUBSCRIBE", STOMP_SUBSCRIBE);
vars.put("STOMP_CONNECT", STOMP_CONNECT);
vars.put("STOMP_DISCONNECT", STOMP_DISCONNECT);
vars.put("STOMP_UNSUBSCRIBE", STOMP_UNSUBSCRIBE);

위 사진과 같은 테스트 flow 를 가지고 테스트를 진행하였다.

테스트 내용은 300명의 유저가 각자의 채팅방에서 채팅을 치는 상황이라고 가정한 것이다.

하지만 이러한 상황에서

위 사진과 같이 평균적으로 26% 의 오류가 발생하였다.

이는 한명의 유저가 300번 같은 채팅방에 메시지를 보냈을 때는

발생하지 않는 오류 였기 때문에 이런 부분을 알아보고자 하였다.

원인

원인은 SessionSubscribeEvent 자체 와
구독 과 메시지 전송 처리를 담당하는 쓰레드가 별개의 쓰레드로 이루어 져 있기
때문이다.

SesssionSubscribeEvent 는 구독이 완료되고 난 후에 호출 되는 것이 아니라 구독 요청이 발생한 이후에 호출 되는 것이다,

그러므로 부하 상황에서는 구독 요청이 완료되었지만 실제로는 구독이 아직 안된 쓰레드와 구독 요청이 이루어 졌으므로 구독한 채팅방에 Enter Type 의 메시지를 보내는 쓰레드 중 메시지를 보내는 쓰레드가 먼저 실행이 되고

이러한 상황이었기 때문에
사용자가 들어왔는데 상대방의 입장에서 1이 사라지지 않는 현상이 발생하는 것이다.

더 상세히 설명하면

사용자가 구독을 요청할 때
ExecutorSubscribableChannel 을 사용하는 ClientInboundChannel 이 3개의 쓰레드를 등록한다.

이 3개의 쓰레드는 아래 3개의 Handler 를 각각 등록한다.

WebSocketAnnotationMethodMessageHandler : 메시지를 적절한 애노테이션 기반 컨트롤러 메서드로 라우팅하는 역할

SimpleBrokerMessageHandler: 메시지 브로커의 역할을 수행하며 사용자의 구독을 관리한다.

UserDestinationMessageHandler :

이다.

이 3개의 쓰레드가 ExecutorSubscribableChannel 에서 아래와 같이 실행된다.

@Override
	public boolean sendInternal(Message<?> message, long timeout) {
		for (MessageHandler handler : getSubscribers()) {
			SendTask sendTask = new SendTask(message, handler);
			if (this.executor != null) {
				try {
					this.executor.execute(sendTask);
				}
				catch (RejectedExecutionException ex) {
					// Probably on shutdown -> run send task locally instead
					sendTask.run();
				}
			}
			else {
				// No executor configured -> always run send tasks locally
				sendTask.run();
			}
		}
		return true;
	}

각각의 역할을 담당하는 쓰레드가 순서를 가지지 않고 비동기적으로 호출되므로 (디버깅 결과 쓰레드 순서는 WebSocket -> Simp -> UserDestination 순 , CONNECT, SUB,MESSAGE 모두 같은 순서 및 쓰레드 생성)

구독 요청이 왔을 때 수행하는 SesssionSubscribeEvent 로는
라우팅을 담당하는 WebSocketAnnotaionMethodMessageHandler 가 수행된 이후 임을 보장할 수 없다.

(부하테스트가 아닌 상황에서는 구독요청이 왔을 때 이것저것 수행을 많이 하므로 WebSocketAnnotationMethodMessageHandler 보다 먼저 수행되는게 거의 대부분이다.)

참고:
https://stackoverflow.com/questions/29194530/stomp-over-websocket-using-spring-and-sockjs-message-lost?rq=4

https://velog.io/@koseungbin/WebSocket

해결 방법

ChannelInterceptor 가 아닌 ExecutorChannelInterceptorStompPreHandle 가 구현하게 처리한다.

이 클래스는 ChannelInterceptor의 확장으로, Executor를 통해 특정 구독자(subscriber)에게 메시지를 비동기로 전송하는 과정을 가로채는 콜백을 제공한다.
이는 Executor로 구성될 수 있는 MessageChannel 구현체에서 지원된다.

(단 메시지 처리에 비동기 Executor가 연관된 경우에만 사용 , (메시지 채널의 모든 종류에 대해 사용할 수는 없는듯?)

구독을 담당하는MessageHandler 가 구독 요청을 처리한 이후에 다음 자체적으로 만든 SessionSubscribedEvent 를 호출하여 구독 이후에 Enter 메시지 전송을 보장하도록 한다.

@Configuration
@RequiredArgsConstructor
public class StompPreHandler implements ExecutorChannelInterceptor {

    private final TokenProvider tokenProvider;
    private final ChattingRoomRepository chattingRoomRepository;
    private static final String CHAT_SUB_PREFIX = "/sub/chat/room";
    private static final String CHAT_SUB_ERROR_PREFIX="/user/queue/error";

    private static final String CHAT_SUB_PREFIX_RABBIT = "/exchange/chat.exchange/room";

    private final ApplicationEventPublisher applicationEventPublisher;

    @Override
    @Transactional
    public Message<?> preSend(Message<?> message, MessageChannel channel) {

        //헤더 접근
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

        if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {

            String accessToken = accessor.getFirstNativeHeader("Authorization");
            tokenProvider.validateToken(accessToken);
            Authentication authentication = tokenProvider.getAuthentication(accessToken);


            if (authentication != null) {

                accessor.setUser(authentication);
                return message;
            }

            throw new BadRequestException(ErrorCode.BAD_REQUEST);
        }

        if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
            String destination = accessor.getDestination();
            // destination을 사용하여 구독 경로를 확인하거나 로깅
            System.out.println("Subscribing to destination: " + destination);

            if(!destination.equals(CHAT_SUB_ERROR_PREFIX)&& !destination.startsWith(CHAT_SUB_PREFIX)){
                throw new BadRequestException(ErrorCode.SUB_URL_NOT_MATCH);
            }

            if(destination.startsWith(CHAT_SUB_PREFIX)){
                Long roomId = Long.valueOf(destination.substring(CHAT_SUB_PREFIX.length()+1));
                ChattingRoom chattingRoom =
                        chattingRoomRepository.findById(21L).orElseThrow(()->new NotFoundException(ErrorCode.CANT_FIND_CHATTING_ROOM));

                if(!chattingRoom.getReceiver().getLoginId().equals(accessor.getUser().getName())&&
                !chattingRoom.getSender().getLoginId().equals(accessor.getUser().getName())){
                    throw new BadRequestException(ErrorCode.SUB_URL_CANT_ACCESS);
                }

            }


        }


        return message;
    }

    @Override
    public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {

        SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);

        if (accessor.getMessageType() == SimpMessageType.SUBSCRIBE && handler instanceof AbstractBrokerMessageHandler) {

            applicationEventPublisher.publishEvent(new SessionSubscribedEvent(message));
        }
    }

구독을 담당하는 메시지 핸들러는 AbstractBrokerMessageHandler

public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {

	private static final byte[] EMPTY_PAYLOAD = new byte[0];


	@Nullable
	private PathMatcher pathMatcher;

	@Nullable

저 위에서 구독을 담당하는 구현체였던 SimpleBrokerMessageHandler 의 추상화한 형태이므로 instanceOf AbstractBrokerMessageHandler 를 사용한다.

@Getter
public class SessionSubscribedEvent {

     private final Message message;



     public SessionSubscribedEvent(Message message){
          this.message = message;

     }

}
@EventListener
    public void handleWebSocketAfterSubscribe(SessionSubscribedEvent event){

        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        final Long chattingRoomId = Long.valueOf(headerAccessor.getSubscriptionId());
        final String loginId = headerAccessor.getUser().getName();

        chattingService.updateCountAllZero(chattingRoomId,loginId);
        chattingRoomConnectService.connectChattingRoom(chattingRoomId,loginId,headerAccessor.getSessionId());

        if(chattingRoomConnectService.isAllConnected(chattingRoomId)){
            chattingService.sendEnterMessage(chattingRoomId,loginId);
        }

        headerAccessor.getSessionAttributes().put(SUB,chattingRoomId);


        log.info("채팅방 입장: chattingRoomId: {}",chattingRoomId);


    }

참고

https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/messaging/support/ExecutorChannelInterceptor.html

개선

그 결과 오류 발생 비율을 개선할 수 있었다.

목표 달성 여부??

마지막으로 기존에 최대 100명의 유저가 각자 채팅을 하는 상황에서 원래 목표인 100~300 ms 의 전체적인 흐름을 충족하는지 실험 결과를 공유한다.

MongoDB 채팅에 Index 를 적용하지 않았을 때

MongoDB 채팅에 Index 를 적용하였을 때

재미로 보는 10000명 유저의 경우

profile
기록을 통해 실력을 쌓아가자

0개의 댓글