MessageQueue + SSE로 알림 서버를 구현해봅시다!

HanSH·2025년 1월 11일

NestJS

목록 보기
29/29

들어가기에 앞서...

kafka를 알고계신다면 kafka를 쓰셔도 무방합니다. 지금 당장 kafka를 구축하고 사용하기에는 토이프로젝트에 저걸? 굳이? 라고 판단하여 좀 더 경량화인 RabbitMQ를 사용하였습니다.
Redis를 사용하는 방법도 있으나, 메시지 큐가 존재하지 않는(!!!!) 단점이 있어 그 대안으로 RabbitMQ를 사용하였습니다.
단순히 댓글이 달렸다고 알려주는것 뿐인데 kafka를 써야할까? ← 이게 가장 큽니다.

프로젝트 구조

  1. 서버는 2개

    main nestjs 서버와 sse nestjs 서버로, sse 서버는 micro service로 동작합니다.
    main 서버는 커뮤니티와 관련된 내용을 담당합니다.
    sse 서버는 main 서버에서 알림 데이터를 받아 sse 서버로 쏴주는 역할을 하죠!

  2. 메시지 큐의 존재

    Message Queue로 RabbitMQ를 사용합니다.
    처음 사용해보지만, kafka 구축을 해본 입장으로 쉬울 것이라 생각이 듭니다. 진짜로 구축"만" 해봤습니다

  3. 도표로 본다면...

    a. sse 연결 요청

    b. 댓글 알림 전달

RabbitMQ

RabbitMQ vs Redis vs kafka

RabbitMQ

  • 장점
    • pub/sub 간 메시지가 보장됩니다.
    • 우선순위를 지정할 수 있습니다.
    • 여러 메시징 프로토콜을 지원합니다.
    • 작은 데이터를 전달하는데 유리합니다.
  • 단점
    • 일반적으로 카프카보다 느립니다.
    • 메시지를 수신하면 메시지 큐에서 삭제됩니다.

Kafka

  • 장점
    • 로그 기반으로, 메시지를 수신한 이후에도 일정 기간동안 재조회가 가능합니다.
      • 이 기능 덕분에 하나의 topic에 여러 consumer가 서로 다른 메시지 위치를 읽을 수 있습니다.
    • 한 파티션 내에서는 항상 순서가 보장됩니다.
    • 대용량 메시지 처리에 용이합니다.
    • RabbitMQ보다 빠릅니다.
  • 단점
    • 구축이 어렵습니다. 단일 클러스터면 쉽지만, 클러스터가 늘어나면... 실제로 구축해봤습니다. 생각해야할 부분이 많습니다.
      • 일단 zookeeper가 추가됩니다. 이것만으로도 사용하기 불편합니다.
    • 파티션을 증가시키기는 쉽지만 감소하기는 불가능에 가깝습니다.
    • 로그 기반이라는 특징 때문에 I/O가 느린 시스템이라면 RabbitMQ보다 느릴 가능성이 있습니다.

Redis

  • 장점
    • 별도로 설치할 것이 없고, Redis를 설치하기만 하면 사용 가능합니다.
    • 지연시간이 낮고 속도가 매우 빠릅니다.
  • 단점
    • consumer가 있든 없든 메시지를 무조건 보냅니다(!!!!).

Docker를 활용한 RabbitMQ

docker를 사용해봅시다!

  1. 최대한 경량화하여 사용합니다. 자원은 한정적이니까요...
  2. env를 설정하지 않았다면 username과 password는 guest 입니다.
services:
    rabbitmq:
        image: rabbitmq:4.0.5-management-alpine
        container_name: rabbitmq
        volumes:
            - ./rabbitmq:/var/lib/rabbitmq
        ports:
            - 5672:5672     # rabbitmq 연결 port
            - 15672:15672   # management 페이지 접속 port
        environments:
        	# 아래의 항목을 적는다면 해당 이름으로 초기값이 설정됩니다.
        	- RABBITMQ_DEFAULT_USER=user
            - RABBITMQ_DEFAULT_PASS=password

NestJS에서의 설정

공식 문서

주의!! pub과 sub의 설정 방식이 다릅니다.
같을수도 있지만, 같게 설정하면 동작을 안하더라고요...

  • pub
    • Module에서 import하여 사용
  • sub
    • microservice로 등록하여 사용

공통 env는 아래와 같습니다.

`amqp://${server_url}:${port}` 형식을 사용합니다.
RABBITMQ_URL: amqp://localhost:5672

