Kafka CLI 사용하기

Yunhong Min·2021년 11월 10일
0

Kafka CLI에한 설명과 예제를 정리해나가는 곳이다. 앞으로도 새로운 kafka cli에 대한 새로운 지식을 알게 될 경우 이 곳에 추가해 나갈 예정이다.

Topic

topic 목록 보기

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list 

topic 생성

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create \
  --topic my-topic --partitions 6 --replication-factor 1

min.insync.replicas 와 같은 topic config를 기본값이나 broker의 값을 override 하여 지정하고 싶다면 --config 옵션을 사용한다.

kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create \
  --topic my-topic --partitions 6 --replication-factor 1 --config min.insync.replicas=1

topic 상세 조회

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic my-topic --describe

결과는 다음과 같다. partition의 갯수, partition의 leader, replica, in-sync replica broker id 정보를 얻을 수 있다.

Topic:my-topic	PartitionCount:6	ReplicationFactor:1	Configs:
	Topic: my-topic	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 3	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 4	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 5	Leader: 1	Replicas: 1	Isr: 1

topic 설정 변경

파티션 및 리플리카 수 변경, config 변경등을 할 수 있다. 파티션 수는 현재 수에서 증가만 할 수 있다.

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic my-topic --partitions 9

결과를 보면 정상적으로 변경된 것을 볼 수 있다.

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic my-topic
Topic: my-topic	TopicId: gfcRYfJ5QW-pubajGhAelw	PartitionCount: 9	ReplicationFactor: 1	Configs:
	Topic: my-topic	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 1	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 2	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 3	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 4	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 5	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 6	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 7	Leader: 1	Replicas: 1	Isr: 1
	Topic: my-topic	Partition: 8	Leader: 1	Replicas: 1	Isr: 1

파티션수 변경은 키를 사용하는 경우 키별로 저장되는 파티션의 위치가 바뀌면서 ordering 등에 문제가 생길 수 있어 주의해야 한다.

topic 삭제

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic my-topic

topic 삭제는 broker 설정에서 delete.topic.enable=true 로 되어 있어야 가능하다.

Producer & Consumer console

kafka producer, consumer를 cli로 간단히 사용할 수 있게 해주는 cli 이다.
kakfa-console.producer.sh 를 사용하여 메세지를 kafka로 보낼 수 있다.

./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-topic
>hi
>this is kafka

--producer-property를 사용하면 acks 의 레벨 설정도 가능하다.

./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-topic acks=all

kafka-console-consumer.sh 를 사용하면 kafka에 저장되어 있는 로그를 받을 수 있다.

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-topic
hi
this is kafka

--from-beginning 을 사용하면 로그를 처음부터 가져올 수 있다.

key 사용하기

key를 포함하여 로그를 보내고 싶다면 다음과 같이 실행하면 된다.

./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-topic --property parse.key=true --property key.separator=,
>1,jason
>2,mike

consumer 쪽에서도 key를 보기 위해서는 --property print.key=true 를 사용하면 확인할 수 있다.

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-topic --property print.key=true --property key.separator=,
1,jason
2,mike

consumer group의 사용

--group 옵션으로 consumer group을 지정할 수 있다. consumer group이 존재하지 않는다면 consumer-group을 새로 생성한다. 이 경우 --from-beginning 옵션을 사용하지 않는다면 새로 들어오는 데이터부터 조회된다.

만약 --group 옵션으로 지정되는 consumer group이 이미 존재한다면, 이전에 커밋한 offset 이후의 메세지만 가져오게 된다. 이 경우 --from-beginning 옵션은 무시된다.

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-topic --group my-first-group

Consumer Group 관리

kafka-consumer-console.sh 에서는 consumer group 생성을 해주고, 메세지를 받는 역할을 하지만 consumer group을 관리하지는 못한다. consumer group에 대한 미세한 조회 및 관리는 kafka-consumer-groups.sh 로 할 수 있다.

consumer group 리스트 조회

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
my-first-group

consumer group의 상세 조회

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group my-first-group


GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                    HOST            CLIENT-ID
my-first-group  my-topic        2          3               3               0               consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1     consumer-my-first-group-1
my-first-group  my-topic        1          2               2               0               consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1     consumer-my-first-group-1
my-first-group  my-topic        5          0               0               0               consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1     consumer-my-first-group-1
my-first-group  my-topic        0          4               4               0               consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1     consumer-my-first-group-1
my-first-group  my-topic        4          1               1               0               consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1     consumer-my-first-group-1
my-first-group  my-topic        3          7               7               0               consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1     consumer-my-first-group-1

consumer group의 파티션별 offset, lagging 정도, 접속된 consumer id, consumer host 정보 등 매우 상세한 정보를 확인할 수 있다.

consumer group의 consumer 정보 조회

현재 어떤 consumer가 conumser group에 포함되어있고, 얼마만큼의 파티션을 담당하고 있는지 간단한 정보를 확인할 수 있다.

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group my-first-group --members

GROUP           CONSUMER-ID                                                    HOST            CLIENT-ID                 #PARTITIONS
my-first-group  consumer-my-first-group-1-f3b03fb1-c65c-4741-a52d-30b701a0787e /172.18.0.1     consumer-my-first-group-1 3
my-first-group  consumer-my-first-group-1-3ad52cf3-4e00-4a29-a549-40506bb5f1ed /172.18.0.1     consumer-my-first-group-1 3

consumer group 설정 변경

만약 consumer-group의 topic에 대한 offset을 초기화 시키고 싶다면 다음 명령어를 사용하면 된다. consumer는 모두 inactive 상태여야 실행이 가능하다.

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my-first-group --topic my-topic --to-earliest --reset-offsets --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
my-first-group                 my-topic                       2          0
my-first-group                 my-topic                       1          0
my-first-group                 my-topic                       5          0
my-first-group                 my-topic                       0          0
my-first-group                 my-topic                       4          0
my-first-group                 my-topic                       3          0

consumer-group의 current-offset을 확인해보면 모두 0으로 바뀐것을 알 수 있다.

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group my-first-group

Consumer group 'my-first-group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-first-group  my-topic        2          0               3               3               -               -               -
my-first-group  my-topic        1          0               2               2               -               -               -
my-first-group  my-topic        5          0               0               0               -               -               -
my-first-group  my-topic        0          0               4               4               -               -               -
my-first-group  my-topic        4          0               1               1               -               -               -
my-first-group  my-topic        3          0               7               7               -               -

dry-run 옵션도 제공한다. 이 경우 --execute 는 사용하지 않는다.

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my-first-group --topic my-topic --to-earliest --reset-offsets --dry-run

consumer group의 삭제

consumer 연결이 안된 상태에서 다음 명령어를 실행하면 consumer group이 삭제된다.

./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group my-first-group
Deletion of requested consumer groups ('my-first-group') was successful.
profile
좋아서 시작한 개발 지금은...

0개의 댓글