NestJS 공식문서에 있는 Queue 예제를 따라해봅니다.
메시지 큐를 이용해 컴퓨터에게 일을 시킬 때 Produecer, Queue, Consumer 이렇게 3개의 주체(!?)가 상호작용합니다. 그 과정을 간단하게 요약하면 아래와 같습니다.
1) Producer가 할 일을 정의한 메시지를 Queue에 저장
2) Consumer가 Queue에서 메시지를 가져와서 작업을 수행
(reference: https://heowc.tistory.com/35)
@nestjs/bull 패키지를 사용할껀데 redis 연동해서 사용한다고 하네요.
그래서 저는 간단하게 docker로 redis를 설치했습니다.
$ docker pull redis
$ docker network create redis-net
$ docker run --name redis -p 6379:6379 --network redis-net -d redis redis-server --appendonly yes
(reference: https://dingrr.com/blog/post/redis-%EB%8F%84%EC%BB%A4docker%EB%A1%9C-redis-%EC%84%A4%EC%B9%98%ED%95%98%EA%B8%B0)
실습을 위해 필요한 패키지를 아래와 같이 설치합니다.
$ yarn add @nestjs/bull bull
$ yarn add @types/bull --dev
NestJS 공식 문서의 예제처럼 AudioModule을 하나 생성해서 BullModule을 import해줍니다.
import { Module } from '@nestjs/common';
import { AudioService } from './audio.service';
import { AudioController } from './audio.controller';
import { AudioConsumer } from './audio.consumer';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [BullModule.registerQueue({
name: 'audio',
redis: {
host: 'localhost',
port: 6379,
},
})],
providers: [AudioService, AudioConsumer],
controllers: [AudioController],
})
export class AudioModule {}
요렇게 셋팅하면 'audio'라는 메시지큐가 생성됩니다.
redis-cli를 통해 아래와 같이 bull:audio:XXX 형식으로 데이터가 생성된 것을 확인할 수 있어요.
NestJS 공식문서를 참고하여 'transcode'라는 작업을 생성하는 AudioService를 추가했습니다.
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';
@Injectable()
export class AudioService {
constructor(@InjectQueue('audio') private audioQueue: Queue) {}
async addJob() {
const job = await this.audioQueue.add('transcode', {
foo: 'bar',
});
return job.id;
}
}
AudioService의 addJob() 메소드가 실행되면 'transcode' 작업이 하나 추가되고, 이 작업을 수행하는 데 필요한 데이터 ({foo:'bar'})가 전달됩니다.
Consumer가 없을 때는 작업이 처리되지 않은 체, Queue(Redis)에 저장되어 있습니다.
일을 시키는 주체를 만들었으니, 실제 일을 하는 주체를 만들어봅시다. (아~ 일하기 싫다.)
import { Process, Processor } from '@nestjs/bull';
import { MyLogger } from '../../logger/logger.service';
import { Job } from 'bull';
@Processor('audio')
export class AudioConsumer {
private readonly logger: MyLogger = new MyLogger(this.constructor.name);
@Process('transcode')
handleTranscode(job: Job) {
this.logger.debug('Start transcoding...');
this.logger.debug(job.data);
this.logger.debug('Transcoding completed');
}
}
이렇게 Consumer를 작성해두면, 'audio' 메시지 큐에 'transcode' 작업이 추가되었을 때 handleTranscode 메소드가 실행됩니다.
Producer가 전달한 data는 Job.data 프로퍼티를 통해서 가져올 수 있습니다.
테스트 해보면서 redis-cli를 통해 메시지큐에 저장된 데이터 상태가 어떻게 변하는지 확인하는 것도 재미있었습니다.
그런데 redis를 다뤄보지 않아서 살짝 해맸네요.. ㅠ
redis에 저장된 데이터는 type에 따라 조회해오는 방법이 달라서 명령어를 잘 찾아봐야해요.
그래도.. 우린 "검색"을 할 수 있기에 저장된 데이터들의 key와 데이터 type만 알아낼 수 있다면 값을 조회할 수 있습니다!
(reference: http://redisgate.kr/redis/command/zsets.php)
# 모든 key들을 조회합니다.
redis:6379> keys *
# "bull:audio:1"를 key로 갖는 데이터의 type을 조회합니다.
redis:6379> type bull:audio:1 ## => hash
# 데이터 type이 hash이길래 검색해보니 hgetall라는 명령어가 있네요.
redis:6379> hgetall bull:audio:1 ## 모든 key, value가 조회됩니다.
NestJS 공식 문서를 보면 Message Queue는 서로 다른 프로세스 간에 메시지를 전송할 수 있다고 나와 있습니다.
그래서 NestJS 프로젝트를 두 개(A,B라고 합시다.) 만들어서 실험을 해봤습니다.
둘 중 하나는 redis 관련 설정을 명시할 필요 없다고 공식 문서에 나와 있어서 A에는 제대로 설정해놓고 B는 아래와 같이 설정했습니다.
... 생략
@Module({
imports: [BullModule.registerQueue({
name: 'audio',
// redis: {
// host: 'localhost',
// port: 6379,
// },
}), AudioConsumer],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
그리고 A에서 작업을 생성했더니 B에서 실행이 됨을 확인 할 수 있었습니다.
그런데 A, B 양쪽에 Consumer를 뒀더니 랜덤하게 A,B 둘 중 한곳에서 작업이 실행되었습니다.
병렬로 처리해도 되며 시간이 다소 오래 걸리는 작업들이 있다면 이렇게 Consumer를 여러 개 띄워놓고 처리하도록 한다면 상당히 효율적일 것 같습니다!