Prisma Extension을 활용한 대량 데이터 삽입 (feat. AsyncIterator)

June·2024년 6월 16일
0
post-thumbnail

1. 문제 상황

서비스를 운영하다보면 정산이나 통계의 배치 처리 등의 이유로 대량의 데이터를 삽입해야 하는 경우가 생깁니다. 이때, Prisma의 createMany 메서드를 사용하면 다수의 데이터를 쉽게 삽입할 수 있습니다. 하지만, createMany 메서드는 한 번에 너무 많은 데이터 삽입을 시도하면 성능 이슈가 발생하거나, Node.js 서비스 자체가 멈추는 문제가 발생할 수 있습니다.

문제를 해결하기 위해 아래 코드처럼 적절한 단위를 정해 createMany 메서드를 여러 번 나누어 호출하면서 데이터를 삽입하는 방법을 사용할 수 있습니다.

for await (const chunk of chunks) {
  await prisma.model.createMany({ data: chunk });
}

하지만 이 방법은 해당 로직이 필요한 모든 곳에 중복해서 작성해야 하고, 동시성 레벨을 조절하기 어렵다는 단점이 있습니다.

해당 포스트에서는 AsyncIterator를 활용하여 동시성 레벨을 조절하고, 작업을 환경에 맞게 나누어 처리하면서 안정적으로 데이터를 삽입하는 로직을 Prisma Extension 을 활용하여 모든 모델에 쉽게 적용하는 방법을 알아보겠습니다.

모든 예시 코드는 여기 에서 확인할 수 있습니다.

2. 동시성 레벨에 따라 작업을 순차적으로 처리하는 JobQueue 클래스 구현

JobQueue 클래스는 AsyncIterator를 활용하여 동시성 레벨에 따라 작업을 처리하는 클래스입니다. fp-ts를 사용하여 함수형 파이프라인으로 각 단계를 단계적으로 구현해보겠습니다.

2.1. 동시성 레벨에 따라 작업을 순차적으로 처리하는 JobQueue 클래스 선언

class JobQueue<T> {
  private constructor(private readonly concurrency: number) {}
  public static of<T>(concurrency: number): JobQueue<T> {
    return new JobQueue<T>(concurrency);
  }
  public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
    // 구현할 부분
  }
}

해당 클래스는 concurrency를 인자로 받아 JobQueue 인스턴스를 생성하고, execute 메서드에 전달된 작업(promise를 반환하는 함수)을 순차적으로 실행하면서 동시성 레벨에 따라 작업을 처리합니다.

2.2. 작업을 concurrency에 따라 나누기

먼저 인자로 전달 받은 작업 배열을 동시성 레벨(concurrency)에 따라 chunk로 나누어 줍니다.

public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
  return await pipe(
    jobs, // 1. 작업 목록
    chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
    //...
  )
}

2.3. 작업 청크를 각각 실행가능한 Task로 변환하기

이전 단계에서 나누어진 작업 청크를 각각 실행가능한 Task로 변환합니다.

  • fp-ts의 Task는 실행하기 전까지 실제로 아무 일도 하지 않는 (lazy) 함수형 스타일의 비동기 계산을 표현합니다.
  • 변환을 위해 fp-ts TasksequenceArray 함수를 사용합니다.
    해당 함수는 배열을 전달받아 배열의 각 요소를 Task로 변환한 후, 모든 Task를 순차적으로 실행할 수 있는 Task로 반환합니다.
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
  return await pipe(
    jobs, // 1. 작업 목록
    chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
    map(sequenceArray), // 3. 각 청크를 Task로 변환
    //...
  )
}

2.4. Task를 실행하고, 비동기 제너레이터로 변환하기

먼저 비동기 제너레이터를 순차적으로 실행하는 유틸리티 함수를 작성합니다.
아래 코드는 비동기 작업 목록을 순차적으로 실행하고 방출합니다.

export async function* runSequentially<T>(
  tasks: Array<Task<ReadonlyArray<T>>>,
): AsyncGenerator<ReadonlyArray<T>, void, unknown> {
  for (const task of tasks) {
    yield await task();
  }
}

