[Kafka][Consumer] Kafka Consumer 구축 옵션

Joney의 SW 공부 블로그·2024년 5월 19일
0

Kafka study

목록 보기
5/7

Consumer의 기능

  • Producer가 메시지를 Kafka의 topic으로 보내면, 그 메시지를 가져와서 소비하는 역할을 하는 애플리케이션, 서버 등을 Consumer라고 함
  • Consumer의 주요 기능은 파티션 리더에게 메시지 가져오기 요청을 하는 것
  • 각 요청은 로그의 오프셋을 명시하고 그 위치로부터 로그 메시지를 수신
  • Consumer는 가져올 메시지의 위치를 조정할 수 있고 필요하면 이미 가져온 데이터도 다시 가져올 수 있음
    • Consumer가 버그가 발생했는데 이미 메시지를 가져온 이후라면, 버그를 수정한 후 가져왔던 메시지들을 다시 가져올 수 있음
  • Consumer는 Old Consumer와 New Consumer로 나뉨
    • 주키퍼에 오프셋을 저장하면 Old Consumer, Topic에 저장하면 New Consumer

Consumer 주요 옵션

  • bootstrap.servers
    • kafka 클러스터에 연결하기 위한 호스트와 포트 정보
  • fetch.min.bytes
    • 한번에 가져올 수 있는 최소 데이터 사이즈
    • 지정한 사이즈보다 작은 경우, 요청에 응답하지 않고 데이터 누적 대기
  • group.id
    • Consumer가 속한 그룹을 식별하는 식별자
  • enable.auto.commit
    • 백그라운드로 주기적으로 offset을 commit
  • auto.offset.reset
    • Kafka에서 초기 offset이 없거나 현재 offset이 더 이상 존재하지 않는 경우(데이터 삭제) 설정한 값으로 reset합
    • ealiest: 가장 초기의 offset값으로 설정
    • latest: 가장 마지막의 offset값으로 설정
    • none: 이전 offset값으로 설정
  • fetch.max.bytes
    • 한번에 가져올 수 있는 최대 데이터 사이즈
  • request.timeout.ms
    • 요청에 대해 응답을 기다리는 최대 시간
  • session.timeout.ms
    • Consumer와 Broker사이의 세션 타임 아웃 시간(default 10s)
    • Broker가 Consumer가 살아있다고 판단하는 시간
      • 정상 동작에서는 Consumer는 그룹 코디네이터에게 heartbeat를 보냄
    • session.timeout.ms이 지나면 Consumer 그룹은 rebalanse를 시도
    • heartbeat.interval.ms와 밀접한 관련이 있음
  • heartbeat.interval.ms
    • Consumer가 그룹 코디네이터에게 얼마나 자주 poll() 메소드로 heartbeat를 보낼지 조정
    • session.timeout.ms보다 낮아야 함 (일반적으로 1/3으로 설정)
  • max.poll.records
    • 단일 호출 poll()에 대한 최대 record 수
    • 해당 옵션으로 polling 루프에서 데이터 양을 조정 가능
  • max.poll.interval.ms
    • Consumer가 heartbeat는 보내지만 메시지를 소비하지 않는 경우 장애라고 판단하기 위한 시간
      • Consumer가 무한정으로 파티션을 점유하지 않도록 주기적으로 poll을 호출해야함
      • 장애라고 판단하면 Consumer 그룹에서 제외후 다른 Consumer가 해당 파티션에서 메시지를 가져가도록 함
  • auto.commit.interval.ms
    • 주기적으로 offset을 commit하는 시간
  • fetch.max.wait.ms
    • fetch.max.bytes에 의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간

Kafka CLI로 메시지 가져오기

Kafka 설치 경로의 하위에 bin 디렉토리에 kafka-console-consumer.sh로 Consumer를 실행시킬 수 있음(종료는 ctrl + c).

$ kafka/bin/kafka-console-consumer.sh --bootstrap-server ${ip1:port1,ip2:port2,ip3:port3} --topic ${topic name} --group ${group name} --from-beginning

--bootstrap-server는 Kafka 호스트 정보를 ',' 구분으로 전부 입력.
--from-beginning 옵션을 넣으면 메시지를 처음부터 가져올 수 있음.
-group을 넣으면 Consumer 그룹을 지정할 수 있지만 지정하지 않으면 자동으로 console-consumer-xxxxxx로 생성됨.

아래와 같이 kafka-consumer-groups.sh를 실행하면 Consuemr 그룹 리스트를 조회 가능.

$ kafka/bin/kafka-consumer-groups.sh --bootstrap-server ${ip1:port1,ip2:port2,ip3:port3} --list




[reference]
카프카, 데이터 플랫폼의 최강자 - 실시간 비동기 스트리밍 솔루션 Kafka의 기본부터 확장 응용까지

profile
SW 지식 노트 블로그

0개의 댓글