Microservices
는 HTTP 를 제외한 transport layer 를 사용하는 애플리케이션transporter
: 서로 다른 인스턴스간 message 전송을 제공하는 built-in transport layer 구현체npm i --save @nestjs/microservices
createMicroservice()
를 통해 인스턴스화transport
는 TCP
, KAFKA
, REDIS
, GRPC
등options
에는 위 transport
에 필요한 옵션 지정import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.TCP,
options: {
...
}
},
);
await app.listen();
}
bootstrap();
connectMicroService()
사용import { ConfigService } from '@nestjs/config'
import { NestFactory } from '@nestjs/core'
import { Transport } from '@nestjs/microservices'
import { AppModule } from '~/src/app.module'
async function bootstrap() {
const app = await NestFactory.create(AppModule)
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
})
await app.startAllMicroservices()
await app.listen(3000)
}
bootstrap()
@MessagePattern
데코레이터를 통해 import { Controller } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
@Controller()
export class MathController {
@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
}
async
를 지원하여 비동기로 사용할 수도 있다.@MessagePattern({ cmd: 'sum' })
async accumulate(data: number[]): Promise<number> {
return (data || []).reduce((a, b) => a + b);
}
Observerble
을 통해 스트림 완료시 알림을 받을 수도 있다.@MessagePattern({ cmd: 'sum' })
accumulate(data: number[]): Observable<number> {
return from([1, 2, 3]);
}
@EventPattern
데코레이터를 사용하여 event handler 를 등록@Ctx
데코레이터로 Context 접근@EventPattern('user_created')
async handleUserCreated(data: Record<string, unknown>) {
// business logic
}
@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
console.log(`Subject: ${context.getSubject()}`); // e.g. "time.us.east"
return new Date().toLocaleTimeString(...);
}
ClientProxy
클래스를 통해 message send, event publishsend()
emit()
ClientModule.register()
를 통해 등록하는 방법
name
: injection token 역할transport
, options
: 상기 내용 동일DI 로 injection 하는 방법
constructor(
@Inject('MATH_SERVICE') private client: ClientProxy,
) {}
@Module({
providers: [
{
provide: 'MATH_SERVICE',
useFactory: (configService: ConfigService) => {
const mathSvcOptions = configService.getMathSvcOptions();
return ClientProxyFactory.create(mathSvcOptions);
},
inject: [ConfigService],
}
]
...
})
@Client
데코레이터 사용하는 방법 (권장하지 않음)@Client({ transport: Transport.TCP })
client: ClientProxy;
@MessagePattern
, @EventPattern
데코레이터와 매칭되어야 받을 수 있다.send()
로 message sendaccumulate(): Observable<number> {
const pattern = { cmd: 'sum' };
const payload = [1, 2, 3];
return this.client.send<number>(pattern, payload);
}
async publish() {
this.client.emit<number>('user_created', new UserCreatedEvent());
}
rxjs
패키치 설치 후 pipe 를 통해 timeout 설정this.client
.send<TResult, TInput>(pattern, data)
.pipe(timeout(5000));