NestJS기반 백엔드 서버에 카프카를 도입하고자 할 때,
kafkajs + @nestjs/microservices 를 조합해서 손쉽게 카프카를 사용할 수 있었습니다.
우리는 Producer가 메시지의 처리 결과를 알게 하고 싶었습니다.
공식문서에 있는 응답 패턴을 구현했고, 이 글은 응답패턴의 명확한 한계점을 공유합니다.
Producing을 맡을 클래스는 nestjs의 OnModuleInit 를 구현해야합니다.
export class Producer implements OnModuleInit {
constructor(
@Inject(KAFKA_CLIENT)
private readonly clientKafka: ClientKafka
) {}
async onModuleInit(): Promise<void> {
this.clientKafka.subscribeToResponseOf(
`foo.get`
)
}
}
기존에 foo.get 라는 토픽이 있었을 것입니다.
응답을 구독하려면 foo.get.reply 라는 토픽을 미리 생성해두어야 합니다.
준비는 끝입니다. 이제 메시지를 발행할 때 ClientKafka의 두 가지 메소드(emit, send) 중 send 를 사용하면 됩니다.
send<TResult = any, TInput = any>(pattern: any, data: TInput): Observable<TResult>;
send는 Observable객체를 return 하기 때문에 lastValueFrom으로 Consumer의 응답결과를 받으면 됩니다.
이 때, 내부적으로는 rpc 통신의 헤더에 응답토픽에 메시지가 추가되기 위한 정보를 싣습니다.
const result = await lastValueFrom(
this.clientKafka
.send<
KafkaReplyDto,
KafkaProduceDto<FooGetDto>
>(`foo.get`, {})
.pipe(timeout(3000))
)
rxjs pipe를 이용해 타임아웃까지 추가해줍니다.
끝입니다.
이제 consumer에서 return 한 값이 result에 담기게 됩니다.
consumer example
@MessagePattern(`foo.get`)
async consumer(
@Payload() data: KafkaProduceDto<FooGetDto>,
@Ctx() context: KafkaContext
) {
return 'producer result에 들어갈 값'
}
카프카와의 rpc 통신 헤더에 포함된 정보를 통해 응답토픽에 return한 값이 프로듀싱됩니다.
이 때 헤더에는 응답토픽명과 파티션까지 특정되어있습니다.
즉, producer - consumer(producer) - consumer 일련의 관계가 파티션 단위까지 보장되어 있어야 한다는 것입니다.
이 때문에 한계점이 발생합니다.
동기적으로 통신해야할 부분을 카프카를 통해 구현하면서
예약시도에 대한 응답이 오기까지 서버가 Block 되지 않아서 효율적으로 다른 작업을 수행하게 되는 장점이 있습니다. (nonblock-sync)
BullMQ에는 Consumer(processor)의 결과를 기다리는 기능이 있습니다.
이 기능을 써봤던 터라 그 생각이 갇혀 응답패턴을 도입하게 된 것 같습니다.
잘못된 방법은 아닙니다. 다음의 경우에 해당한다면 말입니다.
응답패턴은 기본적으로 Producer가 topic.reply 토픽의 구독자(consumer)가 되는 개념입니다.
서버가 부팅할 때 특정 토픽의 구독자로 등록이 됩니다. (consumer-group으로 묶여서)
서버 인스턴스를 늘릴 때, 토픽의 파티션이 부족하다면 (인스턴스 보다 적다면) 구독자로 등록이 되지 못해서 응답패턴이 제대로 동작하지 않습니다.
일반적으로 파티션과 인스턴스(Consume)가 1:1관계가 됩니다.
ECS 등의 블루그린 배포는 기본적으로 서버(task) 인스턴스가 두 배로 늘어났다가 원래 수로 돌아오는 방식이라는 것을 알 겁니다.
이 때 파티션 개수가 부족하다면 추가로 뜨는 인스턴스가 구독자로 등록 되지 못해서 응답패턴이 제대로 동작하지 못합니다.
blue/green 배포가 아닐지언정 인스턴스의 개수가 조정되며 재구성되는 배포방식일 경우 검토가 필요할 것입니다
기본적으로 위의 케이스를 조합해보면 reply 토픽의 파티션 개수를
서버인스턴스 수 x 2 이상으로 항상 유지해야 합니다.
Scale-Out을 해야할 때, 그 서버가 구독하는 카프카 토픽의 파티션 수 까지 미리 확인하고 세팅해야 합니다.