설치
yarn add @nestjs/microservices kafkajs
Module 설정
- Kafka Producer, Consumer 앱 설정
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import {ClientsModule, Transport} from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'my_client_name',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'my_client_id',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my_consumger_string_id',
},
},
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
- hybrid 애플리케이션으로 구현할 경우
- microservice 여럿을 시작하거나, 웹과 같이 사용하기
connectMicroservice(..)
로 각 microservice 를 등록, startAllMicroServices()
로 시작한다.
import { ConfigService } from '@nestjs/config'
import { NestFactory } from '@nestjs/core'
import { Transport } from '@nestjs/microservices'
import { AppModule } from '~/src/app.module'
async function bootstrap() {
const app = await NestFactory.create(AppModule)
const configService = app.get(ConfigService)
const port = configService.get('NODE_SERVER_PORT')
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
brokers: [configService.get('KAFKA_BROKERS')],
},
},
})
await app.startAllMicroservices()
await app.listen(port)
}
bootstrap()
- 단일 Microservice 애플리케이션으로 구현할 시
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
}
}
})
await app.listen()
}
bootstrap()
Producer
- Module 에서 Client 설정
- configService 는 dotenv 로 구현하므로
.env
에서 설정 지정
import { Module } from '@nestjs/common'
import { MongooseModule } from '@nestjs/mongoose'
import { CacheModule } from '@nestjs/cache-manager'
import { ConfigModule, ConfigService } from '@nestjs/config'
import { ClientsModule, Transport } from '@nestjs/microservices'
@Module({
imports: [
ClientsModule.registerAsync({
isGlobal: false,
clients: [{
inject: [ConfigService],
name: 'my_kafka_client',
useFactory: async (configService: ConfigService) => (
{
transport: Transport.KAFKA,
options: {
client: {
clientId: configService.get('KAFKA_CLIENT_ID'),
brokers: configService.get('KAFKA_BROKERS').split(','),
},
consumer: {
groupId: configService.get('KAFKA_CONSUMER_GROUP_ID'),
sessionTimeout: configService.get('KAFKA_SESSION_TIMEOUT'),
rebalanceTimeout: configService.get('KAFKA_REBALANCE_TIMEOUT')
},
},
})
}]
}),
],
controllers: [],
providers: [MyProducerService],
exports: []
})
export class MyKafkaModule { }
@Injectable()
export class MyProducerService {
constructor(
@Inject('my_kafka_client') private readonly kafkaClient: ClientKafka
){ }
sendMessage(message) {
try {
this.kafkaClient.emit('my_topic_name', message)
} catch {
}
}
}
Consumer
import { Controller } from '@nestjs/common'
import { Ctx, KafkaContext, MessagePattern, Payload } from '@nestjs/microservices'
@Controller()
export class MyConsumerController {
@MessagePattern('my_topic_name')
readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage()
const response = originalMessage.value
console.log(originalMessage.value)
console.log(message)
console.log(context.getTopic())
console.log(context.getArgs())
console.log(context.getPartition())
return response
}
}
Module
에서 import 하여 사용
- CLI
nest g controller <name>
으로 생성시 자동으로 import
@Module({
imports: [...],
providers: [...],
controllers: [MyConsumerController]
})
export class MyTestModule { }
reference