SocketIO Adapter (with. nestjs)

Jae Min·2023년 11월 4일
0
post-thumbnail
post-custom-banner

"소켓에 대해서 저 스스로 잘 모른다고 생각하여 정리한 글입니다"

sokcet 은 기본적으로, http 와 같이 프로토콜의 한 종류이며, http는 단방향 통신에 한번 통신을 하고 나면 연결을 끊어버리는 성격과 다르게, socket 은 양방향 통신에 한번 연결을 해도 끊어버리지 않고, 연결을 유지하기 때문에, 실시간으로 소통해야 하는 경우에 자주 사용된다. 예를 들면, 채팅 같이 실시간 통신을 하기 위한 기능에 자주 사용된다.

socket 을 사용되는 기술에는 websocket 과 socketio 가 있는데 차이점은 아래와 같다.

WebSocket 과 socketIO 의 차이점

ws 는 프로토콜이다(웹소켓이 등장하기 전에는 http 로만 통신을 했기 때문에 주로 polling 을 사용함)
io 는 node로 만든 websocket 기반 양방향 통신 모듈(라이브러리)이다.
ws 를 지원하지 못하는 브라우저가 많아서 io가 생김.

그러면 어떻게 구현할 수 있는지 알아보자.

install package

$ npm i --save @nestjs/websockets @nestjs/platform-socket.io
$ npm i --save-dev @types/socket.io

architecture

socket 서버를 열기 위해서는 일단, nest 서버가 있어야 한다.
그리고 nest 서버에서 socket 서버를 주입받아서 사용한다.
어떻게 되어 있는지 코드를 통해 알아보자.

// ws.gateway.ts
@WebSocketGateway(80)
export class WsKorGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
    constructor(private readonly logger: Logger){}
  ... 이하 생략
}

// ws.module.ts
@Module({
    providers: [WsKorGateway, Logger]
})
export class WsKorModule {
}

socket 서버만을 여는 socket gateway 모듈화 시켜준다.

