TIL | [Redis] 팀플 코드 리뷰

bubblegum·2024년 4월 14일
0

Today I learn(TIL)

목록 보기
74/84
post-thumbnail
@Inject('REDIS_DATA_CLIENT') private redisDataClient: Redis, // Redis 데이터 클라이언트를 주입
@Inject('REDIS_SUB_CLIENT') private redisSubClient: Redis,
  • @Inject('REDIS_DATA_CLIENT'): REDIS_DATA_CLIENT라는 토큰(또는 식별자)에 연결된 의존성을 현재 클래스의 redisDataClient 프로퍼티에 주입하라는 지시입니다. 주입되는 객체는 Redis 데이터를 처리하기 위한 클라이언트입니다. 이 클라이언트를 사용하여 Redis 데이터베이스에 데이터를 저장하거나 조회하는 등의 작업을 수행할 수 있습니다.

  • @Inject('REDIS_SUB_CLIENT'): REDIS_SUB_CLIENT라는 토큰에 연결된 의존성을 현재 클래스의 redisSubClient 프로퍼티에 주입하라는 지시입니다. 이 클라이언트는 Redis의 Publish/Subscribe 기능을 사용하여 메시지를 구독하는 데 사용됩니다. 예를 들어, 특정 채널에서 발행된 메시지를 실시간으로 받아 처리할 수 있습니다.

Redis에서 발행/구독(pub/sub) 기능을 사용할 때, 발행하는 클라이언트(publisher)와 구독하는 클라이언트(subscriber) 사이에는 기능적인 차이가 없습니다. Redis 클라이언트는 발행과 구독 모두를 수행할 수 있으며, 이는 Redis 클라이언트의 인스턴스가 동시에 여러 역할을 할 수 있음을 의미합니다.

redisDataClientredisSubClient라는 두 개의 클라이언트 인스턴스를 사용하는 이유는, 발행과 구독 작업을 분리하여 처리하기 위함입니다. 이 구분은 다음과 같은 이유로 유용합니다:

  1. 역할 분리: 애플리케이션 내에서 데이터 관리(예: 데이터 저장, 조회 등)와 메시지 시스템(발행/구독)을 명확히 분리하고자 할 때, 별도의 클라이언트 인스턴스를 사용하는 것이 좋습니다. 이렇게 하면 코드의 가독성과 유지보수성이 향상됩니다.

  2. 성능 최적화: 발행과 구독에 다른 Redis 연결(클라이언트 인스턴스)을 사용하면, 각각의 작업에 최적화된 연결 설정을 적용할 수 있습니다. 예를 들어, 구독자 클라이언트는 주로 메시지 수신에 집중할 수 있도록, 발행자 클라이언트는 데이터 전송에 최적화될 수 있습니다.

  3. 자원 관리: Redis 클라이언트가 구독 모드에 들어가면, 해당 연결은 오직 구독 관련 명령어(SUBSCRIBE, UNSUBSCRIBE, PING 등)만 수행할 수 있습니다. 따라서, 데이터 관련 작업과 메시지 시스템을 동시에 처리하려면, 별도의 클라이언트 인스턴스가 필요합니다.

위의 코드에서 redisDataClient에서 publish 메서드를 사용하는 것은, 이 클라이언트가 데이터 관련 작업뿐만 아니라 메시지 발행 역할도 수행할 수 있기 때문입니다. 반면, redisSubClient는 구독 작업에 집중되어 있을 가능성이 큽니다. 이렇게 함으로써, 각 클라이언트는 자신의 주 역할에 집중하면서도 필요에 따라 다른 기능을 수행할 수 있는 유연성을 갖게 됩니다.

async publishNotification(message: string) {
	const channelName = 'notifications';
	await this.redisDataClient.publish(channelName, message);
}

메시지를 지정된 채널에 메시지를 "발행"(publish)합니다. Redis의 발행/구독(Publish/Subscribe, 또는 pub/sub) 시스템을 이용하는 것인데요, 이 시스템은 메시지 브로커 역할을 합니다.

Redis의 발행/구독(pub/sub) 시스템의 작동 방식:

  • 발행(Publish): 특정 채널에 메시지를 발행합니다. 이 예시에서는 notifications 채널에 문자열 message를 발행하고 있습니다. 발행된 메시지는 이 채널을 구독하고 있는 모든 클라이언트에게 전달됩니다.
  • 구독(Subscribe): 하나 또는 여러 채널을 구독하고, 해당 채널에 발행되는 메시지를 실시간으로 받습니다. 이 경우 다른 클라이언트(예: redisSubClient)가 notifications 채널을 구독하고 있다면, publishNotification 함수를 통해 발행된 메시지를 받을 수 있습니다.

