Kafka Consumer

개발개 보스독·2025년 3월 23일

Kafka

목록 보기
4/5

내부구조

  • Fetcher : 리더 파티션으로부터 레코드를 미리 가져와서 대기
  • poll() : Fetcher에 있는 레코드 들을 리턴하는 레코드
  • ConsumerRecords : 처리하고자 하는 레코드들의 모음. 오프셋이 포함되어 있다.

Consumer Rebalance

Partition Assignment

  • 하나의 Consumer Group에서의 Partition 할당
  • 하나의 Partition은 지정된 Consumer Group내의 하나의 Consumer만 사용한다.
  • 동일한 Key를 가진 메시지는 동일한 Consumer가 사용한다. (Partition 수 불변가정)
  • Consumer의 설정 파라미터 중에서 partition.assignment.strategy로 할당 방식 조정
  • Consumer Group은 Group Coordinator라는 프로세스에 의해 관리된다.

Group Coordinator & Group Leader

  1. Consumer 등록 및 Group Coordinator 선택
    • hash(group.id) % offsets.topic.num.partitions 수식을 이용하여 group.id가 저장될 __consumer_offsets의 Partition을 결정한다.
    • 위 알고리즘으로 결정된 Partition이 포함된 브로커가 Group Coordinator가 된다.
  2. JoinGroup 요청 순서에 따라 Consumer 나열
    • Group Coordinator는 Group의 Consumers 카탈로그를 생성하기 전에 Consumers의 JoinGroup 요청에 대해 group.initial.rebalance.delay.ms(3초)를 대기한다.
    • Consumer들이 Consume할 최대 Partition 수까지 JoinGroup 요청을 수신하는 순서대로 Consumer를 나열
  3. Group Leader 결정 및 Partition 할당
    • JoinGroup 요청을 보내는 최초 Consumer는 Group Leader로 지정되며 Group Coordinator로 부터 Consumer 목록을 받는다.
    • Group Leader는 구성된 Partition.assignment.strategy를 사용하여 각 Consumer에게 Partition을 할당한다.
  4. Consumer → Partition 맵핑 정보를 Group Coorinator에게 다시 전송
    • Group Leader는 “Consumer → Partition” 맵핑 정보를 Group Coordinator에게 다시 보냄
    • Group Coodinator는 맵핑 정보를 메모리에 캐시하고 Zookeeper에 유지
  5. 각 Consumer에게 할당된 Partition 정보를 보냄
    • Group Coordinator는 각 Consumer에게 할당된 Partition 정보를 보냄
    • 각 Consumer는 할당된 Partition 에서 Consume을 시작한다.

💡 Group Coordinator가 직접 Partition을 맵핑하지 않는가?

  • kafka의 한가지 원칙은 가능한 한 많은 계산을 클라이언트에서 수행하도록 하여, Broker의 부담을 줄이는 것이다.
  • 많은 Consumer Group과 Consumer들이 있고 Broker 혼자서 Rebalance를 위한 계산을 한다면 Broker에 엄청난 부담이고 이러한 계산을 Broker가 아닌 클라이언트에게 오프로드 하는 것이 바람직하다.

Consumer Rebalancing Trigger

  • 불필요한 Rebalancing은 피하는 것이 좋다.
  • Rebalancing 도중에는 메시지를 consume하지 못한다.
  • Rebalancing Trigger
    • Consumer가 Consumer Group에서 탈퇴하거나 신규로 합류할 경우
    • Consumer가 Topic 구독을 변경할 경우
    • Consumer Group은 Topic 메타데이터의 변경 사항을 인지 (partition 증가 등)
  • Rebalancing Process
    1. Group Coordinator는 Heartbeats의 플래그를 사용하여 Consumer에게 Rebalance 신호를 보냄
    2. Consumer가 일시중지하고 Offset를 Commit
    3. Consumer는 Consumer Group의 새로운 ‘generation’에 다시 합류
    4. Partition 재할당
    5. Consumer는 새 Partition에서 다시 Consume을 시작

Consumer Heartbeats

  • consumer 장애를 인지하기 위함
  • consumer는 poll()과 별도로 백그라운드에서 Hearbeats를 보낸다. (interval : default 3초)
  • 아래 시간동안 Heartbeats가 수신되지 않으면 Consumer는 Consumer Group에서 삭제한다. (timeout : default 10초)
  • poll()은 heartbeats와 관계없이 주기적으로 호출되어야 한다. (max.poll.interval : default 5분)

과도한 Rebalancing 피하는 방법

  1. Consumer Group 멤버 고정
    • Group의 각 Consumer에게 고유한 group.instance.id를 할당합니다.
    • consumer는 LeaveGroupRequest를 사용하지 않아야한다.
    • rejoin(재가입)은 알려진 group.instance.id에 대한 Rebalance를 trigger 하지 않음
  2. session.timeout.ms 튜닝
    • heartbeat.interval.ms를 session.timeout.ms의 1/3로 설정
    • group.min.session.timeout.ms(6초)와 group.max.session.timeout.ms(5분) 사이의 값을 사용
    • 장점 : Consumer가 Rejoin할 수 있는 더 많은 시간을 제공
    • 단점 : Consumer 장애를 감지하는 데 시간이 더 오래 걸림
  3. max.poll.interval.ms 튜닝
    • Consumer에게 poll()한 데이터를 처리할 수 있는 충분한 시간을 제공한다.
    • 너무 크게 하면 안됨

Partition Assignment Strategy

Range

  • Topic 별로 작동하는 Default Assigner
  • 동일한 Key를 가지고 있는 메시지들에 대한 Topic들 간에 “co-partitioning”하기 좋다.

Round Robin

  • Round Robin 방식으로 Consumer에게 Partition을 할당
  • Reassign(재할당) 후 Consumer가 동일한 Partition을 유지한다고 보장할 수 없음
  • 할당 불균형이 일어날 수 있다.

Sticky

  • 최대한 많은 기존 Partition 할당을 유지하면서 최대 균형을 이루는 할당을 보장
  • 가능한한 균형적으로 할당을 보장한다.
    • Consumer들에게 할당된 Partition의 수는 최대 1만큼 다름
  • 재할당이 발생했을 때, 기존 할당을 최대한 많이 보존하여 유지 (연결이 빠진 partition만 재할당)

CooperativeSticky

  • 동일한 Sticky Assigner 논리를 따르지만 협력적인 Rebalance를 허용

Custom

  • 인터페이스를 구현하면 사용자 지정 할당 전략을 사용할 수 있음

0개의 댓글