
AI 시대에 맞추어 2주에 한 프로덕트를 만들어내는, 작지만 빠른 개발을 지향하는 프로젝트입니다.
이전에 간단한 예제를 통해 공부했지만, 아직 리액티브 프로그래밍에 대해 익숙하지 않은 느낌... 무언가 찝찝한 느낌이 들었다.
야생형 개발자 답게 예제를 조금만 더 깊게 파보자.
저번엔 SSE로 채팅방을 만들었는데, 이번에는 많이 쓰는 방식인 웹소켓으로 채팅방을 만들어보겠다.
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping webSocketMapping(ChatHandler chatHandler) {
return new SimpleUrlHandlerMapping(Map.of(
"/chat", chatHandler
), -1);
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
우선 웹소켓으로 접근한 경우 처리해줄 Config 빈을 생성하였다.
뒤에 등장할 ChatHandler를 주입받아 /chat으로 접근한 경우 해당 핸들러를 사용해주도록 설정하였다.
@Component
public class ChatHandler implements WebSocketHandler {
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
@Override
public Mono<Void> handle(WebSocketSession session) {
// 1. 클라이언트 → 서버 메시지 수신
Flux<String> incoming = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(message -> {
System.out.println("수신: " + message);
sink.tryEmitNext(message); // 수신하면 전체에게 브로드캐스트
});
// 2. 서버 → 클라이언트 메시지 송신
Flux<WebSocketMessage> outgoing = sink.asFlux()
.map(session::textMessage);
// 3. 수신과 송신을 모두 연결
return session.send(outgoing)
.and(incoming);
}
}
채팅을 처리해줄 ChatHandler에 대한 로직이다.
Sinks.many().multicast().onBackpressureBuffer();
SSE와 동일하게 공유할 스트림을 만들어준다.
multicast : 여러 클라이언트의 구독이 가능
onBackpressureBuffer: 소비자가 느릴 경우 메시지를 버퍼에 임시 저장함 (유실 방지)
Flux<String> incoming = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(message -> {
System.out.println("수신: " + message);
sink.tryEmitNext(message); // 수신하면 전체에게 브로드캐스트
});
세션에서 메시지를 받아 텍스트로 변환하고, sink에 밀어넣어 준다.
Flux<WebSocketMessage> outgoing = sink.asFlux()
.map(session::textMessage);
Flux를 반환함으로써 클라이언트가 이를 구독하는 스트리밍을 구현한다.
asFlux를 통해 subscriber에게 데이터를 전달한다.
return session.send(outgoing)
.and(incoming);
웹소켓을 통해 스트림을 전달한다. 두 Mono를 결합해 완료할 때까지 실행한다.
1번의 메시지가 화면에 잘 표시된다.
2번의 메시지도 1번의 화면에서 잘 표시된다.
웹소켓으로 바꿔 살짝 복잡해졌지만 여전이 코드는 깔끔하다.
private final Map<String, Sinks.Many<String>> roomSinkMap = new ConcurrentHashMap<>();
여러 방을 연결할 수 있도록 Map을 적용하였다.
동시성 문제가 있을 수 있으니 Concurrent로..!
Sinks.Many<String> sink = roomSinkMap.computeIfAbsent(roomId,
key -> Sinks.many().multicast().onBackpressureBuffer()
);
roomId를 URI에서 꺼내서 없으면 만들고 있으면 사용하도록 짜줬다.
roomSinks.get(roomId).tryEmitNext(new ChatMessage("ENTER", nickname, nickname + "님이 입장했습니다."));
입장처리를 위한 코드.
세션을 연결할 때 Sinks에 해당 메시지를 넣는 것으로 처리하였다.
return session.send(outgoing)
.and(incoming.then())
.doFinally(signalType -> {
// 연결 종료 시 처리
roomUsers.get(roomId).remove(nickname);
roomSinks.get(roomId).tryEmitNext(new ChatMessage("LEAVE", nickname, nickname + "님이 퇴장했습니다."));
});
퇴장 처리는 세션을 종료할 때 Sinks에 메시지를 넣어 처리하였다.
세션, SSE와 같은 통신 관련 기능을 작성할 때 확실히 편리한 것 같다.
아직 실무에서 쓰기에 뭔가 준비된 느낌은 안드는데 다른 예제도 살펴봐야겠다.