SSE(Server-Sent Events)를 추가해 단방향 통신 하기 (feat. cron issue + 추가 EventEmitter)

최봉수·2023년 12월 26일
0

지난 글을 마지막으로 cron을 통한 스케줄링과 로직 리팩토링을 진행한 후
며칠간 다른 일을 하며 배포하고 사무실 사람들한테 공유한 서비스를 모니터링 중이였다.
사용자(임직원)가 서비스를 보고 있는 중에 서버에서 스케줄링이 실행돼서 데이터가 업데이트되어도 사용자가 보고있던 View에 최신 데이터가 적용되지 않는 현상을 개선하고 싶었다.

그래서 바로 webSocket를 사용하려 했지만 정말 예전에 찍먹해본게 전부라 검색하던 중 SSE라는 걸 알게 되었는데 양방향 통신보다 단방향 통신이 더 적합해 보여서 채택하게 되었다.

SSE, WebSocket, Polling, Long Polling

구현하기 전에 간단하게 찾아봤던 내용들을 특징과 장단점으로 간략하게 정리해두겠습니다.

SSE(Server-Sent Events)

  • 특징
    • 이벤트 스트림은 텍스트 포맷으로, 각 이벤트는 data, event, id 등의 필드를 가집니다.
    • 서버에서 클라이언트로만 데이터를 전송하는 단방향 통신입니다.
    • HTTP 기반의 표준 프로토콜을 사용합니다.
    • 연결이 끊기면 브라우저가 자동으로 재연결을 시도합니다.
    • 모던 브라우저에서 기본적으로 지원합니다.
  • 장점
    • 간단한 구현과 사용이 쉽습니다.
    • 브라우저에서 내장 지원합니다.
    • 서버 푸시를 간단히 구현할 수 있습니다.
  • 단점
    • 단방향 통신으로 양방향 통신이 필요한 경우에는 부적합합니다.
    • 일부 브라우저에서는 오래된 버전이나 특정 설정에서 지원하지 않을 수 있습니다.

WebSocket

  • 특징
    • 서버와 클라이언트 간에 양방향 통신이 가능하며, 데이터를 실시간으로 교환할 수 있습니다.
    • 바이너리 데이터를 전송할 수 있고, 메시지에는 프레임이 추가됩니다.
    • 서버와 클라이언트 간에 독립적인 전이중(full-duplex) 통신이 가능합니다.
  • 장점
    • 실시간성이 뛰어납니다.
    • 연결이 계속 유지됩니다.
  • 단점
    • 일부 방화벽에서는 WebSocket 연결을 차단할 수 있습니다.
    • 다른 방법에 비해 더 많은 리소스를 사용할 수 있습니다.

Polling

  • 특징
    • 클라이언트는 일정한 주기로 서버에 요청을 보내 데이터를 가져옵니다.
  • 장점
    • 구현이 간단합니다.
    • 대부분의 브라우저에서 지원합니다.
    • 서버와 클라이언트 간 통신을 쉽게 설정할 수 있습니다.
  • 단점
    • 실시간성이 떨어지며, 불필요한 네트워크 트래픽을 발생시킬 수 있습니다.
    • 서버에 지속적인 요청이 발생하므로 서버 및 네트워크 부하가 높아질 수 있습니다.

Long Polling

  • 특징
    • 클라이언트가 서버에 연결을 유지하고 서버가 새 데이터를 가지고 올 때까지 응답을 지연시킵니다.
    • Polling에 비해 실시간성이 향상됩니다.
  • 장점
    • Polling에 비해 불필요한 트래픽이 감소됩니다.
    • 클라이언트와 서버 간에 양방향 통신이 필요한 경우에 유용합니다.
  • 단점
    • 여전히 연결 유지를 위한 리소스를 사용하므로, 서버 부하가 계속 발생합니다.
    • 일부 브라우저에서는 동시에 여러 개의 연결을 유지할 수 있는 제한이 있을 수 있습니다.