// app.module.ts
@Module({
  imports: [WsKorModule, WsUsaModule],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

socket 모듈(WsKorModule)을 app module 에 주입해준다.

app module 에서는 단순하게 socket 서버를 여는 방법으로 연결을 할 수도 있지만, socket 서버만 달랑 하나 여는 것 보다, adapter 를 이용하면, 여러 socket server 를 통합하여 관리할 수 있다.
아래는 1. socket io adapter 또는 2. web socket adapter 로 관리하는 방법이다.

// main.ts
async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  app.enableCors({
    origin: true,
    credentials: true,
  });

  // 1. io adapter init using socketIO
  const redisIoAdapter = new RedisIoAdapter(app);
  await redisIoAdapter.connectToRedis();
  app.useWebSocketAdapter(redisIoAdapter);

  // 2. ws adapter init using WS (faster then socketIO, lower skill then socketIO)
  app.useWebSocketAdapter(new WsAdapter(app));


  await app.listen(3000);
}
bootstrap();

코드에 대한 설명은 아래에서 더 자세하게 다뤄보겠다.


socket server gateway

Gateway lifeCycle

ws gateway 가 연결이 되고 끊어때까지의 생명주기

implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect

를 통해서 알 수 있다.

export interface OnGatewayInit<T = any> { 
  // ws gateway 시작되는 순간 가장먼저 실행됨
    afterInit(server: T): any;
}
export interface OnGatewayConnection<T = any> { 
  // 소켓 연결되면 실행됨
    handleConnection(client: T, ...args: any[]): any;
}
export interface OnGatewayDisconnect<T = any> { 
  // 소켓 연결 끊어지면 실행됨
    handleDisconnect(client: T): any;
}

OnGatewayInit


ws gateway 에 namespace 속성을 추가하면 해당 namespace 로 방을 만들었음을 알 수 있다. (nsps 는 namespaces 의 축약이다 ^^)

web socket 에서 말하는 namespace 는 하나의 방이라고 생각하면 된다.
(http 통신에서의 방은 엔드포인트라고 볼 수 있듯이)
namespace 이름에 따라 요청을 다르게 처리할 수 있다.

OnGatewayConnection & OnGatewayDisconnect

동일 클라이언트가 연결이 됐다가 끊겼다는 것을 id 로 알 수 있다.

socket adapter

adapter는 쉽게 말해, 여러 소켓 서버들을 연결해주는 모듈이라고 생각하면 된다.
여러 서버들간의 데이터를 주고 받는 다거나 상태 값을 공유할 때 쓰인다.

nestjs 에서는 기본적으로 socket을 socketIO 로 연결한다. ws로 연결시켜주기 위해서는 따로 세팅이 필요하다.

1. socketio with not Adapter

Namespace {
  _events: [Object: null prototype] {},
  _eventsCount: 0,
  _maxListeners: undefined,
  sockets: Map(0) {},
  _fns: [],
  _ids: 0,
  server: Server {
    _events: [Object: null prototype] {},
    _eventsCount: 0,
    _maxListeners: undefined,
    _nsps: Map(1) { '/' => [Circular *1] },
    parentNsps: Map(0) {},
    parentNamespacesFromRegExp: Map(0) {},
    _path: '/socket.io',
    clientPathRegex: /^\/socket\.io\/socket\.io(\.msgpack|\.esm)?(\.min)?\.js(\.map)?(?:\?|$)/,
    _connectTimeout: 45000,
    _serveClient: true,
    _parser: {
      protocol: 5,
      PacketType: [Object],
      Encoder: [class Encoder],
      Decoder: [class Decoder extends Emitter]
    },
    ... 이하 생략
  },
  name: '/',
  adapter: Adapter {
    _events: [Object: null prototype] {},
    _eventsCount: 0,
    _maxListeners: undefined,
    nsp: [Circular *1],
    rooms: Map(0) {},
    sids: Map(0) {},
    encoder: Encoder { replacer: undefined },
    [Symbol(kCapture)]: false
  },
  [Symbol(kCapture)]: false
}

그림과 같이 socketIO 로 연결하고 Adapter 또한 사용하고 있지 않는 것을 볼 수 있다. 하지만 redis 를 이용해 socketIO Adapter 를 이용한다면 (아래 코드가 있음)

2. socketio with Adapter

Namespace {
  _events: [Object: null prototype] {},
  _eventsCount: 0,
  _maxListeners: undefined,
  sockets: Map(0) {},
  _fns: [],
  _ids: 0,
  server: Server {
    _events: [Object: null prototype] {},
    _eventsCount: 0,
    _maxListeners: undefined,
    _nsps: Map(1) { '/' => [Circular *1] },
    parentNsps: Map(0) {},
    parentNamespacesFromRegExp: Map(0) {},
    _path: '/socket.io',
    clientPathRegex: /^\/socket\.io\/socket\.io(\.msgpack|\.esm)?(\.min)?\.js(\.map)?(?:\?|$)/,
   ... 이하 생략
  },
  name: '/',
  adapter: RedisAdapter {
    _events: [Object: null prototype] {},
    _eventsCount: 0,
    _maxListeners: undefined,
    nsp: [Circular *1],
    rooms: Map(0) {},
    sids: Map(0) {},
    encoder: Encoder { replacer: undefined },
    pubClient: Commander {
      _events: [Object: null prototype],
      _eventsCount: 1,
      _maxListeners: undefined,
      commandOptions: [Function: commandOptions],
      select: [AsyncFunction: SELECT],
      subscribe: [Function: SUBSCRIBE],
      unsubscribe: [Function: UNSUBSCRIBE],
      pSubscribe: [Function: PSUBSCRIBE],
      pUnsubscribe: [Function: PUNSUBSCRIBE],
      sSubscribe: [Function: SSUBSCRIBE],
      sUnsubscribe: [Function: SUNSUBSCRIBE],
      ... 이하생략
    },
    subClient: Commander {
      _events: [Object: null prototype],
      _eventsCount: 1,
      _maxListeners: undefined,
      commandOptions: [Function: commandOptions],
      select: [AsyncFunction: SELECT],
      subscribe: [Function: SUBSCRIBE],
      unsubscribe: [Function: UNSUBSCRIBE],
      pSubscribe: [Function: PSUBSCRIBE],
      pUnsubscribe: [Function: PUNSUBSCRIBE],
      sSubscribe: [Function: SSUBSCRIBE],
      sUnsubscribe: [Function: SUNSUBSCRIBE],
      ... 이하생략
    },
    ... 이하생략
    channel: 'socket.io#/#',
    requestChannel: 'socket.io-request#/#',
    responseChannel: 'socket.io-response#/#',
    specificResponseChannel: 'socket.io-response#/#aVBDBs#',
    friendlyErrorHandler: [Function (anonymous)],
    [Symbol(kCapture)]: false
  },
  [Symbol(kCapture)]: false
}

다음과 같이 socketIO 를 사용하면서 Adapter 또한 사용함을 알 수 있다.

코드

// io-adapter.ts
import { IoAdapter } from '@nestjs/platform-socket.io';
import { ServerOptions } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

export class RedisIoAdapter extends IoAdapter {
  private adapterConstructor: ReturnType<typeof createAdapter>;

  async connectToRedis(): Promise<void> {
    const pubClient = createClient({ url: `redis://localhost:6379` });
    const subClient = pubClient.duplicate();

    await Promise.all([pubClient.connect(), subClient.connect()]);

    this.adapterConstructor = createAdapter(pubClient, subClient); // publish client 와 subscribe client 끼리 연결?
  }

  createIOServer(port: number, options?: ServerOptions): any {
    const server = super.createIOServer(port, options);
    server.adapter(this.adapterConstructor);
    return server;
  }
}

3. web socket with Adapter

다음으로 ws adpater 를 사용하도록 설정하면 (아래 코드 있음)

WebSocketServer {
  _events: [Object: null prototype] {
    connection: [Function (anonymous)],
    error: [Function (anonymous)]
  },
  _eventsCount: 2,
  _maxListeners: undefined,
  _server: <ref *1> Server {
    maxHeaderSize: undefined,
    insecureHTTPParser: undefined,
    requestTimeout: 300000,
    headersTimeout: 60000,
    keepAliveTimeout: 5000,
    connectionsCheckingInterval: 30000,
    joinDuplicateHeaders: undefined,
    ... 이하 생략
  _removeListeners: [Function: removeListeners],
  clients: Set(0) {},
  _shouldEmitClose: false,
  options: {
    ... 이하생략
    port: 80,
    WebSocket: <ref *2> [class WebSocket extends EventEmitter] {
      CONNECTING: 0,
      OPEN: 1,
      CLOSING: 2,
      CLOSED: 3,
      createWebSocketStream: [Function: createWebSocketStream],
      Server: [class WebSocketServer extends EventEmitter],
      Receiver: [class Receiver extends Writable],
      Sender: [class Sender],
      WebSocket: [Circular *2],
      WebSocketServer: [class WebSocketServer extends EventEmitter]
    }
  },
  _state: 0,
  [Symbol(kCapture)]: false
}

코드

// ws-adapter.ts
import * as WebSocket from 'ws';
import { WebSocketAdapter, INestApplicationContext } from '@nestjs/common';
import { MessageMappingProperties } from '@nestjs/websockets';
import { Observable, fromEvent, EMPTY } from 'rxjs';
import { mergeMap, filter } from 'rxjs/operators';

export class WsAdapter implements WebSocketAdapter {
  constructor(private app: INestApplicationContext) {}

  create(port: number, options: any = {}): any {
    return new WebSocket.Server({ port, ...options });
  }
  

  bindClientConnect(server, callback: Function) {
    server.on('connection', callback);
  }

  bindMessageHandlers(
    client: WebSocket,
    handlers: MessageMappingProperties[],
    process: (data: any) => Observable<any>,
  ) {
    fromEvent(client, 'message')
      .pipe(
        mergeMap(data => this.bindMessageHandler(data, handlers, process)),
        filter(result => result),
      )
      .subscribe(response => client.send(JSON.stringify(response)));
  }

  bindMessageHandler(
    buffer,
    handlers: MessageMappingProperties[],
    process: (data: any) => Observable<any>,
  ): Observable<any> {
    const message = JSON.parse(buffer.data);
    const messageHandler = handlers.find(
      handler => handler.message === message.event,
    );
    if (!messageHandler) {
      return EMPTY;
    }
    return process(messageHandler.callback(message.data));
  }

  close(server) {
    server.close();
  }
}

얼추 어떻게 다르고 역할을 하는지 알았으니, 이제 adapter (redis io apater) 를 이용해서 테스트를 해보자.
두개의 소켓 서버를 생성한다

port : 80(kor), 81(usa)

테스트는 postman 으로 했다.

postman test video

영상을 보면 한국에서 보낸 메세지를 미국에서도 수신할 수 있는 것으로 보인다.

이 처럼 다른 서버간의 소켓 통신을 위해서는 adapter 로 통신할 수 있음을 확인할 수 있다.


github

https://github.com/jjmmll0727/potential-nestjs-memory/tree/main/socket-kafka

profile
자유로워지고 싶다면 기록하라.
post-custom-banner

0개의 댓글