kafka를 알고계신다면 kafka를 쓰셔도 무방합니다. 지금 당장 kafka를 구축하고 사용하기에는 토이프로젝트에 저걸? 굳이? 라고 판단하여 좀 더 경량화인 RabbitMQ를 사용하였습니다.
Redis를 사용하는 방법도 있으나, 메시지 큐가 존재하지 않는(!!!!) 단점이 있어 그 대안으로 RabbitMQ를 사용하였습니다.
단순히 댓글이 달렸다고 알려주는것 뿐인데 kafka를 써야할까? ← 이게 가장 큽니다.
서버는 2개
main nestjs 서버와 sse nestjs 서버로, sse 서버는 micro service로 동작합니다.
main 서버는 커뮤니티와 관련된 내용을 담당합니다.
sse 서버는 main 서버에서 알림 데이터를 받아 sse 서버로 쏴주는 역할을 하죠!
메시지 큐의 존재
Message Queue로 RabbitMQ를 사용합니다.
처음 사용해보지만, kafka 구축을 해본 입장으로 쉬울 것이라 생각이 듭니다. 진짜로 구축"만" 해봤습니다
도표로 본다면...
a. sse 연결 요청
b. 댓글 알림 전달
docker를 사용해봅시다!
services:
rabbitmq:
image: rabbitmq:4.0.5-management-alpine
container_name: rabbitmq
volumes:
- ./rabbitmq:/var/lib/rabbitmq
ports:
- 5672:5672 # rabbitmq 연결 port
- 15672:15672 # management 페이지 접속 port
environments:
# 아래의 항목을 적는다면 해당 이름으로 초기값이 설정됩니다.
- RABBITMQ_DEFAULT_USER=user
- RABBITMQ_DEFAULT_PASS=password
주의!! pub과 sub의 설정 방식이 다릅니다.
같을수도 있지만, 같게 설정하면 동작을 안하더라고요...
공통 env는 아래와 같습니다.
`amqp://${server_url}:${port}` 형식을 사용합니다.
RABBITMQ_URL: amqp://localhost:5672
추가로 아래의 패키지를 설치해줍시다!
npm i --save amqplib amqp-connection-manager @nestjs/microservices
새로운 Module을 하나 만들어줍니다.
@Global() <-- Global 데코레이터를 통해 다른 module에서 import 없이 사용할 수 있도록 설정
@Module({
imports: [
ClientsModule.registerAsync([
{
name: 'RABBITMQ_SERVICE',
inject: [ConfigService],
useFactory: async (configService: ConfigService): Promise<ClientProvider> => ({
transport: Transport.RMQ,
options: {
urls: [configService.get<string>('RABBITMQ_URL')],
queue: configService.get<string>('RABBITMQ_QUEUE'),
},
}),
},
]),
],
exports: [ClientsModule],
})
export class RabbitmqModule {}
app의 microservice로 동작을 한다고 위에서 말했습니다. 따라서 microservice로 연결을 해줍니다.
// main.ts
const configService: ConfigService = app.get(ConfigService);
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.RMQ,
options: {
urls: [configService.get<string>('RABBITMQ_URL')],
queue: configService.get<string>('RABBITMQ_QUEUE'),
},
});
await app.startAllMicroservices();
사용 자체는 간단합니다. pub에서는 emit을 하여 특정 pattern에 값을 전달하고, sub에서는 MessagePattern 데코레이터를 통하여 해당 parttern을 구독하면 됩니다.
pattern을 환경변수화 하거나 enum으로 설정하면 더 좋겠네요!
// service
constructor(@Inject('RABBITMQ_SERVICE') private readonly rabbitClient: ClientProxy) {}
...
function ...() {
...
// this.rabbitClient.emit(PATTERN: string, data: any);
this.rabbitClient.emit('notification', { data: commentMessageQueueDto });
...
}
// controller
// @MessagePattern(PATTERN)
@MessagePattern('notification')
getNotification(@Payload() payload: any, @Ctx() context: RmqContext) {
// const msg = context.getMessage();
// console.log(payload, msg);
this.appService.handleMessage(payload);
}
Server Sent Event로, 양방향인 Socket과는 달리 단방향이 특징입니다. 따라서 단순한 알림에는 적당하죠.
WebSocket은 설정이 조금 복잡하지만(Nginx에서 헤더가 사라진다든가...), SSE는 http를 사용하다보니 별다른 설정 없이도 사용할 수 있다는 것이 장점입니다.
별도의 모듈을 설치할 필요 없이 사용이 가능합니다.
1) HTTP를 사용
2) @nest/common에 존재
공식문서의 예제는 조금 부적절합니다. 서버에서 원할 때 전송하는게 목적인데 1초마다 전송하고있고...
잘 되는지 테스트 용도로는 쓸만하지만 실사용에서는 조금 지양하는 편이 좋습니다.
실제로 이렇게 간편하게 쓸 순 있지만... 뭔가 잘 안되는 것이 첫번째고, 내가 원할때 전달하는 것은 조금 어렵다 판단하여 다른 방법을 사용하였습니다.
@Sse('sse')
sse(): Observable<MessageEvent> {
return interval(1000).pipe(map((_) => ({ data: { hello: 'world' } })));
}
위에서 말했다시피 SSE는 HTTP를 사용합니다. 따라서 res 객체와 http 연결이 지속된다면? 괜찮지 않을까요?
GET으로 연결하고 연결을 지속한다는 헤더를 추가하여 전달하면 문제없을겁니다!
Response 객체도 관리할 수 있어 계속 보낼수 있는 장점도 있네요!
// controller
@Get('connect')
sse(@Req() req, @Res() res: Response) {
const user: User = req.user;
this.appService.onConnect(res, user.id);
this.appService.onDisconnect(res, user.id);
}
// service
onConnect(res: Response, uuid: string) {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// user의 response session을 map에 저장
this.connections.set(uuid, res);
console.log(`connected - ${uuid}`);
const message = this.getAllMessage(uuid);
const sseData = this.generateSseData(message);
res.write(sseData);
}
onDisconnect(res: Response, uuid: string) {
res.on('close', () => {
this.connections.delete(uuid);
console.log(`connection stopped - ${uuid}`);
res.end();
});
}
프로젝트에서는 아래와 같이 사용하였습니다.
connection 관리
sse 연결이 되었다면 connection map에 저장합니다.
추후 redis에 저장하면 문제 없을것으로 생각이 되지만, 현재 그렇게까지 할 필요가 없다 판단하였습니다.
sse 연결 -> cookie에서 uuid를 분리 -> uuid: Response 쌍을 map에 저장
message 관리
sse 연결이 되어있는 경우에는 그냥 값을 전달하면 되지만, 그렇지 않다면 buffer가 필요합니다.
이를 구현하기 위해 messages map을 두어 관리해줍시다!
메시지 구조
아래의 형식을 따릅니다.
무조건 data: 로 시작하고 \n\n 으로 끝나야합니다. 안그러면 sse 메시지라고 인식을 못해요...
data: ${text...}\n\n
// service
private connections = new Map<string, Response>();
private messages = new Map<string, CommentMessageData[]>();
private saveOrSendData(data: any) {
const timestamp = new Date();
Object.keys(data).forEach(key => {
const opt = data[key] as CommentMessageQueueDetail;
const conn = this.connections.get(opt.targetUUID);
let message;
if (key === 'comment') message = `게시글 ${opt.title}에 작성한 댓글에 대댓글이 달렸습니다|${opt.comment}`;
else if (key === 'article') message = `게시글 ${opt.title}에 댓글이 달렸습니다|${opt.comment}`;
if (!conn) {
this.appendMessage(opt.targetUUID, { message, articleId: opt.id, timestamp, type: key });
} else {
const sseData = this.generateSseData([{ message, articleId: opt.id, timestamp }]);
conn.write(sseData);
}
});
}
private generateSseData(data: any): string {
// sse data는 \n\n 2개가 무조건 있어야 한다
// 없어서 왜 전송이 안되나.. 했다
return `data: ${JSON.stringify(data)}\n\n`;
}
각각을 어떻게 설정하고 사용하는지 간단하게 알아봤으니, 이를 결합해봅시다.
producer 부분은 크게 바뀌는 부분이 없습니다!
이곳은 두 부분으로 나뉩니다.
Message Queue endpoint에서 handleMessage 메서드를 사용합니다. 이를 확인해봅시다.
@MessagePattern('notification')
getNotification(@Payload() payload: RabbitMQMessageDto, @Ctx() context: RmqContext) {
// const msg = context.getMessage();
// console.log(payload, msg);
this.appService.handleMessage(payload);
}
실제 동작은 더 복잡하지만, 최대한 간단한 데이터로 나타내었습니다.
private saveOrSendData(data: any) {
const timestamp = new Date();
const conn = this.connections.get(data.targetUUID); 1. connection(Request)를 가져옵니다.
const message = `게시글 ${data.title}에 댓글이 달렸습니다|${data.comment}`;
if (!conn) {
this.appendMessage(opt.targetUUID, { message, articleId: data.id, timestamp, type: key }); 2-1. connection이 없다면 message buffer에 등록합니다.
} else {
const sseData = this.generateSseData([{ message, articleId: data.id, timestamp }]); 2-2-1. connection이 있다면 MQ에서 받은 데이터를 가공합니다.
conn.write(sseData); 2-2-2. sse에 전달합니다.
}
}
// 데이터는 아래와 같다고 가정합니다.
data: {
id: number;
targetUUID: string;
title: string;
comment: string;
}
아래의 3줄이 핵심입니다. 이 중 getAllMessage 를 알아봅시다.
const message = this.getAllMessage(uuid);
const sseData = this.generateSseData(message);
res.write(sseData);
SSE 연결 시 자신 message queue에 있는 모든 데이터를 가져옵니다.
private getAllMessage(uuid: string) {
const result = this.messages.get(uuid).map(res => {
delete res.type;
return res;
});
this.messages.delete(uuid);
return result;
}
이후 Response.write 를 이용하여 sse 메시지를 전달합니다.
주의!
Response.send를 쓰면 연결이 종료됩니다.
Response.send는Response.write+Response.end의 조합이라고 생각하세요!
main -> sse로 가는 MQ는 있지만 sse -> main으로 전달하는 코드는 없습니다.
단순 댓글 알림은 재송신의 중요도가 떨어지는것으로 판단되어 해당 결정을 내렸습니다.
만약 결제 등 트랜잭션이 필요한 경우에는 일정 시간 후 resonse message가 오지 않는다면 재전송하는 로직이 있어야 했을겁니다. 하지만 댓글은 달렸더라도 삭제되는 경우가 많고, 달렸는지 궁금하면 직접 게시글에 들어가는 경우가 많아 noti가 정상적으로 전송되었는지 확인하는 로직은 따로 작성하지 않았습니다.
Message Queue와 SSE를 사용하여 실시간 알림을 구현하였습니다.
Message Queue는 아래의 과정을 거쳐 선정하였습니다.
1번 항목은 아래의 상황을 검토하여 Redis와 RabbitMQ를 사용하기로 하였습니다.
2번 항목은 모든 기술들이 사용은 편합니다. 사용 자체만 보면요.
NestJS의 Adpater Pattern을 이용하여 어느 제공자를 쓸 것인지만 밝힌다면 큰 무리 없이 사용할 수 있습니다.
publisher는 emit 메서드를, subscribe는 @MessagePattern 데코레이터를 쓰면 되거든요!
3번 항목은 아래의 상황을 검토하여 RabbitMQ를 사용하기로 하였습니다.
1~3의 항목에 따라 최종적으로 RabbitMQ를 사용하게 되었습니다.
SSE, WebSocket, Polling 중 어느 기술을 사용할 것인지 검토하였습니다.
프로젝트의 댓글 알림 조건을 다시 보면
이외에도 여러 부가 조건을 판단하여 SSE 기술을 사용하는것이 좋다 판단하였습니다.
공식 문서에서는 프로젝트에 적용하는 방법만 유용하였습니다. 대부분은 검색 엔진에 적용하는 방법을 물어보았습니다.
공식 문서를 보는 것이 좋긴 하지만 유튜브 등 다른 매체를 사용하는것이 도입 입장에서는 더 편한점이 많더라고요...