StompSession BlockingQueue 메시지 전달 안되는 문제

hyng·2023년 1월 25일
0

smilegate-winter-dev-camp

목록 보기
8/15

발단

 @BeforeEach
  void init() throws ExecutionException, InterruptedException, TimeoutException {
    webSocketStompClient = new WebSocketStompClient(
      new SockJsClient(
        List.of(new WebSocketTransport(new StandardWebSocketClient()))));
    webSocketStompClient.setMessageConverter(new MappingJackson2MessageConverter());

    session = webSocketStompClient.connectAsync(
                                    String.format("ws://localhost:%d/ws", port),
                                    new StompSessionHandlerAdapter() {
                                    })
                                  .get(1000, TimeUnit.SECONDS);
  }
@Nested
  @DisplayName("send 메서드는")
  class DescribeSend {

    @Nested
    @DisplayName("유효한 값이 전달되면")
    class ContextWithValidData {

      @Test
      @DisplayName("해당 토픽을 구독한 사용자들에게 메시지를 발행한다")
      void ItPublishesMessage() throws InterruptedException {
        // given
        String topicId = "topicId";
        String subUri = String.format("/topic/%s", topicId);
        topicService.create(topicId);

        ChatMessage message = ChatMessage.from("hi~");
        BlockingQueue<ChatMessage> queue = new ArrayBlockingQueue<>(1);
        session.subscribe(subUri, new StompFrameHandler() {
          @Override
          public Type getPayloadType(StompHeaders headers) {
            return ChatMessage.class;
          }

          @Override
          public void handleFrame(StompHeaders headers, Object payload) {
            queue.add((ChatMessage) payload);
          }
        });

        // when
        session.send(String.format("/chat/%s/send", topicId), message);
        // then
        ChatMessage poll = queue.poll(10, TimeUnit.SECONDS);
        Assertions.assertThat(poll).isNotNull();
//        await()
//          .atMost(5, TimeUnit.SECONDS)
//          .untilAsserted(() -> assertEquals(message.getMessage(), queue.poll()));
      }
    }

해당 테스트 코드로 다음의 컨트롤러를 테스트하려고 했다.

  @MessageMapping("/{id}/send")
  public void send(@DestinationVariable @NotEmpty String id, @Valid ChatMessage message) {
    chatService.send(id, message);
  }

먼저 컨트롤러부터 자세히 설명하면,
topic id와 메시지 정보를 받아서 chatService.send()를 호출하고

chatService.send()에서는 topic id를 이용해서 Topic을 찾아와서 RedisPublisher를 이용해 메시지를 publish 한다.


그러면 RedisSubscriber는 메시지를 받아서 simpMessageSendingOperations.convertAndSend("/topic/" + id, chatMessage); 로 브로드캐스트 한다.

다음으로 이제 테스트코드를 봐보자. 자세한 테스트 과정은 다음 링크 참조

connect -> subscribe -> send 과정을 거치고
서버로부터 오는 메시지는 BlockingQueue queue를 통해 받는다.

문제는 분명 서버 코드에서 simpMessageSendingOperations.convertAndSend("/topic/" + id, chatMessage);로 메시지를 전달하는데 테스트 코드에는 아무런 메시지가 전달되지 않았다.

아무런 메시지가 전달되지 않아 테스트가 실패하는 것을 볼 수 있다.

그래서 connect, subscribe가 제대로 된 것은 맞는지를 의심했다.

이를 테스트해보기 위해

@Component
@Slf4j
public class WebSocketEventListener {

@EventListener
public void handle1(SessionConnectedEvent event) {
  log.info("session-id:{}", event.getMessage().getHeaders().get("simpSessionId"));
  log.info("session-destination:{}", event.getMessage().getHeaders().get("simpDestination"));
  log.info("connect event");
}
@EventListener
public void handle2(SessionSubscribeEvent event) {
  log.info("session-id:{}", event.getMessage().getHeaders().get("simpSessionId"));

  log.info("session-destination:{}", event.getMessage().getHeaders().get("simpDestination"));
  log.info("subscribe event");
}
}

connect, subscribe 이벤트 발생시 로그를 찍는 코드를 만들고 다시 테스트를 실행해봤다.

로그가 찍힌 걸 확인할 수 있었고 connect, subscribe에는 문제가 없다는 것을 알게 되었다.

해결

엄청난 삽질 끝에 서버에서 전달한 메시지가 제대로 받아지기는 하는 건지 확인 하기 위해 StompFrameHandler 내부 코드를 디버깅해보았다.

DefaultStompSession.invokeHandler() 가 메시지를 전달받았을 때 실행되는 메서드라는 느낌이 강하게왔다. 🤔
디버깅을 해보니


메시지는 제대로 전달이 되고 있었고 파란 박스 부분에서 메시지 컨버터가 제대로 동작하기 못해 예외가 발생하고 있었다.
Cannot construct instance of com.example.chat.dto.ChatMessage (although at least one Creator exists): cannot deserialize from Object value (no delegate- or property-based Creator)

ChatMessage 를 생성할 수 없다는 것이다.

@Getter
public class ChatMessage implements Serializable {

private static final long serialVersionUID = 6494678977089006639L;
@Size(min = 1, max = 100, message = "메시지 글자 수는 최대 100자입니다.")
private String message;

@JsonCreator
private ChatMessage(String message) {
  this.message = message;
}

public static ChatMessage from(String message) {
  return new ChatMessage(message);
}
}

@Getter와 생성자 위에 @JsonCreator가 있어 역직렬화할때 문제가 되지 않겠지 생각했는데 그때 사용하는 메시지 컨버터는 MappingJackson2HttpMessageConverter이고 현재 사용하는 메시지 컨버터는 MappingJackson2MessageConverter였다. (처리 방식이 달라서 MappingJackson2MessageConverter의 경우는 기본 생성자가 있어야 되는 것 아닌가 생각하고 있는데 모르겠다🤔)

기본 생성자를 넣어 문제를 해결했다!

@Getter
@NoArgsConstructor(access = PRIVATE)
public class ChatMessage implements Serializable {

private static final long serialVersionUID = 6494678977089006639L;
@Size(min = 1, max = 100, message = "메시지 글자 수는 최대 100자입니다.")
private String message;

@JsonCreator
private ChatMessage(String message) {
  this.message = message;
}

public static ChatMessage from(String message) {
  return new ChatMessage(message);
}
}


원인을 찾기가 너무 힘들었다. 비동기 테스트는 처음이라 RedisSubscriber가 메시지를 받기도 전에 테스트 끝난다던지, timeout으로 실패, embedded redis 실패.. 등등 🥲

profile
공부하고 알게 된 내용을 기록하는 블로그

0개의 댓글