왜 PUB/SUB을 또 도입해야 할까?
- 트랙픽 관리 서버는 클라이언트가 소켓 서버에 연결되기 전에 어떤 소켓 서버로 가면 좋을지 알려주는 서버입니다.
- 하지만 트래픽 관리 서버의 문제는 Nest.js의 멀티 클러스터 상황에서 문제가 발생합니다.
멀티 클러스터링이 뭘까요?
- Nest.js를 PM2로 배포하게 되면 여러 프로세스로 구성이 됩니다.
- 즉 같은 소켓 URL로 접근하더라도 다른 프로세스로 할당될 수 있고 이는 같은 ROOM에 조인할 수 없게 될 수 있음을 의미합니다.
PUB/SUB이 무엇일까?
- 😾 프로세스
- A하고 B서버가 채팅 채널을 SUB합니다.
- B가 채팅 채널에 메시지를 PUB합니다.
- A하고 B서버가 채팅 채널을 SUB하고 있었으므로 메시지를 받습니다.
ROOM 이름으로 PUB/SUB을 하자
- ROOM 이름으로 PUB/SUB 하기
- ROOM 이름이 UNIQUE하다면 ROOM 이름으로 PUB/SUB을 할 수 있다고 생각했습니다.
- 사용자가 각자의 서버에서 UNIQUE한 ROOM에 JOIN하고, 서버는 ROOM에 대한 SUB을 합니다.
- 이후 다른 서버에서 ROOM에 대한 PUB이 일어났을때 각 서버는 메시지를 SUB합니다.
- 마지막으로 메시지를 ROOM에 JOIN된 사용자들에게 메시지를 보냅니다.
ROOM 이름으로 PUB/SUB을 하는 한계점…
- 😾 문제점
- ROOM 이름으로 이벤트를 발행하고 구독을 하니 문제가 하나 생겼습니다.
- 일단 너무 많은 룸이 생성이 될 수 있는데 이 이벤트를 다 관리하는 것에서 오는 문제입니다.
각 서버마다 채널을 만들고 여기에 PUB/SUB하자
- 😾 문제점
- 각 서버마다 채널을 만들고 PUB/SUB합니다
- 이렇게 하면 각 서버의 멀티 클러스터는 해당 채널의 이름으로 PUB/SUB이 이뤄집니다.
- 만약 새로운 채팅 소켓 서버가 생성되면 이를 트래픽 관리 서버를 통해 관리됩니다.
최종 아키텍처
- 😾 아키텍처
- 트래픽 관리 서버를 통해 어떤 소켓 서버로 갈지 URL을 받습니다.
- 각 소켓 서버는 각 서버의 이름으로 PUB/SUB이 이뤄집니다.
- 즉 각 서버마다 PUB/SUB이 이뤄지므로 멀티 클러스터링 환경에서도 채팅이 가능합니다.
그림으로 이해하자!
멀티 클러스터 상황에서는 어떤 문제가 발생할까?
- 같은 채팅방의 클라이언트 1,2,3,4,5가 채팅을 주고 받기 위해 소켓 서버로 연결합니다.
- 클라이언트가 트래픽 관리 서버를 통해 특정 서버로 이동해도 그 안에서 어떤 프로세스로 이동하게 될지 알 수 없습니다.
- 즉 또 다시 서로 메시지를 주고 받을 수 없는 문제가 발생합니다.
PUB/SUB을 활용하자
- 트래픽 관리 서버를 통해 같은 채팅방 유저는 같은 서버로 연결했습니다.
- 하지만 그럼에도 각각 프로세스가 나뉩니다.
- 이런 문제를 해결하기 위해 Redis PUB/SUB을 활용하였고 메시지를 각 서버 채널로 PUB합니다.
- 각 서버의 프로세스들은 같은 채널을 구독하므로 SUB을 통해 메시지를 클라이언트들에게 전달합니다.
- 즉 같은 서버로 이동하게 되면 프로세스가 다르더라도 메시지를 주고 받을 수 있는 것입니다.
채팅 구현 3단계
일반적인 채팅 기능을 구현해보자
- 사실 단순히 채팅 기능만 구현하는 것은 너무 쉽다. 아래의 코드가 그것입니다.
- 하지만 이런 경우 멀티 클러스터 환경에서 같은 소켓 URL로 접근하더라도, 다른 클러스터로 배정될 수 있기 때문에 같은 ROOM에 조인할 수 없다. 즉 메시지를 주고 받을 수 없습니다.
@WebSocketGateway({ cors: true })
export class ChatGateway implements OnGatewayConnection {
private readonly logger = new Logger();
@WebSocketServer()
server: Server;
handleConnection(socket: Socket) {
this.logger.log(`on connect called : ${socket.id}`);
}
@SubscribeMessage('joinRoom')
handleJoin(
@MessageBody() data: JoinRoomDto,
@ConnectedSocket() socket: Socket,
) {
const { room } = data;
socket.join(room);
}
@SubscribeMessage('leaveRoom')
async handleLeave(
@MessageBody() data: LeaveRoomDto,
@ConnectedSocket() socket: Socket,
) {
const { room } = data;
socket.leave(room);
}
@SubscribeMessage('sendMessage')
async sendMessage(
@MessageBody() data: MessageDto,
@ConnectedSocket() socket: Socket,
) {
const { room, message } = data;
socket.to(room).emit('newMessage', message);
}
}
ROOM 이름으로 PUB/SUB 구현하기
- ROOM 이름을 통해 이벤트를 발행하고 서버가 만약 해당 ROOM을 SUB했을때 해당 ROOM에 메시지를 전달합니다.
- A 서버의 ROOM1에 사람이 있고 B 서버의 ROOM1에 사람이 있다고 해보겠습니다.
- 각 사람이 ROOM에 조인할때 각 사람이 참여한 서버가 ROOM1에 대한 이벤트를 구독합니다.
- A 서버에서 ROOM1에 대한 PUB을 하면 A 서버와 B 서버가 ROOM1에 대한 메시지를 SUB합니다.
- 이 후 해당 메시지를 해당 ROOM의 클라이언트들에게 전달합니다.
- 이를 통해 멀티 클러스터링 상황에서도 메시지를 주고받을 수 있었습니다.
@WebSocketGateway({ namespace: SOCKET.NAME_SPACE, cors: true })
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private readonly logger = new Logger();
private rooms: Map<string, boolean> = new Map();
private roomToCount: Map<string, number> = new Map();
private instanceId = process.env.NODE_APP_INSTANCE || os.hostname();
private subscriberClient: Redis;
private publisherClient: Redis;
constructor(
@InjectRedis() private readonly client: Redis,
private readonly chatService: ChatService,
private readonly configService: ConfigService,
) {
this.subscriberClient = client.duplicate();
this.publisherClient = client.duplicate();
}
handleConnection(socket: Socket) {
this.logger.log(`Instance ${this.instanceId} - connected: ${socket.id}`);
}
handleDisconnect(socket: Socket) {
this.logger.log(`Instance ${this.instanceId} - disconnected: ${socket.id}`);
}
@SubscribeMessage(SOCKET_EVENT.JOIN_ROOM)
handleJoin(
@MessageBody() data: JoinRoomDto,
@ConnectedSocket() socket: Socket,
) {
this.logger.log(`Instance ${this.instanceId} - joinRoom: ${socket.id}`);
const { room } = data;
this.chatService.validateRoom(room);
socket.join(room);
const isRoom = this.rooms.get(room);
const count = this.roomToCount.get(room) || 0;
this.roomToCount.set(room, count + 1);
if (!isRoom) {
this.subscriberClient.subscribe(room);
this.subscriberClient.on('message', (channel, message) => {
if (channel === room) {
this.server.to(room).emit(SOCKET_EVENT.NEW_MESSAGE, message);
}
});
this.rooms.set(room, true);
}
}
@SubscribeMessage(SOCKET_EVENT.LEAVE_ROOM)
async handleLeave(
@MessageBody() data: LeaveRoomDto,
@ConnectedSocket() socket: Socket,
) {
this.logger.log(`Instance ${this.instanceId} - leaveRoom: ${socket.id}`);
const { room } = data;
this.chatService.validateRoom(room);
socket.leave(room);
const count = (this.roomToCount.get(room) || 1) - 1;
this.roomToCount.set(room, count);
if (count === SOCKET.EMPTY_ROOM) {
this.subscriberClient.unsubscribe(room);
await this.chatService.deleteByRoom(room);
this.rooms.delete(room);
this.roomToCount.delete(room);
}
}
@SubscribeMessage(SOCKET_EVENT.SEND_MESSAGE)
async handleMessage(
@MessageBody() data: MessageDto,
@ConnectedSocket() socket: Socket,
) {
this.logger.log(`Instance ${this.instanceId} - sendMessage: ${socket.id}`);
this.chatService.validateSendMessage(data);
const { room, message, nickname } = data;
const response = {
message: message,
nickname: nickname,
socketId: socket.id,
};
try {
await this.publisherClient.publish(room, JSON.stringify(response));
} catch (error) {
throw new WsException({
statusCode: ERRORS.FAILED_PUBLISHING.statusCode,
message: ERRORS.FAILED_PUBLISHING.message,
});
}
}
}
ROOM 이름이 아닌 서버 단위를 통해 이벤트 발행 및 구독
- ROOM 이름으로 이벤트를 발행하고 구독을 하니 문제가 하나 생겼습니다.
- 일단 너무 많은 룸이 생성이 될 수 있는데 이 이벤트를 다 관리하는 것에서 오는 문제입니다.
(node:39) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 message listeners added to [Commander]. Use emitter.setMaxListeners() to increase limit
- 위에가 그 에러(?)까지는 아니고 경고인데 10개 이상 만들어서 나는 경고이다.
- 사실 한 천개 만들었는데 이런 경고가 나면 이해가 가겠는데... 10개에서 이런 경고가 난다는것은 설계가 잘못되었다고 느꼈다.
- 따라서 클러스터 프로세스들이 초기화될때 채팅이라는 이벤트를 구독하고 채팅이라는 채널에 PUB을 하는 방식으로 바꿨습니다.
- 이러면 Map을 통해 ROOM에 대한 정보를 관리할 필요도 없으므로 메모리 관리에도 좋습니다.
@WebSocketGateway({ namespace: SOCKET.NAME_SPACE, cors: true })
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server;
private readonly logger = new Logger();
private roomToCount: Map<string, number> = new Map();
private instanceId = process.env.NODE_APP_INSTANCE || os.hostname();
private subscriberClient: Redis;
private publisherClient: Redis;
constructor(
@InjectRedis() private readonly client: Redis,
private readonly chatService: ChatService,
private readonly configService: ConfigService,
) {
this.subscriberClient = client.duplicate();
this.publisherClient = client.duplicate();
this.subscriberClient.subscribe(SOCKET.REDIS_CHAT_CHANEL);
this.subscriberClient.on('message', this.handleChatMessage.bind(this));
}
private handleChatMessage(channel: string, message: string) {
if (channel === SOCKET.REDIS_CHAT_CHANEL) {
const { room, ...messageData } = JSON.parse(message);
this.server.to(room).emit(SOCKET_EVENT.NEW_MESSAGE, messageData);
}
}
handleConnection(socket: Socket) {
this.logger.log(`Instance ${this.instanceId} - connected: ${socket.id}`);
}
handleDisconnect(socket: Socket) {
this.logger.log(`Instance ${this.instanceId} - disconnected: ${socket.id}`);
}
@SubscribeMessage(SOCKET_EVENT.JOIN_ROOM)
handleJoin(
@MessageBody() data: JoinRoomDto,
@ConnectedSocket() socket: Socket,
) {
this.logger.log(`Instance ${this.instanceId} - joinRoom: ${socket.id}`);
const { room } = data;
this.chatService.validateRoom(room);
socket.join(room);
const count = this.roomToCount.get(room) || 0;
this.roomToCount.set(room, count + 1);
}
@SubscribeMessage(SOCKET_EVENT.LEAVE_ROOM)
async handleLeave(
@MessageBody() data: LeaveRoomDto,
@ConnectedSocket() socket: Socket,
) {
this.logger.log(`Instance ${this.instanceId} - leaveRoom: ${socket.id}`);
const { room } = data;
this.chatService.validateRoom(room);
socket.leave(room);
const count = (this.roomToCount.get(room) || 1) - 1;
this.roomToCount.set(room, count);
if (count === SOCKET.EMPTY_ROOM) {
this.roomToCount.delete(room);
this.chatService.deleteByRoom(room);
}
}
@SubscribeMessage(SOCKET_EVENT.SEND_MESSAGE)
async handleMessage(
@MessageBody() data: MessageDto,
@ConnectedSocket() socket: Socket,
) {
this.logger.log(`Instance ${this.instanceId} - sendMessage: ${socket.id}`);
this.chatService.validateSendMessage(data);
const { room, message, nickname, ai } = data;
const response = {
room: room,
message: message,
nickname: nickname,
socketId: socket.id,
};
try {
await this.publisherClient.publish(
SOCKET.REDIS_CHAT_CHANEL,
JSON.stringify(response),
);
} catch (error) {
throw new WsException({
statusCode: ERRORS.FAILED_PUBLISHING.statusCode,
message: ERRORS.FAILED_PUBLISHING.message,
});
}
}
}