@Inject('REDIS_DATA_CLIENT') private redisDataClient: Redis, // Redis 데이터 클라이언트를 주입
@Inject('REDIS_SUB_CLIENT') private redisSubClient: Redis,
@Inject('REDIS_DATA_CLIENT'): REDIS_DATA_CLIENT라는 토큰(또는 식별자)에 연결된 의존성을 현재 클래스의 redisDataClient 프로퍼티에 주입하라는 지시입니다. 주입되는 객체는 Redis 데이터를 처리하기 위한 클라이언트입니다. 이 클라이언트를 사용하여 Redis 데이터베이스에 데이터를 저장하거나 조회하는 등의 작업을 수행할 수 있습니다.
@Inject('REDIS_SUB_CLIENT'): REDIS_SUB_CLIENT라는 토큰에 연결된 의존성을 현재 클래스의 redisSubClient 프로퍼티에 주입하라는 지시입니다. 이 클라이언트는 Redis의 Publish/Subscribe 기능을 사용하여 메시지를 구독하는 데 사용됩니다. 예를 들어, 특정 채널에서 발행된 메시지를 실시간으로 받아 처리할 수 있습니다.
Redis에서 발행/구독(pub/sub) 기능을 사용할 때, 발행하는 클라이언트(publisher)와 구독하는 클라이언트(subscriber) 사이에는 기능적인 차이가 없습니다. Redis 클라이언트는 발행과 구독 모두를 수행할 수 있으며, 이는 Redis 클라이언트의 인스턴스가 동시에 여러 역할을 할 수 있음을 의미합니다.
redisDataClient
와 redisSubClient
라는 두 개의 클라이언트 인스턴스를 사용하는 이유는, 발행과 구독 작업을 분리하여 처리하기 위함입니다. 이 구분은 다음과 같은 이유로 유용합니다:
역할 분리: 애플리케이션 내에서 데이터 관리(예: 데이터 저장, 조회 등)와 메시지 시스템(발행/구독)을 명확히 분리하고자 할 때, 별도의 클라이언트 인스턴스를 사용하는 것이 좋습니다. 이렇게 하면 코드의 가독성과 유지보수성이 향상됩니다.
성능 최적화: 발행과 구독에 다른 Redis 연결(클라이언트 인스턴스)을 사용하면, 각각의 작업에 최적화된 연결 설정을 적용할 수 있습니다. 예를 들어, 구독자 클라이언트는 주로 메시지 수신에 집중할 수 있도록, 발행자 클라이언트는 데이터 전송에 최적화될 수 있습니다.
자원 관리: Redis 클라이언트가 구독 모드에 들어가면, 해당 연결은 오직 구독 관련 명령어(SUBSCRIBE, UNSUBSCRIBE, PING 등)만 수행할 수 있습니다. 따라서, 데이터 관련 작업과 메시지 시스템을 동시에 처리하려면, 별도의 클라이언트 인스턴스가 필요합니다.
위의 코드에서 redisDataClient
에서 publish
메서드를 사용하는 것은, 이 클라이언트가 데이터 관련 작업뿐만 아니라 메시지 발행 역할도 수행할 수 있기 때문입니다. 반면, redisSubClient
는 구독 작업에 집중되어 있을 가능성이 큽니다. 이렇게 함으로써, 각 클라이언트는 자신의 주 역할에 집중하면서도 필요에 따라 다른 기능을 수행할 수 있는 유연성을 갖게 됩니다.
async publishNotification(message: string) {
const channelName = 'notifications';
await this.redisDataClient.publish(channelName, message);
}
메시지를 지정된 채널에 메시지를 "발행"(publish)합니다. Redis의 발행/구독(Publish/Subscribe, 또는 pub/sub) 시스템을 이용하는 것인데요, 이 시스템은 메시지 브로커 역할을 합니다.
이 시스템은 실시간 알림, 채팅 시스템, 실시간 피드 업데이트 등 다양한 실시간 메시징 요구사항에 활용됩니다. 중요한 점은, 발행된 메시지는 영구 저장되지 않고, 채널을 구독하는 클라이언트에게만 전달된다는 것입니다. 따라서, 메시지를 영구적으로 저장하려면 다른 방법을 사용해야 합니다.
@Interval(1000 * 60)
async handleScheduledTasks() {
// await this.redisConnection();
for (const channelType of Object.values(ChannelType)) {
let cursor = '0';
const tasks = [];
do {
const [nextCursor, keys] = await this.redisDataClient.scan(
cursor,
'MATCH',
`${channelType}:*`,
'COUNT',
100,
);
cursor = nextCursor;
if (keys.length === 0) continue;
for (const key of keys) {
tasks.push(this.processKey(key));
if (tasks.length >= 500) {
await Promise.all(tasks.splice(0, 500));
}
}
} while (cursor !== '0');
await Promise.all(tasks);
}
}
@Interval(1000 * 60)
데코레이터는 이 함수가 60초(1000밀리초 * 60)마다 한 번씩 자동으로 실행되도록 설정합니다. 이는 주기적인 작업을 자동화하는 데 사용됩니다.
코드 내부에서는 특정 조건을 만족하는 Redis 키를 찾는 과정을 반복적으로 수행합니다. 각 채널 유형(ChannelType
)에 대해 다음과 같은 작업을 수행합니다:
redisDataClient.scan
메서드를 사용하여 지정된 패턴(${channelType}:*
)에 맞는 키를 찾습니다. 이 메서드는 커서 기반의 반복을 사용하여 데이터베이스 내에서 키를 검색합니다. MATCH
옵션은 검색할 키의 패턴을 지정하고, COUNT
옵션은 한 번의 호출에서 반환될 수 있는 최대 키의 수를 지정합니다.
검색된 키가 있으면, 각 키에 대해 processKey
함수를 호출하여 처리합니다. 이 함수는 실제 키 처리 로직을 구현할 것으로 추정됩니다.
한 번에 많은 양의 작업(tasks
)을 처리하기 위해, 작업 목록에 500개의 작업이 쌓이면 Promise.all
을 사용하여 병렬로 처리합니다. 이는 성능 최적화를 위한 것으로 보입니다.
모든 키를 처리한 후에 남아 있는 작업들도 마찬가지로 Promise.all
을 사용하여 처리합니다.
모든 채널 유형에 대해 위의 과정을 반복합니다.
Promise.all
은 여러 프로미스를 병렬로 처리할 때 매우 유용하지만 몇 가지 주의해야 할 단점이 있습니다. 가장 큰 단점 중 하나는 모든 프로미스가 성공적으로 이행(resolve)될 때까지 기다리지만, 하나라도 거부(reject)되면 즉시 전체가 거부되고 나머지 성공적인 결과들에 대한 정보를 잃게 된다는 점입니다.
이는 다음과 같은 상황에서 문제가 될 수 있습니다:
완전성: 여러 작업 중 하나라도 실패하면 Promise.all
은 즉시 거부되고, 성공한 나머지 작업의 결과를 얻을 수 없습니다. 즉, 모든 작업의 결과가 중요한 경우 하나의 실패로 인해 유용한 정보를 잃을 수 있습니다.
오류 처리: Promise.all
은 첫 번째 거부된 프로미스만 포착하고 나머지는 무시합니다. 따라서 여러 프로미스가 실패할 경우, 어떤 것들이 실패했는지 알기 어렵습니다.
효율성: 모든 프로미스가 병렬로 실행되기 때문에, 리소스에 대한 부담이 클 수 있습니다. 특히, 네트워크 요청이나 디스크 I/O와 같은 리소스 집약적 작업을 많이 처리해야 할 때 문제가 될 수 있습니다.
이러한 단점을 극복하기 위해 여러 대안이 있습니다:
Promise.allSettled
: ES2020에서 도입된 Promise.allSettled
는 모든 프로미스가 이행되거나 거부될 때까지 기다립니다. 각 프로미스의 결과(성공 또는 실패)를 배열로 반환하여, 어떤 프로미스가 성공했고 어떤 프로미스가 실패했는지 알 수 있게 해줍니다.
개별 오류 처리: 각 프로미스에 대해 개별적으로 오류 처리 로직을 추가하여, 오류가 발생해도 다른 프로미스의 실행에 영향을 미치지 않도록 할 수 있습니다.
분할 실행: 프로미스를 몇 개의 그룹으로 나누어 순차적 또는 부분적으로 병렬로 실행하여, 시스템에 대한 부담을 줄일 수 있습니다.
Promise.all
의 사용에 있어서 이러한 단점과 대안을 고려하는 것이 중요합니다.
async processKey(key: string) {
const retainCount = 50; // 유지할 메시지의 수를 정의합니다. 이 경우는 50개입니다.
// 주어진 key에 해당하는 Redis 리스트에서 0부터 -retainCount-1까지의 범위에 있는 모든 요소를 조회합니다.
// 여기서 -retainCount-1은 끝에서부터 50개의 요소를 제외한 나머지 모든 요소를 의미합니다.
const messages = await this.redisDataClient.lrange(
key,
0,
-retainCount - 1,
);
// 조회한 메시지가 존재하는 경우에만 다음 작업을 수행합니다.
if (messages.length > 0) {
// key를 ':'을 기준으로 분리하여 채널 타입과 방 ID를 추출합니다.
const [channelType, roomId] = key.split(':');
// 조회한 메시지, 채널 타입, 방 ID, 그리고 원본 key를 사용하여 메시지를 데이터베이스에 저장하는 함수를 호출합니다.
await this.saveMessagesToDatabase(messages, channelType, +roomId, key);
// 이후, Redis 리스트에서 끝에서부터 50개를 제외하고 나머지 부분을 삭제합니다.
// 이는 리스트의 크기를 50으로 유지하기 위함입니다.
await this.redisDataClient.ltrim(key, -retainCount, -1);
}
}
리스트를 50개로 유지하는 이유는 다음과 같은 이점들 때문일 수 있습니다:
메모리 관리: Redis는 메모리 내 데이터 스토리지 시스템입니다. 따라서, 데이터의 양을 적절히 제한함으로써 메모리 사용량을 효율적으로 관리할 수 있습니다. 리스트의 크기를 제한함으로써, 메모리 공간을 절약하고 성능 저하를 방지할 수 있습니다.
데이터 관련성 유지: 특히 채팅방 같은 실시간 통신 환경에서는 최신 메시지가 가장 중요할 수 있습니다. 오래된 메시지는 덜 중요하거나 관련성이 낮아질 수 있습니다. 따라서, 최신 50개의 메시지만 유지함으로써 사용자에게 가장 관련성 높고 중요한 정보를 제공할 수 있습니다.
성능 최적화: Redis에서 리스트의 크기가 커지면, 데이터를 읽고 쓰는데 걸리는 시간도 늘어날 수 있습니다. 특정 크기로 리스트를 제한함으로써, 성능을 일정 수준 이상으로 유지할 수 있습니다. 이는 데이터 조회와 처리 속도를 빠르게 유지하는 데 도움이 됩니다.
비용 절감: 클라우드 기반 또는 메모리 기반 데이터베이스의 경우, 사용하는 메모리 양에 따라 비용이 발생할 수 있습니다. 데이터를 효과적으로 관리함으로써, 불필요한 비용을 줄일 수 있습니다.
데이터 보존 정책: 특정 비즈니스 또는 어플리케이션 요구 사항에 따라, 최신 데이터만을 보존하는 정책을 적용할 수 있습니다. 예를 들어, 법적 또는 개인정보 보호 요건으로 인해 데이터 보존 기간을 제한해야 할 수도 있습니다.
// 데이터베이스에 메시지를 저장하는 비동기 함수 정의
async saveMessagesToDatabase(
messages: string[], // 메시지 배열
channelType: string, // 채널 타입
roomId: number, // 방 번호
redisKey: string, // 레디스 키
) {
// 채널 채팅 리포지토리 변수 선언
let channelChatsRepository: Repository<any>;
// 채팅 엔티티 클래스 변수 선언
let ChatEntity:
| typeof TrialsChat
| typeof HumorsChat
| typeof PolticalsChat;
// 채널 타입에 따라 적절한 리포지토리와 엔티티 할당
switch (channelType) {
case 'trials':
channelChatsRepository = this.trialsChatRepository;
ChatEntity = TrialsChat;
break;
case 'humors':
channelChatsRepository = this.humorsChatRepository;
ChatEntity = HumorsChat;
break;
case 'polticals':
channelChatsRepository = this.polticalsChatRepository;
ChatEntity = PolticalsChat;
break;
}
// 채팅 객체를 저장할 배열 선언
const chats = [];
// 메시지 배열을 순회하며 각 메시지를 처리
for (const message of messages) {
const parsedMessage = JSON.parse(message); // 메시지 JSON 파싱
// 사용자 정보 조회
const user = await this.usersInfoRepository.findOne({
where: { id: parsedMessage.userId }, // 사용자 ID로 조회
select: ['nickName'], // 닉네임만 선택
});
// 새 채팅 엔티티 인스턴스 생성
const chat = new ChatEntity();
chat.message = parsedMessage.message; // 메시지 내용 할당
chat.userId = parsedMessage.userId; // 사용자 ID 할당
chat.roomId = roomId; // 방 번호 할당
chat.timestamp = new Date(parsedMessage.timestamp); // 타임스탬프 할당
chat.userName = user ? user.nickName : 'Unknown User'; // 사용자 닉네임 할당 (없을 경우 'Unknown User')
// 처리된 채팅 객체를 배열에 추가
chats.push(chat);
}
// 쿼리 러너 생성 및 연결
const queryRunner = this.dataSource.createQueryRunner();
await queryRunner.connect();
// 트랜잭션 시작
await queryRunner.startTransaction();
try {
// 채팅 데이터 일괄 저장
await channelChatsRepository.save(chats);
// 트랜잭션 커밋
await queryRunner.commitTransaction();
} catch (error) {
// 에러 발생 시 롤백
await queryRunner.rollbackTransaction();
} finally {
// 쿼리 러너 해제
await queryRunner.release();
}
}
이 함수는 주어진 메시지 배열을 처리하여 데이터베이스에 저장하는 역할을 합니다. 각 메시지는 JSON 형태로 되어 있으며, 채널 타입에 따라 다른 테이블(엔티티)에 저장됩니다. 사용자 정보는 별도의 리포지토리를 통해 조회하고, 모든 데이터베이스 작업은 트랜잭션 내에서 수행됩니다. 이를 통해 데이터 일관성과 안정성을 보장합니다.
// 채널 채팅을 생성하는 비동기 함수 정의
async createChannelChat(
channelType: string, // 채널 타입 (예: 'public', 'private' 등)
userId: number, // 사용자 ID
message: string, // 채팅 메시지 내용
roomId: number, // 채팅이 발생하는 방의 ID
) {
try {
// 사용자 정보를 조회하기 위한 데이터베이스 쿼리 실행
const user = await this.usersInfoRepository.findOne({
where: { id: userId }, // 조건: 사용자 ID가 일치하는 사용자
select: ['nickName'], // 조회할 필드: 닉네임
});
// 새로운 채팅 객체 생성
const chat = new Chat();
chat.message = message; // 메시지 내용 설정
chat.userId = userId; // 사용자 ID 설정
chat.RoomId = roomId; // 방 ID 설정
chat.timestamp = new Date(); // 현재 시간으로 타임스탬프 설정
chat.userName = user.nickName; // 조회한 사용자의 닉네임 설정
// 채팅 키 생성 (예: 'public:123')
const chatKey = `${channelType}:${roomId}`;
// 채팅 객체를 문자열로 변환
const chatValue = JSON.stringify(chat);
// Redis 리스트에 채팅 데이터 추가 (오른쪽에 푸시)
await this.redisDataClient.rpush(chatKey, chatValue);
// 해당 채팅 키의 만료 시간을 설정 (48시간 후 만료)
await this.redisDataClient.expire(chatKey, 60 * 60 * 24 * 2);
// 채팅 키에 대해 채팅 데이터를 발행 (구독자에게 메시지 전달)
await this.redisDataClient.publish(chatKey, chatValue);
// 사용자 정보 반환
return user;
} catch (error) {
// 에러 발생 시 에러를 밖으로 던짐
throw error;
}
}
이 함수는 주어진 채널 타입, 사용자 ID, 메시지, 방 ID를 기반으로 채팅 메시지를 생성하고, 이를 Redis에 저장 및 발행하는 역할을 합니다. 이를 통해 실시간으로 채팅 메시지를 다룰 수 있으며, Redis의 만료 기능을 사용해 데이터가 일정 시간 후 자동으로 삭제되도록 설정할 수 있습니다.
// 채널의 채팅 메시지를 조회하는 함수
async getChannel(
channelType: string, // 채널 타입 (예: 'trials', 'humors', 'polticals')
roomId: number, // 방 ID
page: number = 0, // 페이지 번호, 기본값은 0
limit: number = 50, // 페이지 당 메시지 수, 기본값은 50
): Promise<Chat[]> { // 반환 타입은 Chat 객체의 배열
// Redis에서 채널 키를 생성합니다. 예: 'trials:1'
const channelKey = `${channelType}:${roomId}`;
// 해당 채널 키에 대한 메시지의 총 개수를 Redis에서 조회합니다.
const redisMessageCount = await this.redisDataClient.llen(channelKey);
let chats = []; // 조회된 채팅 메시지를 저장할 배열
try {
// 첫 페이지 요청인 경우, Redis에서 모든 메시지를 조회합니다.
if (page === 0) {
const chatMessages = await this.redisDataClient.lrange(
channelKey,
0,
-1,
);
// 조회된 메시지들을 JSON 형태로 파싱하여 chats 배열에 추가합니다.
if (chatMessages.length > 0) {
chats = chatMessages.map((message) => JSON.parse(message));
}
}
// Redis에 저장된 메시지가 페이지 당 제한보다 적거나, 페이지 요청이 첫 페이지가 아닌 경우
if (redisMessageCount < limit || page > 0) {
let channelChatsRepository: Repository<any>;
// 채널 타입에 따라 다른 데이터베이스 리포지토리를 선택합니다.
switch (channelType) {
case 'trials':
channelChatsRepository = this.trialsChatRepository;
break;
case 'humors':
channelChatsRepository = this.humorsChatRepository;
break;
case 'polticals':
channelChatsRepository = this.polticalsChatRepository;
break;
}
// 데이터베이스에서 조회할 시작 인덱스를 계산합니다.
const dbStartIndex =
page > 0 ? page * limit - Math.min(redisMessageCount, limit) : 0;
// 데이터베이스에서 채팅 메시지를 조회합니다.
const dbChatMessages = await channelChatsRepository.find({
where: { roomId: roomId },
order: { timestamp: 'ASC' },
skip: dbStartIndex,
take: limit,
});
// 첫 페이지 요청이고, Redis에서 이미 메시지를 조회했던 경우, 두 결과를 합칩니다.
if (page === 0 && chats.length > 0) {
chats = [...chats, ...dbChatMessages];
} else {
chats = dbChatMessages;
}
}
return chats; // 최종 조회된 채팅 메시지 배열을 반환합니다.
} catch (error) {
throw error; // 에러 발생 시 에러를 밖으로 던집니다.
}
}
// 특정 방에 이벤트를 발행하는 함수
async publishEvent(roomId: number, event: string, data: any) {
// 이벤트 발행을 위한 채널 이름을 생성합니다. 예: 'roomEvents:1'
const channelName = `roomEvents:${roomId}`;
// 생성된 채널 이름으로 이벤트와 데이터를 함께 JSON 형태로 발행합니다.
await this.redisDataClient.publish(
channelName,
JSON.stringify({ event, data }),
);
}
// 비동기 함수로, 특정 방의 이벤트를 Redis 채널에 발행합니다.
async publishEvent(roomId: number, event: string, data: any) {
// Redis에서 사용할 채널 이름을 생성합니다.
// 각 방마다 고유한 채널을 가집니다. 예: "roomEvents:1", "roomEvents:2"
const channelName = `roomEvents:${roomId}`;
// Redis 클라이언트를 사용하여, 생성한 채널에 이벤트 정보를 발행합니다.
// 이벤트 정보는 'event' 이름과 'data'를 포함하는 객체를 JSON 문자열로 변환하여 전달합니다.
// 이를 통해, 이 채널을 구독하는 다른 클라이언트들은 이 정보를 실시간으로 수신할 수 있습니다.
await this.redisDataClient.publish(
channelName,
JSON.stringify({ event, data }),
);
}