Nestjs 마이크로서비스에 Kafka 구성하기

atesi·2023년 9월 15일
2

Retrograde

목록 보기
3/3

Retrograde 세 번째 이야기입니다. 마이크로서비스 아키텍처의 구성으로 해결해야 할 문제가 하나 생겼습니다. 독립적인 서비스를 구축함으로써 각 서비스 간 연결은 끊어져 있지만 역설적으로 데이터 전달을 통해 통합을 해야 할 필요가 있습니다.

Kafka

message queue

카프카는 메시지 큐 모델을 통해 각 서비스를 분리하는 이 특정 문제에 대한 해결책을 제공합니다. 카프카는 메시지 브로커로 데이터를 전송하는 서비스와 데이터를 수신하는 서비스 간에 명시적으로 설정하지 않고도 메시지를 전송할 수 있는 시스템입니다.

메시지를 생성하고 전송하는 서비스, 즉 생산자는 메시지를 카프카에게 전달하기만 하면 됩니다. 이러한 패턴은 기본적으로 발행-구독(PUB-SUB) 패턴을 따르며, 메시지를 수신하는 서비스, 즉 소비자는 메시지를 구독하여 처리합니다.

이를 누가 메시지를 받을지 '상관하지 않는' 특정 장소로 메시지를 보낼 방법이라고 비유한다면 message를 넣는 이러한 특정 위치를 topic이라고 하며, 토픽에 메시지를 추가할 때 내가 Producer가 되고 이 정보를 받는 사람은 Consumer로 볼 수 있습니다.

event streaming

이벤트 스트리밍은 데이터를 실시간으로 캡처하는 방법으로, 데이터베이스, 센서, 모바일 장치, 클라우드 서비스 및 소프트웨어 응용 프로그램과 같은 이벤트 소스에서 발생하는 사건을 캡처합니다.(1)^{(1)}

이벤트란 어떤 사건이 발생했음을 기록하는 데이터의 한 형태입니다. 이벤트는 주로 키, 값, 타임스탬프, 메타데이터 헤더로 이루어져 있습니다.

이벤트는 토픽에 영구적으로 저장되며 토픽은 데이터의 카테고리 또는 주제를 나타냅니다. 예를 들어, "결제"라는 토픽은 결제 관련 이벤트를 저장합니다. 이벤트는 보존 기간 동안 삭제되지 않고 보관되며, 이를 통해 데이터를 장기적으로 보관하고 검색할 수 있습니다.

카프카의 토픽은 여러 브로커에 분산되어 있어 확장성을 제공하며, 이벤트는 파티션에 기록됩니다. 이벤트의 키에 따라 동일한 파티션에 저장되며, 이로써 이벤트의 순서가 보장됩니다. 이러한 특징들은 카프카를 데이터 스트림 처리 및 이벤트 기반 아키텍처에 이상적인 선택지로 만듭니다.

setup

Kafka Cluster를 Docker-Compose로 구축하는 예제입니다. 이곳을 참고해 docker-compose를 작성해줍니다.

카프카의 경우에 공식적으로 제공하는 이미지가 없기에 bitnami, confluentinc, Strimzi등 추천하는 이미지가 다양합니다. 로컬에서 카프카를 구성하여 진행하므로 confluentinc를 사용하는 것에 문제가 없어 보입니다. 개발자들의 기여나 수정이 활발하고 Apache licensed라는 이야기가 있습니다. 이후 k8s편에서 Strimzi를 이용할지도 모르겠습니다.

시작 전에 kafka 흐름의 정확한 파악을 위해 api-users 앱을 추가하겠습니다.

nx g @nrwl/nest:app api-users
nest g resource users
npm install @nestjs/microservices

카프카를 nestjs에서 마이크로서비스로 이용하기 위해 패키지를 설치 해주도록 합니다.

workdir는 apps/api-users/src/users/ 입니다.

import { Module } from '@nestjs/common';
import { UsersService } from './users.service';
import { UsersController } from './users.controller';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'USERS_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'users',
            brokers: ['localhost:9093', 'localhost:9094', 'localhost:9095'],
          },
          consumer: {
            groupId: 'users-consumer',
          },
        },
      },
    ]),
  ],
  controllers: [UsersController],
  providers: [UsersService],
})
export class UsersModule {}

users.module에 카프카 클라이언트를 등록합니다.

