Kafka 서버 구성하기

steve·2023년 10월 10일
0

Backend

목록 보기
4/17

Kafka hands-on quick start

Docker Image로 Kafka 서버 구동하기

  • 내가 만든 Docker Image (Ubuntu + JDK 8 + Kafka(KRaft))

https://hub.docker.com/repository/docker/sylvius/kraft/general

  1. Docker Image 실행 후 아래 명령어로 서버 구동
// 도커 컨테이너 실행
docker run -it --name kraft -p 9092:9092 sylvius/kraft:1.0 /bin/bash

// bash 진입 후 서버 구동
cd /home/user/kafka_2.13-3.5.0
bin/kafka-server-start.sh config/kraft/server.properties

// 원하는 Topic 생성 (quickstart-events 자리에 topic 이름 입력)
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Nestjs에서 Kafka 사용하기

  1. Producer 코드
  • 코드 내용
    // books.modules.ts
    import { Module } from '@nestjs/common';
    import { BooksService } from './books.service';
    import { BooksController } from './books.controller';
    import { ClientsModule, Transport } from '@nestjs/microservices';
    
    @Module({
      imports:[
        ClientsModule.register([
          {
            name: 'BOOK_SERVICE',
            transport: Transport.KAFKA,
            options: {
              client: {
                clientId: 'book-client',
                brokers: ['localhost:9092'],
              },
              consumer: {
                groupId: 'book-consumer'
              }
            }
          },
        ]),
      ],
      controllers: [BooksController],
      providers: [BooksService]
    })
    export class BooksModule {}
    
    // books.controller.ts
    import { Controller, Get, Inject } from '@nestjs/common';
    import { ClientKafka } from '@nestjs/microservices';
    
    @Controller('books')
    export class BooksController {
      constructor(@Inject('BOOK_SERVICE') private readonly client: ClientKafka) {}
      async onModuleInit() {
    		// Kafka 서버에서 생성한 Topic들을 구독
        const requestPatterns = ['test-event']; 
    
        requestPatterns.forEach((pattern) => {
          this.client.subscribeToResponseOf(pattern);
        });
        console.log('connecting to kafka');
        await this.client.connect();
        console.log('connected to kafka');
      }
    
      async onModuleDestroy() {
        await this.client.close();
      }
    
      @Get('find')
      find() {
        const data = {
          bookName: 'book1',
          author: 'author1',
        };
    		// Kafka Topic에 메시지를 전송
        return this.client.emit('test-event', data);
      }
    }
  1. Consumer 코드
  • 코드 내용
    // main.ts
    import { NestFactory } from '@nestjs/core';
    import { MicroserviceOptions, Transport } from '@nestjs/microservices';
    import { MsaFindbookModule } from './msa-findbook.module';
    
    async function bootstrap() {
      const app = await NestFactory.createMicroservice<MicroserviceOptions>(
        MsaFindbookModule,
        {
          transport: Transport.KAFKA,
          options: {
            client: {
              brokers: ['localhost:9092'],
            },
            consumer: {
              groupId: 'book-consumer',
            },
          },
        }
      );
      app.listen();
    }
    bootstrap();
    
    // msa-findbook.controller.ts
    import { Controller } from '@nestjs/common';
    import {
      Ctx,
      KafkaContext,
      MessagePattern,
      Payload,
    } from '@nestjs/microservices';
    import { MsaFindbookService } from './msa-findbook.service';
    
    @Controller()
    export class MsaFindbookController {
      constructor() {}
    
    	// Kafka Topic 메시지 패턴 지정
      @MessagePattern('test-event')
      findBook(
        // @Payload() message: any,
        @Ctx() ctx: KafkaContext
      ) {
        // message payload = context.getMessage().value
        const data = ctx.getMessage().value;
        // console.log(ctx.getTopic());
        // console.log(ctx.getArgs());
        // console.log(ctx.getPartition());
        console.log(data)
    
        return data;
      }
    }
  • 결과
    Producer Nestjs Instance log

    Consumer Nestjs Microservice Instance log

0개의 댓글

관련 채용 정보