Kafkaproducer 동기, 비동기 속도 비교

Bonjugi·2022년 9월 12일
0
post-custom-banner

로컬에 카프카를 설치하고 동기와 비동기 코드 속도차이를 비교 하려고 한다.
트랜잭셔널 메시징을 구현하기 위해 건별로 처리하면 좋을거같은데 성능이 어떨지 궁금하다.

결과부터

결과부터 얘기 하자면
1만개 처리시 속도차이는 100배 넘게 차이가 났다. (9987519583 / 074682958 = 113.73)

---------------------------------------------
ns         %     Task name
---------------------------------------------
074682958  001%  synchronous
9987519583  099%  asynchronous

상대적인 속도로는 async가 매우 빨라지면 최대 300배도 차이가 났었기 때문에 큰 의미가 없을수 있다.
절대적인 속도만 합격해도 좋을텐데, 보면 거의 10초에 육박한다.
1만개 기준이니까 tps 1000 밖에 되지 않는다.
메세지 브로커가 이정도 처리속도라면 큰 에로사항이 아닐까 싶다.

카프카 클라이언트들은 어떻게 되어있지?

기본적으로 KafkaProducer, KafkaTemplate, MessageChannel 등은 모두 비동기로 되어있다.
즉 반환값은 항상 성공한것처럼 내려오고 있으니 개발시 참고하자.

테스트 환경

  • acks 옵션은 1로 설정
  • 파티션의 갯수가 1개 로 설정
  • 압축 이라던지 고급 설정은 하지 않음
  • 기본설정으로 사용했기 때문에, 좀 더 최적화 여지가 있을수 있다.

스탠다드 성능이 좀더 궁금하기도 했고,
로컬에 설치된 카프카를 사용 한 점과 머신 성능 역시 좋은편에 속한점 (m1프로 mbp 16인치 32G) 을 감안하면 이 이상 드라마틱하게 좋아지긴 힘들것이라고 생각된다.

구현 코드

비교에 사용된 코드는 다음과 같다.
.get() 이 있냐 없냐의 차이 이다.

  @Test
  public void hello() {
    StopWatch stopWatch = new StopWatch();
    
    stopWatch.start("synchronous");
    for (int i = 0; i < 10000; i++) {
      String resourceId = String.valueOf(i);
      publisher.fire(resourceId, resourceId);
    }
    stopWatch.stop();   // synchronous 끝

    stopWatch.start("asynchronous");
    for (int i = 0; i < 10000; i++) {
      String resourceId = String.valueOf(i);
      publisher.fireSynchronous(resourceId, resourceId);
    }
    stopWatch.stop();  // asynchronous 끝

    System.out.println(stopWatch.prettyPrint());
  }

단순히 publisher.fire 냐 publisher.fireSynchronous 냐 의 차이 이다.
아래는 publisher 의 구현체이다.
fire 와 fireSynchronous 메소드를 구현했고, 차이는 send(..); 이냐, send(..).get(); 이냐 의 차이다.

public class MessagePublisher {

  private final KafkaProducer<String, byte[]> producer;

  public Future<RecordMetadata> fire(String resourceId, String message) {
    ProducerRecord<String, byte[]> producerRecord = getStringProducerRecord(resourceId, message);
    return producer.send(producerRecord);
  }

  public void fireSynchronous(String resourceId, String message) {
    ProducerRecord<String, byte[]> producerRecord = getStringProducerRecord(resourceId, message);
    try {
      producer.send(producerRecord).get();
    } catch (InterruptedException | ExecutionException e) {
      throw new RuntimeException(e);
    }
  }
  // .. getStringProducerRecord 메소드 생략
}
post-custom-banner

0개의 댓글