import { Inject, Injectable } from '@nestjs/common';
import { CreateUserDto } from './dto/create-user.dto';
import { ClientKafka } from '@nestjs/microservices';
import { UserCreatedEvent } from './event/user-created.event';

@Injectable()
export class UsersService {
  constructor(
    @Inject('USERS_SERVICE') private readonly usersClient: ClientKafka
  ) {}

  create({ userId, name }: CreateUserDto) {
    this.usersClient.emit('user_created', new UserCreatedEvent(userId, name));
    return 'This action adds a new user';
  }
}

emit을 이용해 이벤트를 생성합니다. 첫번째 인자로 topic을 받고 두번째에 전달할 message를 받습니다.

export class UserCreatedEvent {
    constructor(
      public readonly userId: number,
      public readonly name: string,
    ) {}
  
    toString() {
      return JSON.stringify({
        userId: this.userId,
        name: this.name,
      });
    }
  }

UserCreatedEvent 인스턴스 생성을 위해 클래스를 만들고 직렬화를 위한 메소드 작성 해줍니다.

import { Controller, Post } from '@nestjs/common';
import { UsersService } from './users.service';
import { CreateUserDto } from './dto/create-user.dto';

@Controller('users')
export class UsersController {
  constructor(private readonly usersService: UsersService) {}

  @Post()
  create(@Body() createUserDto: CreateUserDto) {
    return this.usersService.create(createUserDto);
  }
}

컨트롤러에 등록해주고 사용할 준비를 마쳤습니다. 우리는 이제 auth앱에서 이 메세지를 받아 원하는 작업을 수행하고자 합니다.

workdir를 apps/api-auth/ 로 이동해줍니다.

import { Logger } from '@nestjs/common';
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app/app.module';

import { KafkaOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const kafkaOptions: KafkaOptions = {
    transport: Transport.KAFKA,

    options: {
      client: {
        brokers: ['localhost:9093', 'localhost:9094', 'localhost:9095'],
      },
      consumer: {
        groupId: 'auth-consumer',
      },
    },
  };

  const globalPrefix = 'api';
  app.setGlobalPrefix(globalPrefix);
  const port = process.env.PORT || 3310;
  app.connectMicroservice(kafkaOptions);
  await app.startAllMicroservices();
  await app.listen(port);
  Logger.log(
    `🚀 Application is running on: http://localhost:${port}/${globalPrefix}`
  );
}

bootstrap();

auth 앱의 main.ts입니다. 기존에 생성한 HTTP 서버에 마이크로서비스 리스너를 결합하기 위해 하이브리드 애플리케이션을 이용합니다. 전통적인 방식인 NestFactory.createMicroservice를 이용하는 경우 여러서버를 허용하지 않아 각각 수동으로 설정해주고 실행시켜야 합니다.

@Controller('auth')
export class AuthController {
  constructor(private readonly authService: AuthService) {}

  @EventPattern('user_created')
  handleUserCreated(data: any) {
    this.authService.handleUserCreated(data);
  }
  
  // @MessagePattern('user_created')
  // readMessage(@Payload() message: any, @Ctx() context: KafkaContext) {
  //   const originalMessage = context.getMessage();
  //   const response =
  //     `Receiving a new message from topic: user_created: ` +
  //     JSON.stringify(originalMessage.value);
  //   console.log(response);
  //   return response;
  // }
  

@EventPattern()은 응답 메시지를 기대하지 않는 요청에 사용되고 모든 클래스에 사용가능합니다.
@MessagePattern()는 동기적인 메서드 전용이며, 컨트롤러로 데코레이트된 클래스 내에서만 사용할 수 있습니다.

