사실 큐를 사용하는 것에는 여러가지 경우가 있을 것입니다.
트래픽이 몰리거나 문제가 생겼을 때 어디에 안전하게 저장한 후 재처리하기 위한 경우도 있을 것이고 동시성 문제를 해결하기 위해 사용하는 경우도 있을 것입니다.
필자는 그 중에서도 비동기적으로 수행하기 위해 큐를 사용하게 되었습니다.(사용량 체크)
우리가 알아야할 것은 딱 3가지입니다.
이렇게 3가지의 주체가 메시지 큐를 운영하게 됩니다. 그 과정을 간단하게 요약하면 아래와 같습니다.
사실상 카프카의 심플 버전이라고 봐도 될 것 같습니다.
NestJS 공식 홈페이지에 나와있는대로 bull 패키지를 설치해줍니다.
npm install --save @nestjs/bull bull
필자는 configService를 이용해 환경변수에서 redis의 설정값을 가져올 것이기 때문에 factory함수를 사용할 수 있는 forRootAsync를 사용하여 모듈을 추가해주었습니다.
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { queueFactory } from './config/queue.config';
@Module({
import: [
ConfigModule.forRoot({ isGlobal: true }),
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: queueFactory,
inject: [ConfigService],
})
]
})
import { ConfigService } from '@nestjs/config';
import { EnvVars } from './env.validation';
export const queueFactory = (configService: ConfigService<EnvVars, true>) => ({
redis: {
host: configService.get<string>('REDIS_HOST', { infer: true }),
port: configService.get<number>('REDIS_PORT', { infer: true }),
db: configService.get<number>('REDIS_DB', { infer: true }),
},
});
Bull를 사용할 레디스를 종속시켜줬으니 이제 큐를 등록해봅시다.
자신이 큐를 사용할 모듈에서 registerQueue 메서드를 사용해 큐를 등록해줍니다.
저는 글로벌 인터셉터에서 사용할 것이기 때문에 app.module 에서 등록해주었습니다.
BullModule.registerQueue({
name: 'use-check',
});
name으로 이름을 등록하여 producer가 알 수 있도록 해줍니다.
서버를 키게 되면 그럼 이렇게 레디스에 큐가 등록 된 것을 볼 수 있습니다.
코드로 알아봅시다.
필자는 인터셉터에 넣었으니 인터셉터를 예시 코드로 보여드리겠습니다.
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull'; // 나머지 import는 생략
@Injectable()
export class ClientInterceptor implements NestInterceptor {
private readonly redisClient: Redis;
constructor(
@InjectQueue('use-check')
private useCheckQueue: Queue, // @InjectQueue('등록한 큐 이름') 을 이용해 종속성 주입
private reflector: Reflector,
private readonly redisService: RedisService,
) {
this.redisClient = redisService.getClient();
}
public async intercept(
context: ExecutionContext,
next: CallHandler,
): Promise<Observable<any>> {
return next.handle().pipe(
catchError(async (err) => {
if (await this.redisClient.get('stop')) { // 간단한 redlock 구현
await this.useCheckQueue.pause(); // consumer 처리 정지
} else {
await this.useCheckQueue.resume(); // consumer 처리 다시 시작
}
await this.useCheckQueue.add(
// 큐에 저장
'use',
`${req.method}>${req.route.path}>fail`,
{ removeOnComplete: true }, // 작업 저장 성공 시 작업 데이터 삭제
);
throw err;
}),
tap(async (n) => {
if (await this.redisClient.get('stop')) {
await this.useCheckQueue.pause();
} else {
await this.useCheckQueue.resume();
}
await this.useCheckQueue.add(
'use',
`${req.method}>${req.route.path}>>success`,
{ removeOnComplete: true },
);
return n;
}),
);
}
}
add 메서드가 실행되면
첫번째 인자로 넣은 값을 key로 하는 작업이 하나 추가되고
두번째 인자로 들어가있는 내용이 레디스의 value로 들어갑니다.
!!! 주의할 점
두번쨰 인자의 값에 따라 레디스의 타입이 지정되니 저장 후 한번씩 확인이 필요합니다.
그리고 메서드에는 여러가지의 옵션값이 있는데 필자는 레디스에 작업이 남는게 싫어서 removeOnComplete 옵션을 켜두었습니다.
기존처럼 내비두면 작업이 hash 타입으로 저장되게 되고 아래 사진처럼 저장되게 됩니다.
마지막으로 저는 사용량 체크이기 떄문에 값을 업데이트할 때는 간단한 lock(레디스에 key 추가)과 pause와 resume 메서드를 이용해
consumer가 큐에서 작업을 꺼내오지 못하도록 조절해주었습니다.
use-check.processor.ts
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { RedisService } from '@liaoliaots/nestjs-redis';
import Redis from 'ioredis';
@Processor('use-check') // 등록한 큐를 보는 데코레이터
export class UseCheckProcessor {
private readonly redisClient: Redis;
constructor(private readonly redisService: RedisService) {
this.redisClient = redisService.getClient();
}
@Process('use') // use 작업을 진행
async transcode(job: Job<string>) {
// job.data로 작업 진행
}
}
필자는 정말 만족하며 사용하고 있고 간단한 큐나 동시성 이슈가 있는 작업에서 사용하면 알맞게 사용할 수 있어 nestJS를 사용하며 애용할 것입니다.
job options 에는 더 많은 옵션값들이 있고 우선순위, delay 등 여러 옵션을 더 사용해보고 나중에 더 좋은 글을 써보겠습니다!