[Kafka] Kafka producer in Nestjs -2-

영슈·2023년 10월 3일
0

Nest

목록 보기
6/6
post-thumbnail

그전과 동일하게 아래 영상을 참고 하였다.
https://www.youtube.com/watch?v=geMtm17ofPY
해당 영상은 Java 단위에서 수행 하였고 , 기본 적인 개념 설명 후 nest 에서 사용하는 법을 작성

프로듀서의 기본 흐름

  • Sender 와 전반 Serializer & Partitioner 은 별도 Thread 로 동작
    => Batch 찼는지 여부와 상관없이 Sender 는 Broker 에 전송
  • batch.size : Batch 크기 ( 다 차면 바로 전송 )
  • linger.ms : 전송 대기 시간 ( 기본값 0 )
    - 대기 시간 X 일 시 , 배치 덜 차도 바로 전송
    - 대기 시간 O 일 시 , 시간 만큼 배치에 추가하여 한번에 전송 - 헌번에 처리 가능
    => 기본적 send 는 전송 결과 확인 X ( ack : 0

ack

ack : 0

  • 서버 응답 대기 X
  • 전송 보장 X

ack : 1

  • Partition 리더 에 저장되면 응답 받음
  • 리더가 장애 발생시 Message 유실 가능성 존재

ack : all ( -1 )

  • 모든 리프리카에 저장되면 응답 받음 ( min.insync.replicas 따라 설정 가능 )

min.insync.replicas ( Broker Option )

  • ack 옵션이 all 일 떄 , 저장에 성공했다고 응답 해야하는 동기화 된 리플리카 최소 개수
Case1. 리플리카 개수 : 3 , ack = all , min.insync.replicas = 2

=> 리더에 저장 후 , 팔로워 중 한개 저장시 성공 응답 ( 리더도 min 에 포함 )

Case2. 리플리카 개수 : 3 , ack = all , min.insync.replicas = 1

=> 리더에 저장 되면 성공 응답 ( ack = 1 이라 차이 X - Message 유실 가능! )

Case3. 리플리카 개수 : 3 , ack = all , min.insync.replicas = 3
  • 리더 1개 + 팔로워 2개 저장시 성공 응답
  • 팔로워 중 한개 응답시 리플리카 부족 인한 저장 실패! ( 매우 치명적 ) - 리더도 리플리카에 포함

에러 유형

전송 과정 단위 실패

  • 전송 타임 아웃
  • 리더 다운 의한 새 리더 선출 진행 상태
  • 브로커 설정 메시지 크기 한도 초과
  • ...

전송 전 실패

  • 직렬화 실패 , 프로듀서 자체 요청 크기 제한 초과
  • Producer Buffer 가 찬 후 기다린 시간이 최대 대기 시간 초과
  • ...

실패 대응

1. 재시도

  • 재시도 가능 에러 는 재시도 처리 ( 브로커 응답 타임 아웃 , 일시적 리더 없음 등 )
재시도 위치 단
  • 프로듀서는 자체적 브로커 전송 과정 중 에러 발생 시 재시도 가능 에러 대해 재전송 시도
  • send 메서드에서 익셉션 발생시 익셉션 타입 따라 재호출
  • 무한 재시도는 매우 지양 해야함! ( 서버 부하 )

2. 기록

  • 추후 처리 위한 기록
  • 별도 파일 , DB 등 이용해 실패 Message 기록
  • 추후 수동 or 자동 통해 보정 작업 진행
기록 위치
  • send Method 에 익셉션 발생시
  • send Method 에 전달 콜백에서 익셉션 받는 경우
  • send Method 가 리턴한 Future 의 get() Method 에서 익셉션 발생시
    

메시지 중복 전송 가능성

  • Broker 응답이 늦게 와서 재시도 할 경우 중복 발송 가능
    => enable.idempotence 속성 참고

재시도와 순서

  • max.in.flight.requests.per.connection
  • Blocking 없이 한 Connection 에서 전송할 수 있는 최대 전송중 인 요청 개수
  • 이 값이 1보다 크면 재시도 시점 따라 메시지 순서 바뀔수 있음. ( 여러개를 보내므로 )
  • 전송 순서 중요시 1로 지정

in Nest

  • nest 에서 제공하는 microservices 를 사용하려고 해도 , kafkajs 는 무조건 install 해야한다.
npm install kafkajs

npm install @nestjs/microservices

Producer

Producer Config

  • 생성할때 Config
export interface ProducerConfig {
	createPartitioner?: ICustomPartitioner;
	retry?: RetryOptions;
	metadataMaxAge?: number;
	allowAutoTopicCreation?: boolean;
	idempotent?: boolean;
	transactionalId?: string;
	transactionTimeout?: number;
	maxInFlightRequests?: number;
}
  • createPartitioner : Producer 가 Message 를 특정 Partition 라우팅 할 때 사용하는 파티셔너 지정
  • metadataMaxAge : Cluster 의 Metadata 정보 얼마나 오래 Cache 할지 지정
    ( 캐시된 메타데이터 사용해 파티션 리더 정보 더 빠르게 검색 가능 )
  • allowAutoTopicCreation : Kafka Topic 이 자동 생성 되게 할 지 여부
    ( true 시 , topic 존재 하지 않을 시 자동 생성)
  • idempotent : 메시지 전송 멱등성 활성화 ( 메시지 중복 전송 방지 ) - enable.idempotence 와 동일
  • transactionalId : 트랜잭션 아이디 설정 , 트랜잭션 사용 시 해당 아이디 지정해 트랜잭션 관리
  • transactionTimeout : 트랜잭션의 Timeout 시간 설정 - 지정 시간 내 완료되야함
  • maxInFlightRequest : 동시 처리 최대 요청 전송 수 지정 - 동시 처리 요청 수 제한
  • retry : 메시지 전송 시 재시도 동작 구성 하는데 사용

RetryOptions

export interface RetryOptions {
	maxRetryTime?: number;
	initialRetryTime?: number;
	factor?: number;
	multiplier?: number;
	retries?: number;
	restartOnFailure?: (e: Error) => Promise<boolean>;
}
  • maxRetryTime : 재시도 허용 되는 최대 시간 - 지정 시간 이내 재시도 완료 못할 시 중단
  • initialRetryTime : 첫 재시도 시작되는 대기 시간 - 지정 시간 지난 후 재시도 시작
  • factor : 각 재시도 간 대시 시간이 이전 재시도 대기 시간 대해 곱해지는 계수 - 2.0 이 시 , 두 배씩 증가
  • multiplier : 대기 시간 계산에 사용 되는 기본 시간 단위 - 1000 이면 , 밀리초(ms)단위 대기 시간 계산
  • retries : 최대 재시도 횟수 - 초과 시 실패 처리
  • restartOnFailure : Error 처리하고 , 재시도 여부 결정 위한 Custom 정의 함수

Usage

  • 여기서 주의해야할 점은 kafkajs 를 사용하는 것 과 @nestjs/microservices를 사용할 때 차이점이 있다는 것이다.

@nestjs/microservices

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'PRODUCER',
        transport: Transport.KAFKA,
        options: {
          
          client: {
            clientId: 'RooTripClient',
            brokers: ['localhost:9094'],
          },
          producer: {retry:{retries:3,initialRetryTime:1000,multiplier:1.5}},
        },
      },
    ]),
  ],
  • ClientsModule 을 통해서 생성가능
  • name을 지정하여 , 여러가지 options 으로 생성하여 주입 가능
    ( 사담으로 , useFactory 를 이용하여 , client 가 아닌 , producer 단위 추출할 방법을 찾았으나 ,
    microservices 의 client 는 client.producer.send 가 아닌 , client.send 처럼 client 가 전부 다 가지고 있으므로 불가능 )

