CLI에서 Kafka 사용하기 (by. 스크립트)

Kai·2024년 5월 27일
0

Kafka

목록 보기
4/6

☕ 개요


지난 글에 이어서, 이번 글에서는 Kafka 운영에 필요한 스크립트 사용법에 대해서 알아보도록 하겠다.

Kafka & Zookeeper가 설치 및 세팅이 완료되어 있다는 가정하에 진행한다. 혹시 아직 세팅하지 못했다면, 이전 글을 참고하자.

이 글은 Dev원영님의 강의를 듣고 정리한 내용이다. 🙏



1. kafka-topics.sh


kafka-topics.sh은 토픽을 생성하고 관리하는 역할을 하는 스크립트이다. 그 사용법에 대해서 알아보자.

1) 토픽 생성 (--create)

bash bin/kafka-topics.sh \
  --create \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --topic {토픽 이름}
 
# Example
bash bin/kafka-topics.sh \
  --create \
  --bootstrap-server kafka-0:9092 \
  --topic hello.kafka

위 명령어를 통해서 토픽을 새롭게 생성할 수 있다.

2) 토픽 정보 출력 (--describe)

bash bin/kafka-topics.sh --bootstrap-server {카프카 브로커 명}:{카프카 포트} --topic {토픽 이름} --describe

# Example
bash bin/kafka-topics.sh --bootstrap-server kafka-0:9092 --topic hello.kafka --describe

--decribe옵션을 통해서 존재하는 토픽의 정보를 출력할 수 있다.

출력해보면 토픽의 이름, 파티션 개수, 리더 파티션의 번호등등을 확인할 수 있다.

3) 토픽 생성 with 옵션 (--create)

파티션의 개수와 같은 다양한 설정들을 매개변수로 넘겨주면, 커스터마이징이 가능하다.

bash bin/kafka-topics.sh \
  --create \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --topic {토픽 이름} \
  --partitions {파티션 개수} \
  --replication-factor {Replication factor} \
  --config retention.ms={데이터 유지 기간}

# Example
bash bin/kafka-topics.sh \
  --create \
  --bootstrap-server kafka-0:9092 \
  --topic hello.kafka2 \
  --partitions 10 \
  --replication-factor 1 \
  --config retention.ms=172800000

위 명령어를 수행한 후, 토픽의 정보를 출력해보면 아래와 같이 10개의 파티션이 출력되는 것을 볼 수 있다.

4) 토픽 설정 수정 (--alter)

bash bin/kafka-topics.sh \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --topic {토픽 이름} \
  --alter \
  {변경할 옵션} {변경할 옵션 값}

# Example
bash bin/kafka-topics.sh \
  --bootstrap-server kafka-0:9092 \
  --topic hello.kafka \
  --alter \
  --partitions 10

처음에 파티션 개수를 기본값(브로커에 설정된 파티션 개수 옵션 = 1)으로 설정했던 토픽의 개수를 10으로 수정하는 명령어이다. 이를 실행한 후 토픽의 정보를 출력해보면 아래와 같이 10개의 파티션이 출력되는 것을 확인할 수 있다.



2. kafka-configs.sh


1) 토픽 옵션 변경

몇몇 디테일한 옵션들은 kafka-configs.sh을 통해서 수정한다. 그 외에도 몇가지 기능들이 존재하는데, kafka-configs.sh의 사용법을 알아보자.

bash bin/kafka-configs.sh \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --alter \
  -{변경할 옵션} {변경할 옵션 값} \
  --topic {토픽 이름}

# Example
bash bin/kafka-configs.sh \
  --bootstrap-server kafka-0:9092 \
  --alter \
  --add-config min.insync.replicas=2 \
  --topic hello.kafka

위 명령어는 min.insync.replicas를 2로 수정하는 명령어이다.
명령어를 실행한 후, 토픽 정보를 출력해보면 min.insync.replicas가 2로 설정되어 있는 것을 확인할 수 있다.

2) 브로커 옵션 출력

bash bin/kafka-configs.sh \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --broker {브로커 번호} \
  --all \
  --describe

# Example
bash bin/kafka-configs.sh \
  --bootstrap-server kafka-0:9092 \
  --broker 1001 \
  --all \
  --describe

위 명령어를 실행하면, 브로커에 설정된 옵션들을 확인할 수 있는데, 실행해보면 많은 내용이 와라락 출력이 되는 것을 확인할 수 있다.



3. kafka-console-producer.sh


이 스크립트는 개발 및 테스트 목적으로 사용할 수 있는 스크립트이다. 별도의 프로듀서를 구축하지 않고, 카프카 애플리케이션에 메세지를 송신할 수 있는 기능을 제공한다.

1) 문자열 전송

bash bin/kafka-console-producer.sh \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --topic {토픽 이름}
  
# Example
bash bin/kafka-console-producer.sh \
  --bootstrap-server kafka-0:9092 \
  --topic hello.kafka

아무 옵션 없이 스크립트를 실행하면 그냥 문자열을 메세지 값으로 전송할 수 있게 된다. 명령어를 실행하면 문자열을 입력할 수 있는 인터페이스로 변한다. 아무 문자열을 입력해보자.

2) Key:Value 전송

bash bin/kafka-console-producer.sh \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --topic {토픽 이름}
  --property "parse.key=true" \
  --property "key.separator={구분자}"
  
# Example
bash bin/kafka-console-producer.sh \
  --bootstrap-server kafka-0:9092 \
  --topic hello.kafka \
  --property "parse.key=true" \
  --property "key.separator=:"

--property옵션을 통해서 Key:Value형태의 메세지 값을 보낼 수 있도록 설정하였다.
원하는 값들을 입력해보자.

3) 결과 확인

