[토킹테일] Spring WebFlux 채팅 예제로 연습하기 (1-1)

Choi Wontak·2025년 4월 27일

비바이빙

목록 보기
7/8
post-thumbnail

비바이빙 - 빠르게 개발하기

AI 시대에 맞추어 2주에 한 프로덕트를 만들어내는, 작지만 빠른 개발을 지향하는 프로젝트입니다.


아직 WebFlux 이해가 부족하다 (문제 인식 단계)

이전에 간단한 예제를 통해 공부했지만, 아직 리액티브 프로그래밍에 대해 익숙하지 않은 느낌... 무언가 찝찝한 느낌이 들었다.
야생형 개발자 답게 예제를 조금만 더 깊게 파보자.

공부 내용 정리

좀더 발전된 채팅 기능 만들기

저번엔 SSE로 채팅방을 만들었는데, 이번에는 많이 쓰는 방식인 웹소켓으로 채팅방을 만들어보겠다.

@Configuration
public class WebSocketConfig {

    @Bean
    public HandlerMapping webSocketMapping(ChatHandler chatHandler) {
        return new SimpleUrlHandlerMapping(Map.of(
                "/chat", chatHandler
        ), -1);
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

우선 웹소켓으로 접근한 경우 처리해줄 Config 빈을 생성하였다.
뒤에 등장할 ChatHandler를 주입받아 /chat으로 접근한 경우 해당 핸들러를 사용해주도록 설정하였다.

@Component
public class ChatHandler implements WebSocketHandler {

    private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        // 1. 클라이언트 → 서버 메시지 수신
        Flux<String> incoming = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .doOnNext(message -> {
                    System.out.println("수신: " + message);
                    sink.tryEmitNext(message); // 수신하면 전체에게 브로드캐스트
                });

        // 2. 서버 → 클라이언트 메시지 송신
        Flux<WebSocketMessage> outgoing = sink.asFlux()
                .map(session::textMessage);

        // 3. 수신과 송신을 모두 연결
        return session.send(outgoing)
                .and(incoming);
    }
}

채팅을 처리해줄 ChatHandler에 대한 로직이다.

Sinks.many().multicast().onBackpressureBuffer();

SSE와 동일하게 공유할 스트림을 만들어준다.
multicast : 여러 클라이언트의 구독이 가능
onBackpressureBuffer: 소비자가 느릴 경우 메시지를 버퍼에 임시 저장함 (유실 방지)

Flux<String> incoming = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .doOnNext(message -> {
                    System.out.println("수신: " + message);
                    sink.tryEmitNext(message); // 수신하면 전체에게 브로드캐스트
                });

세션에서 메시지를 받아 텍스트로 변환하고, sink에 밀어넣어 준다.

Flux<WebSocketMessage> outgoing = sink.asFlux()
                .map(session::textMessage);

Flux를 반환함으로써 클라이언트가 이를 구독하는 스트리밍을 구현한다.
asFlux를 통해 subscriber에게 데이터를 전달한다.

return session.send(outgoing)
                .and(incoming);

웹소켓을 통해 스트림을 전달한다. 두 Mono를 결합해 완료할 때까지 실행한다.

1번의 메시지가 화면에 잘 표시된다.

2번의 메시지도 1번의 화면에서 잘 표시된다.

웹소켓으로 바꿔 살짝 복잡해졌지만 여전이 코드는 깔끔하다.

방을 추가해보기

private final Map<String, Sinks.Many<String>> roomSinkMap = new ConcurrentHashMap<>();

여러 방을 연결할 수 있도록 Map을 적용하였다.
동시성 문제가 있을 수 있으니 Concurrent로..!

Sinks.Many<String> sink = roomSinkMap.computeIfAbsent(roomId,
                key -> Sinks.many().multicast().onBackpressureBuffer()
        );

roomId를 URI에서 꺼내서 없으면 만들고 있으면 사용하도록 짜줬다.

roomSinks.get(roomId).tryEmitNext(new ChatMessage("ENTER", nickname, nickname + "님이 입장했습니다."));

입장처리를 위한 코드.
세션을 연결할 때 Sinks에 해당 메시지를 넣는 것으로 처리하였다.

return session.send(outgoing)
                .and(incoming.then())
                .doFinally(signalType -> {
                    // 연결 종료 시 처리
                    roomUsers.get(roomId).remove(nickname);
                    roomSinks.get(roomId).tryEmitNext(new ChatMessage("LEAVE", nickname, nickname + "님이 퇴장했습니다."));
                });

퇴장 처리는 세션을 종료할 때 Sinks에 메시지를 넣어 처리하였다.


후기

세션, SSE와 같은 통신 관련 기능을 작성할 때 확실히 편리한 것 같다.
아직 실무에서 쓰기에 뭔가 준비된 느낌은 안드는데 다른 예제도 살펴봐야겠다.

profile
백엔드 주니어 주니어 개발자

0개의 댓글