이 시스템은 실시간 알림, 채팅 시스템, 실시간 피드 업데이트 등 다양한 실시간 메시징 요구사항에 활용됩니다. 중요한 점은, 발행된 메시지는 영구 저장되지 않고, 채널을 구독하는 클라이언트에게만 전달된다는 것입니다. 따라서, 메시지를 영구적으로 저장하려면 다른 방법을 사용해야 합니다.

@Interval(1000 * 60)
  async handleScheduledTasks() {
    // await this.redisConnection();

    for (const channelType of Object.values(ChannelType)) {
      let cursor = '0';
      const tasks = [];

      do {
        const [nextCursor, keys] = await this.redisDataClient.scan(
          cursor,
          'MATCH',
          `${channelType}:*`,
          'COUNT',
          100,
        );
        cursor = nextCursor;

        if (keys.length === 0) continue;

        for (const key of keys) {
          tasks.push(this.processKey(key));

          if (tasks.length >= 500) {
            await Promise.all(tasks.splice(0, 500));
          }
        }
      } while (cursor !== '0');

      await Promise.all(tasks);
    }
  }

@Interval(1000 * 60) 데코레이터는 이 함수가 60초(1000밀리초 * 60)마다 한 번씩 자동으로 실행되도록 설정합니다. 이는 주기적인 작업을 자동화하는 데 사용됩니다.

코드 내부에서는 특정 조건을 만족하는 Redis 키를 찾는 과정을 반복적으로 수행합니다. 각 채널 유형(ChannelType)에 대해 다음과 같은 작업을 수행합니다:

  1. redisDataClient.scan 메서드를 사용하여 지정된 패턴(${channelType}:*)에 맞는 키를 찾습니다. 이 메서드는 커서 기반의 반복을 사용하여 데이터베이스 내에서 키를 검색합니다. MATCH 옵션은 검색할 키의 패턴을 지정하고, COUNT 옵션은 한 번의 호출에서 반환될 수 있는 최대 키의 수를 지정합니다.

  2. 검색된 키가 있으면, 각 키에 대해 processKey 함수를 호출하여 처리합니다. 이 함수는 실제 키 처리 로직을 구현할 것으로 추정됩니다.

  3. 한 번에 많은 양의 작업(tasks)을 처리하기 위해, 작업 목록에 500개의 작업이 쌓이면 Promise.all을 사용하여 병렬로 처리합니다. 이는 성능 최적화를 위한 것으로 보입니다.

  4. 모든 키를 처리한 후에 남아 있는 작업들도 마찬가지로 Promise.all을 사용하여 처리합니다.

  5. 모든 채널 유형에 대해 위의 과정을 반복합니다.

Promise.all은 여러 프로미스를 병렬로 처리할 때 매우 유용하지만 몇 가지 주의해야 할 단점이 있습니다. 가장 큰 단점 중 하나는 모든 프로미스가 성공적으로 이행(resolve)될 때까지 기다리지만, 하나라도 거부(reject)되면 즉시 전체가 거부되고 나머지 성공적인 결과들에 대한 정보를 잃게 된다는 점입니다.

이는 다음과 같은 상황에서 문제가 될 수 있습니다:

  1. 완전성: 여러 작업 중 하나라도 실패하면 Promise.all은 즉시 거부되고, 성공한 나머지 작업의 결과를 얻을 수 없습니다. 즉, 모든 작업의 결과가 중요한 경우 하나의 실패로 인해 유용한 정보를 잃을 수 있습니다.

  2. 오류 처리: Promise.all은 첫 번째 거부된 프로미스만 포착하고 나머지는 무시합니다. 따라서 여러 프로미스가 실패할 경우, 어떤 것들이 실패했는지 알기 어렵습니다.

  3. 효율성: 모든 프로미스가 병렬로 실행되기 때문에, 리소스에 대한 부담이 클 수 있습니다. 특히, 네트워크 요청이나 디스크 I/O와 같은 리소스 집약적 작업을 많이 처리해야 할 때 문제가 될 수 있습니다.