위에서 입력한 메세지들을 Kafka-UI를 통해서 확인해보니 잘 적재되어 있는 것을 확인할 수 있다.



4. kafka-console-consumer.sh


1) 사용법

bash bin/kafka-console-consumer.sh \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --topic {토픽 이름}
  {다양한 옵션들}

#Example
bash bin/kafka-console-consumer.sh \
  --bootstrap-server kafka-0:9092 \
  --topic hello.kafka \
  --property "print.key=true" \
  --property "key.separator=@" \
  --from-beginning \
  --max-messages 10 \
  --partition 0 \
  --group hello

kafka-console-consumer는 다양한 옵션과 함께 사용되는 경우가 빈번해서, 옵션들에 대해서 설명한다.

  • from-beginning: 처음부터 메세지를 읽어들임.
  • max-messages: 최대로 출력할 메세지의 개수
  • partition: 컨슈밍할 파티션의 번호
  • group: 컨슈머 그룹 지정. 컨슈머 그룹을 지정해야 컨슈머 그룹에 대해서 Offset이 저장된다. ⭐

추가적으로 --property에 정의된 옵션은 들어오는 메세지를 Key:Value형태로 수신할지 말지를 정의하는 부분이다.

2) 결과 확인

왼쪽은 Producer의 터미널이고, 오른쪽은 Consumer의 터미널이다.
흠... 작아서 잘 보이지는 않지만, Producer에 메세지를 입력하면 Consumer에서 잘 읽어들이고 있는 것을 확인할 수 있다. 😅



5. kafka-consumer-groups.sh


스크립트의 이름처럼 컨슈머 그룹을 다루기 위해서 사용되는 스크립트이다.
다양한 사용법을 알아보자.

1) 컨슈머 그룹 목록 조회

bash bin/kafka-consumer-groups.sh \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --list

# Example
bash bin/kafka-consumer-groups.sh \
  --bootstrap-server kafka-0:9092 \
  --list

브로커에 존재하는 컨슈머 그룹의 리스트를 출력한다. 그 결과는 아래와 같다.
현재, console-consumer-31602라는 컨슈머 그룹 1개가 존재한다.

2) 컨슈머 그룹 상세 조회

bash bin/kafka-consumer-groups.sh \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --group {컨슈머 그룹 이름} \
  --describe

# Example
bash bin/kafka-consumer-groups.sh \
  --bootstrap-server kafka-0:9092 \
  --group console-consumer-31602 \
  --describe

조회한 컨슈머 그룹의 오프셋, 컨슈머 랙, 호스트 등등을 알 수 있는 유용한 명령어이다.
그 결과는 아래와 같다.

3) 오프셋 리셋

bash bin/kafka-consumer-groups.sh \
  --bootstrap-server {카프카 브로커 명}:{카프카 포트} \
  --group {컨슈머 그룹 이름} \
  --topic {토픽 이름} \
  --reset-offsets \
  {리셋 범위 옵션} \
  --execute
  
# Example
bash bin/kafka-consumer-groups.sh \
  --bootstrap-server kafka-0:9092 \
  --group console-consumer-31602 \
  --topic hello.kafka \
  --reset-offsets \
  --to-earliest \
  --execute

컨슈머 그룹에 설정되어 있는 오프셋을 리셋하는 명령어이다.
리셋 범위 옵션을 잘 설정해야하는데, 리셋 범위 옵션은 아래와 같은 것들이 있다.

  • --to-earliest: 가장 처음 오프셋으로 리셋
  • --to-latest: 가장 마지막 오프셋으로 리셋
  • --to-current: 현 시점 오프셋으로 리셋
  • --to-datetime {YYYY-MM-DDTHH:mmSS.sss}: 특정 시간의 오프셋으로 리셋 (레코드의 타임스탬프를 기준으로한다.)
  • --to-offset {long}: 특정 오프셋으로 리셋
  • --shift-by {(+, -) long}: 현재 오프셋에서 long개 이전 또는 이후의 오프셋으로 리셋


6. 그 외


1) kafka-producer-perf-test.sh

프로듀서의 성능을 테스트할 수 있는 스크립트이다.

bash bin/kafka-producer-perf-test.sh \
  --bootstrap-server kafka-0:9092 \
  --topic hello.kafka \
  --num-records 10 \
  --throughput 1 \
  --record-size 100 \
  --print-metric

2) kafka-consumer-perf-test.sh

컨슈머의 성능을 테스트할 수 있는 스크립트이다.

bash bin/kafka-consumer-perf-test.sh \
  --bootstrap-server kafka-0:9092 \
  --topic hello.kafka \
  --messages 10 \
  --show-detailed-stats

3) kafka-reassign-partitions.sh

{
	"partitions": [
      { "topic": "hello.kafka", "partition": 0, "replicas": [0] }
    ],
  	"version": 1
}
bash bin/kafka-reassign-partitions.sh \
  --zookeeper \
  --bootstrap-server kafka-0:9092 \
  --reassignment-json-file reassign.json \
  --execute

정해진 형태로 json파일을 작성하고, 위 명령어를 실행하면 리더 파티션을 리밸런싱한다.

4) kafka-delete-record.sh

{
	"partitions": [
      { "topic": "hello.kafka", "partition": 0, "offset": 5 }
    ],
  	"version": 1
}
bash bin/kafka-delete-records.sh \
  --bootstrap-server kafka-0:9092 \
  --offset-json-file offset.json

정해진 형태로 json파일을 작성하고, 위 명령어를 실행하면 json파일에 입력된 Offset 이전의 데이터들을 삭제한다.

5) kafka-dump-log.sh

bash bin/kafka-dump-log.sh \
  --files data/hello.kafka-0000.log \
  --deep-iteration

특정 데이터 파일(.log)에 대한 상세 정보를 출력한다.

0개의 댓글