- Consumer는 메시지를 가져오기 위해서 Partition에 연속적으로 Poll한다.
- 가져온 위치를 나타내는 offset 정보를 __consumer_offsets Topic에 저장하여 관리한다.
- 동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성한다.
- Consumer Group의 Consumer들은 작업량을 어느정도 균등하게 분할한다.
- 동일한 Topic에 consume하는 여러 Consumer Group이 있을 수 있다.
- 하나의 Partition은 지정된 Consumer Group내의 하나의 Consumer만 할당되어 사용된다.
- 동일한 Key를 가진 메시지는 동일한 Consumer가 사용한다.(Partition 수를 변경하지 않는 한)
- Consumer 설정 파라미터 중에서 partition.assignment.strategy로 할당 방식을 조정할 수 있다.
- Consumer Group은 Group Coordinator라는 프로세스에 의해 관리된다.
- Group Coordinator(하나의 Broker) 와 Group Leader(하나의 Consumer)가 상호작용한다.
- Kafka Cluster 내에 6개의 Partition을 가지고 있는 Topic A가 있다.
- Consumer Group에는 0부터6까지 총 7개의 Consumer가 있다.
- 동일한 group.id를 가진 Consumer들이 Broker에 접속하면 Consumer Group이 만들어진다.
- Consumer들의 모든 Offset은 __consumer_offsets Topic의 하나의 Partition에 저장된다.
- 이 Partition의 Leader Broker는 Consumer Group의 Group Coordinator로 선택된다.
3-1. hash(group.id) % offsets.topic.num.partitions(default 50) 수식을 사용하여 group.id가 저장될 __consumer_offsets의 Partition을 결정한다.
- Group Coordinator가 된 Broker 102은 Group의 Consumers 카탈로그를 생성하기 전에 Consumer들로부터 JoinGroup 요청을 받는다.
- group.initial.rebalance.delay.ms(default 3ms) 만큼 대기한다.
- Group Coordinator는 JoinGroup을 요청한 Consumer들을 순서대로 나열해서 제일 처음 요청을 보낸 Consumer에게 전달한다.
- Group Leader는 Consumer -> Partition 매핑정보를 Group Coordinator에게 다시 보낸다.
- Group Coordinator는 맵핑정보를 메모리에 캐시하고 Zookeeper에 알려준다.
- Group Coordinator는 각 Consumer에게 할당된 Partition 정보를 보낸다.
- 각 Consumer들은 할당된 Partition에서 Consume을 시작한다.
- Consumer가 Consumer Group에서 탈퇴
- 신규 Consumer가 Consumer Group에 합류
- Consumer가 Topic 구독을 변경
- Consumer Group가 Topic Metadata의 변경 사항을 인지 (ex: Partition 증가)
- Group Coordinator는 heartbeats의 Flag를 사용하여 Consumer에게 Rebalance 신호를 보낸다.
- Consumer가 일시 중지하고 Offset을 Commit한다.
- Consumer는 Consumer Group의 새로운 Generation에 다시 합류한다.
- Partition을 재할당한다.
- Consumer는 새 Partition에서 다시 Consumer을 시작한다.
Consumer Rebalancing시 Consumer들은 메시지를 Consumer하지 못한다.
따라서 불필요한 Rebalancing은 반드시 피해야 한다.
- Consumer는 poll()과 별도로 백그라운드 Thread에서 Heartbeats를 보낸다.
- heartbeat.interval.ms (default 3초)
- 아래 시간 동안 Heartbeats가 수신되지 않으면 Consumer는 Consumer Group에서 삭제된다.
- session.timeout.ms (default 10초)
- poll()은 Heartbeats와 상관없이 주기적으로 호출되어야 한다.
- max.poll.interval.ms (default 5m)
- Group의 각 Consumer에게 고유한 group.instance.id를 할당한다.
- Consumer는 LevelGroupRequest를 사용하지 않아야 한다.
- Rejoin(재가입)은 알려진 group.instance.id에 대한 Rebalance를 Trigger하지 않는다.
- heartbat.interval.ms를 session.timeout.ms의 1/3으로 설정
- group.min.session.timeout.ms (default 6초) 와 group.max.session.timeout.ms (default 5분) 의 사이 값
- 장점 : Consumer가 Rejoin(재가입)할 수 있는 더 많은 시간을 제공해준다.
- 단점 : Consumer 장애를 감지하는데 시간이 더 오래걸린다.
- Consumer에게 poll()한 데이터를 처리할 수 있는 충분한 시간을 제공해준다.
- 단 너무 크게 하면 안된다.