구현

Client

function createServerSentEvents() {
	const eventSource = new EventSource(SSE_URL);

  	// 서버에서 보낸 데이터를 받을 때 실행
	eventSource.addEventListener('message', function (e) {
		const updatedMenuData = JSON.parse(e.data);
		updateMenuData(updatedMenuData);
		console.log('Receive Server-Sent-Events');
	});

  	// 서버와 연결이 됐을 때 실행
	eventSource.addEventListener('open', function (e) {
		console.log('Connected to Server-Sent-Events');
	});

  	// 서버와 연결에서 에러가 발생했을 때 실행
	eventSource.addEventListener('error', function (e) {
		console.log('Error in Server-Sent-Events');
	});

	// 페이지 이탈 시 SSE 연결 종료
	window.addEventListener('beforeunload', (e) => {
		e.preventDefault();
		eventSource.close();
	});
}

서버에서 데이터를 받기 위한 클라이언트 구현은 위 코드가 전부일 정도로 간단합니다.
한가지 옵션으로는 withCredentials option을 통해서 CORS 설정이 가능하고, 기본 값은 false 입니다.
더욱 자세한 내용은 MDN

Server

const menuDataSSEHandler = {
	tasks: [],
	addTask(clientIp, task) {
		this.tasks.push({ [clientIp]: task });
	},
	getAllTasks() {
		return this.tasks;
	},
	removeTask(clientIp) {
		const removeTask = this.tasks.filter((task) => !task[clientIp]);
		this.tasks = removeTask;
	},
};

app.get('/menuDataSSE', (req, res) => {
	res.writeHead(200, {
		'Content-Type': 'text/event-stream',
		'Cache-Control': 'no-cache',
		Connection: 'keep-alive',
	});

	const clientIp = req.ip.replace(/^::ffff:/, '');

	menuDataSSEHandler.addTask(clientIp, async () => {
		try {
			const latestData = await getParsedLunchJson();
			res.write(`data: ${JSON.stringify(latestData)}\n\n`);
		} catch (error) {
			handleServerError(error);
		}
	});

	req.on('close', () => {
		menuDataSSEHandler.removeTask(clientIp);
	});
});

cron.schedule(
	'30-59/5 11 * * 1-5',
	async () => {
		console.log('업데이트 스케줄링 실행');

		await lunchMenuUpdateTask();

		const sseTasks = menuDataSSEHandler.getAllTasks();

		if (sseTasks.length > 0) {
			const taskPromises = sseTasks.map((task) => Object.values(task)[0]());
			await Promise.all(taskPromises);
		}
	},
	{
		name: 'lunch-menu-update-task',
		timezone: 'Asia/Seoul',
	}
);

위 코드는
1. 서버에서 cron을 통해 스케줄링을 예약해둔다
2. 유저가 접속하면 SSE가 연결되어 스케줄링 실행 시 클라이언트에 전송할 작업을 배열에 담아둔다.
3. 데이터가 업데이트 되면, SSE 작업이 있는지 배열을 체크하여 정해진 로직을 수행한다.

이슈가 발생했었는데 이 부분은 마지막에 적도록 하겠습니다.
이제 맨 위에 코드만 뜯어보면

뜯어보기

app.get('/menuDataSSE', (req, res) => {
  // ...
});

엔드포인트 입니다.

res.writeHead(200, {
	'Content-Type': 'text/event-stream',
	'Cache-Control': 'no-cache',
	Connection: 'keep-alive',
});

헤더입니다.
Content-Type : 'text/event-stream'가 표준으로 정의되어 있습니다.
Cache-Control : 계속 새로운 데이터가 올 것이기 때문에 'no-cache'로 설정합니다.
Connection : 연결을 해제하지 말고 유지해야 하기 때문에 'keep-alive'로 설정합니다.

res.write(`data: ${JSON.stringify(latestData)}\n\n`);

클라이언트에게 보낼 메세지입니다.
필드의 사양은