send vs emit

  • client 는 Kafka 에 Message 를 Produce 하기 위한 Method 로 send method 와 emit method 를 가진다.
send<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>;
emit<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>;
  • send 와 emit 은 이렇게 모든게 동일한데 , 동작 방법이 다르다.
  • 아래 예제는 다른 서버에서 , 이메일을 보내기 위한 kafka topic 을 발생시키는 예제이다.
send - message pattern
  • send 는 producer 가 send 를 하기 전 , onMoudleInit Function 을 통해서 ,
    producer 가 subscribeResponseof 를 통해서 , reply topic 을 구독해야 한다.
    ( Kafka microservice message pattern 이 두개 topic 활성 - request , reply channel )
@Injectable()
export class SendVertificationEmailEventListener {
  constructor(@Inject('PRODUCER') private readonly producer: ClientKafka) {

  }
  onModuleInit(){
    this.producer.subscribeToResponseOf('send.vertification.email');
  }


	@OnEvent(SendVertificationEmailDomainEvent.name)
	handleSendVertificationEmailEvent(event: SendVertificationEmailDomainEvent) {
	const result$ = this.producer.send(SEND_VERTIFICATION_EMAIL, {email:event.email,url:event.redirectUrl});

	result$.subscribe({
		next:(data)=>{
			return;
		},
		error:(error)=>{
			this.producer.emit(SEND_VERTIFICATION_EMAIL,{email:event.email,url:event.redirectUrl});
			return;
		}
	})
}
  • result 뒤에 $ 를 붙인 것은 observable 변수이므로 $ 를 붙여 구분했다.
  • 여기서 주의해야할 점은 무조건 subscribe 를 해야 한다는 것이다.
    => 사실 해당 부분에 대해서는 issue 가 있다.
    https://github.com/nestjs/nest/issues/2718

