websocket의 저수준 api 활용에 대해서 공부해보려고 한다. 다들 웹소켓에서 지원하는 것들을 얼마나 활용하고 있는가? 만약 제대로 활용하고 있지 못한다면 이번에 다양하게 사용할 수 있는 인터페이스와 클래스들을 알아보고 본인 프로젝트에 적용해보자.
이건 검색하면 다른 블로그에서 많이 봤을 것이다.
WebSocketHandler
인터페이스public interface WebSocketHandler {
void afterConnectionEstablished(WebSocketSession session) throws Exception;
void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
boolean supportsPartialMessages();
}
이 인터페이스는 WebSocket 연결의 라이프사이클과 메시지 처리를 담당하는 핵심 인터페이스이다.
위에서 부터 순서대로 연결이 성공되었을 때, 새로운 메시지가 도착했을 때, 전송 중 오류가 발생했을 때, 연결이 종료되었을 때, 부분 메시지를 지원하는지
이 인터페이스는 많이들 사용해 보았을 거라 생각한다. 추가적인 설명은 생략하겠다.
HandShakeInterceptor
인터페이스public interface HandshakeInterceptor {
boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception;
void afterHandshake(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, Exception ex) throws Exception;
}
이 인터페이스는 Http에서 websocket으로 연결하기 위해서 핸드쉐이크를 거치는 단계에서 사용되는 인터페이스다 이 인터페이스는 보통 토큰 검증, 인증하는 경우에 사용된다. Spring Boot3.0에서는 DefaultHandshakeHandler를 확장해서 보통 구현한다.
spring security에서 보통 한번에 한번 실행되는 OncePerRequestFilter 이 필터를 사용해서 구현하는 것과 비슷하다.
AbstractWebSocketHandler
클래스public abstract class AbstractWebSocketHandler implements WebSocketHandler {
// 기본 구현 제공
}
WebSocketHandler 인터페이스가 기본 구현을 제공하는 추상 클래스이다. 자세한 내용이 궁금하면 직접 확인해보고 사용해보길 바란다.
TextWebSocketHandler
클래스public class TextWebSocketHandler extends AbstractWebSocketHandler {
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
// 바이너리 메시지를 거부하고 NOT_ACCEPTABLE 상태로 응답
session.close(CloseStatus.NOT_ACCEPTABLE);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 텍스트 메시지 처리 로직 구현
}
}
이는 텍스트 메시지만 처리하는 WebSocketHandler
구현체이다. 텍스트 처리에 대해서 커스텀을 하고 싶다면 TextWebSocketHandler
이를 사용하자.
BinaryWebSocketHandler
클래스public class BinaryWebSocketHandler extends AbstractWebSocketHandler {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 텍스트 메시지를 거부하고 NOT_ACCEPTABLE 상태로 응답
session.close(CloseStatus.NOT_ACCEPTABLE);
}
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
// 바이너리 메시지 처리 로직 구현
}
}
바이너리 메시지만 처리하는 WebSocketHandler
구현체이다. 필요하다면 BinaryWebSocketHandler
을 받아서 커스텀 하자.
WebSocketConfigurer
인터페이스public interface WebSocketConfigurer {
void registerWebSocketHandlers(WebSocketHandlerRegistry registry);
}
java 구성을 통해서 WebSocket 핸들러를 등록하는 인터페이스이다. stomp를 사용하지 않고 순수 웹소켓만 사용하려면 이 설정을 해주면 된다.
WebSocketMessageBrokerConfigurer
인터페이스public interface WebSocketMessageBrokerConfigurer {
void registerStompEndpoints(StompEndpointRegistry registry);
void configureMessageBroker(MessageBrokerRegistry registry);
// 기타 구성 메서드
}
Stomp 메시징을 사용하는 WebSocket을 구성하기 위한 인터페이스이다.
이렇게 활용할 수 있는 인터페이스와 클래스에 대해서 알아보았는데 이제 이를 가지고 어떻게 더 활용할 수 있는지 알아보자.
웹소켓을 가지고 구현을 할 경우에 세션 관리가 중요하다. 그래서 세션 관리를 위한 유틸리티 클래스를 보통 만들게 되는데 WebSocketSessionManager
클래스를 만들어서 관리하고 메시지 처리를 해보자. 이름은 본인이 막지은거다. 알아서 하고 싶은대로 작성하면 된다.
WebSocketSessionManager
클래스@Component
public class WebSocketSessionManager {
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
// 세션 등록
public void registerSession(WebSocketSession session) {
sessions.put(session.getId(), session);
}
// 세션 제거
public void removeSession(WebSocketSession session) {
sessions.remove(session.getId());
}
// 특정 사용자에게 메시지 전송
public void sendMessageToUser(String userId, TextMessage message) throws IOException {
for (WebSocketSession session : sessions.values()) {
// 세션에 저장된 사용자 정보 확인
Principal principal = session.getPrincipal();
if (principal != null && principal.getName().equals(userId)) {
session.sendMessage(message);
break;
}
}
}
// 모든 사용자에게 메시지 브로드캐스트
public void broadcastMessage(TextMessage message) throws IOException {
for (WebSocketSession session : sessions.values()) {
if (session.isOpen()) {
session.sendMessage(message);
}
}
}
// 특정 조건의 사용자들에게 메시지 전송
public void sendMessageToUsers(Predicate<Principal> filter, TextMessage message) throws IOException {
for (WebSocketSession session : sessions.values()) {
if (session.isOpen() && session.getPrincipal() != null && filter.test(session.getPrincipal())) {
session.sendMessage(message);
}
}
}
}
위와 같은 방식으로 세션 관리를 할 수 있을 것이고, 이제 이 유틸리티 클래스를 가지고 메시지를 TextWebSocketHandler
클래스를 커스텀하는 클래스에서 메시지를 처리할 수 있을 것이다.
websocket에서는 메시지 가로채기 및 변환을 위해서 제공하는 데코레이터가 존재한다. 데코레이터 패턴에 대해서 알고 있다면 쉽게 사용해 볼 수 있을 것이다. websocket에서 제공하는 데코레이터는 다음과 같다.
WebSocketHandlerDecorator
클래스public class MessageInterceptorDecorator extends WebSocketHandlerDecorator {
private final ObjectMapper objectMapper = new ObjectMapper();
public MessageInterceptorDecorator(WebSocketHandler delegate) {
super(delegate);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
// 필요한 커스텀 내용 구현
}
}
이렇게 사용하면 되고 이 데코데이터를 빈 등록해줘야한다. 앞서 언급한 순수 websocket을 사용할 경우에는 WebSocketConfig
이 설정을 통해서 빈 등록을 해주면 된다.
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// 핸들러를 데코레이터로 감싸서 등록
WebSocketHandler decoratedHandler = new MessageInterceptorDecorator(myWebSocketHandler);
// myWebSocketHandler 이거는 의존성 주입
registry.addHandler(decoratedHandler, "/wsWithInterceptor")
.setAllowedOrigins("*");
}
순수 websocket에서 예외처리는 이것도 기본적으로 제공하는 데코레이터가 존재한다. 이는 연결에서 발생하는 예외를 중앙 집중식으로 처리하는데 유용하다.
ExceptionWebSocketHandlerDecorator
클래스public class CustomExceptionHandler extends ExceptionWebSocketHandlerDecorator {
private static final Logger logger = LoggerFactory.getLogger(CustomExceptionHandler.class);
public CustomExceptionHandler(WebSocketHandler delegate) {
super(delegate);
}
@Override
protected void handleException(WebSocketSession session, Throwable exception) {
logger.error("WebSocket error: " + exception.getMessage(), exception);
try {
// 에러 메시지를 클라이언트에게 전송
if (session.isOpen()) {
Map<String, String> errorInfo = new HashMap<>();
errorInfo.put("error", exception.getMessage());
errorInfo.put("type", "ERROR");
ObjectMapper objectMapper = new ObjectMapper();
String errorJson = objectMapper.writeValueAsString(errorInfo);
session.sendMessage(new TextMessage(errorJson));
}
} catch (IOException e) {
logger.error("Failed to send error message to client", e);
}
// 심각한 오류인 경우 연결 종료
if (exception instanceof WebSocketMessageNotReadableException ||
exception instanceof SecurityException) {
try {
session.close(CloseStatus.PROTOCOL_ERROR);
} catch (IOException e) {
logger.error("Failed to close WebSocket session", e);
}
}
}
}
WebSocketHandler 메서드에서 발생하는 모든 처리되지 않은 예외를 포착하고, 해당 예외를 로깅한 다음, WebSocket 세션을 CloseStatus.SERVER_ERROR(1011) 상태로 닫는다. 물론 이도 헨들러 등록을 해줘야 한다.
이번엔 동기가 아닌 비동기 프로젝트의 경우에 대해서 알아보자. 알고나면 다 비슷하고 크게 다르진 않다. 먼저 웹소켓 헨들러부터 보자.
Reactive WebSocketHandler
클래스@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
// 인바운드 메시지를 처리하고 에코 응답을 보냄
return session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(message -> "ECHO: " + message)
.map(session::textMessage)
.as(session::send);
}
}
함수형 프로그래밍에 대해서 배웠다면 매우 반갑게 보일 것이다. 함수형 프로그래밍 꼭 배워두자. 사용하기 편리하니까..!
ReactiveWebSocketConfig
클래스 구성@Configuration
public class ReactiveWebSocketConfig {
@Autowired
private ReactiveWebSocketHandler reactiveWebSocketHandler;
@Bean
public HandlerMapping webSocketHandlerMapping() {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/reactive-ws", reactiveWebSocketHandler);
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setUrlMap(map);
handlerMapping.setOrder(-1); // 높은 우선순위
return handlerMapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
여기서는 기본적으로 제공하는 설정클래스가 없다. 본인이 직접 설정해줘야 하는데 spring에서 비동기의 경우 보통 webflux를 사용할 것이다. 이를 사용해서 설정 구성을 하게 될 경우 WebSocketHandlerAdapter
클래스를 빈등록 해줘야 한다.
이는 웹 소켓 지원을 활성화 하고 WebSocket 요청을 처리하는데 필요한 어댑터 역할을 하기 때문에 필요하다.
WebSocketClient
인터페이스가 있는데 테스트를 해볼 경우 사용된다. reactive의 경우에는 구현체는 ReactorNettyWebSocketClient
이 클래스이니 테스트가 필요한 경우 사용해보자.마지막으로 스레드 구성에 대해서 작성을 해보려고 한다. 이는 처음에 본인의 로컬 환경에 맞게 구성하고 클라우드 환경에 맞게 수를 조정해보길 바란다. 아직 본인도 "스레드 수 조절" 이에 대해서는 잘 모른다. 차차 알아나가려고 한다.
ThreadPoolTaskExecutor
구성@Configuration
public class WebSocketAsyncConfig {
@Bean(name = "webSocketTaskExecutor")
public TaskExecutor webSocketTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("WebSocket-");
executor.initialize();
return executor;
}
}
만약 이런 스레드 구성을 하지 않게 되면 기본 스레드를 사용하니까 트래픽이 많은 환경인 경우 또한 데이터 처리가 많은 경우 주의하자 공용 스레드를 사용하니까 상당히 제한이 걸릴 수 밖에 없다. stream을 공부해봤다면 이 공용 스레드를 사용함으로 인해서 어떠한 문제를 일으킬 수 있을지 알 수 있을 거라 생각한다.
이번 포스팅에서는 저수준의 API를 다루는 내용에 중점을 두려고 했다. 하지만 이 부분까지만 설명하고 마무리 하려고 한다.
WebSocketBrokerConfig
최적화 설정@Configuration
@EnableWebSocketMessageBroker
public class OptimizedWebSocketBrokerConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration
.setSendTimeLimit(15 * 1000) // 15초
.setSendBufferSizeLimit(512 * 1024) // 512KB
.setMessageSizeLimit(128 * 1024); // 128KB
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// 클라이언트로부터 들어오는 메시지를 처리하는 스레드 풀 설정
registration.taskExecutor()
.corePoolSize(4)
.maxPoolSize(8)
.queueCapacity(50);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
// 클라이언트로 나가는 메시지를 처리하는 스레드 풀 설정
registration.taskExecutor()
.corePoolSize(4)
.maxPoolSize(8)
.queueCapacity(50);
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/optimized-ws")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 메시지 브로커 설정
registry.enableSimpleBroker("/topic", "/queue")
.setTaskScheduler(heartbeatScheduler()) // 하트비트 스케줄러
.setHeartbeatValue(new long[] {10000, 10000}); // 10초 하트비트
registry.setApplicationDestinationPrefixes("/app");
}
@Bean
public ThreadPoolTaskScheduler heartbeatScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(1);
scheduler.setThreadNamePrefix("ws-heartbeat-");
return scheduler;
}
}
각각에서 처리하는 스레드 풀과 큐는 본인 상황에 맞게 수정하길 바란다.
이렇게 WebSocket 저수준 API가 어떤게 있고 어떻게 활용할 수 있는지 알아보게 되었는데 많은 도움이 되었길 바란다. 계속 사용해보면서 어떻게 동작하는지 알아야 자신의 상황에 맞게 수정할 수 있을 것이다. 마지막에 최적화에 대해서 언급을 했는데 만약 본인이 채팅 앱을 만드는데 그냥 재미로 만드는거고 사용자가 별로 없다고 한다면 최적화는 신경 안쓰고 만들어도 된다. 채팅 앱을 만들어서 출시를 하는거고 예상 사용자가 천명, 만명이 된다면 그 만큼의 동시 사용자에서 요구되는 속도와 동시성의 문제를 또 생각해봐야 할 것이다. 상황에 맞게 사용하길 바란다.