event : 이벤트 타입입니다. 이 필드가 지정돼 있는 경우에 이벤트는 브라우저 내에서 이벤트명에 해당하는 이벤트 리스너로 전달되고, 따로 지정되어있지 않은 경우 'onmessage' 핸들러가 호출됩니다.

data : 메시지의 데이터 필드입니다. 복수의 행을 전달 할 경우에 \n을 통해서 개행 문자를 삽입해야하며, 마지막 행에는 \n\n을 사용해 끝을 알려야합니다.

// 예시
event: test\n
data: {\n
"user_id": 1\n
"text": "test textd",\n
}\n\n

id : EventSource 객체의 마지막 이벤트 ID 값을 설정할 이벤트 ID입니다. 해당 값을 설정하면 ID를 통해 마지막 이벤트를 추적하여 서버 연결이 끊어지면 특수한 HTTP 헤더 (Last-Event-ID)가 새 요청으로 설정됩니다.

retry : 재연결 시간입니다. 서버에 대한 연결이 끊어지면 브라우저는 지정된 시간 동안 기다렸다가 다시 연결을 시도합니다. 단위는 밀리초

더욱 더 자세한 내용은 MDN

발생했던 이슈 기록

SSE 적용 초기엔 코드가 지금과 조금은 달랐는대 해당 버전에서 cron 업데이트 시 SSE와 서버의 동작에 대해서 익숙치 않아 데이터가 꼬이는 + 인스타그램 Block 이슈가 발생했었다.

원인은 쉽게 파악이 가능했다.

단순히 데이터를 가져와서 클라이언트에 보내주는 것이면 이슈는 없었겠지만, 당시 코드에서는 데이터 패칭도 함께 진행을 했었다.

그렇기 때문에 여러 pc에서 접속 시 데이터 패칭 및 업데이트 로직이 동시에 여러번 실행되어서 이미 이전 로직이 실행 중이고 종료되지 않았는데 새로운 클라이언트의 로직이 중복 실행되면서 오류 발생 하였고, 그로인해 과도한 요청으로 인스타그램에 일정시간 Block을 당한 것으로 유추했다.

(지금은 Jest를 공부하며 통해 테스트 코드를 짜고있다. 어느정도 안정화가 되었다고 자만해 static mock 데이터를 그냥 날려버리고 테스트한 멍청한 나를 반성하면서 말이다.)

찾아보니 위와 같은 상태를 경쟁 상태라고 칭하며 상호배제라는 프로세스가 서로 간에 공유 자원의 동시 접근을 허용하지 않기 위한 기술이있다고 하며, mutex semaphore monitor 등이 대표적이라고 한다.

더 찾아보다 보니까 내 상황에서는 솔루션이 될 수 없었다.
왜냐하면, 현재 서비스는 다수의 유저가 들어와 요청을 해서 데이터를 수정 할 일이 없다.
유저는 그저 최신 데이터를 보기만 하면 된다. 마치 실시간 주가를 보는 것처럼

그러면 어떻게 해야될까.

업데이트 스케줄링은 원래대로 서버에서 독립적으로 실행이 되고, 스케줄링 작업이 완료될 때 SSE를 통하여 클라이언트에 업데이트가 완료된 데이터만 보내주면 될 거 같다.

  1. 연결된 클라이언트의 res를 저장할 객체를 만든다. (class로 만들까 했으나 객체를 재사용 필요가 없고, 간단한 로직이라 더 직관적인 객체리터럴 방식을 사용했다.)

  2. 클라이언트가 접속 시 해당 클라이언트 고유의 task(res)를 만들어 저장해둔다.

  3. 업데이트 스케줄링 실행

  4. 스케줄링 종료 후 저장된 모든 res를 가져와 실행 (클라이언트로 데이터 전달)

  5. 클라이언트 연결 해제 시 저장해뒀던 해당 클라이언트의 req 제거

