[Kafka] ConsumerRebalanceListener의 올바른 구현

유알·2023년 7월 30일
0

[Kafka]

목록 보기
2/5

개요

이 글에서는 공식 Java Doc을 기반으로 ConsumerRebalanceListener에 대해 자세히 알아보고 어떻게 구현해야하는지 공부한다.

책에는 간단하게 커밋을 하는 것으로 구현하였지만, 많은 의문점이 생겼다.(커밋만 하고, 아직 남은 레코드는 처리해도 되는건지, 별도의 쓰레드에서 콜백이 호출되는건지 아니면 동기적으로 처리되는 건지 등등) 그래서 공식문서를 공부했다.

역시 좀 어렵더라도(영어라) 공식문서를 보는게 가장 정확하고 빠른거 같다.

핵심 내용 요약

개요

ConsumerRebalanceListener는 컨슈머에게 할당된 파티션 셋이 변경될 때 트리거될 액션을 정의하는 콜백이다. (리밸런싱)

.assign() 의 경우 리밸런싱은 발생하지 않고 , .supscribe()를 통해 Kafka auto-managed group membership을 사용할 때만 리밸런싱이 발생한다. 따라서 직접적으로 파티션이 할당된 경우 그 파티션들은 이 콜백을 절대 호출하지 않으며 유용하지 않을 수 있다.

리밸런싱은 언제 발생하는가

리밸런싱(re-assignment)은 다음과 같은 상황에 발생한다.

  • 프로세스가 죽을 때
  • 새로운 프로세스가 추가될 때
  • 오래된 인스턴스가 실패한 이후 다시 살아날 때(back to life)
  • 구독된 토픽에 영향을 미치는 변경(e.g. 파티션의 수가 관리상 조정되는 경우)

이 콜백의 활용 예시

문서에서는 이 콜백의 대표적인 구현/활용 예시를 다음과 같이 들고 있다

  • 리밸런싱 직전 커밋을 저장하므로써, 배정받은 파티션이 오프셋이 저장된 상태라는 것을 보장한다.
  • 컨슈머가 가지고 있을 수 있는 중간 단계의 어떤 캐시(any kind of cache)들을 flush 하도록 한다.

    예를 들어, 사용자 페이지 보기가 포함된 토픽에 가입되어 있고 목표가 각 5분 윈도우의 사용자당 페이지 보기 수를 세는 것인 경우를 생각해 보자. 기존에 컨슈머가 메모리에 집계를 유지하면서 캐시가 너무 커질 때만 원격 데이터 저장소로 flush하도록 했을 수 있다. 그러나 파티션이 재할당되면 새 소유자가 소비를 인계받기 전에 아직 flush하지 않은 데이터를 flush하도록 트리거 할 수 있다. (의역 후 간추림)

콜백은 언제 어디에서 어떻게 실행되는가?

이 공부의 동기이기도 하다.

This callback will only execute in the user thread as part of the poll(long) call whenever partition assignment changes.

중요한 말이다.(책에는 이런 내용을 언급하지 않았다. 왜,,,) 이 콜백은 유저 쓰레드에서 실행되며, poll() 메서드의 일부로 호출된다고 한다.

Under normal conditions, if a partition is reassigned from one consumer to another, then the old consumer will always invoke onPartitionsRevoked for that partition prior to the new consumer invoking onPartitionsAssigned for the same partition.

일반적인 경우라면, 그 리밸런싱된 파티션에 대해, 파티션의 원래 컨슈머가 onPartitionsRevoked를 호출받고, 새로운 컨슈머가 onPartitionsAssigned 을 호출받는다.

따라서 onPartitionsRevoked가 호출되었을 때 오프셋이나 다른 상태가 저장되었을 경우, 그 이후로 항상 새로운 컨슈머가 배정받고 onPartitionsAssigned을 호출해서 상태를 로드하는 것이 가능해진다.(accessible)

하지만 항상 이러한 정상적인 과정이 가능한 것이 아니다. (ex. session timeout)
따라서 세번째 콜백인 onPartitionsLost(Collection)이 존재한다. 이 콜백이 onPartitionsRevoked과 다른 점은 전자의 경우 이미 그 파티션이 다른 컨슈머에게 배정되었을 수 있고, 그러므로 이미 소비된 오프셋에 대해 커밋을 하지 못할 수 있다.
따라서 두개의 메서드를 다르게 구현해야한다.(could라고 되어 있지만, 다르게 구현해야한다고 생각한다.)

    default void onPartitionsLost(Collection<TopicPartition> partitions) {
        onPartitionsRevoked(partitions);
    }

기본적으로는 default 메서드로서 onPartitionsRevoked를 그냥 호출한다. 그래서 구현이 강제되지 않아 그냥 넘어가는 경우가 많이 생길 수 있는데, 내 생각에 이 구현은 필히 신경써서 해야한다고 생각한다.
예상치 못한(중복이 방지되었다고 생각했지만, 실제로는 중복해서 처리되는 등) 결과가 나타날 수 있다고 생각한다.

예를 들어 onPartitionsLost에서는 이미 소유권이 넘어갔을 수 있으므로, 오프셋을 저장하지 않는다던지, 이미 처리한 데이터에 대한 보상을 해준다던지

리밸런싱이 완료되면, onPartitionsAssigned 이 정확히 한번 트리거 된다. 이 말은 만약 새롭게 할당된 파티션이 없더라도 리밸런싱 완료 후 빈 컬렉션과 함께 한번 '트리거' 된다는 것이다.

  • eager rebalancing을 사용할 경우, onPartitionsAssigned은 항상 호출된다. 반면 onPartitionsLost는 lost 된 파티션이 있는 경우만 호출된다.
  • cooperative rebalancing를 사용할 경우, 두개의 콜백은 lost하거나 revoked 된 파티션이 있을 때만 호출된다.

콜백 내부에서 발생할 수 있는 예외

내부 호출에서 WakeupException이나 InterruptException이 발생할 수 있으며, 이는 KafkaConsumer.poll(java.time.Duration) 메서드로 전파된다.(외부로 throw)

참고로 WakeupException의 경우 외부에서 consumer.wakeUp()을 호출하면 발생하며, 보통 while(true)문을 빠져나와 안전하게 consumer을 종료하고자 할때 많이 사용한다. 보통 자바 런타임에 shutdown hook에 등록을 많이 해서 사용한다.

또한 이 콜백은 상황의 알림을 표시하는 용도이므로, 안에서 예외를 발생시킨다고 해서 수락을 하지 않는다던가 하는식으로 동작하지 않는다.

제공된 구현 예시

여기서는 offset을 저장하는 목적으로 구현된 예시코드를 제공했다.

   public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
       private Consumer<?,?> consumer;

       public SaveOffsetsOnRebalance(Consumer<?,?> consumer) {
           this.consumer = consumer;
       }

       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
           // save the offsets in an external store using some custom code not described here
           for(TopicPartition partition: partitions)
              saveOffsetInExternalStore(consumer.position(partition));
       }

       public void onPartitionsLost(Collection<TopicPartition> partitions) {
           // do not need to save the offsets since these partitions are probably owned by other consumers already
       }

       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
           // read the offsets from an external store using some custom code not described here
           for(TopicPartition partition: partitions)
              consumer.seek(partition, readOffsetFromExternalStore(partition));
       }
   }

다음 글에서는 리밸런싱을 고려해서 내가 설계한 예시 컨슈머를 한번 살펴보겠다.

profile
더 좋은 구조를 고민하는 개발자 입니다

0개의 댓글

관련 채용 정보