Kafka Consumer Message Pooling
- Consumer는 메시지를 가져오기 위해서 Partition에 연속적으로 Poll한다.
- 가져온 위치를 나타내는 offset 정보를 __consumer_offsets Topic에 저장하여 관리한다.
Consumer Load Balancing
- 동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성한다.
- Consumer Group의 Consumer들은 작업량을 어느정도 균등하게 분할한다.
동일한 Topic에 consume하는 여러 Consumer Group이 있을 수 있다.
Partition Assignment
하나의 Consumer Group에서의 Partition 할당
- 하나의 Partition은 지정된 Consumer Group내의 하나의 Consumer만 할당되어 사용된다.
- 동일한 Key를 가진 메시지는 동일한 Consumer가 사용한다.(Partition 수를 변경하지 않는 한)
- Consumer 설정 파라미터 중에서 partition.assignment.strategy로 할당 방식을 조정할 수 있다.
- Consumer Group은 Group Coordinator라는 프로세스에 의해 관리된다.
Consumer Group Coordination
Consumer Group을 등록
- Group Coordinator(하나의 Broker) 와 Group Leader(하나의 Consumer)가 상호작용한다.
Consumer Group 등록 및 Consumer를 Partition에 할당하는 프로세스 예시
- Kafka Cluster 내에 6개의 Partition을 가지고 있는 Topic A가 있다.
- Consumer Group에는 총 7개의 Consumer가 있다.
Consumer 등록 및 Group Coordinator 선택
1. Consumer 등록 및 Group Coordinator 선택
1.1. 동일한 group.id를 가진 Consumer들이 Broker에 접속하면 Consumer Group이 만들어진다.
1.2. Consumer들의 모든 Offset은 __consumer_offsets Topic의 하나의 Partition에 저장된다.
1.3. 이 Partition의 Leader Broker는 Consumer Group의 Group Coordinator로 선택된다.
- hash(group.id)offsets.topic.num.partitions(default 50) 수식을 사용하여 group.id가 저장될 __consumer_offsets의 Partition을 결정한다.
2. JoinGroup 요청 순서에 따라 Conumser 나열
2.1. Group Coordinator가 된 Broker 102은 Group의 Consumers 카탈로그를 생성하기 전에 Consumer들로부터 JoinGroup 요청을 받는다.
2.2. group.initial.rebalance.delay.ms(default 3ms) 만큼 대기한다.
2.3. Group Coordinator는 JoinGroup을 요청한 Consumer들을 순서대로 나열해서 제일 처음 요청을 보낸 Consumer에게 전달한다.
3. Group Leader 결정 및 Partition 할당
3.1. JoinGroup 요청을 최초로 보낸 Consumer가 Group Leader가 된다.
3.2. Group Leader는 partition.assignment.startegy를 사용하여 각 Consumer들에게 Partition을 할당한다.
3.3. 위의 그림에서는 Partition보다 더 많은 Consumer가 있으므로 각 Consumer는 Consume할 Partition이 최대 1개 있고 1개의 Consumer는 Partition을 할당 받지 못한다.
4. Consumer -> Partition 매핑정보를 Group Coordinator에게 전송
4.1. Group Leader는 Consumer -> Partition 매핑정보를 Group Coordinator에게 다시 보낸다.
4.2. Group Coordinator는 맵핑정보를 메모리에 캐시하고 Zookeeper에 알려준다.
5. 각 Consumer에게 할당된 Partition 정보를 보낸다.
5.1. Group Coordinator는 각 Consumer에게 할당된 Partition 정보를 보낸다.
5.2. 각 Consumer들은 할당된 Partition에서 Consume을 시작한다.
왜 Group Coordinator가 직접 Partition을 할당하지 않을까?
Kafka의 한 가지 원칙은 가능한 한 많은 계산을 클라이언트에 수행하도록 하여 Broker의 부담을 줄이는 것
많은 Consumer Group과 Consumer들이 있고 Broker 혼자서 Rebalance를 위한 계산을 한다고 생각해보면..
Broker에게 엄청난 부담이 된다.
이러한 계산을 Broker가 아닌 Client에게 OffLoad하는 것이 가장 바람직하다.
Consumer Rebalancing Trigger
불필요한 Rebalancing은 피해야 한다.
Rebalancing Trigger 조건
- Consumer가 Consumer Group에서 탈퇴
- 신규 Consumer가 Consumer Group에 합류
- Consumer가 Topic 구독을 변경
- Consumer Group가 Topic Metadata의 변경 사항을 인지 (ex: Partition 증가)
Rebalancing Process
- Group Coordinator는 heartbeats의 Flag를 사용하여 Consumer에게 Rebalance 신호를 보낸다.
- Consumer가 일시 중지하고 Offset을 Commit한다.
- Consumer는 Consumer Group의 새로운 Generation에 다시 합류한다.
- Partition을 재할당한다.
- Consumer는 새 Partition에서 다시 Consumer을 시작한다.
Consumer Rebalancing시 Consumer들은 메시지를 Consumer하지 못한다.
따라서 불필요한 Rebalancing은 반드시 피해야 한다.
Consumer Heartbeats
Consumer 장애를 인지하기 위함
- Consumer는 poll()과 별도로 백그라운드 Thread에서 Heartbeats를 보낸다.
heartbeat.interval.ms (default 3초)
- 아래 시간 동안 Heartbeats가 수신되지 않으면 Consumer는 Consumer Group에서 삭제된다.
session.timeout.ms (default 10초)
- poll()은 Heartbeats와 상관없이 주기적으로 호출되어야 한다.
max.poll.interval.ms (default 5m)
과도한 Rebalancing을 피하는 방법
session.timeout.ms 튜닝
- heartbat.interval.ms를 session.timeout.ms의 1/3으로 설정
- group.min.session.timeout.ms (default 6초) 와 group.max.session.timeout.ms (default 5분) 의 사이 값
장점 : Consumer가 Rejoin(재가입)할 수 있는 더 많은 시간을 제공해준다.
단점 : Consumer 장애를 감지하는데 시간이 더 오래걸린다.
max.poll.interval.ms 튜닝
- Consumer에게 poll()한 데이터를 처리할 수 있는 충분한 시간을 제공해준다.
단 너무 크게 하면 안된다.