최근 카프카 컨슈머를 통해 consume 을 진행하다가 어느순간 message가 넘어오지 않은 적이 있다. 카프카의 내부 동작을 모른채로 카프카 서버에 접속해 컨슈머 그룹 정보와 오프셋을 를 읽는 명령어를 입력하니 아래와 같은 경고 메시지가 출력됐다.
Warning: Consumer group 'ooooooooooooooo' is rebalancing.
카프카 리밸런싱
- 컨슈머 그룹 내의 컨슈머들은 자신들이 읽는 파티션의 소유권을 공유
- 즉 하나의 컨슈머 그룹에서 컨슈머 A가 담당하던 파티션 읽기 작업을 컨슈머 B가 이관받아 작업을 처리할 수 있음
- 이와 같은 컨슈머 그룹 내의 소유권 이관 작업을 리밸런싱(Rebalance, Rebalancing)이라고 함
- 리밸런싱은 컨슈머의 파티션 소유권을 조정할 수 있기 때문에 컨슈머 그룹의 확장성과 가용성을 높여줌
컨슈머 그룹 코디네이터
- consumer grooup coordinator
- 컨슈머 그룹 코디네이터는 인스턴스를 백그라운드 프로세스로 실행하며 특정 컨슈머 그룹을 관리하는 브로커
- 컨슈머 그룹 별로 관리하는 브로커가 지정되는데 이 브로커가 해당 컨슈머 그룹의 코디네이터
- 컨슈머 그룹 코디네이터는 아래와 같은 정보를 추적하고 관리
- 컨슈머 그룹의 맴버십 변화: 컨슈머 그룹 내의 컨슈머가 제외 되거나 추가됐을 떄
- 컨슈머 그룹의 컨슈머는 폴링(polling)하거나 커밋(commit)할 때 하트비트 메시지를 그룹 코디네이터에게 전달
- 그룹 코디네이터는 하트 비트를 성공적으로 전달한 컨슈머는 정상 작동중이라고 판단
- 하지만 그룹 코디네이터가 일정 기간 동안 컨슈머의 하트비트를 받지 못하면 해당 컨슈머는 어떠한 이유로 작업이 불가능 한 것으로 판단
- 해당 컨슈머의 파티션 소유권을 다른 컨슈머로 이관. 즉 리밸런싱을 실시
- 새로운 파티션의 추가 혹은 변경: 컨슈머 그룹이 구독하고 있는 토픽의 파티션이 추가 혹은 변경 된 경우
- 특정 토픽의 파티션이 증가하거나 변경이 발생하면 해당 파티션에 대한 소유권을 재조정해야함
- 그렇기 때문에 리밸런싱 을 통해 컨슈머 그룹 내의 컨슈머가 추가된 파티션을 구독할 수 있음
컨슈머 그룹 리밸런싱 과정
- 그룹 코디네이터가 변경 사항을 감지하고 리밸런싱을 발생시키면 다음과 같이 소유권이 재조정
- 그룹 코디네이터가 컨슈머 그룹 내의 모든 컨슈머들의 파티션 소유권을 박탈. 컨슈머들의 JoinGroup요청을 일정시간 기다림
- 그룹 코디네이터는 제일먼저 JoinGroup을 요청한 컨슈머를 그룹리더로 지정. 그룹 리더에게 파티션 정도와 컨슈머 목록을 전달
- 그룹 리더는 전달받은 정보를 바탕으로 파티션 소유권을 재조정. 이를 그룹 코디네이터에게 다시 전달
- 그룹 코디네이터는 재조정된 파티션 소유권을 각 컨슈머에게 알리고 리밸런싱을 종료
리밸런싱의 위험
- 리밸런싱은 컨슈머의 소유권을 재조정하기 때문에 리밸런싱이 발생한 컨슈머 그룹 내의 모든 컨슈머의 읽기 작업은 중단됨
- 컨슈머의 일시적인 서비스 중단이 발생할 수 있음
- 리밸런싱은 컨슈머 쪽의 서비스 상태에 일시적인 영향을 줄 수 있으므로 컨슈머, 파티션의 추가가 필요하다면 충분한 고려 후에 추가해야함
- 추가적으로 하트비트 전송주기를 결정하는
hearbeat.interval.ms
설정과 session.timeout.ms
설정은 원치 않는 리밸런싱 발생에 영향을 미치므로 적절하게 설정할 필요가 있음. 대체로 1 : 3 비율로 설정하는 것을 권장
원인 찾아보기
- 리밸런싱 이슈가 발생하는 여러 이유들을 찾아보니 낮은 처리량으로 인한 문제, 메시지 중복 소비 등의 이슈가 있었다.
- 먼저 낮은 처리량 이슈 부터 살펴보면, 실제 운영 서버에서도 동일한 환경에서 카프카 메시지를 소비하고 있고, 운영 서버의 kafka lag 또한 매우 적은 수 인 것을 보았을 때 처리량의 문제는 아닌 것 같았다.
- 그렇다면 다른 이유를 생각해 봐야하는데.. 인터넷에서 찾은 부분들 중 먼저 의심 되는 부분은
- session timeout 시간이 지난 경우
- 이는 컨슈머와 브로커 사이의 session timeout을 나타내는 시간으로, 컨슈머가 살아 있는 것으로 판단하는 시간
- 이 시간이 지나면 컨슈머는 종료되거나 장애가 발생한 것으로 판단하고 컨슈머 그룹은 리밸런스를 시도
- 현재 사용하고 있는 카프카 컨슈머 옵션을 보니 session.timeout.ms이 따로 설정되어 있지 않은 것을 보아하니 default 인 1000ms인 것 같다
- 두 번째 poll을 호출 하는 시간 간격 문제
- 컨슈머는 메시지를 가져오기 위해 poll() 요청을 보냄
- 컨슈머는 가져온 메시지를 처리한 후 해당 파티션의 offset을 커밋
- 하지만 poll 요청을 보내고 다음 poll 요청을 보내는데 까지의 시간 max.poll.interval.ms 보다 늦으면 컨슈머에 문제가 있다고 판단하여 리밸런싱 진행
- 파이썬 기본 Kafka패키지는 기본 500개의 메시지를 한 꺼번에 가져옴
- max.poll.interval.ms 또한 따로 설정되어 있지 않은 것을 보니 default인 300000ms(5분)이다.
- 컨슈머가 정상적으로 종료되지 않았을 때
- 정상적으로 종료되지 않은 컨슈머는 session timeout이 발생할 때까지 컨슈머 그룹에 남아있음
- 이로 인해 실제로는 종료되었지만 더는 동작을 하지 않는 컨슈머가 존재하기 때문에 파티션의 데이터는 소모되지 못하고 컨슈머 랙이 늘어나게됨
- 컨슈머 랙이 늘어나면 데이터 처리 지연이 발생
- !!!!!!!!
- 위의 리밸런싱 이슈가 발생하기 전, 여러 테스트를 하기 위해 반복적으로 카프카 컨슈머 연결 ↔ 종료를 빈번하게 하곤했었다. 그러다 결국 중간에 timeout error가 발생했었고 별 생각 없이 넘어갔었는데 이후로 부터 정상적으로 메시지 소비가 되지 않았다.
- 지금까지 가장 유력한 원인은 아마 컨슈머의 비정상적인 종료로 인해 session timeout 에러가 발생했고 그로 인해 리밸런싱이 진행된 것 같다
해결책 및 마무리
- 그렇다면 컨슈머를 안전하게 종료하기 위해서는 어떻게 해야할까 ?
- 컨슈머를 안전하게 종료하기 위해 kafka consumer 클래스는
wakeup()
이라는 메소드를 지원함
- 이것이 나의 문제에 대한 해결책인지는 잘 모르겠음 ..
- 에러 이후 천천히 탐색하다보니 카프카의 고효용 및 고가용성을 담당하는 리밸런싱이라는 개념과 새로운 이슈를 만나게 되었다. 좀 더 연구해봐야겠다.
- 언제쯤.. 리벨런싱이 마무리 될지 .. 오늘 카프카에 대한 지식 + 1
참고자료