지난 글을 마지막으로 cron을 통한 스케줄링과 로직 리팩토링을 진행한 후
며칠간 다른 일을 하며 배포하고 사무실 사람들한테 공유한 서비스를 모니터링 중이였다.
사용자(임직원)가 서비스를 보고 있는 중에 서버에서 스케줄링이 실행돼서 데이터가 업데이트되어도 사용자가 보고있던 View에 최신 데이터가 적용되지 않는 현상을 개선하고 싶었다.
그래서 바로 webSocket를 사용하려 했지만 정말 예전에 찍먹해본게 전부라 검색하던 중 SSE라는 걸 알게 되었는데 양방향 통신보다 단방향 통신이 더 적합해 보여서 채택하게 되었다.
구현하기 전에 간단하게 찾아봤던 내용들을 특징과 장단점으로 간략하게 정리해두겠습니다.
SSE(Server-Sent Events)
data
, event
, id
등의 필드를 가집니다.WebSocket
Polling
Long Polling
function createServerSentEvents() {
const eventSource = new EventSource(SSE_URL);
// 서버에서 보낸 데이터를 받을 때 실행
eventSource.addEventListener('message', function (e) {
const updatedMenuData = JSON.parse(e.data);
updateMenuData(updatedMenuData);
console.log('Receive Server-Sent-Events');
});
// 서버와 연결이 됐을 때 실행
eventSource.addEventListener('open', function (e) {
console.log('Connected to Server-Sent-Events');
});
// 서버와 연결에서 에러가 발생했을 때 실행
eventSource.addEventListener('error', function (e) {
console.log('Error in Server-Sent-Events');
});
// 페이지 이탈 시 SSE 연결 종료
window.addEventListener('beforeunload', (e) => {
e.preventDefault();
eventSource.close();
});
}
서버에서 데이터를 받기 위한 클라이언트 구현은 위 코드가 전부일 정도로 간단합니다.
한가지 옵션으로는 withCredentials
option을 통해서 CORS 설정이 가능하고, 기본 값은 false
입니다.
더욱 자세한 내용은 MDN
const menuDataSSEHandler = {
tasks: [],
addTask(clientIp, task) {
this.tasks.push({ [clientIp]: task });
},
getAllTasks() {
return this.tasks;
},
removeTask(clientIp) {
const removeTask = this.tasks.filter((task) => !task[clientIp]);
this.tasks = removeTask;
},
};
app.get('/menuDataSSE', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
const clientIp = req.ip.replace(/^::ffff:/, '');
menuDataSSEHandler.addTask(clientIp, async () => {
try {
const latestData = await getParsedLunchJson();
res.write(`data: ${JSON.stringify(latestData)}\n\n`);
} catch (error) {
handleServerError(error);
}
});
req.on('close', () => {
menuDataSSEHandler.removeTask(clientIp);
});
});
cron.schedule(
'30-59/5 11 * * 1-5',
async () => {
console.log('업데이트 스케줄링 실행');
await lunchMenuUpdateTask();
const sseTasks = menuDataSSEHandler.getAllTasks();
if (sseTasks.length > 0) {
const taskPromises = sseTasks.map((task) => Object.values(task)[0]());
await Promise.all(taskPromises);
}
},
{
name: 'lunch-menu-update-task',
timezone: 'Asia/Seoul',
}
);
위 코드는
1. 서버에서 cron을 통해 스케줄링을 예약해둔다
2. 유저가 접속하면 SSE가 연결되어 스케줄링 실행 시 클라이언트에 전송할 작업을 배열에 담아둔다.
3. 데이터가 업데이트 되면, SSE 작업이 있는지 배열을 체크하여 정해진 로직을 수행한다.
이슈가 발생했었는데 이 부분은 마지막에 적도록 하겠습니다.
이제 맨 위에 코드만 뜯어보면
app.get('/menuDataSSE', (req, res) => {
// ...
});
엔드포인트 입니다.
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
헤더입니다.
Content-Type
: 'text/event-stream'가 표준으로 정의되어 있습니다.
Cache-Control
: 계속 새로운 데이터가 올 것이기 때문에 'no-cache'로 설정합니다.
Connection
: 연결을 해제하지 말고 유지해야 하기 때문에 'keep-alive'로 설정합니다.
res.write(`data: ${JSON.stringify(latestData)}\n\n`);
클라이언트에게 보낼 메세지입니다.
필드의 사양은
event
: 이벤트 타입입니다. 이 필드가 지정돼 있는 경우에 이벤트는 브라우저 내에서 이벤트명에 해당하는 이벤트 리스너로 전달되고, 따로 지정되어있지 않은 경우 'onmessage' 핸들러가 호출됩니다.
data
: 메시지의 데이터 필드입니다. 복수의 행을 전달 할 경우에 \n
을 통해서 개행 문자를 삽입해야하며, 마지막 행에는 \n\n
을 사용해 끝을 알려야합니다.
// 예시
event: test\n
data: {\n
"user_id": 1\n
"text": "test textd",\n
}\n\n
id
: EventSource 객체의 마지막 이벤트 ID 값을 설정할 이벤트 ID입니다. 해당 값을 설정하면 ID를 통해 마지막 이벤트를 추적하여 서버 연결이 끊어지면 특수한 HTTP 헤더 (Last-Event-ID)가 새 요청으로 설정됩니다.
retry
: 재연결 시간입니다. 서버에 대한 연결이 끊어지면 브라우저는 지정된 시간 동안 기다렸다가 다시 연결을 시도합니다. 단위는 밀리초
더욱 더 자세한 내용은 MDN
SSE 적용 초기엔 코드가 지금과 조금은 달랐는대 해당 버전에서 cron 업데이트 시 SSE와 서버의 동작에 대해서 익숙치 않아 데이터가 꼬이는 + 인스타그램 Block 이슈가 발생했었다.
원인은 쉽게 파악이 가능했다.
단순히 데이터를 가져와서 클라이언트에 보내주는 것이면 이슈는 없었겠지만, 당시 코드에서는 데이터 패칭도 함께 진행을 했었다.
그렇기 때문에 여러 pc에서 접속 시 데이터 패칭 및 업데이트 로직이 동시에 여러번 실행되어서 이미 이전 로직이 실행 중이고 종료되지 않았는데 새로운 클라이언트의 로직이 중복 실행되면서 오류 발생 하였고, 그로인해 과도한 요청으로 인스타그램에 일정시간 Block을 당한 것으로 유추했다.
(지금은 Jest를 공부하며 통해 테스트 코드를 짜고있다. 어느정도 안정화가 되었다고 자만해 static mock 데이터를 그냥 날려버리고 테스트한 멍청한 나를 반성하면서 말이다.)
찾아보니 위와 같은 상태를 경쟁 상태라고 칭하며 상호배제라는 프로세스가 서로 간에 공유 자원의 동시 접근을 허용하지 않기 위한 기술이있다고 하며, mutex
semaphore
monitor
등이 대표적이라고 한다.
더 찾아보다 보니까 내 상황에서는 솔루션이 될 수 없었다.
왜냐하면, 현재 서비스는 다수의 유저가 들어와 요청을 해서 데이터를 수정 할 일이 없다.
유저는 그저 최신 데이터를 보기만 하면 된다. 마치 실시간 주가를 보는 것처럼
그러면 어떻게 해야될까.
업데이트 스케줄링은 원래대로 서버에서 독립적으로 실행이 되고, 스케줄링 작업이 완료될 때 SSE를 통하여 클라이언트에 업데이트가 완료된 데이터만 보내주면 될 거 같다.
연결된 클라이언트의 res를 저장할 객체를 만든다. (class로 만들까 했으나 객체를 재사용 필요가 없고, 간단한 로직이라 더 직관적인 객체리터럴 방식을 사용했다.)
클라이언트가 접속 시 해당 클라이언트 고유의 task(res)를 만들어 저장해둔다.
업데이트 스케줄링 실행
스케줄링 종료 후 저장된 모든 res를 가져와 실행 (클라이언트로 데이터 전달)
클라이언트 연결 해제 시 저장해뒀던 해당 클라이언트의 req 제거
아쉬운 점은 각 클라이언트의 req객체를 저장하지 말고 스케줄링이 끝난 시점을 감지하여 SSE 코드 블럭 내부에서 클라이언트로 보내줄 순 없을까? useEffect와 useState처럼
아이디어는 있지만 아직은 아무리 생각하고 검색해봐도 좀 처럼 깔끔한 해결 방안이 떠오르지 않는다.
이 부분 역시 노트해두고 해결 방법이 떠오르거나 찾게 된다면 리팩토링을 진행해야겠다.
@추가
Node.js의 EventEmitter라는 것을 이용해 클라이언트에서 사용하던 커스텀 이벤트와 유사한 동작을 수행할 수 있다고 한다. 뭐가 더 적합한지 조금 더 생각해보고 적용하도록 해야겠다.
워 server쪽 구현 코드의 설명인데, 나는 알겠지만 다른 사람이 볼 때 잘 풀어 설명했는지는 모르겠다..
서버에서도 이벤트를 만들어서 발생시킬 수 있다는 것을 발견하고, 이게 더 낫겠다 싶었습니다.
그래서 간단한 사용법 검색과 공식문서를 통해 기본 이벤트 객체를 확장하여 변경해봤습니다.
// server
// import도 가능
const EventEmitter = require('events');
const scheduleEvent = new EventEmitter();
app.get('/menuDataSSE', (req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
async function sendLunchDataToClient() {
try {
const latestData = await getParsedLunchJson();
res.write(`data: ${JSON.stringify(latestData)}\n\n`);
} catch (error) {
handleServerError(error, `'${req.ip}'와의 통신 중 오류가 발생했습니다.`);
}
}
scheduleEvent.addListener('complete', sendLunchDataToClient);
req.on('close', () => {
scheduleEvent.removeListener('complete', sendLunchDataToClient);
});
});
cron.schedule(
'*/1 * * * 1-5',
async () => {
console.log('업데이트 스케줄링 실행');
await lunchMenuUpdateTask();
scheduleEvent.emit('complete');
},
{
name: 'lunch-menu-update-task',
timezone: 'Asia/Seoul',
}
);
사용자가 브라우저에 접속하여 SSE 연결이 될 때 생성한 이벤트 객체로 리스너를 등록합니다.
새로고침, 브라우저 닫기 등 SSE 연결이 끊길 때 등록했던 리스너를 제거합니다.
제거하는 이유는 해당 코드는 클라이언트 코드가 아니라 사용자가 브라우저를 닫아도 리스너는 이벤트 큐(Event Queue)에 쌓이게 됩니다.
해당 큐를 관리하지 않으면 MaxListenersExceededWarning
경고를 뿜게됩니다.
const EventEmitter = require('events');
class CustomEmitter extends EventEmitter {
constructor() {
super();
}
something(params) {
}
}
const customEmitter = new CustomEmitter();
위 처럼 기본 Class를 확장시켜서 사용할 수도 있습니다.
해당 Class의 자세한 정보는 공식문서에서 확인이 가능하며, Node의 Core에서 확인할 수 있습니다. (node_modules/@types/node/events.d.ts)
처음 구현했던 방식과 큰 맥락은 비슷하지만 SSE 연결 시 등록해놨던 모든 Task
를 스케줄링이 끝난 후 Promise.all
을 통해서 처리하는 부분이 SSE 연결 시 EventListener
를 등록해두고 스케줄링이 끝난 후 emit
을 통해 이벤트를 발생시켜 등록된 모든 이벤트를 한번에 실행시키는 점이 달라졌습니다.
현재는 틈틈히 jest를 공부하고 있는데 앞으로는 테스트 코드를 추가해 볼 생각입니다.