이제 runSequentially 함수를 execute의 파이프라인에 추가합니다.

public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
  return await pipe(
    jobs, // 1. 작업 목록
    chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
    map(sequenceArray), // 3. 각 청크를 Task로 변환
    runSequentially, // 4. 각 청크 Task를 순차적으로 실행하고 결과를 방출
    //...
  )
}

2.5 AsyncIterable을 배열로 변환하기

이전 단계에서 만들어진 비동기 제너레이터가 방출하는 AsyncIterable을 배열로 변환하는 유틸리티 함수를 작성합니다.

export async function toArrayAsync<T>(iterable: AsyncIterable<T>) {
  const result: T[] = [];
  for await (const value of iterable) {
    result.push(value);
  }
  return result;
}

이제 toArrayAsync 함수를 execute의 파이프라인에 추가합니다.

public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
  return await pipe(
    jobs, // 1. 작업 목록
    chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
    map(sequenceArray), // 3. 각 청크를 Task로 변환
    runSequentially, // 4. 각 청크 Task를 순차적으로 실행하고 결과를 방출
    toArrayAsync, // 5. AsyncIterable을 배열로 변환
  )
}

2.6. 결과를 평탄화하여 반환

마지막으로 만들어진 배열을 평탄화하여 반환합니다.

public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
  return await pipe(
    jobs, // 1. 작업 목록
    chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
    map(sequenceArray), // 3. 각 청크를 Task로 변환
    runSequentially, // 4. 각 청크 Task를 순차적으로 실행하고 결과를 방출
    toArrayAsync, // 5. AsyncIterable을 배열로 변환
  ).then(flatten); // 6. 각 청크 결과를 하나의 배열로 평탄화하여 반환
}

3. Prisma Extension으로 JobQueue를 사용하여 대용량 데이터를 삽입하는 커스텀 메서드 추가하기

Prisma Extension은 Prisma 기본 기능을 Mixin 방식으로 확장하거나 변경할 수 있는 방법을 제공합니다.

Prisma Extension의 기본 사용법은 여기 를 참고해주세요.

3.1. 대용량 데이터를 삽입하는 메서드 시그니처 정의

Prisma.defineExtension을 사용하여 모든 모델에 대용량 데이터를 삽입하는 createBulk 메서드를 정의합니다.

기존 prisma의 createManyAndReturn 메서드와 동일한 시그니처를 사용하되, 추가적으로 bulkSizeconcurrency를 인자로 받습니다.

  • bulkSize: 한 번에 처리할 데이터의 양 (동시에 삽입되는 데이터의 양 = bulkSize * concurrency)
  • concurrency: 병렬로 실행할 작업의 수 (동시에 시작하는 트랜잭션의 수, 커넥션 풀의 크기보다 작아야 함)
const BulkCreateExtension = Prisma.defineExtension({
  name: 'bulk-insert',
  model: {
    $allModels: {
      /**'
       * 여러 개의 데이터를 한 번에 삽입하고 삽입된 데이터를 반환
       * @param args.bulkSize 한 번에 처리할 데이터의 양 (동시에 삽입되는 데이터의 양 = bulkSize * concurrency)
       * @param args.concurrency 병렬로 실행할 작업의 수 (동시에 시작하는 트랜잭션의 수, 커넥션 풀의 크기보다 작아야 함)
       *
       * ※ 주의: concurrency가 1 이상인 경우, 데이터의 삽입 순서가 보장되지 않음
       *
       */
      async createBulk<T, D>(
        this: T,
        args: Prisma.Exact<D, Prisma.Args<T, 'createManyAndReturn'>> & {
          concurrency?: number;
          bulkSize: number;
        },
      ): Promise<Prisma.Result<T, D, 'createManyAndReturn'>> {
        // 구현할 부분
      },
    },
  },
});

3.2. bulkSize에 따라 데이터를 나누어 job을 생성하는 함수 작성

bulkSize에 따라 데이터를 나누어 JobQueue에 전달할 작업 배열을 생성하는 함수를 작성합니다.

