회사에서 맡고 있던 프로젝트에서 WebSocket으로 통신해야 하는 Part를 할당받았다. 처음 접하는 작업이기에 다소 미숙한 점이 많았지만 나만의 방식대로 정리를 해보고자 한다.
WebSocket은 주로 실시간으로 실시간 양방향 통신이 필요한 기능에서 사용되는데 그 중에서도 나는 무수히 많은 데이터를 주고받는 주식 시장을 대상으로 하는 기능을 구현해야 했다.
지속적인 통신이 필요했기에 전역으로 WebSocket 상태를 관리했고 현 시점에서 가장 간편하고 편리한 Zustand를 사용했다.
Zustand로 관리하는 상태와 함수 아래와 같다.
State
worker
: WebSocket 인스턴스isConnectd
: 연결 상태 관리messageQueue
: 미전송 메세지 큐Funtions
connect
: 연결 관리disconnect
: 연결 해제sendMessage
: 서버로부터 요청받을 데이터 메세지 전송 (클라이언트 --> 서버)setOnMessage
: 응답받은 데이터 처리 (서버 --> 클라이언트)// useWebSocketStore.ts
export const useWebSocketStore = create<WebSocketStore>((set, get) => ({
worker: null,
isConnected: false,
messageQueue: [],
connect: (url: string) => {
const { worker, messageQueue } = get();
if (worker) return;
const newWorker = new WebSocketWorker();
newWorker.onmessage = (event) => {
const { type, message, error } = event.data;
const onMessageCallback = get().onMessage;
switch (type) {
case "CONNECTED":
console.log("WebSocket connected");
set({ isConnected: true });
messageQueue.forEach((msg) => {
newWorker.postMessage({
type: "SEND",
payload: { message: JSON.stringify(msg) },
});
});
set({ messageQueue: [] });
break;
case "DISCONNECTED":
console.log("WebSocket disconnected");
set({ isConnected: false });
break;
case "MESSAGE":
if (onMessageCallback) {
onMessageCallback(message);
}
break;
case "ERROR":
console.error("WebSocket error:", error);
break;
default:
console.error("Unknown message type:", type);
}
};
newWorker.postMessage({ type: "CONNECT", payload: { url } });
set({ worker: newWorker });
},
disconnect: () => {
const { worker } = get();
if (worker) {
worker.postMessage({ type: "DISCONNECT" });
worker.terminate();
set({ worker: null, isConnected: false });
}
},
sendMessage: (message: any) => {
const { worker, isConnected } = get();
if (!isConnected) {
set((state) => ({
messageQueue: [...state.messageQueue, message],
}));
return;
}
if (worker) {
worker.postMessage({
type: "SEND",
payload: { message: JSON.stringify(message) },
});
}
},
setOnMessage: (callback) => {
set({ onMessage: callback });
},
}));
재연결 해주는 reconnect 함수를 왜 Store에서 정의하지 않았냐면 WebWorker에서 동작하도록 구현했기 때문이다.
WebWorker를 import 할 때 경로 뒤에 ?worker
를 붙여줘야 제대로 기능이 활성화된다.
WebWorker란?
메인 스레드와 분리된 별도의 스레드에서 WebSocket 통신 처리하기 위한 방법이다.
쉽게 설명하면 다른 브라우저 탭을 보고 있어도 백그라운드에서 동작하고 있다는 뜻이다.
WebSocket을 연결하고 해제하고 메세지를 요청하는 로직은 대부분 WebWorker에서 구현했다.
Store에서는 onmessage를 통해서 type만 전달하고 있다.
// websocket-worker.js
const connect = (url) => {
if (
socket &&
(socket.readyState === WebSocket.OPEN || socket.readyState === WebSocket.CONNECTING)
) {
return;
}
socket = new WebSocket(url);
socket.onopen = () => {
isReconnecting = false;
self.postMessage({ type: "CONNECTED" });
};
socket.onclose = () => {
self.postMessage({ type: "DISCONNECTED" });
if (navigator.onLine) return;
reconnect(url);
};
socket.onerror = (error) => {
self.postMessage({ type: "ERROR", error });
reconnect(url);
};
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
self.postMessage({ type: "MESSAGE", message: data });
};
};
self.onmessage = function (event) {
const { type, payload } = event.data;
switch (type) {
case "CONNECT":
connect(payload.url);
break;
case "DISCONNECT":
disconnect();
break;
case "SEND":
sendMessage(payload.message);
break;
default:
console.error("Unknown message type:", type);
}
};
보다시피 굉장히 간단한 코드이다. 연결 여부를 확인하는 방어 코드를 넣고 안 되어 있으면 연결해주는 게 끝이다.
reconnect
의 경우 setTimeout으로 1초의 간격을 두고 connect 함수를 재호출 하도록 구현했지만 너무 과도한 재연결이 이뤄지면 과부하 되기 때문에 지수 백오프를 적용할 예정이다.
self
는 Web Worker 내부에서 Worker의 전역 스코프를 가리키는 객체이고 일반 브라우저 환경의 window 객체와 유사한 역할을한다.
가장 중요한 서버 연결 상태를 확인하기 위해 5초 마다 ping을 요청하면 바로 서버로부터 pong 응답을 받는다.
pong 응답받을 때는 pingCount 초기화
만약 ping 요청을 3회 이상했는데 서버로부터 pong 응답이 오지 않을 시, 연결을 바로 해제하고 1초 마다 재연결을 시도 하도록 했다.
위 connect 함수에 적용해보면?
const startPing = () => {
if (pingInterval) return;
pingInterval = setInterval(() => {
if (socket?.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify({ cmd: "ping" }));
pingCount++;
if (pingCount >= 3) disconnect();
} else if (socket?.readyState === WebSocket.CLOSING) {
disconnect();
}
}, 5000);
};
const connet = () => {
socket.onopen = () => {
isReconnecting = false;
self.postMessage({ type: "CONNECTED" });
startPing();
};
}
먼저 모든 페이지에서 통신이 이루어져야 해서 custom hook을 이용하여 useEffect 형태로 구현했고 Route 별로 가장 최상위 컴포넌트에서 호출을 진행했다. 왜냐하면 각 페이지별로 요청해야 하는 데이터가 달랐기 때문이다.
// useWebSocket.ts
const useWebSocketWorker = (
options: WebSocketParams,
callback?: (socketData: SocketDataType) => void,
) => {
const { connect, disconnect, sendMessage, setOnMessage, isConnected } = useWebSocketStore();
const wsUrl = import.meta.env.VITE_WEBSOCKET_URL;
const sendTimer = useRef<number | null>(null);
const prevParams = useRef<string | null>(null);
const defaultCodes = [...];
const defaultTypes = [...];
const extendsOptions = useMemo(() => {
return {
...options,
iscdList: [...(options.iscdList ?? []), ...defaultCodes],
typeList: [...(options.typeList ?? []), ...defaultTypes],
};
}, [options]);
const sendSocketMessage = (oneMoreSend = false) => {
const currentParams = JSON.stringify(extendsOptions.iscdList);
if (!oneMoreSend && prevParams.current === currentParams) return;
prevParams.current = currentParams;
if (sendTimer.current) clearTimeout(sendTimer.current);
sendTimer.current = setTimeout(() => {
sendMessage(extendsOptions);
sendTimer.current = null;
}, 400);
};
useEffect(() => {
if (hasActualStockData) {
connect(wsUrl);
}
}, [hasActualStockData]);
useEffect(() => {
if (isEmpty(extendsOptions.iscdList) || !isConnected) return;
sendSocketMessage();
}, [isConnected, extendsOptions.iscdList]);
useEffect(() => {
if (isConnected && callback) setOnMessage(callback);
if (isConnected && !isEmpty(extendsOptions.iscdList)) {
sendSocketMessage(true);
}
const handleVisibilityChange = () => {
if (document.visibilityState === "visible") {
if (!isConnected) {
disconnect();
connect(wsUrl);
sendMessage(extendsOptions);
}
}
};
document.addEventListener("visibilitychange", handleVisibilityChange);
return () => {
document.removeEventListener("visibilitychange", handleVisibilityChange);
};
}, [isConnected]);
}
그리고 데이터 요청과 요청했던 이전 값을 추적할 sendTimer와 prevParams가 필요했다.
왜냐하면 주식 종목을 4000개 이상을 리스트에서 보여줘야 했고 스크롤을 할 때 마다 서버에 요청을 하기에는 서버 과부화 문제로 인해 성능이 낮아질 우려가 있기 때문이었다. (그래서 sendTimer로 일정 간격을 둠)
그러기에 리스트에는 Virtual Scroll을 이미 적용한 상태여서 보여지는 영역에 종목 코드만 요청하도록 설계해 놓았고 이전에 보냈던 요청 파라미터와 현재 보낸 파라미터와 비교하는 방어 코드를 구현했다.
그리고 가장 중요한 가시성을 잃게되면 브라우저에서 메모리 최적화를 하기 위해 백그라운드 동작을 줄이게 되는데 이 때 소켓 연결이 해제되는 현상이 발생되었다. 그래서 visibilitychange 이벤트를 등록해서 다시 화면으로 돌아올 때 연결 상태 여부를 확인 후 재연결 하도록 설계했다. 이러면 유연하게 소켓을 지속적으로 유지할 수 있게된다.
useWebSocket({ cmd: "요청 값", iscdList, typeList: ["코드 값"] }, (socketData) => {
if (!socketData) return;
if (socketData.type === "1") {
...setState(...);
} else if (socketData.type === "2") {
...setState(...);
} else {
...setState(...);
}
});
사용방법은 간단하다.
useEffect 형태로 호출하고 콜백으로 응답 값을 전달받아 분기처리 하면 된다.
첫 번째, 현재는 수 많은 종목들이 실시간으로 체결된 내역(현재가, 등락률, 등락폭)을 같은 컴포넌트 사용해서 화면에 업데이트 하다 보니까 하나의 종목이 체결돼도 다 같이 컴포넌트가 리렌더링 되고 있는데 체결된 종목만 리렌더링이 이루어지도록 최적화 작업이 필요하다.
두 번째, 가장 고려해야 될 부분은 가장 많은 메모리를 차지하고 있는 호가창에서 체결량이 제일 많은 종목을 선택한 후 다른 사이트에 있다가 다시 원래 화면으로 전환하게 되면 일시적으로 멈추는 현상이 발생한다.
그래서 useRerender hooks를 별도로 만들어서 가시성 여부를 판단하여 컴포넌트에 key값으로 리렌더링 되도록 처리를 하였으나 부드럽게 처리되지는 않았다. 이 점도 유의해서 후처리를 진행해야 할 것 같다.