어떤 리스트를 불러와 랭킹 알고리즘을 바탕으로 정렬을 할 때, 보통 이 정렬하는 로직은 동기적으로 이루어진 코드 비중이 높을 것이며 리스트의 길이가 길어질수록 cpu의 연산량도 많아진다. 따라서, 이런 로직은 설령 리스트의 길이가 작을 때 처리시간이 작게 걸린다하더라도 api 서버의 메인 스레드를 블로킹하지 않도록 미리 설계하는 것이 안전할 것이다.
@Get('/')
async galleryImageList(
@Query('page') _page: number,
@Query('row') _row: number,
@Req() req: Request,
) {
const page = _page || -1;
const row = _row || 7;
if (!isUserPayload(req.user)) {
throw new UnauthorizedException();
}
const cachedList = await this.cache.checkCache(req.user.userId, page, row);
if (cachedList.length > 0) {
return cachedList;
}
const imageList = await this.imageService.findForGallery();
console.time('sort');
const sortedList = this.ranking.sort(imageListWithComments);
console.timeEnd('sort');
await this.cache.store(req.user.userId, sortedList);
return sortedList.slice(0, row * 10);
}
위 코드는 sort에 약 4ms의 처리 시간을 필요로 하고 있으며, api의 응답 속도는 130~140ms가 나오고 있다.
우선 @nestjs/bull과 node:worker_threads를 이용해 bull의 processor에서 워커 스레드를 생성하도록 코드를 작성한다.
// ranking.service.ts
@Injectable()
export class RankingService {
private readonly workerPath = path.join(__dirname, 'ranking-worker.js')
async sort(imageList: ImageExpanded[]) {
const worker = new Worker(this.workerPath, {
workerData: imageList,
});
return new Promise((resolve, reject) => {
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`));
});
});
}
}
// worker.ts
if (isMainThread) {
throw new Error('This file should be run as a worker thread');
}
if (parentPort === null) {
throw new Error('parentPort is null');
}
parentPort.postMessage(sort(workerData));
function sort(imageList: ImageExpanded[]) {
imageList.forEach((image) => calculateScore(image));
return quickSort(imageList, 0, imageList.length - 1);
}
이렇게 단순한 워커스레드를 구현하였을 때 다시 처리 시간을 측정하면 sort에 50ms가, 그리고 전체 api 응답 시간도 약 180ms로 크게 증가한다. 이는 다른 스레드와의 통신하는만큼 지연되는 시간과, 워커 스레드를 처음 구성하는데 추가적인 시간이 들기 때문이다.
만일 이 작업이 자주 실행되는 작업이 아니거나, 아니면 처음 연산 시간이 4ms가 아닌 40ms가 들었다면 크게 상관 없을수도 있으나, 지금과 같아서는 배보다 배꼽이 더 큰 격이다.
문제는 또 있다. 위 코드에서는 매 api 요청마다 랭킹 정렬을 위해 새로운 워커 스레드가 생성되게 된다. 자칫 잘못하여 api 요청이 폭주하여 워커 스레드가 지나치게 많이 생성될 경우 서버 리소스가 남아나지 않을 수 있으며, 최악의 경우 서버가 다운될 수 있다.
따라서, 워커스레드 풀을 구성하고 풀 안에서 생성된 스레드를 종료하지 않은 채 유지하며 랭킹 정렬을 위해 수시로 사용하기로 한다.
먼저, 단 하나의 워커스레드만을 가지는 단일 풀을 생성해보자.
// ranking.service.ts
@Injectable()
export class RankingService {
private readonly pool: Worker[] = [];
private readonly workerPath = path.join(__dirname, 'ranking-worker.js')
async sort(imageList: ImageExpanded[]) {
const worker = this.getWorker();
const result = await this.assignWorker(worker, imageList);
return result;
}
private getWorker() {
if (this.pool.length === 0) {
this.createWorker();
}
return this.pool[0];
}
private createWorker() {
const worker = new Worker(this.workerPath);
worker.once('exit', (code) => {
if (code !== 0) console.error(`Worker stopped with exit code ${code}`);
});
this.pool.push(worker);
}
private async assignWorker(worker: Worker, imageList: ImageExpanded[]): Promise<ImageExpanded[]> {
worker.postMessage(imageList);
return new Promise((resolve, reject) => {
const messageHandler = (result: ImageExpanded[]) => {
resolve(result);
worker.removeListener('error', errorHandler);
};
const errorHandler = (error: Error) => {
reject(error);
worker.removeListener('message', messageHandler);
}
worker.once('message', messageHandler);
worker.once('error', errorHandler);
});
}
}
이전 코드와 달라진 점은 두 가지다.
먼저, 워커 풀을 리스트로 만들고 리스트를 참조하여 생성된 워커스레드를 재사용한다. 지금은 단 하나의 워커스레드만을 사용할 것이기에 풀이 비었을 경우에만 새로 워커스레드를 생성한다.
그리고 워커에 붙은 이벤트 리스너가 중복으로 생성되지 않도록 주의가 필요하다. assignWorker가 실행될 때마다 리스너가 추가되기 때문에 일회성으로 수명을 조절하거나, 지워지지 않을 경우 직접 지워주어야만 한다.
또, 워커 코드도 수정이 필요하다.
if (isMainThread) {
throw new Error('This file should be run as a worker thread');
}
if (parentPort === null) {
throw new Error('parentPort is null');
}
parentPort.on('message', (imageList: ImageExpanded[]) => {
parentPort!.postMessage(sort(imageList));
});
function sort(imageList: ImageExpanded[]) {
imageList.forEach((image) => calculateScore(image));
return quickSort(imageList, 0, imageList.length - 1);
}
생성된 워커는 워커 코드가 동작을하는 한 종료되지 않는다. 따라서, 이벤트 리스너가 추가될 경우 해당 리스너가 제거되기 전까지 워커스레드는 계속 살아있는다.
이제 다시 sort 처리 시간을 측정해보면 처음에는 똑같이 약 50ms가 나오지만, 두번째부터는 13~16ms로 처리에 소요되는 시간이 크게 감소한다.
이제 제대로 풀을 갖추고 여러개의 워커 스레드를 사용할 차례다.
// ranking.service.ts
@Injectable()
export class RankingService {
private poolCount: number = 0;
private readonly readyPool: Worker[] = [];
private readonly workingPool: Set<Worker> = new Set();
private readonly isReady: EventEmitter = new EventEmitter();
private readonly workerPath = path.join(__dirname, 'ranking-worker.js')
async sort(imageList: ImageExpanded[]) {
const worker = await this.getWorker();
const result = await this.assignWorker(worker, imageList);
return result;
}
private getWorker(): Promise<Worker> {
return new Promise((resolve) => {
if (this.readyPool.length > 0) {
return resolve(this.popWorker());
}
this.isReady.once('next', () => resolve(this.popWorker()));
if (this.poolCount < RankingConfig.POOL_SIZE) {
this.createWorker();
}
});
}
private popWorker() {
const worker = this.readyPool.pop()!;
this.workingPool.add(worker);
return worker;
}
private createWorker() {
const worker = new Worker(this.workerPath);
worker.once('exit', (code) => {
if (code !== 0) console.error(`Worker stopped with exit code ${code}`);
});
this.readyPool.push(worker);
this.poolCount += 1;
this.isReady.emit('next');
}
private async assignWorker(worker: Worker, imageList: ImageExpanded[]): Promise<ImageExpanded[]> {
worker.postMessage(imageList);
return new Promise((resolve, reject) => {
const next = () => {
this.readyPool.push(worker);
this.workingPool.delete(worker);
this.isReady.emit('next');
}
const messageHandler = (result: ImageExpanded[]) => {
resolve(result);
worker.removeListener('error', errorHandler);
next();
};
const errorHandler = (error: Error) => {
reject(error);
worker.removeListener('message', messageHandler);
next();
}
worker.once('message', messageHandler);
worker.once('error', errorHandler);
});
}
}
우선, 풀은 대기중인 워커스레드를 담는 readyPool과 작업중인 워커스레드를 담는 workingPool을 나눈다. readyPool에 워커스레드가 있으면 bull에서 요청한 작업을 워커스레드에 넘겨주고, 없으면 풀 크기를 넘지 않는 선에서 워커스레드를 새로 만들거나 이전 작업이 완료되기를 기다린다. 이 대기 작업은 커스텀 이벤트리스너를 하나 만들어서 간단하게 언제까지 대기할지 비동기적으로 알려줄 수 있다.
여기까지 했을 때도 코드 자체는 원하는대로 잘 동작한다고 볼 수 있다. 하지만, 기왕 Nestjs를 쓸거라면 가능하면 Nestjs와 integration이 되어있는 라이브러리를 쓰는 것이 더 안전할 것이라고 생각한다.
bull은 redis 기반의 메시지 브로커이며 큐로서 사용할 수 있다. 또, 큐에서 작업을 꺼내서 수행할 때 자체적으로 관리하는 워커스레드 풀에서 작업을 하나씩 수행하기 때문에, 결국 위에서 작성한 로직이 유사하게 bull 내부적으로 돌아간다고 볼 수 있을 것이다.
// image.module.ts
@Module({
imports: [
BullModule.registerQueue({
name: 'rank',
}),
],
providers: [
RankingService,
RankingProcessor,
],
})
export class ImageModule {}
// ranking.service.ts
@Injectable()
export class RankingService {
constructor(
@InjectQueue('rank') private readonly queue: Queue,
) {}
async sort(imageList: ImageExpanded[]) {
const job = await this.queue.add(RankingConfig.JOB_NAME, imageList);
return job.finished();
}
}
// ranking-processor.ts
@Processor('rank')
export class RankingProcessor {
@Process({
name: RankingConfig.JOB_NAME,
concurrency: RankingConfig.POOL_SIZE,
})
async transcode(job: Job<ImageExpanded[]>) {
const result = this.sort(job.data);
await job.progress(100);
return result;
}
sort(imageList: ImageExpanded[]) {
imageList.forEach((image) => this.calculateScore(image));
return this.quickSort(imageList, 0, imageList.length - 1);
}
}
bull 큐에 작업을 추가하기만하면, 큐에 작업이 있을 때 프로세서의 transcode함수가 자동으로 실행되면서 작업을 처리한다. 이때, 프로세서를 등록할 때 concurrency 옵션을 통해서 동시에 처리할 작업의 개수, 즉 워커 풀 사이즈를 정할 수 있다.
코드도 보다 단순해졌고, 최적화도 더 잘되어있는만큼 sort 처리 시간도 10~13ms로 소폭 빨라졌다.
위 예시에서는 고작 4ms의 작업을 예시로 들었지만, 동기 작업량이 많아질수록 큐를 통한 작업의 관리와 메인스레드로부터의 작업 분리는 더욱 중요하다.
하지만 워커스레드는 결코 만능이 아니다. 특히, 위 예시처럼 워커 스레드 풀의 개수를 늘릴 때는 서버의 리소스 사용량에 주의해아한다. 워커스레드가 새로 추가될 때마다 각각의 워커스레드를 유지하기 위해 생각외로 큰 리소스가 사용되기 때문이다. 아니면 AWS Lambda나 Google Cloud Functions등을 사용해 서버리스로 구성하는 것도 좋은 방법일 것이다.
안녕하세요.
좋은 글 써주셔서 감사합니다 :)
그런데 @nest/bull 를 이용해서 병렬 처리하신게 정말로 병렬 실행되는지 확인해보셔야 할거 같아요...
parallel 과 concurrency 는 다른 개념입니다.
https://docs.bullmq.io/guide/parallelism-and-concurrency#concurrency