    위처럼 send 는 의도적으로 cold observeable 을 반환한다.
    이 점은 nestjs 공식 문서 - microservices/overview 에서 Sending messages 부분에 보면 있어서 모를수 있으므로 주의해야 한다. ( 하단 공식문서 부분 참조 )

    따라서 , send 를 사용할때는 subscribe 를 해야만 한다.
  • 성공한 경우인 next 와 에러가 발생한 경우인 error로 추가 로직을 처리할 수 있다.
    ( next 에 있는 data 를 출력해보면 , null 이 나온다. )
  • performance.now 를 통해 측정
  • 총 32개의 실행을 한 후 , 평균을 구한 결과 대략 5.33 ms 가 나왔다.
emit - event pattern
  • emit 은 단순히 , 바로 선언해서 사용할 수 있다.
@Injectable()
export class SendVertificationEmailEventListener {
  constructor(@Inject('PRODUCER') private readonly producer: ClientKafka) {
  }

	@OnEvent(SendVertificationEmailDomainEvent.name)
	handleSendVertificationEmailEvent(event: SendVertificationEmailDomainEvent) {
	const result$ = this.producer.emit(SEND_VERTIFICATION_EMAIL, {email:event.email,url:event.redirectUrl});

	result$.subscribe({
		next:(data)=>{
			return;
		},
		error:(error)=>{
			this.producer.emit(SEND_VERTIFICATION_EMAIL,{email:event.email,url:event.redirectUrl});
			return;
		}
	})
}
  • result 는 hot observable 을 반환한다.
  • data 를 출력하면 , 아래와 같이 나온다.
[
  {
    topicName: 'send.vertification.email',
    partition: 0,
    errorCode: 0,
    baseOffset: '290',
    logAppendTime: '-1',
    logStartOffset: '0'
  }
]
  • 다른 부분은 위와 동일하다
  • 총 32개의 실행을 한 후 , 평균을 구한 결과 대략 2.7742 ms 가 나왔다.

Conclusion

  • emit 이 사용하는 법도 간단하고 , 속도도 매우 빠르다.
  • 왜 send 를 사용해야하는지에 대해서는 아직 잘 모르겠다.
By GPT

NestJS의 @nestjs/microservices 모듈을 사용할 때, sendemit 메서드는 서로 다른 메시지 패턴을 지원합니다. 각각의 메서드에는 다음과 같은 주요 특징이 있습니다:

  1. send 메서드:

    • Request-Response 메시지 패턴을 지원합니다.
    • 메시지를 보내고 그에 대한 응답을 기다릴 수 있습니다.
    • send를 사용하면 특정 주제(topic)로 메시지를 보내고 해당 주제에 대한 응답을 기다릴 수 있습니다.
    • 즉, send를 사용하면 메시지를 보내고 해당 메시지에 대한 응답을 동기적으로 처리할 수 있습니다.
  2. emit 메서드:

    • Publish-Subscribe 메시지 패턴을 지원합니다.
    • 메시지를 보내고 해당 메시지를 구독하는 모든 리스너에게 브로드캐스팅됩니다.
    • emit를 사용하면 메시지를 보내고 해당 메시지를 처리하는 리스너에 대한 응답을 기다리지 않습니다.
    • 비동기적인 이벤트 기반 통신에 적합합니다.

따라서 사용 사례에 따라 send 또는 emit를 선택합니다. 예를 들어, Request-Response 패턴이 필요한 경우 send를 사용하여 메시지를 보내고 응답을 기다릴 수 있습니다. 반면에, 이벤트를 브로드캐스팅하고 비동기적으로 처리해야 하는 경우 emit를 사용할 수 있습니다.

요약하면, sendemit는 서로 다른 메시지 패턴을 지원하며, 사용 사례와 요구 사항에 따라 선택하여 사용해야 합니다.

고찰점