추가로 아래의 패키지를 설치해줍시다!

npm i --save amqplib amqp-connection-manager @nestjs/microservices

publisher

새로운 Module을 하나 만들어줍니다.

@Global()  <-- Global 데코레이터를 통해 다른 module에서 import 없이 사용할 수 있도록 설정
@Module({
  imports: [
    ClientsModule.registerAsync([
      {
        name: 'RABBITMQ_SERVICE',
        inject: [ConfigService],
        useFactory: async (configService: ConfigService): Promise<ClientProvider> => ({
          transport: Transport.RMQ,
          options: {
            urls: [configService.get<string>('RABBITMQ_URL')],
            queue: configService.get<string>('RABBITMQ_QUEUE'),
          },
        }),
      },
    ]),
  ],
  exports: [ClientsModule],
})
export class RabbitmqModule {}

subscriber

app의 microservice로 동작을 한다고 위에서 말했습니다. 따라서 microservice로 연결을 해줍니다.

// main.ts
  const configService: ConfigService = app.get(ConfigService);
  app.connectMicroservice<MicroserviceOptions>({
    transport: Transport.RMQ,
    options: {
      urls: [configService.get<string>('RABBITMQ_URL')],
      queue: configService.get<string>('RABBITMQ_QUEUE'),
    },
  });
  await app.startAllMicroservices();

NestJS에서의 사용

사용 자체는 간단합니다. pub에서는 emit을 하여 특정 pattern에 값을 전달하고, sub에서는 MessagePattern 데코레이터를 통하여 해당 parttern을 구독하면 됩니다.
pattern을 환경변수화 하거나 enum으로 설정하면 더 좋겠네요!

publisher

// service

  constructor(@Inject('RABBITMQ_SERVICE') private readonly rabbitClient: ClientProxy) {}
  ...
  
  function ...() {
    ...
    // this.rabbitClient.emit(PATTERN: string, data: any);
    this.rabbitClient.emit('notification', { data: commentMessageQueueDto });
    ...
  }

subscriber

// controller

  // @MessagePattern(PATTERN)
  @MessagePattern('notification')
  getNotification(@Payload() payload: any, @Ctx() context: RmqContext) {
    // const msg = context.getMessage();
    // console.log(payload, msg);
    this.appService.handleMessage(payload);
  }

SSE

Server Sent Event로, 양방향인 Socket과는 달리 단방향이 특징입니다. 따라서 단순한 알림에는 적당하죠.
WebSocket은 설정이 조금 복잡하지만(Nginx에서 헤더가 사라진다든가...), SSE는 http를 사용하다보니 별다른 설정 없이도 사용할 수 있다는 것이 장점입니다.

별도의 모듈을 설치할 필요 없이 사용이 가능합니다.
1) HTTP를 사용
2) @nest/common에 존재

NestJS에서의 사용

공식문서

공식문서의 예제는 조금 부적절합니다. 서버에서 원할 때 전송하는게 목적인데 1초마다 전송하고있고...
잘 되는지 테스트 용도로는 쓸만하지만 실사용에서는 조금 지양하는 편이 좋습니다.

실제로 이렇게 간편하게 쓸 순 있지만... 뭔가 잘 안되는 것이 첫번째고, 내가 원할때 전달하는 것은 조금 어렵다 판단하여 다른 방법을 사용하였습니다.

@Sse('sse')
sse(): Observable<MessageEvent> {
  return interval(1000).pipe(map((_) => ({ data: { hello: 'world' } })));
}

위에서 말했다시피 SSE는 HTTP를 사용합니다. 따라서 res 객체와 http 연결이 지속된다면? 괜찮지 않을까요?
GET으로 연결하고 연결을 지속한다는 헤더를 추가하여 전달하면 문제없을겁니다!
Response 객체도 관리할 수 있어 계속 보낼수 있는 장점도 있네요!

연결

// controller
  @Get('connect')
  sse(@Req() req, @Res() res: Response) {
    const user: User = req.user;

    this.appService.onConnect(res, user.id);
    this.appService.onDisconnect(res, user.id);
  }


// service
  onConnect(res: Response, uuid: string) {
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');

    // user의 response session을 map에 저장
    this.connections.set(uuid, res);
    console.log(`connected - ${uuid}`);

    const message = this.getAllMessage(uuid);
    const sseData = this.generateSseData(message);
    res.write(sseData);
  }

  onDisconnect(res: Response, uuid: string) {
    res.on('close', () => {
      this.connections.delete(uuid);
      console.log(`connection stopped - ${uuid}`);
      res.end();
    });
  }

