지금까지 통상적으로 구독을 완료한 이후에
어떤 로직을 처리할 때 다음과 같은 코드로 처리했다.
@Configuration
@RequiredArgsConstructor
public class StompPreHandler implements ChannelInterceptor {
private final TokenProvider tokenProvider;
private final ChattingRoomRepository chattingRoomRepository;
private static final String CHAT_SUB_PREFIX = "/sub/chat/room";
private static final String CHAT_SUB_ERROR_PREFIX="/user/queue/error";
private static final String CHAT_SUB_PREFIX_RABBIT = "/exchange/chat.exchange/room";
private final ApplicationEventPublisher applicationEventPublisher;
@Override
@Transactional
public Message<?> preSend(Message<?> message, MessageChannel channel) {
//헤더 접근
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
String accessToken = accessor.getFirstNativeHeader("Authorization");
tokenProvider.validateToken(accessToken);
Authentication authentication = tokenProvider.getAuthentication(accessToken);
if (authentication != null) {
accessor.setUser(authentication);
return message;
}
throw new BadRequestException(ErrorCode.BAD_REQUEST);
}
if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
// destination을 사용하여 구독 경로를 확인하거나 로깅
System.out.println("Subscribing to destination: " + destination);
if(!destination.equals(CHAT_SUB_ERROR_PREFIX)&& !destination.startsWith(CHAT_SUB_PREFIX)){
throw new BadRequestException(ErrorCode.SUB_URL_NOT_MATCH);
}
if(destination.startsWith(CHAT_SUB_PREFIX)){
Long roomId = Long.valueOf(destination.substring(CHAT_SUB_PREFIX.length()+1));
ChattingRoom chattingRoom =
chattingRoomRepository.findById(21L).orElseThrow(()->new NotFoundException(ErrorCode.CANT_FIND_CHATTING_ROOM));
if(!chattingRoom.getReceiver().getLoginId().equals(accessor.getUser().getName())&&
!chattingRoom.getSender().getLoginId().equals(accessor.getUser().getName())){
throw new BadRequestException(ErrorCode.SUB_URL_CANT_ACCESS);
}
}
}
return message;
}
@EventListener
public void handleSubscribe(SessionSubscribeEvent event){
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
final Long chattingRoomId = Long.valueOf(headerAccessor.getSubscriptionId());
final String loginId = headerAccessor.getUser().getName();
chattingService.updateCountAllZero(chattingRoomId,loginId);
chattingRoomConnectService.connectChattingRoom(chattingRoomId,loginId,headerAccessor.getSessionId());
if(chattingRoomConnectService.isAllConnected(chattingRoomId)){
chattingService.sendEnterMessage(chattingRoomId,loginId);
}
headerAccessor.getSessionAttributes().put(SUB,chattingRoomId);
log.info("채팅방 입장: chattingRoomId: {}",chattingRoomId);
}
구독 요청이 온 후를 알리는 event SessionSubscribeEvent
가 발생한다면
위에서 chattingRoomConnectService.isAllConnected
가 실행된다.
이 코드는 만약 채팅방에 1대1 채팅에서 상대방이 채팅방에 존재한다면
카카오톡의 1처럼 1을 제거하기 위해서
채팅방에 enterMessage
를 보낸다.
코드는 정확하게 내가 의도한 대로 동작하였고,
코드를 잘 짠 줄 알았다.
그러나...
JMeter 를 사용하여,
String STOMP_SEND = "SEND\n" +
"destination:/pub/message/${__threadNum}\n" +
"content-length:20\n" + // 메시지 본문의 정확한 바이트 길이
"\n" + // 헤더와 본문 사이의 빈 줄
"{\"message\":\"hihihp\"}\u0000"; // 실제 메시지 본문
String STOMP_SUBSCRIBE = "SUBSCRIBE\n" + "id:${__threadNum}\n" + "destination:/sub/chat/room/${__threadNum}\n" + "\n" +
"\u0000";
String STOMP_UNSUBSCRIBE = "UNSUBSCRIBE\n" + "id:${__threadNum}\n" + "\n" + "\u0000";
String STOMP_CONNECT = "CONNECT\n" + "accept-version:1.2,1.1,1.0\n" + "heart-beat:0,0\n" +
"Authorization:eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzUxMiJ9.eyJleHAiOjE3MzMyMTUwNjcsInN1YiI6ImRpb25pc29zMTk4IiwiYXV0aCI6Ik5PUk1BTCJ9.263me0HITWBLtLsEDvCigL5uaCEMkzIzT0l6HhMutlPGcjivYQdH_bQqlYzrfIixgVeZf3RB9Tun7urKcxm2rw\n" + "\n" + "\u0000";
String STOMP_DISCONNECT = "DISCONNECT\n" + "receipt:close-0\n" + "\n" + "\u0000";
vars.put("s", s);
vars.put("STOMP_SEND", STOMP_SEND);
vars.put("STOMP_SUBSCRIBE", STOMP_SUBSCRIBE);
vars.put("STOMP_CONNECT", STOMP_CONNECT);
vars.put("STOMP_DISCONNECT", STOMP_DISCONNECT);
vars.put("STOMP_UNSUBSCRIBE", STOMP_UNSUBSCRIBE);
위 사진과 같은 테스트 flow 를 가지고 테스트를 진행하였다.
테스트 내용은 300명의 유저가 각자의 채팅방에서 채팅을 치는 상황이라고 가정한 것이다.
하지만 이러한 상황에서
위 사진과 같이 평균적으로 26% 의 오류가 발생하였다.
이는 한명의 유저가 300번 같은 채팅방에 메시지를 보냈을 때는
발생하지 않는 오류 였기 때문에 이런 부분을 알아보고자 하였다.
원인은 SessionSubscribeEvent
자체 와
구독 과 메시지 전송 처리를 담당하는 쓰레드가 별개의 쓰레드로 이루어 져 있기
때문이다.
SesssionSubscribeEvent
는 구독이 완료되고 난 후에 호출 되는 것이 아니라 구독 요청이 발생한 이후에 호출 되는 것이다,
그러므로 부하 상황에서는 구독 요청이 완료되었지만 실제로는 구독이 아직 안된 쓰레드와 구독 요청이 이루어 졌으므로 구독한 채팅방에 Enter Type
의 메시지를 보내는 쓰레드 중 메시지를 보내는 쓰레드가 먼저 실행이 되고
이러한 상황이었기 때문에
사용자가 들어왔는데 상대방의 입장에서 1이 사라지지 않는 현상이 발생하는 것이다.
더 상세히 설명하면
사용자가 구독을 요청할 때
ExecutorSubscribableChannel
을 사용하는 ClientInboundChannel
이 3개의 쓰레드를 등록한다.
이 3개의 쓰레드는 아래 3개의 Handler 를 각각 등록한다.
WebSocketAnnotationMethodMessageHandler
: 메시지를 적절한 애노테이션 기반 컨트롤러 메서드로 라우팅하는 역할
SimpleBrokerMessageHandler
: 메시지 브로커의 역할을 수행하며 사용자의 구독을 관리한다.
UserDestinationMessageHandler
:
이다.
이 3개의 쓰레드가 ExecutorSubscribableChannel
에서 아래와 같이 실행된다.
@Override
public boolean sendInternal(Message<?> message, long timeout) {
for (MessageHandler handler : getSubscribers()) {
SendTask sendTask = new SendTask(message, handler);
if (this.executor != null) {
try {
this.executor.execute(sendTask);
}
catch (RejectedExecutionException ex) {
// Probably on shutdown -> run send task locally instead
sendTask.run();
}
}
else {
// No executor configured -> always run send tasks locally
sendTask.run();
}
}
return true;
}
각각의 역할을 담당하는 쓰레드가 순서를 가지지 않고 비동기적으로 호출되므로 (디버깅 결과 쓰레드 순서는 WebSocket -> Simp -> UserDestination 순 , CONNECT, SUB,MESSAGE 모두 같은 순서 및 쓰레드 생성)
구독 요청이 왔을 때 수행하는 SesssionSubscribeEvent
로는
라우팅을 담당하는 WebSocketAnnotaionMethodMessageHandler
가 수행된 이후 임을 보장할 수 없다.
(부하테스트가 아닌 상황에서는 구독요청이 왔을 때 이것저것 수행을 많이 하므로 WebSocketAnnotationMethodMessageHandler
보다 먼저 수행되는게 거의 대부분이다.)
https://velog.io/@koseungbin/WebSocket
ChannelInterceptor
가 아닌 ExecutorChannelInterceptor
를 StompPreHandle
가 구현하게 처리한다.
이 클래스는 ChannelInterceptor
의 확장으로, Executor를 통해 특정 구독자(subscriber)에게 메시지를 비동기로 전송하는 과정을 가로채는 콜백을 제공한다.
이는 Executor로 구성될 수 있는 MessageChannel 구현체에서 지원된다.
(단 메시지 처리에 비동기 Executor가 연관된 경우에만 사용 , (메시지 채널의 모든 종류에 대해 사용할 수는 없는듯?)
구독을 담당하는MessageHandler
가 구독 요청을 처리한 이후에 다음 자체적으로 만든 SessionSubscribedEvent
를 호출하여 구독 이후에 Enter
메시지 전송을 보장하도록 한다.
@Configuration
@RequiredArgsConstructor
public class StompPreHandler implements ExecutorChannelInterceptor {
private final TokenProvider tokenProvider;
private final ChattingRoomRepository chattingRoomRepository;
private static final String CHAT_SUB_PREFIX = "/sub/chat/room";
private static final String CHAT_SUB_ERROR_PREFIX="/user/queue/error";
private static final String CHAT_SUB_PREFIX_RABBIT = "/exchange/chat.exchange/room";
private final ApplicationEventPublisher applicationEventPublisher;
@Override
@Transactional
public Message<?> preSend(Message<?> message, MessageChannel channel) {
//헤더 접근
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
String accessToken = accessor.getFirstNativeHeader("Authorization");
tokenProvider.validateToken(accessToken);
Authentication authentication = tokenProvider.getAuthentication(accessToken);
if (authentication != null) {
accessor.setUser(authentication);
return message;
}
throw new BadRequestException(ErrorCode.BAD_REQUEST);
}
if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
// destination을 사용하여 구독 경로를 확인하거나 로깅
System.out.println("Subscribing to destination: " + destination);
if(!destination.equals(CHAT_SUB_ERROR_PREFIX)&& !destination.startsWith(CHAT_SUB_PREFIX)){
throw new BadRequestException(ErrorCode.SUB_URL_NOT_MATCH);
}
if(destination.startsWith(CHAT_SUB_PREFIX)){
Long roomId = Long.valueOf(destination.substring(CHAT_SUB_PREFIX.length()+1));
ChattingRoom chattingRoom =
chattingRoomRepository.findById(21L).orElseThrow(()->new NotFoundException(ErrorCode.CANT_FIND_CHATTING_ROOM));
if(!chattingRoom.getReceiver().getLoginId().equals(accessor.getUser().getName())&&
!chattingRoom.getSender().getLoginId().equals(accessor.getUser().getName())){
throw new BadRequestException(ErrorCode.SUB_URL_CANT_ACCESS);
}
}
}
return message;
}
@Override
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
if (accessor.getMessageType() == SimpMessageType.SUBSCRIBE && handler instanceof AbstractBrokerMessageHandler) {
applicationEventPublisher.publishEvent(new SessionSubscribedEvent(message));
}
}
구독을 담당하는 메시지 핸들러는 AbstractBrokerMessageHandler
는
public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
private static final byte[] EMPTY_PAYLOAD = new byte[0];
@Nullable
private PathMatcher pathMatcher;
@Nullable
저 위에서 구독을 담당하는 구현체였던 SimpleBrokerMessageHandler
의 추상화한 형태이므로 instanceOf AbstractBrokerMessageHandler
를 사용한다.
@Getter
public class SessionSubscribedEvent {
private final Message message;
public SessionSubscribedEvent(Message message){
this.message = message;
}
}
@EventListener
public void handleWebSocketAfterSubscribe(SessionSubscribedEvent event){
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
final Long chattingRoomId = Long.valueOf(headerAccessor.getSubscriptionId());
final String loginId = headerAccessor.getUser().getName();
chattingService.updateCountAllZero(chattingRoomId,loginId);
chattingRoomConnectService.connectChattingRoom(chattingRoomId,loginId,headerAccessor.getSessionId());
if(chattingRoomConnectService.isAllConnected(chattingRoomId)){
chattingService.sendEnterMessage(chattingRoomId,loginId);
}
headerAccessor.getSessionAttributes().put(SUB,chattingRoomId);
log.info("채팅방 입장: chattingRoomId: {}",chattingRoomId);
}
그 결과 오류 발생 비율을 개선할 수 있었다.
마지막으로 기존에 최대 100명의 유저가 각자 채팅을 하는 상황에서 원래 목표인 100~300 ms 의 전체적인 흐름을 충족하는지 실험 결과를 공유한다.