다중 소켓 관리하기(2): PUB/SUB

JSM·2023년 11월 19일
1

프로젝트

목록 보기
1/10
post-thumbnail

왜 PUB/SUB을 또 도입해야 할까?

  • 트랙픽 관리 서버는 클라이언트가 소켓 서버에 연결되기 전에 어떤 소켓 서버로 가면 좋을지 알려주는 서버입니다.
  • 하지만 트래픽 관리 서버의 문제는 Nest.js의 멀티 클러스터 상황에서 문제가 발생합니다.

멀티 클러스터링이 뭘까요?

  • Nest.js를 PM2로 배포하게 되면 여러 프로세스로 구성이 됩니다.
  • 즉 같은 소켓 URL로 접근하더라도 다른 프로세스로 할당될 수 있고 이는 같은 ROOM에 조인할 수 없게 될 수 있음을 의미합니다.

PUB/SUB이 무엇일까?

  • 😾 프로세스
  • A하고 B서버가 채팅 채널을 SUB합니다.
  • B가 채팅 채널에 메시지를 PUB합니다.
  • A하고 B서버가 채팅 채널을 SUB하고 있었으므로 메시지를 받습니다.

ROOM 이름으로 PUB/SUB을 하자

  • ROOM 이름으로 PUB/SUB 하기
  • ROOM 이름이 UNIQUE하다면 ROOM 이름으로 PUB/SUB을 할 수 있다고 생각했습니다.
  • 사용자가 각자의 서버에서 UNIQUE한 ROOM에 JOIN하고, 서버는 ROOM에 대한 SUB을 합니다.
  • 이후 다른 서버에서 ROOM에 대한 PUB이 일어났을때 각 서버는 메시지를 SUB합니다.
  • 마지막으로 메시지를 ROOM에 JOIN된 사용자들에게 메시지를 보냅니다.

ROOM 이름으로 PUB/SUB을 하는 한계점…

  • 😾 문제점
  • ROOM 이름으로 이벤트를 발행하고 구독을 하니 문제가 하나 생겼습니다.
  • 일단 너무 많은 룸이 생성이 될 수 있는데 이 이벤트를 다 관리하는 것에서 오는 문제입니다.

각 서버마다 채널을 만들고 여기에 PUB/SUB하자

  • 😾 문제점
  • 각 서버마다 채널을 만들고 PUB/SUB합니다
  • 이렇게 하면 각 서버의 멀티 클러스터는 해당 채널의 이름으로 PUB/SUB이 이뤄집니다.
  • 만약 새로운 채팅 소켓 서버가 생성되면 이를 트래픽 관리 서버를 통해 관리됩니다.

최종 아키텍처

  • 😾 아키텍처
  • 트래픽 관리 서버를 통해 어떤 소켓 서버로 갈지 URL을 받습니다.
  • 각 소켓 서버는 각 서버의 이름으로 PUB/SUB이 이뤄집니다.
  • 즉 각 서버마다 PUB/SUB이 이뤄지므로 멀티 클러스터링 환경에서도 채팅이 가능합니다.

그림으로 이해하자!

멀티 클러스터 상황에서는 어떤 문제가 발생할까?

  • 같은 채팅방의 클라이언트 1,2,3,4,5가 채팅을 주고 받기 위해 소켓 서버로 연결합니다.
  • 클라이언트가 트래픽 관리 서버를 통해 특정 서버로 이동해도 그 안에서 어떤 프로세스로 이동하게 될지 알 수 없습니다.
  • 즉 또 다시 서로 메시지를 주고 받을 수 없는 문제가 발생합니다.

PUB/SUB을 활용하자

  • 트래픽 관리 서버를 통해 같은 채팅방 유저는 같은 서버로 연결했습니다.
  • 하지만 그럼에도 각각 프로세스가 나뉩니다.
  • 이런 문제를 해결하기 위해 Redis PUB/SUB을 활용하였고 메시지를 각 서버 채널로 PUB합니다.
  • 각 서버의 프로세스들은 같은 채널을 구독하므로 SUB을 통해 메시지를 클라이언트들에게 전달합니다.
  • 즉 같은 서버로 이동하게 되면 프로세스가 다르더라도 메시지를 주고 받을 수 있는 것입니다.

채팅 구현 3단계

일반적인 채팅 기능을 구현해보자

  • 사실 단순히 채팅 기능만 구현하는 것은 너무 쉽다. 아래의 코드가 그것입니다.
  • 하지만 이런 경우 멀티 클러스터 환경에서 같은 소켓 URL로 접근하더라도, 다른 클러스터로 배정될 수 있기 때문에 같은 ROOM에 조인할 수 없다. 즉 메시지를 주고 받을 수 없습니다.
@WebSocketGateway({ cors: true })
export class ChatGateway implements OnGatewayConnection {
  private readonly logger = new Logger();
  @WebSocketServer()
  server: Server;

