아웃박스 메시지를 삭제할때 유의할점

Bonjugi·2023년 5월 29일
0

아웃박스패턴은 메세지브로커로 send가 되었을때 아웃박스를 삭제하는 과정이 필요하다.
그런데 send메소드는 비동기적으로 처리된다.
(KafkaProducer는 물론, spring-kafka의 KafkaTemplate도, spring-cloud의 MessageChannel도 모두 비동기 이다)

send() 메소드는 논블로킹이라 호출하고 바로 메세지를 지워도 되는걸까?
send는 로컬에 버퍼를 가지고 있고, 일정 시간이나 용량이 경과되면 예외가 발생한다.
당연히 반환값을 확인 후 지우는게 안전하다.

send에 대한 반환값을 동기 처리 - 성능이슈 발생

성능이슈가 있다.
대체로 Kafka로 메세지를 발행할때 굳이 반환값이 궁금할 일은 없다.
하지만 때론, "정확히 브로커까지 전달이 되었는지" 판별이 필요할 때가 있다.
이때 동기적으로 처리해 버리면 성능이슈가 있으니 주의하자.

이전에 동기, 비동기를 비교한적이 있다.
https://velog.io/@bonjugi/Kafkaproducer-%EB%8F%99%EA%B8%B0-%EB%B9%84%EB%8F%99%EA%B8%B0-%EC%86%8D%EB%8F%84-%EB%B9%84%EA%B5%90

send에 대한 반환값을 비동기 처리

2가지 방법이 있다.

  • 반환값을 무시하고 그냥 믿고 처리
  • CompletableFuture를 이용하여 송신 확인하고 처리

방법1. 반환값을 무시하고 그냥 믿고 처리

MSK 를 쓴다면 사실상 send 메소드가 폴트되는 경우는 흔치 않다.
아주 적은 다운타임을 보장하고 (.9였나 .99였나) 만약 문제가 발생하더라도 리더 선출 등 카프카 자체적인 내결함성이 견고하기 때문이다.
게다가 카프카 클라이언트는 브로커가 다운되어도 문제없도록, 로컬에서 프로듀서가 버퍼링을 한다.(성능 개선목적으로도)
버퍼링은 일정의 용량과 일정의 시간을 갖고 있고 프로퍼티로 제어할수 있다.
이런 특성들을 믿고 그냥 송신결과를 알 필요 없이 비동기 처리하면 당연히 성능이슈가 발생하지 않는다.

하지만 영원한것은 없음을 유의하자.
해저에 묻힌 케이블을 물고기가 물어 뜯을수도 있고, DNS 이슈로 일시적으로 전체가 다운될수도 있다.
브로커의 매니지드 스케줄링이 잘못되어 강제 업그레이드가 발생할수도 있다.
또한 백엔드서비스 로컬 버퍼에 메세지를 머금은 상태에서 패닉과 함께 다운되어 버릴수도 있다. 이런건 Graceful Shutdown 으로도 해결할수 없다.

정말 말도 안되는 이유료 유실을 겪게될수 있다.
결국, 메시지 유실을 허용할수 있는 시스템이 아니라면 추천하지 않는다.

방법2. CompletableFuture를 이용하여 송신 확인하고 처리

사실 send 결과로는 CompletableFuture를 반환 하기 때문에 쉽게 구현할수 있다.
CompletableFuture가 아니더라도 그냥 패러럴스트림을 돌리면 되지만, 논블로킹 처리할수 있다는 제작자의 의도를 파악할수 있다.

블로킹 코드

다음은 블로킹이 발생하는 코드로 100n 으로 처리된다.
for문 내에서 id를 반환받고, 모아서 OutBox의 메시지를 제거하는 코드 이다.

// Bad
List<Id> ids = new ArrayList();
for (int i = 0; i < 100; i++) {
    Dto value = kafkaTemplate.send("topicName", "value")
    .get()  // 블로킹 발생
    .getProducerRecord()
    .value();
    
    list.add(value.getId());
}
deleteOutBoxMessageBy(ids);

하지만 위 코드는 .get() 메소드에서 블로킹이 발생하여 100n 의 소요시간이 발생 한다.
그냥 동기식 코드와 동일하다고 보면 된다.

논블로킹 코드

다음은 논블로킹을 적용한 코드로 1n 으로 처리된다.
마찬가지로 ids를 모아야 하지만 CompletableFuture 먼저 집계후, 이후 stream으로 처리하는 차이가 있다.

// Good
List<CompletableFuture<SendResult<String, Object>>> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
    CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send("topicName", "value");
    list.add(future);
}

List<Id> ids = list.stream()
    .map(CompletableFuture::join)  // join 에서 블로킹이 발생하지만 stream으로 동작
    .map(SendResult::getProducerRecord)
    .map(ProducerRecord::value)
    .map(Dto::getId)
    .toList();
deleteOutBoxMessageBy(ids);    

future리스트를 반환하고, 스트림에서 join을 사용한다.
join 자체는 블로킹을 하기 때문에 뭔가 블로킹스럽게 동작할거 같다.
하지만 stream 에 있는 CompletableFuture는 ForkJoin 을 이용해 개별 처리후 조인 하여 처리한다.
따라서 parallelStream을 쓰지 않아도 병렬적으로 처리된다.

0개의 댓글