아쉬운 점은 각 클라이언트의 req객체를 저장하지 말고 스케줄링이 끝난 시점을 감지하여 SSE 코드 블럭 내부에서 클라이언트로 보내줄 순 없을까? useEffect와 useState처럼

아이디어는 있지만 아직은 아무리 생각하고 검색해봐도 좀 처럼 깔끔한 해결 방안이 떠오르지 않는다.
이 부분 역시 노트해두고 해결 방법이 떠오르거나 찾게 된다면 리팩토링을 진행해야겠다.

@추가
Node.js의 EventEmitter라는 것을 이용해 클라이언트에서 사용하던 커스텀 이벤트와 유사한 동작을 수행할 수 있다고 한다. 뭐가 더 적합한지 조금 더 생각해보고 적용하도록 해야겠다.

워 server쪽 구현 코드의 설명인데, 나는 알겠지만 다른 사람이 볼 때 잘 풀어 설명했는지는 모르겠다..

*EventEmitter로 변경

서버에서도 이벤트를 만들어서 발생시킬 수 있다는 것을 발견하고, 이게 더 낫겠다 싶었습니다.
그래서 간단한 사용법 검색과 공식문서를 통해 기본 이벤트 객체를 확장하여 변경해봤습니다.

// server
// import도 가능
const EventEmitter = require('events');

const scheduleEvent = new EventEmitter();

app.get('/menuDataSSE', (req, res) => {
	res.writeHead(200, {
		'Content-Type': 'text/event-stream',
		'Cache-Control': 'no-cache',
		Connection: 'keep-alive',
	});

	async function sendLunchDataToClient() {
		try {
			const latestData = await getParsedLunchJson();
			res.write(`data: ${JSON.stringify(latestData)}\n\n`);
		} catch (error) {
			handleServerError(error, `'${req.ip}'와의 통신 중 오류가 발생했습니다.`);
		}
	}

	scheduleEvent.addListener('complete', sendLunchDataToClient);

	req.on('close', () => {
		scheduleEvent.removeListener('complete', sendLunchDataToClient);
	});
});

cron.schedule(
	'*/1 * * * 1-5',
	async () => {
		console.log('업데이트 스케줄링 실행');
		await lunchMenuUpdateTask();
		scheduleEvent.emit('complete');
	},
	{
		name: 'lunch-menu-update-task',
		timezone: 'Asia/Seoul',
	}
);

사용자가 브라우저에 접속하여 SSE 연결이 될 때 생성한 이벤트 객체로 리스너를 등록합니다.
새로고침, 브라우저 닫기 등 SSE 연결이 끊길 때 등록했던 리스너를 제거합니다.
제거하는 이유는 해당 코드는 클라이언트 코드가 아니라 사용자가 브라우저를 닫아도 리스너는 이벤트 큐(Event Queue)에 쌓이게 됩니다.
해당 큐를 관리하지 않으면 MaxListenersExceededWarning 경고를 뿜게됩니다.

const EventEmitter = require('events');

class CustomEmitter extends EventEmitter {
	constructor() {
		super();
	}

	something(params) {
	}
}

const customEmitter = new CustomEmitter();

위 처럼 기본 Class를 확장시켜서 사용할 수도 있습니다.
해당 Class의 자세한 정보는 공식문서에서 확인이 가능하며, Node의 Core에서 확인할 수 있습니다. (node_modules/@types/node/events.d.ts)

처음 구현했던 방식과 큰 맥락은 비슷하지만 SSE 연결 시 등록해놨던 모든 Task를 스케줄링이 끝난 후 Promise.all을 통해서 처리하는 부분이 SSE 연결 시 EventListener를 등록해두고 스케줄링이 끝난 후 emit을 통해 이벤트를 발생시켜 등록된 모든 이벤트를 한번에 실행시키는 점이 달라졌습니다.

현재는 틈틈히 jest를 공부하고 있는데 앞으로는 테스트 코드를 추가해 볼 생각입니다.

profile
돈이 좋아

0개의 댓글