/**
 *
 * @summary `bulkSize` 만큼의 데이터를 한 번에 삽입하는 작업 배열을 생성
 * @param ctx 현제 모델의 컨텍스트
 * @param bulkSize 한 번에 처리할 데이터의 양
 * @param createManyAndReturnArgs createManyAndReturn 메서드의 인자
 * @returns 작업 배열
 */
function _createJobs(
  ctx: any,
  bulkSize: number,
  createManyAndReturnArgs: Prisma.Args<any, 'createManyAndReturn'>,
) {
  return pipe(createManyAndReturnArgs.data, chunksOf(bulkSize), (chunks) =>
    chunks.map(
      (chunk) => () =>
        ctx.$parent[ctx.$name].createManyAndReturn({
          data: chunk,
          select: createManyAndReturnArgs.select,
          skipDuplicates: createManyAndReturnArgs.skipDuplicates,
        }),
    ),
  );
}

3.3. 나누어진 작업을 JobQueue에 전달하여 실행하고 결과를 반환하도록 메서드 구현

async createBulk<T, D>(
    this: T,
    args: Prisma.Exact<D, Prisma.Args<T, 'createManyAndReturn'>> & {
        concurrency?: number;
        bulkSize: number;
    },
    ): Promise<Prisma.Result<T, D, 'createManyAndReturn'>> {
    const ctx = Prisma.getExtensionContext(this); // 현재 모델의 컨텍스트
    const { concurrency, bulkSize, ...rest } = args as any;

    const jobs = _createJobs(ctx, bulkSize, rest);

    const res = await IterableJobQueue.of(concurrency ?? 1)
        .execute(jobs)
        .then(flatten); // createManyAndReturn의 반환값이 배열의 배열이므로 flatten을 사용하여 1차원 배열로 변환

    // 타입 추론을 위해 as any 사용
    return res as Prisma.Result<T, D, 'createManyAndReturn'>;
},

3.4. Prisma Extension을 적용하여 모든 모델에 createBulk 메서드 추가

// Create a new PrismaClient instance
const prisma = new PrismaClient({
  log: ['query'],
});

// Extend the PrismaClient instance with the BulkInsertExtension
return prisma.$extends(BulkInsertExtension);

4. 사용 예시

4.1. 예시 코드

// 각 모델에 createBulk 메서드가 추가됨
const bulkCreateResult = await prisma.user.createBulk({
  data: Array.from({ length }).map((_, i) => ({
    email: `email ${i}`,
    name: `name ${i}`,
    password: `password ${i}`,
  })),
  select: { id: true },
  concurrency: 10, // 병렬로 실행할 작업의 수
  bulkSize: 100000, // 한 번에 처리할 데이터의 양
});

  • 위 사진처럼 prisma extension을 통해 추가된 메서드(createBulk)는 ide에서도 인식됩니다.

4.2. 실험 결과

10,000,000 건의 삽입 요청 시
concurrency : 10, bulkSize : 100000

  • 전체 실행 시간: 1:11.499 (m:ss.mmm)

concurrency: 1, bulkSize: 100000

  • 전체 실행 시간: 2:41.523 (m:ss.mmm)

5. 결론

이번 포스트에서는 Prisma를 활용한 대량 데이터 삽입 문제를 해결하기 위한 방법으로 함수형 방식과 이터러블을 통해 동시성 작업을 위한 유틸리티 클래스(JobQueue)를 구현하고, 해당 유틸을 통해 Prisma Extension을 사용하여 대용량 데이터를 삽입할 수 있는 커스텀 메서드를 추가하는 방법을 알아보았습니다.

이 방법을 통해 다음과 같은 이점을 얻을 수 있습니다

  • 성능 향상: 동시성 레벨을 조절하여 데이터 삽입 시 성능을 최적화할 수 있습니다.
  • 안정성: 대량의 데이터를 한 번에 삽입하는 대신 적절한 단위로 나누어 삽입함으로써 Node.js 서비스의 안정성을 높일 수 있습니다.
  • 재사용성: Prisma Extension을 사용하여 공통 로직을 모든 모델에 쉽게 적용할 수 있어 코드의 중복을 줄이고 유지보수성을 높일 수 있습니다.

0개의 댓글

관련 채용 정보