서비스를 운영하다보면 정산이나 통계의 배치 처리 등의 이유로 대량의 데이터를 삽입해야 하는 경우가 생깁니다. 이때, Prisma의 createMany
메서드를 사용하면 다수의 데이터를 쉽게 삽입할 수 있습니다. 하지만, createMany
메서드는 한 번에 너무 많은 데이터 삽입을 시도하면 성능 이슈가 발생하거나, Node.js 서비스 자체가 멈추는 문제가 발생할 수 있습니다.
문제를 해결하기 위해 아래 코드처럼 적절한 단위를 정해 createMany
메서드를 여러 번 나누어 호출하면서 데이터를 삽입하는 방법을 사용할 수 있습니다.
for await (const chunk of chunks) {
await prisma.model.createMany({ data: chunk });
}
하지만 이 방법은 해당 로직이 필요한 모든 곳에 중복해서 작성해야 하고, 동시성 레벨을 조절하기 어렵다는 단점이 있습니다.
해당 포스트에서는 AsyncIterator
를 활용하여 동시성 레벨을 조절하고, 작업을 환경에 맞게 나누어 처리하면서 안정적으로 데이터를 삽입하는 로직을 Prisma Extension
을 활용하여 모든 모델에 쉽게 적용하는 방법을 알아보겠습니다.
모든 예시 코드는 여기 에서 확인할 수 있습니다.
JobQueue
클래스 구현JobQueue
클래스는 AsyncIterator
를 활용하여 동시성 레벨에 따라 작업을 처리하는 클래스입니다. fp-ts
를 사용하여 함수형 파이프라인으로 각 단계를 단계적으로 구현해보겠습니다.
JobQueue
클래스 선언class JobQueue<T> {
private constructor(private readonly concurrency: number) {}
public static of<T>(concurrency: number): JobQueue<T> {
return new JobQueue<T>(concurrency);
}
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
// 구현할 부분
}
}
해당 클래스는 concurrency
를 인자로 받아 JobQueue
인스턴스를 생성하고, execute
메서드에 전달된 작업(promise
를 반환하는 함수)을 순차적으로 실행하면서 동시성 레벨에 따라 작업을 처리합니다.
concurrency
에 따라 나누기먼저 인자로 전달 받은 작업 배열을 동시성 레벨(concurrency
)에 따라 chunk로 나누어 줍니다.
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
return await pipe(
jobs, // 1. 작업 목록
chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
//...
)
}
Task
로 변환하기이전 단계에서 나누어진 작업 청크를 각각 실행가능한 Task
로 변환합니다.
Task
는 실행하기 전까지 실제로 아무 일도 하지 않는 (lazy) 함수형 스타일의 비동기 계산을 표현합니다.Task
의 sequenceArray
함수를 사용합니다.Task
로 변환한 후, 모든 Task
를 순차적으로 실행할 수 있는 Task
로 반환합니다.public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
return await pipe(
jobs, // 1. 작업 목록
chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
map(sequenceArray), // 3. 각 청크를 Task로 변환
//...
)
}
먼저 비동기 제너레이터를 순차적으로 실행하는 유틸리티 함수를 작성합니다.
아래 코드는 비동기 작업 목록을 순차적으로 실행하고 방출합니다.
export async function* runSequentially<T>(
tasks: Array<Task<ReadonlyArray<T>>>,
): AsyncGenerator<ReadonlyArray<T>, void, unknown> {
for (const task of tasks) {
yield await task();
}
}
이제 runSequentially
함수를 execute
의 파이프라인에 추가합니다.
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
return await pipe(
jobs, // 1. 작업 목록
chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
map(sequenceArray), // 3. 각 청크를 Task로 변환
runSequentially, // 4. 각 청크 Task를 순차적으로 실행하고 결과를 방출
//...
)
}
이전 단계에서 만들어진 비동기 제너레이터가 방출하는 AsyncIterable을 배열로 변환하는 유틸리티 함수를 작성합니다.
export async function toArrayAsync<T>(iterable: AsyncIterable<T>) {
const result: T[] = [];
for await (const value of iterable) {
result.push(value);
}
return result;
}
이제 toArrayAsync
함수를 execute
의 파이프라인에 추가합니다.
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
return await pipe(
jobs, // 1. 작업 목록
chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
map(sequenceArray), // 3. 각 청크를 Task로 변환
runSequentially, // 4. 각 청크 Task를 순차적으로 실행하고 결과를 방출
toArrayAsync, // 5. AsyncIterable을 배열로 변환
)
}
마지막으로 만들어진 배열을 평탄화하여 반환합니다.
public async execute(jobs: (() => Promise<T>)[]): Promise<ReadonlyArray<T>> {
return await pipe(
jobs, // 1. 작업 목록
chunksOf(this.concurrency), // 2. 동시성 레벨에 따라 작업을 나누기
map(sequenceArray), // 3. 각 청크를 Task로 변환
runSequentially, // 4. 각 청크 Task를 순차적으로 실행하고 결과를 방출
toArrayAsync, // 5. AsyncIterable을 배열로 변환
).then(flatten); // 6. 각 청크 결과를 하나의 배열로 평탄화하여 반환
}
JobQueue
를 사용하여 대용량 데이터를 삽입하는 커스텀 메서드 추가하기Prisma Extension은 Prisma 기본 기능을 Mixin
방식으로 확장하거나 변경할 수 있는 방법을 제공합니다.
Prisma Extension의 기본 사용법은 여기 를 참고해주세요.
Prisma.defineExtension
을 사용하여 모든 모델에 대용량 데이터를 삽입하는 createBulk
메서드를 정의합니다.
기존 prisma의 createManyAndReturn
메서드와 동일한 시그니처를 사용하되, 추가적으로 bulkSize
와 concurrency
를 인자로 받습니다.
bulkSize
: 한 번에 처리할 데이터의 양 (동시에 삽입되는 데이터의 양 = bulkSize * concurrency
)concurrency
: 병렬로 실행할 작업의 수 (동시에 시작하는 트랜잭션의 수, 커넥션 풀의 크기보다 작아야 함)const BulkCreateExtension = Prisma.defineExtension({
name: 'bulk-insert',
model: {
$allModels: {
/**'
* 여러 개의 데이터를 한 번에 삽입하고 삽입된 데이터를 반환
* @param args.bulkSize 한 번에 처리할 데이터의 양 (동시에 삽입되는 데이터의 양 = bulkSize * concurrency)
* @param args.concurrency 병렬로 실행할 작업의 수 (동시에 시작하는 트랜잭션의 수, 커넥션 풀의 크기보다 작아야 함)
*
* ※ 주의: concurrency가 1 이상인 경우, 데이터의 삽입 순서가 보장되지 않음
*
*/
async createBulk<T, D>(
this: T,
args: Prisma.Exact<D, Prisma.Args<T, 'createManyAndReturn'>> & {
concurrency?: number;
bulkSize: number;
},
): Promise<Prisma.Result<T, D, 'createManyAndReturn'>> {
// 구현할 부분
},
},
},
});
bulkSize
에 따라 데이터를 나누어 JobQueue
에 전달할 작업 배열을 생성하는 함수를 작성합니다.
/**
*
* @summary `bulkSize` 만큼의 데이터를 한 번에 삽입하는 작업 배열을 생성
* @param ctx 현제 모델의 컨텍스트
* @param bulkSize 한 번에 처리할 데이터의 양
* @param createManyAndReturnArgs createManyAndReturn 메서드의 인자
* @returns 작업 배열
*/
function _createJobs(
ctx: any,
bulkSize: number,
createManyAndReturnArgs: Prisma.Args<any, 'createManyAndReturn'>,
) {
return pipe(createManyAndReturnArgs.data, chunksOf(bulkSize), (chunks) =>
chunks.map(
(chunk) => () =>
ctx.$parent[ctx.$name].createManyAndReturn({
data: chunk,
select: createManyAndReturnArgs.select,
skipDuplicates: createManyAndReturnArgs.skipDuplicates,
}),
),
);
}
JobQueue
에 전달하여 실행하고 결과를 반환하도록 메서드 구현async createBulk<T, D>(
this: T,
args: Prisma.Exact<D, Prisma.Args<T, 'createManyAndReturn'>> & {
concurrency?: number;
bulkSize: number;
},
): Promise<Prisma.Result<T, D, 'createManyAndReturn'>> {
const ctx = Prisma.getExtensionContext(this); // 현재 모델의 컨텍스트
const { concurrency, bulkSize, ...rest } = args as any;
const jobs = _createJobs(ctx, bulkSize, rest);
const res = await IterableJobQueue.of(concurrency ?? 1)
.execute(jobs)
.then(flatten); // createManyAndReturn의 반환값이 배열의 배열이므로 flatten을 사용하여 1차원 배열로 변환
// 타입 추론을 위해 as any 사용
return res as Prisma.Result<T, D, 'createManyAndReturn'>;
},
createBulk
메서드 추가// Create a new PrismaClient instance
const prisma = new PrismaClient({
log: ['query'],
});
// Extend the PrismaClient instance with the BulkInsertExtension
return prisma.$extends(BulkInsertExtension);
// 각 모델에 createBulk 메서드가 추가됨
const bulkCreateResult = await prisma.user.createBulk({
data: Array.from({ length }).map((_, i) => ({
email: `email ${i}`,
name: `name ${i}`,
password: `password ${i}`,
})),
select: { id: true },
concurrency: 10, // 병렬로 실행할 작업의 수
bulkSize: 100000, // 한 번에 처리할 데이터의 양
});
createBulk
)는 ide에서도 인식됩니다. 10,000,000 건의 삽입 요청 시
concurrency : 10, bulkSize : 100000
- 전체 실행 시간: 1:11.499 (m:ss.mmm)
concurrency: 1, bulkSize: 100000
- 전체 실행 시간: 2:41.523 (m:ss.mmm)
이번 포스트에서는 Prisma
를 활용한 대량 데이터 삽입 문제를 해결하기 위한 방법으로 함수형
방식과 이터러블
을 통해 동시성 작업을 위한 유틸리티 클래스(JobQueue
)를 구현하고, 해당 유틸을 통해 Prisma Extension
을 사용하여 대용량 데이터를 삽입할 수 있는 커스텀 메서드를 추가하는 방법을 알아보았습니다.
이 방법을 통해 다음과 같은 이점을 얻을 수 있습니다