  handleUserCreated(userCreatedEvent: UserCreatedEvent) {
  	console.log(userCreatedEvent)
  } 

handleUserCreated의 경우는 전달받은 값을 출력하는 간단한 함수입니다.

KafkaJS

Nest에서 지원하는 마이크로서비스 모듈로 카프카를 이용하는 방법을 이용할 때 서버를 실행하고 카프카 모듈에 연결하는 시간이 약간 소요되는 점에 문제를 느껴 KafkaJS를 사용해 보았습니다.

npm install kafkajs 

먼저 libs에 kafka-client를 추가해주었습니다.

nx g @nx/nest:lib kafka-client
prisma-schema-one/
|-- src/ 			   
|   |-- lib/
|   	|-- consumer.service.ts
|   	|-- kafka-client.module.ts
|   	|-- producer.service.ts
|-- .eslintrc.json
.
.
.

위와 같이 워크스페이스를 구성해줍니다.

// producer.service.ts
import {
  Injectable,
  OnApplicationShutdown,
  OnModuleInit,
} from '@nestjs/common';
import { Kafka, Producer, ProducerRecord, Partitioners } from 'kafkajs';

@Injectable()
export class ProducerService implements OnModuleInit, OnApplicationShutdown {
  private readonly kafka = new Kafka({
    brokers: ['localhost:9093', 'localhost:9094', 'localhost:9095'],
  });
  private readonly producer: Producer = this.kafka.producer({
    createPartitioner: Partitioners.LegacyPartitioner,
  });

  async onModuleInit() {
    await this.producer.connect();
  }

  async produce(record: ProducerRecord) {   // Record (레코드):
    await this.producer.send(record);	    // 레코드는 Kafka 메시지의 단위로, Producer가 생성하고 Consumer가 소비하는 메시지를 나타냅니다.
  }
  
  async onApplicationShutdown() {
    await this.producer.disconnect();
  }
}

Producer는 Kafka 토픽으로 메시지를 생성하고 보내는 역할을 합니다.
kafkajs를 사용하여 Producer를 생성하고, .send() 메서드를 사용하여 메시지를 토픽으로 보낼 수 있습니다.

// consumer.serivce.ts
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { Consumer, ConsumerRunConfig, ConsumerSubscribeTopics, Kafka } from 'kafkajs';

@Injectable()
export class ConsumerService implements OnApplicationShutdown {
  private readonly kafka = new Kafka({
    brokers: ['localhost:9093', 'localhost:9094', 'localhost:9095'],
  });
  private readonly consumers: Consumer[] = [];

  async consume(topic: ConsumerSubscribeTopics, config: ConsumerRunConfig) {
    const consumer = this.kafka.consumer({ groupId: 'nestjs-kafka' });
    await consumer.connect();
    await consumer.subscribe({ topic });
    await consumer.run(config);
    this.consumers.push(consumer);
  }

  async onApplicationShutdown() {
    for (const consumer of this.consumers) {
      await consumer.disconnect();
    }
  }
}

Consumer는 Kafka 토픽에서 메시지를 구독하고 처리하는 역할을 합니다.
kafkajs를 사용하여 Consumer를 생성하고, .subscribe() 메서드로 특정 토픽을 구독할 수 있습니다. 그 후 .run()을 호출하여 메시지를 처리하는 함수를 등록할 수 있습니다.

// kafka-client.module.ts
import { Module } from '@nestjs/common';
import { ConsumerService } from './consumer.service';
import { ProducerService } from './producer.service';

@Module({
  providers: [ProducerService, ConsumerService],
  exports: [ProducerService, ConsumerService],
})
export class KafkaModule {}

마찬가지로 이벤트를 생산하고 소비할 앱에서 작성을 이어나갑니다.

// users.service.ts
@Injectable()
export class UsersService {
  constructor(private readonly producerService: ProducerService) {}

  create({ userId, name }: CreateUserDto) {
    const newUser = new UserCreatedEvent(userId, name);
    this.producerService.produce({
      topic: ['test'],
      messages: [
        {
          value: JSON.stringify(newUser),
        },
      ],
    });
    return 'This action adds a new user';
  }
//auth.service.ts
@Injectable()
export class AuthService implements OnModuleInit {
  constructor(private readonly consumerService: ConsumerService) {}

  async onModuleInit() {
    await this.consumerService.consume(
      { topic: 'test' },
      {
        eachMessage: async ({ topic, partition, message }) => {
          console.log({
            value: message.value.toString(),
            topic: topic.toString(),
            partition: partition.toString(), 
          });
        },
      }
    );
  }
}




참고
https://docs.nestjs.com/microservices/basics#event-based
https://kafka.apache.org/intro(1)^{(1)}
https://betterprogramming.pub/build-scalable-event-driven-applications-with-nest-js-28676cb093d0
https://medium.com/@andhikayusup/kafka-on-the-microservice-architecture-dc52d73837f2
https://kafka.js.org/docs/producer-example

profile
Action!

0개의 댓글