
서비스를 운영하다보면 정산이나 통계의 배치 처리 등의 이유로 대량의 데이터를 삽입해야 하는 경우가 생깁니다. 이때, Prisma의 createMany 메서드를 사용하면 다수의 데이터를 쉽게 삽입할 수 있습니다. 하지만, createMany 메서드는 한 번에 너무 많은 데이터 삽입을 시도하면 성능 이슈가 발생하거나, Node.js 서비스 자체가 멈추는 문제가 발생할 수 있습니다.
문제를 해결하기 위해 아래 코드처럼 적절한 단위를 정해 createMany 메서드를 여러 번 나누어 호출하면서 데이터를 삽입하는 방법을 사용할 수 있습니다.
for await (const chunk of chunks) {
await prisma.model.createMany({ data: chunk });
}
하지만 이 방법은 해당 로직이 필요한 모든 곳에 중복해서 작성해야 하고, 동시성 레벨을 조절하기 어렵다는 단점이 있습니다.
해당 포스트에서는 AsyncIterator를 활용하여 동시성 레벨을 조절하고, 작업을 환경에 맞게 나누어 처리하면서 안정적으로 데이터를 삽입하는 로직을 Prisma Extension 을 활용하여 모든 모델에 쉽게 적용하는 방법을 알아보겠습니다.
모든 예시 코드는 여기 에서 확인할 수 있습니다.
JobQueue 클래스 구현JobQueue 클래스는 AsyncIterator를 활용하여 동시성 레벨에 따라 작업을 처리하는 클래스입니다. fp-ts를 사용하여 함수형 파이프라인으로 각 단계를 단계적으로 구현해보겠습니다.
JobQueue 클래스 선언class JobQueue<T> {
private constructor(private readonly concurrency: number) {}
public static of<T>(concurrency: number): JobQueue<T> {
return new JobQueue<T>(concurrency);
}
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
// 구현할 부분
}
}
해당 클래스는 concurrency를 인자로 받아 JobQueue 인스턴스를 생성하고, execute 메서드에 전달된 작업(promise를 반환하는 함수)을 순차적으로 실행하면서 동시성 레벨에 따라 작업을 처리합니다.
concurrency에 따라 나누기먼저 인자로 전달 받은 작업 배열을 동시성 레벨(concurrency)에 따라 chunk로 나누어 줍니다.
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
return await pipe(
jobs, // 1. 작업 목록
chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
//...
)
}
Task로 변환하기이전 단계에서 나누어진 작업 청크를 각각 실행가능한 Task로 변환합니다.
Task는 실행하기 전까지 실제로 아무 일도 하지 않는 (lazy) 함수형 스타일의 비동기 계산을 표현합니다.Task의 sequenceArray 함수를 사용합니다.Task로 변환한 후, 모든 Task를 순차적으로 실행할 수 있는 Task로 반환합니다.public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
return await pipe(
jobs, // 1. 작업 목록
chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
map(sequenceArray), // 3. 각 청크를 Task로 변환
//...
)
}
먼저 비동기 제너레이터를 순차적으로 실행하는 유틸리티 함수를 작성합니다.
아래 코드는 비동기 작업 목록을 순차적으로 실행하고 방출합니다.
export async function* runSequentially<T>(
tasks: Array<Task<ReadonlyArray<T>>>,
): AsyncGenerator<ReadonlyArray<T>, void, unknown> {
for (const task of tasks) {
yield await task();
}
}
이제 runSequentially 함수를 execute의 파이프라인에 추가합니다.
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
return await pipe(
jobs, // 1. 작업 목록
chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
map(sequenceArray), // 3. 각 청크를 Task로 변환
runSequentially, // 4. 각 청크 Task를 순차적으로 실행하고 결과를 방출
//...
)
}
이전 단계에서 만들어진 비동기 제너레이터가 방출하는 AsyncIterable을 배열로 변환하는 유틸리티 함수를 작성합니다.
export async function toArrayAsync<T>(iterable: AsyncIterable<T>) {
const result: T[] = [];
for await (const value of iterable) {
result.push(value);
}
return result;
}
이제 toArrayAsync 함수를 execute의 파이프라인에 추가합니다.
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
return await pipe(
jobs, // 1. 작업 목록
chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
map(sequenceArray), // 3. 각 청크를 Task로 변환
runSequentially, // 4. 각 청크 Task를 순차적으로 실행하고 결과를 방출
toArrayAsync, // 5. AsyncIterable을 배열로 변환
)
}
마지막으로 만들어진 배열을 평탄화하여 반환합니다.
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
return await pipe(
jobs, // 1. 작업 목록
chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
map(sequenceArray), // 3. 각 청크를 Task로 변환
runSequentially, // 4. 각 청크 Task를 순차적으로 실행하고 결과를 방출
toArrayAsync, // 5. AsyncIterable을 배열로 변환
).then(flatten); // 6. 각 청크 결과를 하나의 배열로 평탄화하여 반환
}
JobQueue를 사용하여 대용량 데이터를 삽입하는 커스텀 메서드 추가하기Prisma Extension은 Prisma 기본 기능을 Mixin 방식으로 확장하거나 변경할 수 있는 방법을 제공합니다.
Prisma Extension의 기본 사용법은 여기 를 참고해주세요.
Prisma.defineExtension을 사용하여 모든 모델에 대용량 데이터를 삽입하는 createBulk 메서드를 정의합니다.
기존 prisma의 createManyAndReturn 메서드와 동일한 시그니처를 사용하되, 추가적으로 bulkSize와 concurrency를 인자로 받습니다.
bulkSize: 한 번에 처리할 데이터의 양 (동시에 삽입되는 데이터의 양 = bulkSize * concurrency)concurrency: 병렬로 실행할 작업의 수 (동시에 시작하는 트랜잭션의 수, 커넥션 풀의 크기보다 작아야 함)const BulkCreateExtension = Prisma.defineExtension({
name: 'bulk-insert',
model: {
$allModels: {
/**'
* 여러 개의 데이터를 한 번에 삽입하고 삽입된 데이터를 반환
* @param args.bulkSize 한 번에 처리할 데이터의 양 (동시에 삽입되는 데이터의 양 = bulkSize * concurrency)
* @param args.concurrency 병렬로 실행할 작업의 수 (동시에 시작하는 트랜잭션의 수, 커넥션 풀의 크기보다 작아야 함)
*
* ※ 주의: concurrency가 1 이상인 경우, 데이터의 삽입 순서가 보장되지 않음
*
*/
async createBulk<T, D>(
this: T,
args: Prisma.Exact<D, Prisma.Args<T, 'createManyAndReturn'>> & {
concurrency?: number;
bulkSize: number;
},
): Promise<Prisma.Result<T, D, 'createManyAndReturn'>> {
// 구현할 부분
},
},
},
});
bulkSize에 따라 데이터를 나누어 JobQueue에 전달할 작업 배열을 생성하는 함수를 작성합니다.
/**
*
* @summary `bulkSize` 만큼의 데이터를 한 번에 삽입하는 작업 배열을 생성
* @param ctx 현제 모델의 컨텍스트
* @param bulkSize 한 번에 처리할 데이터의 양
* @param createManyAndReturnArgs createManyAndReturn 메서드의 인자
* @returns 작업 배열
*/
function _createJobs(
ctx: any,
bulkSize: number,
createManyAndReturnArgs: Prisma.Args<any, 'createManyAndReturn'>,
) {
return pipe(createManyAndReturnArgs.data, chunksOf(bulkSize), (chunks) =>
chunks.map(
(chunk) => () =>
ctx.$parent[ctx.$name].createManyAndReturn({
data: chunk,
select: createManyAndReturnArgs.select,
skipDuplicates: createManyAndReturnArgs.skipDuplicates,
}),
),
);
}
JobQueue에 전달하여 실행하고 결과를 반환하도록 메서드 구현async createBulk<T, D>(
this: T,
args: Prisma.Exact<D, Prisma.Args<T, 'createManyAndReturn'>> & {
concurrency?: number;
bulkSize: number;
},
): Promise<Prisma.Result<T, D, 'createManyAndReturn'>> {
const ctx = Prisma.getExtensionContext(this); // 현재 모델의 컨텍스트
const { concurrency, bulkSize, ...rest } = args as any;
const jobs = _createJobs(ctx, bulkSize, rest);
const res = await IterableJobQueue.of(concurrency ?? 1)
.execute(jobs)
.then(flatten); // createManyAndReturn의 반환값이 배열의 배열이므로 flatten을 사용하여 1차원 배열로 변환
// 타입 추론을 위해 as any 사용
return res as Prisma.Result<T, D, 'createManyAndReturn'>;
},
createBulk 메서드 추가// Create a new PrismaClient instance
const prisma = new PrismaClient({
log: ['query'],
});
// Extend the PrismaClient instance with the BulkInsertExtension
return prisma.$extends(BulkInsertExtension);
// 각 모델에 createBulk 메서드가 추가됨
const bulkCreateResult = await prisma.user.createBulk({
data: Array.from({ length }).map((_, i) => ({
email: `email ${i}`,
name: `name ${i}`,
password: `password ${i}`,
})),
select: { id: true },
concurrency: 10, // 병렬로 실행할 작업의 수
bulkSize: 100000, // 한 번에 처리할 데이터의 양
});

createBulk)는 ide에서도 인식됩니다. 10,000,000 건의 삽입 요청 시
concurrency : 10, bulkSize : 100000
- 전체 실행 시간: 1:11.499 (m:ss.mmm)
concurrency: 1, bulkSize: 100000
- 전체 실행 시간: 2:41.523 (m:ss.mmm)
이번 포스트에서는 Prisma를 활용한 대량 데이터 삽입 문제를 해결하기 위한 방법으로 함수형 방식과 이터러블을 통해 동시성 작업을 위한 유틸리티 클래스(JobQueue)를 구현하고, 해당 유틸을 통해 Prisma Extension을 사용하여 대용량 데이터를 삽입할 수 있는 커스텀 메서드를 추가하는 방법을 알아보았습니다.
이 방법을 통해 다음과 같은 이점을 얻을 수 있습니다