사용

프로젝트에서는 아래와 같이 사용하였습니다.

  1. connection 관리
    sse 연결이 되었다면 connection map에 저장합니다.
    추후 redis에 저장하면 문제 없을것으로 생각이 되지만, 현재 그렇게까지 할 필요가 없다 판단하였습니다.

    sse 연결 -> cookie에서 uuid를 분리 -> uuid: Response 쌍을 map에 저장
  2. message 관리
    sse 연결이 되어있는 경우에는 그냥 값을 전달하면 되지만, 그렇지 않다면 buffer가 필요합니다.
    이를 구현하기 위해 messages map을 두어 관리해줍시다!

  3. 메시지 구조
    아래의 형식을 따릅니다.
    무조건 data: 로 시작하고 \n\n 으로 끝나야합니다. 안그러면 sse 메시지라고 인식을 못해요...

    data: ${text...}\n\n
// service
  private connections = new Map<string, Response>();
  private messages = new Map<string, CommentMessageData[]>();

  private saveOrSendData(data: any) {
    const timestamp = new Date();
    Object.keys(data).forEach(key => {
      const opt = data[key] as CommentMessageQueueDetail;
      const conn = this.connections.get(opt.targetUUID);
      let message;
      if (key === 'comment') message = `게시글 ${opt.title}에 작성한 댓글에 대댓글이 달렸습니다|${opt.comment}`;
      else if (key === 'article') message = `게시글 ${opt.title}에 댓글이 달렸습니다|${opt.comment}`;

      if (!conn) {
        this.appendMessage(opt.targetUUID, { message, articleId: opt.id, timestamp, type: key });
      } else {
        const sseData = this.generateSseData([{ message, articleId: opt.id, timestamp }]);
        conn.write(sseData);
      }
    });
  }

  private generateSseData(data: any): string {
    // sse data는 \n\n 2개가 무조건 있어야 한다
    // 없어서 왜 전송이 안되나.. 했다
    return `data: ${JSON.stringify(data)}\n\n`;
  }

RabbitMQ + SSE 결합

각각을 어떻게 설정하고 사용하는지 간단하게 알아봤으니, 이를 결합해봅시다.
producer 부분은 크게 바뀌는 부분이 없습니다!

이곳은 두 부분으로 나뉩니다.

  1. RabbitMQ에서 데이터를 전달받아 SSE로 전송
  2. SSE 연결 시 기존의 message들을 전달

1. RabbitMQ subscribe

Message Queue endpoint에서 handleMessage 메서드를 사용합니다. 이를 확인해봅시다.

  @MessagePattern('notification')
  getNotification(@Payload() payload: RabbitMQMessageDto, @Ctx() context: RmqContext) {
    // const msg = context.getMessage();
    // console.log(payload, msg);
    this.appService.handleMessage(payload);
  }

실제 동작은 더 복잡하지만, 최대한 간단한 데이터로 나타내었습니다.

  private saveOrSendData(data: any) {
    const timestamp = new Date();
    const conn = this.connections.get(data.targetUUID);                                                 1. connection(Request)를 가져옵니다.
    const message = `게시글 ${data.title}에 댓글이 달렸습니다|${data.comment}`;

    if (!conn) {
      this.appendMessage(opt.targetUUID, { message, articleId: data.id, timestamp, type: key });        2-1. connection이 없다면 message buffer에 등록합니다.
    } else {
      const sseData = this.generateSseData([{ message, articleId: data.id, timestamp }]);               2-2-1. connection이 있다면 MQ에서 받은 데이터를 가공합니다.
      conn.write(sseData);                                                                              2-2-2. sse에 전달합니다.
    }
  }

// 데이터는 아래와 같다고 가정합니다.
data: {
  id: number;
  targetUUID: string;
  title: string;
  comment: string;
}

2. On SSE Connect

아래의 3줄이 핵심입니다. 이 중 getAllMessage 를 알아봅시다.

    const message = this.getAllMessage(uuid);
    const sseData = this.generateSseData(message);
    res.write(sseData);

SSE 연결 시 자신 message queue에 있는 모든 데이터를 가져옵니다.

  private getAllMessage(uuid: string) {
    const result = this.messages.get(uuid).map(res => {
      delete res.type;
      return res;
    });
    this.messages.delete(uuid);
    return result;
  }