  handleConnection(socket: Socket) {
    this.logger.log(`on connect called : ${socket.id}`);
  }

  @SubscribeMessage('joinRoom')
  handleJoin(
    @MessageBody() data: JoinRoomDto,
    @ConnectedSocket() socket: Socket,
  ) {
    const { room } = data;
    socket.join(room);
  }

  @SubscribeMessage('leaveRoom')
  async handleLeave(
    @MessageBody() data: LeaveRoomDto,
    @ConnectedSocket() socket: Socket,
  ) {
    const { room } = data;
    socket.leave(room);
  }

  @SubscribeMessage('sendMessage')
  async sendMessage(
    @MessageBody() data: MessageDto,
    @ConnectedSocket() socket: Socket,
  ) {
    const { room, message } = data;
    socket.to(room).emit('newMessage', message);
  }
}

ROOM 이름으로 PUB/SUB 구현하기

  • ROOM 이름을 통해 이벤트를 발행하고 서버가 만약 해당 ROOM을 SUB했을때 해당 ROOM에 메시지를 전달합니다.
  • A 서버의 ROOM1에 사람이 있고 B 서버의 ROOM1에 사람이 있다고 해보겠습니다.
  • 각 사람이 ROOM에 조인할때 각 사람이 참여한 서버가 ROOM1에 대한 이벤트를 구독합니다.
  • A 서버에서 ROOM1에 대한 PUB을 하면 A 서버와 B 서버가 ROOM1에 대한 메시지를 SUB합니다.
  • 이 후 해당 메시지를 해당 ROOM의 클라이언트들에게 전달합니다.
  • 이를 통해 멀티 클러스터링 상황에서도 메시지를 주고받을 수 있었습니다.
@WebSocketGateway({ namespace: SOCKET.NAME_SPACE, cors: true })
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  private readonly logger = new Logger();
  private rooms: Map<string, boolean> = new Map();
  private roomToCount: Map<string, number> = new Map();
  private instanceId = process.env.NODE_APP_INSTANCE || os.hostname();
  private subscriberClient: Redis;
  private publisherClient: Redis;

  constructor(
    @InjectRedis() private readonly client: Redis,
    private readonly chatService: ChatService,
    private readonly configService: ConfigService,
  ) {
    this.subscriberClient = client.duplicate();
    this.publisherClient = client.duplicate();
  }

  handleConnection(socket: Socket) {
    this.logger.log(`Instance ${this.instanceId} - connected: ${socket.id}`);
  }

  handleDisconnect(socket: Socket) {
    this.logger.log(`Instance ${this.instanceId} - disconnected: ${socket.id}`);
  }

  @SubscribeMessage(SOCKET_EVENT.JOIN_ROOM)
  handleJoin(
    @MessageBody() data: JoinRoomDto,
    @ConnectedSocket() socket: Socket,
  ) {
    this.logger.log(`Instance ${this.instanceId} - joinRoom: ${socket.id}`);

    const { room } = data;
    this.chatService.validateRoom(room);

    socket.join(room);

    const isRoom = this.rooms.get(room);
    const count = this.roomToCount.get(room) || 0;
    this.roomToCount.set(room, count + 1);

    if (!isRoom) {
      this.subscriberClient.subscribe(room);
      this.subscriberClient.on('message', (channel, message) => {
        if (channel === room) {
          this.server.to(room).emit(SOCKET_EVENT.NEW_MESSAGE, message);
        }
      });
      this.rooms.set(room, true);
    }
  }

  @SubscribeMessage(SOCKET_EVENT.LEAVE_ROOM)
  async handleLeave(
    @MessageBody() data: LeaveRoomDto,
    @ConnectedSocket() socket: Socket,
  ) {
    this.logger.log(`Instance ${this.instanceId} - leaveRoom: ${socket.id}`);

    const { room } = data;
    this.chatService.validateRoom(room);

    socket.leave(room);

    const count = (this.roomToCount.get(room) || 1) - 1;
    this.roomToCount.set(room, count);

    if (count === SOCKET.EMPTY_ROOM) {
      this.subscriberClient.unsubscribe(room);
      await this.chatService.deleteByRoom(room);
      this.rooms.delete(room);
      this.roomToCount.delete(room);
    }
  }

  @SubscribeMessage(SOCKET_EVENT.SEND_MESSAGE)
  async handleMessage(
    @MessageBody() data: MessageDto,
    @ConnectedSocket() socket: Socket,
  ) {
    this.logger.log(`Instance ${this.instanceId} - sendMessage: ${socket.id}`);

    this.chatService.validateSendMessage(data);

    const { room, message, nickname } = data;

    const response = {
      message: message,
      nickname: nickname,
      socketId: socket.id,
    };

    try {
      await this.publisherClient.publish(room, JSON.stringify(response));
    } catch (error) {
      throw new WsException({
        statusCode: ERRORS.FAILED_PUBLISHING.statusCode,
        message: ERRORS.FAILED_PUBLISHING.message,
      });
    }
  }
}