이러한 단점을 극복하기 위해 여러 대안이 있습니다:

  • Promise.allSettled: ES2020에서 도입된 Promise.allSettled는 모든 프로미스가 이행되거나 거부될 때까지 기다립니다. 각 프로미스의 결과(성공 또는 실패)를 배열로 반환하여, 어떤 프로미스가 성공했고 어떤 프로미스가 실패했는지 알 수 있게 해줍니다.

  • 개별 오류 처리: 각 프로미스에 대해 개별적으로 오류 처리 로직을 추가하여, 오류가 발생해도 다른 프로미스의 실행에 영향을 미치지 않도록 할 수 있습니다.

  • 분할 실행: 프로미스를 몇 개의 그룹으로 나누어 순차적 또는 부분적으로 병렬로 실행하여, 시스템에 대한 부담을 줄일 수 있습니다.

Promise.all의 사용에 있어서 이러한 단점과 대안을 고려하는 것이 중요합니다.

async processKey(key: string) {
    const retainCount = 50; // 유지할 메시지의 수를 정의합니다. 이 경우는 50개입니다.

    // 주어진 key에 해당하는 Redis 리스트에서 0부터 -retainCount-1까지의 범위에 있는 모든 요소를 조회합니다.
    // 여기서 -retainCount-1은 끝에서부터 50개의 요소를 제외한 나머지 모든 요소를 의미합니다.
    const messages = await this.redisDataClient.lrange(
      key,
      0,
      -retainCount - 1,
    );

    // 조회한 메시지가 존재하는 경우에만 다음 작업을 수행합니다.
    if (messages.length > 0) {
      // key를 ':'을 기준으로 분리하여 채널 타입과 방 ID를 추출합니다.
      const [channelType, roomId] = key.split(':');

      // 조회한 메시지, 채널 타입, 방 ID, 그리고 원본 key를 사용하여 메시지를 데이터베이스에 저장하는 함수를 호출합니다.
      await this.saveMessagesToDatabase(messages, channelType, +roomId, key);

      // 이후, Redis 리스트에서 끝에서부터 50개를 제외하고 나머지 부분을 삭제합니다.
      // 이는 리스트의 크기를 50으로 유지하기 위함입니다.
      await this.redisDataClient.ltrim(key, -retainCount, -1);
    }
  }

리스트를 50개로 유지하는 이유는 다음과 같은 이점들 때문일 수 있습니다:

  1. 메모리 관리: Redis는 메모리 내 데이터 스토리지 시스템입니다. 따라서, 데이터의 양을 적절히 제한함으로써 메모리 사용량을 효율적으로 관리할 수 있습니다. 리스트의 크기를 제한함으로써, 메모리 공간을 절약하고 성능 저하를 방지할 수 있습니다.

  2. 데이터 관련성 유지: 특히 채팅방 같은 실시간 통신 환경에서는 최신 메시지가 가장 중요할 수 있습니다. 오래된 메시지는 덜 중요하거나 관련성이 낮아질 수 있습니다. 따라서, 최신 50개의 메시지만 유지함으로써 사용자에게 가장 관련성 높고 중요한 정보를 제공할 수 있습니다.

  3. 성능 최적화: Redis에서 리스트의 크기가 커지면, 데이터를 읽고 쓰는데 걸리는 시간도 늘어날 수 있습니다. 특정 크기로 리스트를 제한함으로써, 성능을 일정 수준 이상으로 유지할 수 있습니다. 이는 데이터 조회와 처리 속도를 빠르게 유지하는 데 도움이 됩니다.

  4. 비용 절감: 클라우드 기반 또는 메모리 기반 데이터베이스의 경우, 사용하는 메모리 양에 따라 비용이 발생할 수 있습니다. 데이터를 효과적으로 관리함으로써, 불필요한 비용을 줄일 수 있습니다.

  5. 데이터 보존 정책: 특정 비즈니스 또는 어플리케이션 요구 사항에 따라, 최신 데이터만을 보존하는 정책을 적용할 수 있습니다. 예를 들어, 법적 또는 개인정보 보호 요건으로 인해 데이터 보존 기간을 제한해야 할 수도 있습니다.

