NestJS - SSE

최제원·2023년 8월 24일
1

SSE

목록 보기
2/2
post-thumbnail

해당 글은 NsetJS SSE 시스템 구축에 대한 예제에 대해서 다뤄볼 예정이다

참고로 본인은 처음 기술에 대해 접하면서 공식문서를 잘 살펴보지는 않는 편이다 (고쳐야됨..)

따라서 처음으로 공식문서를 살펴보면서 추가적으로 왜 공식문서의 예제를 채택하지 않으며 진행하게 되었는지도 함께 작성할 예정이다

NestJS Documents


SSE(Server-Sent Events)는 클라이언트가 HTTP 연결을 통해 서버로부터 자동 업데이트를 받을 수 있도록 하는
서버 푸시 기술입니다.

NestJS 공식홈페이지 SSE는 SSE를 위와 같이 설명하고 있다 하지만 예제 코드를 보면 뭔가 본인은 이상하다는 느낌을 받았다

@Sse('sse')
sse(): Observable<MessageEvent> {
  return interval(1000).pipe(map((_) => ({ data: { hello: 'world' } })));
}

경고
서버에서 보낸 이벤트 경로는 Observable스트림을 반환해야 합니다

해당 소스코드를 보고 바로 나는 예제 코드를 버리고 다른 방식을 찾으러 간 것 같다 내 이해력이 이상한것 일수도 있지만

소스코드를 살펴보면 sse 도메인은 1000ms 즉 1초마다 해당 데이터를 pipe를 통해서 밀어넣고 있다

실제로 해당 데이터를 찍어보면 sse도메인은 { hello: world } 를 무한으로 찍어낸다

나는 해당 방식은 SSE가 아닌 Polling 방식에 가깝다고 생각하고 있다 Http Request를 일정 간격으로 계속해서 요청해서 이벤트를 찍어내기 때문이다

Example Code


해당 코드는 간략하게 줄여낸 SSE 구현 방식이다 해당 코드를 실직적으로 내 server에서 사용하는것은 아니기 때문에
간략하게 controller 하나만을 사용해서 구현했다 소스코드의 주석으로 해당 코드의 동작을 설명할 예정이다
코드를 보기 앞서 RxJS를 알지 못 한다면 RxJS 공식문서를 참고하여 해당 코드를 소화하자

import { Body, Controller, Param, Post, Sse } from '@nestjs/common';
import { filter, map, Observable, Subject } from 'rxjs';
import { Request } from 'express';

// Subject의 타입을 지정하기 위한 class 선언 
class Users {
  id: number;
  nickname: string;
  level: number;
}

@Controller('sse')
export class SseController {
  // [RxJS] Subject 선언 타입은 Users이다 
  private users$: Subject<Users> = new Subject();

  // 앞서 선언한 Subjcet를 Observable한 객체로 선언
  private observer = this.users$.asObservable();

    //접속한 브라우저의 커넥션을 담을 객체
  private stream: {
    id: string;
    subject: ReplaySubject<unknown>;
    observer: Observable<unknown>;
  }[] = [];

  // 예시 데이터 ( DB 데이터라고 생각하자 )
  users = [
    {
      id: 1,
      nickname: 'jewon',
      level: 1,
    },
    { id: 2, nickname: 'je', level: 2 },
    {
      id: 3,
      nickname: 'won',
      level: 3,
    },
  ];

  // [RxJS] User의 레벨 변화를 감시할 함수, 레벨업이 진행되면 Observable한 Subject에 next로 push
  onUserLevelChange(userId: number, nickname: string, level: number) {
    this.users$.next({ id: userId, nickname, level });
  }

 //브라우저가 접속할 때 해당 스트림을 담아 둡니다.
  private addStream(
    subject: ReplaySubject<unknown>,
    observer: Observable<unknown>,
    id: string,
  ): void {
    this.stream.push({
      id,
      subject,
      observer,
    });
  }

  // 본격 sse domain 함수
  @Sse(':userId')
  sse(@Param('userId') userId: string, @Req() req: Request): Observable<any> {
    // stream의 해당 유저를 담아준다, 유저 자신만의 stream 서버가 된다 (본 코드에서는 Req를 이용했다)
    this.addStream(this.users$, this.observer, userId);
    
    // 해당 유저가 브라우저를 종료하면 User id를 갖은 stream 제거
	req.on('close', () => this.removeStream(req['user'].id.toString()));

    return this.observer.pipe(
      // 해당 유저에게만 데이터를 송신할것이기 때문에 filter로 전송 대상 stream을 제한
      filter((user) => user.id === Number(userId)),
      map(
        (user) =>
          ({
            // 수신할 데이터의 형태, 데이터
            data: {
              userId: user.id,
              level: user.level,
              nickname: user.nickname,
            },
          } as MessageEvent),
      ),
    );
  }

  @Post()
  updateUser(@Body('userId') userId: number) {
    const existingUser = this.users.find((user) => user.id === userId);

    existingUser.level++;
    
    // 변화시에 알맞는 데이터를 next (push)
    this.onUserLevelChange(
      existingUser.id,
      existingUser.nickname,
      existingUser.level,
    );

    return existingUser;
  }
}

SSE


생소한 기술이라 제대로 된 구현인지는 파악하기가 힘들었지만 나름 리소스 낭비도 없이 잘 처리 한 것 같다
해당 user에게만 데이터를 송신하며 다른 유저들에겐 어떠한 이벤트도 일어나지 않았다 참고로 ReplaySubject<> 등을 사용하면
가장 최신 데이터를 접속하는 동시에 수신하는 방법이 존재하는둥 여러가지 사용방법이 있지만 나는 유저가 접속해있는 상황에
수신한 쪽지를 실시간으로 가져오는 서비스이기 때문에 전 데이터는 필요로 하지 않아서 채택하지 않았다

profile
Why & How

0개의 댓글