[KAFKA] kafka-consumer-groups.sh

.·2024년 6월 23일

KAFKA

목록 보기
10/21

지난 포스트에서 kafka-console-consumer.sh 쉘 스크립트와 group 옵션을 사용하여 토픽의 데이터를 읽었다.
지난 포스트처럼 컨슈머 그룹은 따로 생성하지 않고 consumer를 실행 시 그룹 옵션으로 지정하면 새로 생성된다.
이렇게 생성된 컨슈머 그룹은 kafka-consumer-groups.sh 명령어로 확인할 수 있다.

bin 디렉토리 하위 kafka-consumer-groups.sh 라는 이름의 쉘 스크립트가 있다.
kafka-consumer-groups.sh는 컨슈머 그룹과 관련하여 사용하는 쉘 스크립트 커맨드 라인 툴이다.

1. 컨슈머 그룹 리스트 조회

bin/kafka-consumer-groups.sh \
--bootstrap-server {브로커} \
--list
bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--list

kafka-console-consumer.sh 명령어를 사용 시 카프카 클러스터 정보와 함께 list 옵션을 사용하면 컨슈머 그룹 리스트를 조회할 수 있다.

2. 컨슈머 그룹 정보 조회

bin/kafka-consumer-groups.sh \
--bootstrap-server {브로커} \
--group {컨슈머 그룹명} \
--describe
bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--group test-topic-group \
--describe

특정 컨슈머 그룹에 대한 정보 또한 조회할 수 있다.
이 때는 카프카 클러스터 정보컨슈머 그룹명을 필수값으로 주고 describe 옵션을 주면 된다.

describe 옵션을 사용하면 해당 컨슈머 그룹이 어떤 토픽을 대상으로 어떤 레코드 옵션까지 가져갔는지에 대한 상태를 확인할 수 있다.

GROUP : 컨슈머 그룹명
TOPIC : 토픽명
PARTITION : 파티션 번호
CURRENT-OFFSET : 컨슈머 그룹이 가져간 현재 오프셋
LOG-END-OFFSET : 해당 파티션의 마지막 레코드 오프셋
LAG : 컨슈머 랙 (LOG-END-OFFSET - CURRENT-OFFSET)
CONSUMER-ID : 컨슈머 아이디
HOST: 호스트
CLIENT-ID : 클라이언트 아이디

컨슈머 랙은 LOG-END-OFFSET에서 CURRENT-OFFSET을 뺀 값이다. 마지막 레코드의 오프셋에서 현재까지 읽어낸 오프셋을 빼는 것이기 때문에, 아직 읽지 않은 레코드의 개수를 의미한다.
컨슈머 그룹을 운영할 때는 컨슈머 랙에 대한 모니터링이 중요하다.
컨슈머 랙의 개수가 높은 것을 확인하면 프로듀서에 비해 컨슈머의 처리량이 낮음을 의미하므로 컨슈머 개수 또는 파티션 개수 증설 등의 조치를 취해야 한다.

토픽에 7개의 데이터를 추가한 후 정보를 다시 확인해보자.

describe 옵션으로 컨슈머 그룹에 대한 정보를 확인해보면, 추가해준 데이터 개수만큼 3개의 파티션에 분배되어 LOG-END-OFFSET이 증가했다.
데이터를 추가해준 이후로 데이터를 읽어오지 않았으므로, LAG도 총합이 7개로 증가한 것을 확인할 수 있다.
프로듀서에 비해 7개 만큼의 컨슈머 지연이 발생하고 있는 중이다.

이제 컨슈머로 데이터를 소비해보자. 이전에 읽은 데이터 이후 새로 추가된 데이터만 읽어올 수 있음을 확인한다.

컨슈머 그룹 정보를 다시 조회해보면 이제 LOG-END-OFFSET에 맞게 CURRENT-OFFSET이 증가했고, LAG도 0으로 바뀌었음을 확인한다.
프로듀서에 의해 생성된 파티션의 데이터를 모두 읽었다는 의미이다.
컨슈머를 아직 실행중이라면, 컨슈머 아이디와 호스트, 컨슈머 아이디 정보도 함께 확인할 수 있다.

3. 오프셋 리셋

카프카 토픽의 데이터는 retention 기간에 따라 자동으로 삭제 처리 되지 않는 한 그 데이터는 유지된다.

운영 상황에서는 토픽에 있는 데이터를 다시 읽어야 할 상황이 있고, 오프셋 리셋을 통해 데이터를 얼마든지 재처리할 수 있다.

오프셋 리셋은 특정 토픽에 대해서 데이터를 재처리하고 싶을 때 주로 사용한다.

bin/kafka-consumer-groups.sh \
--bootstrap-server {브로커} \
--group {컨슈머 그룹명} \
--topic {토픽명} \
--reset-offsets \
--to-earliest \
--execute
bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--group test-topic-group \
--topic test-topic \
--reset-offsets \
--to-earliest \
--execute

실제로 오프셋을 리셋해보자.

reset-offsets 옵션을 통해 컨슈머 그룹이 어느 오프셋부터 다시 읽을지 지정할 수 있다.
group 옵션의 컨슈머 그룹에 대해서 topic 옵션의 토픽에 대해 추가 설정에 따라 오프셋을 리셋시킨다.

오프셋 리셋 유형은 다음과 같다.

to-earliest : 가장 처음 오프셋(작은 번호)으로 리셋
to-latest : 가장 마지막 오프셋(큰 번호)로 리셋
to-current : 현 시점 기준 오프셋으로 리셋
to-datetime {YYYY-MM-DDHH:mmSS:sss} : 특정 일시로 오프셋 리셋 (레코드 타임스탬프 기준)
to-offset {long} : 특정 오프셋으로 리셋
shift-by {+/- long} : 현재 컨슈머 오프셋에서 앞뒤로 옮겨서 리셋

실행이 완료되면 각각의 파티션에 대해 리셋된 후의 오프셋 번호가 로그로 출력된다.

이 때, 컨슈머가 실행 중일 때 오프셋을 리셋하면 에러가 발생하므로 실행 중인 컨슈머가 있다면 컨슈머를 정지하고 리셋하도록 주의해야 한다.

다시 컨슈머 그룹 정보를 조회해보면 CURRENT-OFFSET이 모두 0으로 돌아오고, LAG의 합도 14인 것을 확인한다.

토픽을 다시 읽어보면 이전에 읽었던 데이터를 포함해서 모든 데이터를 다시 소비해오는 것을 확인할 수 있다.

0개의 댓글