// 데이터베이스에 메시지를 저장하는 비동기 함수 정의
async saveMessagesToDatabase(
    messages: string[], // 메시지 배열
    channelType: string, // 채널 타입
    roomId: number, // 방 번호
    redisKey: string, // 레디스 키
  ) {
    // 채널 채팅 리포지토리 변수 선언
    let channelChatsRepository: Repository<any>;
    // 채팅 엔티티 클래스 변수 선언
    let ChatEntity:
      | typeof TrialsChat
      | typeof HumorsChat
      | typeof PolticalsChat;

    // 채널 타입에 따라 적절한 리포지토리와 엔티티 할당
    switch (channelType) {
      case 'trials':
        channelChatsRepository = this.trialsChatRepository;
        ChatEntity = TrialsChat;
        break;
      case 'humors':
        channelChatsRepository = this.humorsChatRepository;
        ChatEntity = HumorsChat;
        break;
      case 'polticals':
        channelChatsRepository = this.polticalsChatRepository;
        ChatEntity = PolticalsChat;
        break;
    }

    // 채팅 객체를 저장할 배열 선언
    const chats = [];
    // 메시지 배열을 순회하며 각 메시지를 처리
    for (const message of messages) {
      const parsedMessage = JSON.parse(message); // 메시지 JSON 파싱
      // 사용자 정보 조회
      const user = await this.usersInfoRepository.findOne({
        where: { id: parsedMessage.userId }, // 사용자 ID로 조회
        select: ['nickName'], // 닉네임만 선택
      });

      // 새 채팅 엔티티 인스턴스 생성
      const chat = new ChatEntity();
      chat.message = parsedMessage.message; // 메시지 내용 할당
      chat.userId = parsedMessage.userId; // 사용자 ID 할당
      chat.roomId = roomId; // 방 번호 할당
      chat.timestamp = new Date(parsedMessage.timestamp); // 타임스탬프 할당
      chat.userName = user ? user.nickName : 'Unknown User'; // 사용자 닉네임 할당 (없을 경우 'Unknown User')

      // 처리된 채팅 객체를 배열에 추가
      chats.push(chat);
    }

    // 쿼리 러너 생성 및 연결
    const queryRunner = this.dataSource.createQueryRunner();
    await queryRunner.connect();
    // 트랜잭션 시작
    await queryRunner.startTransaction();

    try {
      // 채팅 데이터 일괄 저장
      await channelChatsRepository.save(chats);
      // 트랜잭션 커밋
      await queryRunner.commitTransaction();
    } catch (error) {
      // 에러 발생 시 롤백
      await queryRunner.rollbackTransaction();
    } finally {
      // 쿼리 러너 해제
      await queryRunner.release();
    }
  }

이 함수는 주어진 메시지 배열을 처리하여 데이터베이스에 저장하는 역할을 합니다. 각 메시지는 JSON 형태로 되어 있으며, 채널 타입에 따라 다른 테이블(엔티티)에 저장됩니다. 사용자 정보는 별도의 리포지토리를 통해 조회하고, 모든 데이터베이스 작업은 트랜잭션 내에서 수행됩니다. 이를 통해 데이터 일관성과 안정성을 보장합니다.

// 채널 채팅을 생성하는 비동기 함수 정의
async createChannelChat(
    channelType: string, // 채널 타입 (예: 'public', 'private' 등)
    userId: number, // 사용자 ID
    message: string, // 채팅 메시지 내용
    roomId: number, // 채팅이 발생하는 방의 ID
  ) {
    try {
      // 사용자 정보를 조회하기 위한 데이터베이스 쿼리 실행
      const user = await this.usersInfoRepository.findOne({
        where: { id: userId }, // 조건: 사용자 ID가 일치하는 사용자
        select: ['nickName'], // 조회할 필드: 닉네임
      });
      // 새로운 채팅 객체 생성
      const chat = new Chat();
      chat.message = message; // 메시지 내용 설정
      chat.userId = userId; // 사용자 ID 설정
      chat.RoomId = roomId; // 방 ID 설정
      chat.timestamp = new Date(); // 현재 시간으로 타임스탬프 설정
      chat.userName = user.nickName; // 조회한 사용자의 닉네임 설정

      // 채팅 키 생성 (예: 'public:123')
      const chatKey = `${channelType}:${roomId}`;
      // 채팅 객체를 문자열로 변환
      const chatValue = JSON.stringify(chat);

      // Redis 리스트에 채팅 데이터 추가 (오른쪽에 푸시)
      await this.redisDataClient.rpush(chatKey, chatValue);
      // 해당 채팅 키의 만료 시간을 설정 (48시간 후 만료)
      await this.redisDataClient.expire(chatKey, 60 * 60 * 24 * 2);

      // 채팅 키에 대해 채팅 데이터를 발행 (구독자에게 메시지 전달)
      await this.redisDataClient.publish(chatKey, chatValue);

      // 사용자 정보 반환
      return user;
    } catch (error) {
      // 에러 발생 시 에러를 밖으로 던짐
      throw error;
    }
  }