ROOM 이름이 아닌 서버 단위를 통해 이벤트 발행 및 구독

  • ROOM 이름으로 이벤트를 발행하고 구독을 하니 문제가 하나 생겼습니다.
  • 일단 너무 많은 룸이 생성이 될 수 있는데 이 이벤트를 다 관리하는 것에서 오는 문제입니다.
(node:39) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 message listeners added to [Commander]. Use emitter.setMaxListeners() to increase limit
  • 위에가 그 에러(?)까지는 아니고 경고인데 10개 이상 만들어서 나는 경고이다.
  • 사실 한 천개 만들었는데 이런 경고가 나면 이해가 가겠는데... 10개에서 이런 경고가 난다는것은 설계가 잘못되었다고 느꼈다.
  • 따라서 클러스터 프로세스들이 초기화될때 채팅이라는 이벤트를 구독하고 채팅이라는 채널에 PUB을 하는 방식으로 바꿨습니다.
  • 이러면 Map을 통해 ROOM에 대한 정보를 관리할 필요도 없으므로 메모리 관리에도 좋습니다.
@WebSocketGateway({ namespace: SOCKET.NAME_SPACE, cors: true })
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  private readonly logger = new Logger();
  private roomToCount: Map<string, number> = new Map();
  private instanceId = process.env.NODE_APP_INSTANCE || os.hostname();
  private subscriberClient: Redis;
  private publisherClient: Redis;

  constructor(
    @InjectRedis() private readonly client: Redis,
    private readonly chatService: ChatService,
    private readonly configService: ConfigService,
  ) {
    this.subscriberClient = client.duplicate();
    this.publisherClient = client.duplicate();

    this.subscriberClient.subscribe(SOCKET.REDIS_CHAT_CHANEL);

    this.subscriberClient.on('message', this.handleChatMessage.bind(this));
  }

  private handleChatMessage(channel: string, message: string) {
    if (channel === SOCKET.REDIS_CHAT_CHANEL) {
      const { room, ...messageData } = JSON.parse(message);
      this.server.to(room).emit(SOCKET_EVENT.NEW_MESSAGE, messageData);
    }
  }

  handleConnection(socket: Socket) {
    this.logger.log(`Instance ${this.instanceId} - connected: ${socket.id}`);
  }

  handleDisconnect(socket: Socket) {
    this.logger.log(`Instance ${this.instanceId} - disconnected: ${socket.id}`);
  }

  @SubscribeMessage(SOCKET_EVENT.JOIN_ROOM)
  handleJoin(
    @MessageBody() data: JoinRoomDto,
    @ConnectedSocket() socket: Socket,
  ) {
    this.logger.log(`Instance ${this.instanceId} - joinRoom: ${socket.id}`);

    const { room } = data;
    this.chatService.validateRoom(room);

    socket.join(room);

    const count = this.roomToCount.get(room) || 0;
    this.roomToCount.set(room, count + 1);
  }

  @SubscribeMessage(SOCKET_EVENT.LEAVE_ROOM)
  async handleLeave(
    @MessageBody() data: LeaveRoomDto,
    @ConnectedSocket() socket: Socket,
  ) {
    this.logger.log(`Instance ${this.instanceId} - leaveRoom: ${socket.id}`);

    const { room } = data;
    this.chatService.validateRoom(room);

    socket.leave(room);

    const count = (this.roomToCount.get(room) || 1) - 1;
    this.roomToCount.set(room, count);

    if (count === SOCKET.EMPTY_ROOM) {
      this.roomToCount.delete(room);
      this.chatService.deleteByRoom(room);
    }
  }

  @SubscribeMessage(SOCKET_EVENT.SEND_MESSAGE)
  async handleMessage(
    @MessageBody() data: MessageDto,
    @ConnectedSocket() socket: Socket,
  ) {
    this.logger.log(`Instance ${this.instanceId} - sendMessage: ${socket.id}`);

    this.chatService.validateSendMessage(data);

    const { room, message, nickname, ai } = data;

    const response = {
      room: room,
      message: message,
      nickname: nickname,
      socketId: socket.id,
    };

    try {
      await this.publisherClient.publish(
        SOCKET.REDIS_CHAT_CHANEL,
        JSON.stringify(response),
      );
    } catch (error) {
      throw new WsException({
        statusCode: ERRORS.FAILED_PUBLISHING.statusCode,
        message: ERRORS.FAILED_PUBLISHING.message,
      });
    }
  }
}
profile
내 기술적 고민들을 모은 곳...

0개의 댓글