  • 현재 코드는 , ack 및 retry 에 대한 부분이 없다.
  • 임의로 에러를 발생시켜서 에러 처리를 어떻게 하는지 찾아보려고 했으나 , 임의 에러 발생을 못해서 더 찾아봐야 할거 같다.
  • docs 에도 자세히 안 나와있어서 차후 디스코드 및 커뮤니티에 물어볼 예정

kafkajs

  • nestjs 에서 공식 제공하는게 아니다 보니 , Module 로 제공되는게 아닌 내가 직접 만들어서 주입했다.
@Module({
  imports: [],
  controllers: [],
  providers: [
  {
    provide:"PRODUCER",
    useFactory:()=>{
      const producer = new Kafka({
        clientId: 'RooTripClient',
        brokers:['localhost:9094'],
      }).producer({retry:{retries:3,initialRetryTime:1000,multiplier:1.5}});
      producer.connect();
      return producer;
    }
  } 
  ],
  exports: ["PRODUCER"],
})
  • 위와 동일하게 설정이 가능하다.
@Injectable()
export class SendVertificationEmailEventListener {
	constructor(@Inject('PRODUCER') private readonly producer: Producer) {
}

	@OnEvent(SendVertificationEmailDomainEvent.name)
	handleSendVertificationEmailEvent(event: SendVertificationEmailDomainEvent) {
		const result = this.producer.send({
		messages:[{value:JSON.stringify({email:event.email,url:event.redirectUrl})}]
		,topic:"send.vertification.email",
		acks:1})

		result.then((data)=>{
			return;
		})

		result.catch((error)=>{
			return;
		})
})
  • send 에는 ProducerRecord 가 들어간다.
export interface ProducerRecord {
	topic: string
	messages: Message[]
	acks?: number
	timeout?: number
	compression?: CompressionTypes
}
  • Message 에는 Buffer | String 만 들어가므로 , JSON 을 넣을 시 , stringfy 를 통해 변환하여야 한다.
  • result 는 위와 다르게 , Promise 를 반환한다.
  • 따라서 , then 과 catch 문을 통해 처리가 가능하다.
  • data 에는 아래와 같이 값이 나온다. ( 위 emit 함수와 동일 )
[
{
    topicName: 'send.vertification.email',
    partition: 0,
    errorCode: 0,
    baseOffset: '321',
    logAppendTime: '-1',
    logStartOffset: '0'
  }
]

![[Pasted image 20231003122646.png]]

  • performance.now 를 통해 측정
  • 총 32개의 실행을 한 후 , 평균을 구한 결과 대략 3.37 ms 가 나왔다.

고찰점

  • 위 emit 과 비교했을때 속도가 비슷하다.
  • 위와 다르게, promise 를 반환함에 따라 , 단일 처리를 할 것으로 추정
  • 다양한 옵션 제공 ( compression , ack 등등 )

Total Conclusion

  • nestjs 에서 제공해주는게 , Module 단위 및 microservices 에 맞춰 더 잘 구현 되어 있으나 , 기능이 많이 부실한거 같다.
  • 정확히는 모르겠으나 , 처음 컨슈머가 group 에 밸런싱 되는 시간도 , kafkajs 가 매우 빠름.
  • kafkajs 가 처음 Client 를 생성한 후 , producer / consumer 등 역활에 맞춰 나누는 것을 보아 더 깔끔함.
  • 다음 consumer 편에서 설명하겠지만 , @nestjs/microservices 에서 제공하는 Ctx 를 통해 현재 context 를 받아서 getConsumer 를 하면 kafkajs 의 consumer 가 나온다.
    => nestjs 에서 kafkajs 를 wrapping 하여 제공해주나 , kafkajs 를 쓰는게 더 용이해 보임
    ( 다만 , promise 와 rxjs 의 차이점이나 , 의존성 주입 할 때 싱글톤이나 부차적 문제가 발생하는지는 더 알아볼 예정 )

참고

https://github.com/nestjs/nest/issues/2718
https://so-kyte.tistory.com/199
https://docs.nestjs.com/microservices/basics#sending-messages
https://docs.nestjs.com/microservices/kafka
https://kafka.js.org/docs/introduction

Writed By Obisidan

0개의 댓글