이 함수는 주어진 채널 타입, 사용자 ID, 메시지, 방 ID를 기반으로 채팅 메시지를 생성하고, 이를 Redis에 저장 및 발행하는 역할을 합니다. 이를 통해 실시간으로 채팅 메시지를 다룰 수 있으며, Redis의 만료 기능을 사용해 데이터가 일정 시간 후 자동으로 삭제되도록 설정할 수 있습니다.

// 채널의 채팅 메시지를 조회하는 함수
async getChannel(
    channelType: string, // 채널 타입 (예: 'trials', 'humors', 'polticals')
    roomId: number, // 방 ID
    page: number = 0, // 페이지 번호, 기본값은 0
    limit: number = 50, // 페이지 당 메시지 수, 기본값은 50
  ): Promise<Chat[]> { // 반환 타입은 Chat 객체의 배열
    
    // Redis에서 채널 키를 생성합니다. 예: 'trials:1'
    const channelKey = `${channelType}:${roomId}`;
    // 해당 채널 키에 대한 메시지의 총 개수를 Redis에서 조회합니다.
    const redisMessageCount = await this.redisDataClient.llen(channelKey);

    let chats = []; // 조회된 채팅 메시지를 저장할 배열

    try {
      // 첫 페이지 요청인 경우, Redis에서 모든 메시지를 조회합니다.
      if (page === 0) {
        const chatMessages = await this.redisDataClient.lrange(
          channelKey,
          0,
          -1,
        );
        // 조회된 메시지들을 JSON 형태로 파싱하여 chats 배열에 추가합니다.
        if (chatMessages.length > 0) {
          chats = chatMessages.map((message) => JSON.parse(message));
        }
      }

      // Redis에 저장된 메시지가 페이지 당 제한보다 적거나, 페이지 요청이 첫 페이지가 아닌 경우
      if (redisMessageCount < limit || page > 0) {
        let channelChatsRepository: Repository<any>;
        // 채널 타입에 따라 다른 데이터베이스 리포지토리를 선택합니다.
        switch (channelType) {
          case 'trials':
            channelChatsRepository = this.trialsChatRepository;
            break;
          case 'humors':
            channelChatsRepository = this.humorsChatRepository;
            break;
          case 'polticals':
            channelChatsRepository = this.polticalsChatRepository;
            break;
        }

        // 데이터베이스에서 조회할 시작 인덱스를 계산합니다.
        const dbStartIndex =
          page > 0 ? page * limit - Math.min(redisMessageCount, limit) : 0;

        // 데이터베이스에서 채팅 메시지를 조회합니다.
        const dbChatMessages = await channelChatsRepository.find({
          where: { roomId: roomId },
          order: { timestamp: 'ASC' },
          skip: dbStartIndex,
          take: limit,
        });

        // 첫 페이지 요청이고, Redis에서 이미 메시지를 조회했던 경우, 두 결과를 합칩니다.
        if (page === 0 && chats.length > 0) {
          chats = [...chats, ...dbChatMessages];
        } else {
          chats = dbChatMessages;
        }
      }

      return chats; // 최종 조회된 채팅 메시지 배열을 반환합니다.
    } catch (error) {
      throw error; // 에러 발생 시 에러를 밖으로 던집니다.
    }
  }

// 특정 방에 이벤트를 발행하는 함수
async publishEvent(roomId: number, event: string, data: any) {
    // 이벤트 발행을 위한 채널 이름을 생성합니다. 예: 'roomEvents:1'
    const channelName = `roomEvents:${roomId}`;
    // 생성된 채널 이름으로 이벤트와 데이터를 함께 JSON 형태로 발행합니다.
    await this.redisDataClient.publish(
      channelName,
      JSON.stringify({ event, data }),
    );
}
// 비동기 함수로, 특정 방의 이벤트를 Redis 채널에 발행합니다.
async publishEvent(roomId: number, event: string, data: any) {
  // Redis에서 사용할 채널 이름을 생성합니다. 
  // 각 방마다 고유한 채널을 가집니다. 예: "roomEvents:1", "roomEvents:2"
  const channelName = `roomEvents:${roomId}`;

  // Redis 클라이언트를 사용하여, 생성한 채널에 이벤트 정보를 발행합니다.
  // 이벤트 정보는 'event' 이름과 'data'를 포함하는 객체를 JSON 문자열로 변환하여 전달합니다.
  // 이를 통해, 이 채널을 구독하는 다른 클라이언트들은 이 정보를 실시간으로 수신할 수 있습니다.
  await this.redisDataClient.publish(
    channelName,
    JSON.stringify({ event, data }),
  );
}
profile
황세민

0개의 댓글

관련 채용 정보