이후 Response.write 를 이용하여 sse 메시지를 전달합니다.

주의! Response.send를 쓰면 연결이 종료됩니다.
Response.sendResponse.write + Response.end 의 조합이라고 생각하세요!

noti가 갔는지 확인하는 로직이 없는데요?

main -> sse로 가는 MQ는 있지만 sse -> main으로 전달하는 코드는 없습니다.
단순 댓글 알림은 재송신의 중요도가 떨어지는것으로 판단되어 해당 결정을 내렸습니다.

만약 결제 등 트랜잭션이 필요한 경우에는 일정 시간 후 resonse message가 오지 않는다면 재전송하는 로직이 있어야 했을겁니다. 하지만 댓글은 달렸더라도 삭제되는 경우가 많고, 달렸는지 궁금하면 직접 게시글에 들어가는 경우가 많아 noti가 정상적으로 전송되었는지 확인하는 로직은 따로 작성하지 않았습니다.


회고

메시지 큐 선택 과정

Message Queue와 SSE를 사용하여 실시간 알림을 구현하였습니다.
Message Queue는 아래의 과정을 거쳐 선정하였습니다.

  1. 구축이 용이한가?
  2. 사용하기 편한가?
  3. 자원 소모는 적은가?

1번 항목은 아래의 상황을 검토하여 Redis와 RabbitMQ를 사용하기로 하였습니다.

  • 이미 설치되어있는 Redis
  • docker compose만 하면 되는 RabbitMQ
  • zookeeper까지 사용해야하는 kafka
    • 운용 단계에서 적용하기 위해서는 각각 최소 3개의 노드 필요

2번 항목은 모든 기술들이 사용은 편합니다. 사용 자체만 보면요.
NestJS의 Adpater Pattern을 이용하여 어느 제공자를 쓸 것인지만 밝힌다면 큰 무리 없이 사용할 수 있습니다.
publisher는 emit 메서드를, subscribe는 @MessagePattern 데코레이터를 쓰면 되거든요!

3번 항목은 아래의 상황을 검토하여 RabbitMQ를 사용하기로 하였습니다.

  • 이미 cache로 쓰고 있는 redis
  • 테스트 단계에서는 2개면 되지만 운용 단계에서는 6개의 서버를 돌려야하는 kafka
  • 하나의 서버만 쓰면 되는 RabbitMQ

1~3의 항목에 따라 최종적으로 RabbitMQ를 사용하게 되었습니다.

메시지 전달 기술 선정 과정

SSE, WebSocket, Polling 중 어느 기술을 사용할 것인지 검토하였습니다.

  • SSE
    • 대부분의 웹 브라우저에서 표준으로 지원
    • 단방향 메시지 전송에 유리
    • HTTP 프로토콜을 사용하므로 별도의 정책 수정이 필요 없음
    • but, 서버에 session을 저장해야 함
  • WebSocket
    • 대부분의 웹 브라우저에서 표준으로 지원
    • 양방향 메시지 전송에 유리
    • but, Nginx 등 앞단의 설정이 어려움
    • but, 서버에 session을 저장해야 함
  • Polling
    • 매우 간단한 구현
      • interval을 통한 fetch 요청
    • but, 사용자와 서버 모두 네트워크 사용량 증가
  • FCM
    • 외부 서비스 사용으로 서버의 HTTP 네트워크 부하 감소
    • 강력한 백그라운드 푸시 알림
    • 다양한 플랫폼에 일관된 방식으로 메시지 전달 가능
    • but, 일부 브라우저(IE, Safari)에서 미흡한 지원
    • but, 사용자가 알림을 차단한다면 제공자에서 다시 켤 수 있는 방법이 없음

프로젝트의 댓글 알림 조건을 다시 보면

  1. 서버에서 클라이언트로 알림을 보내기만 하면 됨
    • SSE, FCM
  2. 포그라운드 알림만 필요함
    • SSE

이외에도 여러 부가 조건을 판단하여 SSE 기술을 사용하는것이 좋다 판단하였습니다.

이를 적용하며

공식 문서에서는 프로젝트에 적용하는 방법만 유용하였습니다. 대부분은 검색 엔진에 적용하는 방법을 물어보았습니다.
공식 문서를 보는 것이 좋긴 하지만 유튜브 등 다른 매체를 사용하는것이 도입 입장에서는 더 편한점이 많더라고요...

profile
저는 말하는 싹 난 감자입니다

0개의 댓글