[NestJS] Socket.io 4 ver.

shw·2021년 4월 28일
0

새로운 SocketIoAdapter 작성

NestJs에서 Websocet-Socket.io 2ver를 사용한다.
하지만 Flutter에서는 Socket.io 2ver에서 커넥션이 계속 끊기는 문제가 있어서 최신버전에 4버전으로 업그레이드해야만 했다.

다음의 코드는 NestJs에서 SocketIoAdapter에 대한 코드이다.

//main.ts 
app.useWebSocketAdapter(new SocketIoAdapter(app));

//socket-io-adater.ts
export class SocketIoAdapter extends AbstractWsAdapter {
  constructor(
    appOrHttpServer?: INestApplicationContext | any,
    private readonly corsOrigins = [],
  ) {
    super(appOrHttpServer);
  }

  public create(
    port: number,
    options?: any & { namespace?: string; server?: any },
  ): any {
    if (!options) {
      return this.createIOServer(port);
    }
    const { namespace, server, ...opt } = options;
    return server && isFunction(server.of)
      ? server.of(namespace)
      : namespace
        ? this.createIOServer(port, opt).of(namespace)
        : this.createIOServer(port, opt);
  }

  public createIOServer(port: number, options?: any): any {
    const pubClient = new RedisClient({ host: process.env.REDIS_HOST, port: 6379 });
    const subClient = pubClient.duplicate();
    const redisAdapter = createAdapter({ pubClient, subClient });
    if (this.httpServer && port === 0) {
      const s = new Server(this.httpServer, {
        cors: {
          origin: this.corsOrigins,
          methods: ['GET', 'POST'],
          credentials: true,
          allowedHeaders: ['Content-Type', 'authorization'],
        },
        cookie: {
          // name: 'test',
          httpOnly: true,
          path: '/',
        },
        // Allow 1MB of data per request.
        maxHttpBufferSize: 1e6,
      });


      s.adapter(redisAdapter);
      return s;
    }
    const s = new Server(port, options);
    s.adapter(redisAdapter);
    return s;
  }

  public bindMessageHandlers(
    client: any,
    handlers: MessageMappingProperties[],
    transform: (data: any) => Observable<any>,
  ) {
    const disconnect$ = fromEvent(client, DISCONNECT_EVENT).pipe(
      share(),
      first(),
    );

    handlers.forEach(({ message, callback }) => {
      const source$ = fromEvent(client, message).pipe(
        mergeMap((payload: any) => {
          const { data, ack } = this.mapPayload(payload);
          return transform(callback(data, ack)).pipe(
            filter((response: any) => !isNil(response)),
            map((response: any) => [response, ack]),
          );
        }),
        takeUntil(disconnect$),
      );
      source$.subscribe(([response, ack]) => {
        if (response.event) {
          return client.emit(response.event, response.data);
        }
        isFunction(ack) && ack(response);
      });
    });
  }

  public mapPayload(payload: any): { data: any; ack?: Function } {
    if (!Array.isArray(payload)) {
      return { data: payload };
    }
    const lastElement = payload[payload.length - 1];
    const isAck = isFunction(lastElement);
    if (isAck) {
      const size = payload.length - 1;
      return {
        data: size === 1 ? payload[0] : payload.slice(0, size),
        ack: lastElement,
      };
    }
    return { data: payload };
  }
}

Socket.io에 Redis Adater추가

Socket.io를 단일 서버로만 구성하면 문제없겠지만 클러스터로 구성하면 다른 서버와 메세지 교환에 문제가 있다. 이런문제를 해결하기 위해 일반적으로 Redis Pub/Sub을 이용하여 문제를 해결한다.

기본적으로 Socket.io에서는 adater(Redis)를 추가할 수 있다.

const pubClient = new RedisClient({ host: process.env.REDIS_HOST, port: 6379 });
const subClient = pubClient.duplicate();
const redisAdapter = createAdapter({ pubClient, subClient });

const io = new Server(port, options);
io.adapter(redisAdapter);

위와 같이 Redis adater를 Socket.io에 추가하면 위의 그림과 같은 구성으로 서버를 만들 수 있게된다.

Web에서는 Socket.io 2버전도 문제없이 잘되었지만 Flutter의 socket io 라이브러리에선 최신버전을 사용해야 문제 없이 잘돌아간다. 다음의 라이브러리는 Flutter에서 사용한 라이브러리이다.

import 'package:socket_io/socket_io.dart';

0개의 댓글