웹소켓(WebSocket)은 실시간 양방향 통신을 가능하게 하는 네트워크 프로토콜입니다. 일반적인 웹 통신에서 사용하는 HTTP 프로토콜은 요청-응답 방식으로 동작하며, 클라이언트가 서버에 요청을 보내고 서버가 그 요청에 대한 응답을 반환하는 방식으로 작동합니다. 하지만, 웹소켓은 이와 다르게 클라이언트와 서버 간에 지속적인 연결을 유지하여 양방향 통신을 실시간으로 가능하게 합니다.
웹소켓은 클라이언트와 서버가 동시에 메시지를 주고받을 수 있는 양방향 통신을 지원합니다. 서버는 클라이언트의 요청을 기다릴 필요 없이 언제든지 메시지를 클라이언트에 보낼 수 있습니다.
웹소켓은 클라이언트와 서버 간의 연결을 지속적으로 유지합니다. 한번 연결이 성립되면, 클라이언트나 서버 중 하나가 명시적으로 연결을 종료할 때까지 유지됩니다. 이로 인해 매번 새로운 연결을 설정할 필요가 없어 오버헤드가 줄어듭니다.
웹소켓 프로토콜은 기존의 HTTP 프로토콜과 다르게 동작합니다. 웹소켓 연결은 초기에는 HTTP 핸드셰이크를 통해 시작되며, 이 과정에서 Upgrade 헤더를 사용해 프로토콜을 HTTP에서 WebSocket으로 업그레이드합니다. 이후로는 웹소켓 프로토콜이 사용됩니다.
웹소켓 통신에서 주고받는 데이터는 "프레임"이라는 단위로 나누어집니다. 프레임은 텍스트, 바이너리, 핑, 퐁, 클로즈 등의 종류가 있으며, 데이터의 유형에 따라 프레임이 결정됩니다.
웹소켓은 HTTP와 비교해 헤더 정보가 적고 통신이 간결하게 이루어지므로, 데이터 전송에 필요한 오버헤드가 줄어듭니다. 이는 실시간 데이터 전송이 중요한 애플리케이션에서 성능 향상에 기여합니다.
Springboot 를 활용하여 간단한 웹소켓 채팅 서버를 구현해보겠습니다.
Springboot project 생성 후 spring-boot-starter-websocket
의존성을 추가합니다. 이외, lombok
과 gson
도 추가합니다.
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-websocket'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
implementation group: 'com.google.code.gson', name: 'gson', version: '2.10.1'
//이하 생략..
WebSocketConfigurer 를 구현합니다. end point 를 application.yml의 spring.application.name 으로 설정하고, 아래서 생성할 WebSocktHandler를 등록해 줍니다. setAllowedOrigins("*")를 설정해서 모든 CORS 요청을 허용합니다.
config.WebSocketConfig
@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {
private final WebSocketHandler webSocketHandler;
@Value("${spring.application.name}")
private String projectName;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry
.addHandler(webSocketHandler, projectName)
.setAllowedOrigins("*");
//.withSockJS(); // sockJs 활용 시 추가
}
}
application.yml
spring:
application:
name: chat-websocket
이제 ws://localhost:8080/chat-websocket
으로 웹소켓 서버에 접속할 수 있게 됩니다.
클라이언트로부터 받을 record 구조와 MessageType을 정의합니다.
record.ChatMessage
public record ChatMessage(
Long roomNumber,
MessageType messageType,
String userName,
String message
) {
}
enumeration.MessageType
public enum MessageType {
JOIN, CHAT
}
handler.WebSockethandler
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketHandler extends TextWebSocketHandler {
private final ExecutorService executorService
= Executors.newCachedThreadPool();
private final ConcurrentMap<String, WebSocketSession> sessions
= new ConcurrentHashMap<>();
private final Map<Long, Set<WebSocketSession>> chatRoomMap
= new HashMap<>();
private final Gson gson;
//이하 생략
TextWebSocketHandler 를 상속받아 WebSocketHandler 를 구현합니다. 메시지 전송 시 사용할 ExecutorService와 접속 세션을 관리할 ConcurrentMap, 접속자간 채팅룸 관리를 위한 Map, 메시지 파싱을 위한 Gson 을 클래스 변수로 설정합니다. Gson 은 spring bean 에 등록하여 사용합니다.
config.GsonConfig
@Configuration
public class GsonConfig {
@Bean
public Gson gson() {
return new GsonBuilder()
.serializeNulls()
.create();
}
}
접속 이벤트를 처리하는 afterConnectionEstablished 를 override 합니다. 접속한 세션정보를 클래스 변수로 선언한 ConcurrentMap 에 담고 welcome 메시지를 전송합니다.
@Override
public void afterConnectionEstablished(@Nonnull WebSocketSession session) {
this.sessions.put(session.getId(), session);
log.info("Session connected. (" + this.sessions.keySet().size() + ")");
sendMessage("Welcome to the connection.", session);
}
session 으로 message를 전송하거나 chatRoom 에 message 를 전송할 method를 구현합니다.
private void sendMessage(String message, WebSocketSession session) {
executorService.submit(() -> {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error(session.getId() + " fail to send message. -> " + message);
}
});
}
private void sendMessage(Set<WebSocketSession> chatRoomSessionSet, String message) {
executorService.submit(() -> {
chatRoomSessionSet.parallelStream().forEach(session -> {
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
log.error(session.getId() + " fail to send message. -> " + message);
}
});
});
}
클라이언트로 부터 받은 메시지 처리를 위한 handleTextMessage 를 override 합니다. gson 을 활용해 전달받은 string message 를 ChatMessage record 로 컨버팅합니다.
@Override
protected void handleTextMessage(@Nonnull WebSocketSession session, TextMessage message) {
String payload = message.getPayload();
try {
//String message convert to ChatMessage record
ChatMessage chatMessage = gson.fromJson(payload, ChatMessage.class);
Long chatRoomNumber = chatMessage.roomNumber();
// chatRoomMap 에 chatRoomNumber 가 없을 경우 새로 생성
if (!chatRoomMap.containsKey(chatRoomNumber)) {
log.info(chatRoomNumber + " 번 chatRoom 이 생성되었습니다.");
chatRoomMap.put(chatRoomNumber, new HashSet<>());
}
// messageType에 따라 chatRoom에 메시지 전송
Set<WebSocketSession> chatRoomSessionSet = chatRoomMap.get(chatRoomNumber);
if (MessageType.JOIN == chatMessage.messageType()) {
if (!chatRoomSessionSet.contains(session)) {
sendMessage(chatRoomSessionSet, chatMessage.userName() +
" 님이 채팅에 참여하였습니다.");
chatRoomSessionSet.add(session);
}
} else if (MessageType.CHAT == chatMessage.messageType()) {
sendMessage(chatRoomSessionSet, chatMessage.message());
}
} catch (IllegalArgumentException e) {
log.error("Requested Invalid chatMessage. -> " + payload);
}
}
세션 종료 이벤트 캐치 메서드를 Override 합니다. 전체 세션을 관리하는 sessions와 chatRoomMap 에 세션이 있을 경우 제거합니다.
@Override
public void afterConnectionClosed(WebSocketSession session, @Nonnull CloseStatus status) {
if (this.sessions.get(session.getId()) != null
&& this.sessions.get(session.getId()).isOpen()) {
this.sessions.remove(session.getId());
}
this.chatRoomMap.keySet().forEach(roomNumber -> {
Set<WebSocketSession> sessions = chatRoomMap.get(roomNumber);
sessions.removeIf(roomSession -> roomSession.equals(session));
});
log.info("session disconnected. (" + this.sessions.keySet().size() + ")");
}
웹소켓 연결을 직접 설정하고 클라이언트간 메시지를 직접처리하는 위 방식과 달리, MessageBroker 를 사용하면 STOMP 프로토콜을 사용하여 메시지를 라우팅하고 브로드캐스트할 수 있습니다.
WebSocketConfigurer 을 사용한 경우 접속한 세션을 직접 관리해야 하지만 MessageBroker를 사용한 경우는 MessageBroker가 이를 대신합니다. STOMP를 사용하여 메시지 라우팅과 세션 관리를 자동화하고, 사용자 ID 또는 목적지별로 메시지를 분배할 수 있습니다.
웹소켓 기능을 확인할 수 있는 Chrome의 확장 프로그램을 설치 합니다. Chrome 웹 스토어에서 websocket king client
를 검색하여 설치합니다.
설치가 완료되면 ws://localhost:8080/chat-websocket
으로 접속하고 메시지를 전송합니다.
1번 채팅방 참여
{"messageType":"JOIN", "roomNumber":1, "userName":"이건"}
메시지 전송
{"messageType":"CHAT", "roomNumber":1, message: "안녕하세요", "userName":"이건"}
위와 같이 양쪽 브라우저에 전송한 메시지가 표출되는 것을 확인할 수 있습니다.
웹소켓은 효율적인 실시간 양방향 통신을 가능하게 하며, 위에서 구현한 채팅 프로그램이나 실시간 처리가 필요한 상황판, 영상 Streaming 등 많은 웹 어플리케이션에서 활용되고 있습니다. 이러한 용도에 맞게 적절하게 활용하여 최적화된 어플리케이션을 개발합시다!
데이터의 순서를 보장하고, 손실된 패킷을 재전송할 수 있도록 합니다. 이러한 특성은 웹소켓이 양방향, 지속적인 연결을 유지하면서도 신뢰할 수 있는 데이터 통신을 요구하는 이유입니다.
웹소켓 연결은 먼저 HTTP 프로토콜을 통해 핸드셰이크 과정을 거쳐 설정됩니다. 이 단계에서 클라이언트와 서버는 서로 웹소켓 연결을 설정하기로 동의합니다.
핸드셰이크가 완료된 후, 웹소켓은 TCP 연결을 통해 양방향 통신을 시작하며, 이 연결은 지속적입니다.
이후 클라이언트와 서버는 서로에게 메시지를 주고받을 수 있으며, 이는 TCP의 신뢰성 덕분에 순서와 데이터의 무결성이 보장됩니다.
ExecutorService는 Java에서 비동기 작업을 관리하고 실행하는 데 사용되는 프레임워크입니다. 이는 java.util.concurrent
패키지에 속하며, 작업을 스레드 풀에서 관리하여 보다 효율적인 멀티스레딩을 가능하게 합니다.
스레드 풀 관리
ExecutorService는 스레드 풀을 관리합니다. 스레드 풀은 미리 생성된 스레드들의 집합으로, 작업이 들어올 때마다 새로운 스레드를 생성하는 대신, 기존의 스레드를 재사용하여 성능을 최적화합니다.
스레드 풀을 사용하면 스레드 생성 비용을 줄이고, 시스템 자원의 효율성을 높일 수 있습니다.
작업 제출
ExecutorService는 Runnable 또는 Callable 인터페이스를 구현한 작업을 제출받아 실행합니다. 이를 통해 비동기적으로 작업을 처리할 수 있습니다. 작업을 제출하는 주요 메서드는 아래와 같습니다.
submit() : 작업을 제출하고, 작업의 완료 상태나 결과를 나타내는 Future 객체를 반환합니다.
invokeAll() : 여러 작업을 동시에 제출하고, 모든 작업이 완료될 때까지 기다립니다.
invokeAny() : 여러 작업을 제출하고, 그 중 하나가 완료되면 반환합니다.
작업 종료 관리
ExecutorService는 작업이 완료된 후 스레드 풀을 정리하고 종료할 수 있는 메서드를 제공합니다. 주요 종료 메서드는 아래와 같습니다.
shutdown(): 현재 진행 중인 작업을 완료한 후, 더 이상의 새로운 작업을 받지 않고 스레드 풀을 종료합니다.
shutdownNow(): 진행 중인 작업을 중단하고, 스레드 풀을 즉시 종료합니다.
Future 객체
작업을 제출한 후, ExecutorService는 Future 객체를 반환합니다. 이를 통해 작업의 완료 여부를 확인하거나, 작업이 완료될 때까지 기다릴 수 있습니다.
Future.get() 메서드를 호출하면, 작업이 완료될 때까지 대기하고, 완료된 작업의 결과를 반환합니다.
ThreadPoolExecutor
ExecutorService의 가장 일반적인 구현체로, 커스텀 스레드 풀을 생성할 수 있습니다.
다양한 스레드 풀 정책(코어 스레드 수, 최대 스레드 수, 큐의 종류 등)을 설정할 수 있습니다.
ScheduledThreadPoolExecutor
주기적으로 또는 지연된 작업을 실행할 수 있는 스레드 풀을 제공합니다. 주로 타이머나 주기적인 작업을 처리할 때 사용됩니다.
Executors 유틸리티 클래스
다양한 스레드 풀을 쉽게 생성할 수 있는 팩토리 메서드를 제공합니다.
Executors.newFixedThreadPool(int n): 고정 크기의 스레드 풀을 생성합니다.
Executors.newCachedThreadPool(): 필요에 따라 스레드를 생성하거나 재사용하는 스레드 풀을 생성합니다.
Executors.newSingleThreadExecutor(): 단일 스레드를 사용하는 스레드 풀을 생성합니다.
ConcurrentMap은 Java의 java.util.concurrent
패키지에 포함된 인터페이스로, 멀티스레드 환경에서 안전하게 사용할 수 있는 Map 자료구조를 정의합니다. 이 인터페이스는 Map 인터페이스를 확장하며, 여러 스레드가 동시에 맵에 접근하고 수정할 때 발생할 수 있는 동기화 문제를 해결하기 위한 메서드들을 추가로 제공합니다. 멀티스레드 환경에서 맵 자료구조를 사용할 때 매우 유용하며, 특히 ConcurrentHashMap은 대부분의 동시성 요구 사항을 충족할 수 있는 강력한 구현체입니다. ConcurrentMap을 사용하면 복잡한 동기화 문제를 쉽게 해결하면서도 성능을 극대화할 수 있습니다.
동시성 보장
ConcurrentMap은 다수의 스레드가 동시에 데이터에 접근하고 수정하는 것을 안전하게 보장합니다. 이를 통해 동기화 블록을 사용하지 않고도 여러 스레드가 효율적으로 작업할 수 있습니다.
원자적 연산
ConcurrentMap은 원자적으로 실행되는 여러 메서드를 제공합니다. 예를 들어, putIfAbsent(), remove(), replace() 메서드는 특정 조건을 충족하는 경우에만 원자적으로 실행됩니다.
이러한 메서드들은 데이터 경합(Race Condition) 문제를 방지하고, 예상치 못한 데이터 변경을 피할 수 있도록 도와줍니다.
잠금 기법 최소화
ConcurrentMap의 구현체는 대부분의 경우 필요한 부분에만 잠금을 사용하여 동시성을 보장하고, 전체 맵에 대한 잠금을 피함으로써 성능을 극대화합니다. 이는 synchronized 키워드를 사용한 동기화와 비교했을 때 더 나은 성능을 제공합니다.