Kafka CLI에한 설명과 예제를 정리해나가는 곳이다. 앞으로도 새로운 kafka cli에 대한 새로운 지식을 알게 될 경우 이 곳에 추가해 나갈 예정이다.
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create \
--topic my-topic --partitions 6 --replication-factor 1
min.insync.replicas
와 같은 topic config를 기본값이나 broker의 값을 override 하여 지정하고 싶다면 --config
옵션을 사용한다.
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create \
--topic my-topic --partitions 6 --replication-factor 1 --config min.insync.replicas=1
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic my-topic --describe
결과는 다음과 같다. partition의 갯수, partition의 leader, replica, in-sync replica broker id 정보를 얻을 수 있다.
Topic:my-topic PartitionCount:6 ReplicationFactor:1 Configs:
Topic: my-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 5 Leader: 1 Replicas: 1 Isr: 1
파티션 및 리플리카 수 변경, config 변경등을 할 수 있다. 파티션 수는 현재 수에서 증가만 할 수 있다.
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --alter --topic my-topic --partitions 9
결과를 보면 정상적으로 변경된 것을 볼 수 있다.
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic my-topic
Topic: my-topic TopicId: gfcRYfJ5QW-pubajGhAelw PartitionCount: 9 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 4 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 5 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 6 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 7 Leader: 1 Replicas: 1 Isr: 1
Topic: my-topic Partition: 8 Leader: 1 Replicas: 1 Isr: 1
파티션수 변경은 키를 사용하는 경우 키별로 저장되는 파티션의 위치가 바뀌면서 ordering 등에 문제가 생길 수 있어 주의해야 한다.
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic my-topic
topic 삭제는 broker 설정에서 delete.topic.enable=true
로 되어 있어야 가능하다.
kafka producer, consumer를 cli로 간단히 사용할 수 있게 해주는 cli 이다.
kakfa-console.producer.sh
를 사용하여 메세지를 kafka로 보낼 수 있다.
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-topic
>hi
>this is kafka
--producer-property
를 사용하면 acks
의 레벨 설정도 가능하다.
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-topic acks=all
kafka-console-consumer.sh
를 사용하면 kafka에 저장되어 있는 로그를 받을 수 있다.
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-topic
hi
this is kafka
--from-beginning
을 사용하면 로그를 처음부터 가져올 수 있다.
key를 포함하여 로그를 보내고 싶다면 다음과 같이 실행하면 된다.
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic my-topic --property parse.key=true --property key.separator=,
>1,jason
>2,mike
consumer 쪽에서도 key를 보기 위해서는 --property print.key=true
를 사용하면 확인할 수 있다.
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-topic --property print.key=true --property key.separator=,
1,jason
2,mike
--group
옵션으로 consumer group을 지정할 수 있다. consumer group이 존재하지 않는다면 consumer-group을 새로 생성한다. 이 경우 --from-beginning
옵션을 사용하지 않는다면 새로 들어오는 데이터부터 조회된다.
만약 --group
옵션으로 지정되는 consumer group이 이미 존재한다면, 이전에 커밋한 offset 이후의 메세지만 가져오게 된다. 이 경우 --from-beginning
옵션은 무시된다.
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic my-topic --group my-first-group
kafka-consumer-console.sh
에서는 consumer group 생성을 해주고, 메세지를 받는 역할을 하지만 consumer group을 관리하지는 못한다. consumer group에 대한 미세한 조회 및 관리는 kafka-consumer-groups.sh
로 할 수 있다.
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
my-first-group
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group my-first-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-first-group my-topic 2 3 3 0 consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1 consumer-my-first-group-1
my-first-group my-topic 1 2 2 0 consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1 consumer-my-first-group-1
my-first-group my-topic 5 0 0 0 consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1 consumer-my-first-group-1
my-first-group my-topic 0 4 4 0 consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1 consumer-my-first-group-1
my-first-group my-topic 4 1 1 0 consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1 consumer-my-first-group-1
my-first-group my-topic 3 7 7 0 consumer-my-first-group-1-d581edd4-1fd0-4d53-9c8a-eef0af577cd8 /172.18.0.1 consumer-my-first-group-1
consumer group의 파티션별 offset, lagging 정도, 접속된 consumer id, consumer host 정보 등 매우 상세한 정보를 확인할 수 있다.
현재 어떤 consumer가 conumser group에 포함되어있고, 얼마만큼의 파티션을 담당하고 있는지 간단한 정보를 확인할 수 있다.
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group my-first-group --members
GROUP CONSUMER-ID HOST CLIENT-ID #PARTITIONS
my-first-group consumer-my-first-group-1-f3b03fb1-c65c-4741-a52d-30b701a0787e /172.18.0.1 consumer-my-first-group-1 3
my-first-group consumer-my-first-group-1-3ad52cf3-4e00-4a29-a549-40506bb5f1ed /172.18.0.1 consumer-my-first-group-1 3
만약 consumer-group의 topic에 대한 offset을 초기화 시키고 싶다면 다음 명령어를 사용하면 된다. consumer는 모두 inactive 상태여야 실행이 가능하다.
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my-first-group --topic my-topic --to-earliest --reset-offsets --execute
GROUP TOPIC PARTITION NEW-OFFSET
my-first-group my-topic 2 0
my-first-group my-topic 1 0
my-first-group my-topic 5 0
my-first-group my-topic 0 0
my-first-group my-topic 4 0
my-first-group my-topic 3 0
consumer-group의 current-offset을 확인해보면 모두 0으로 바뀐것을 알 수 있다.
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group my-first-group
Consumer group 'my-first-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-first-group my-topic 2 0 3 3 - - -
my-first-group my-topic 1 0 2 2 - - -
my-first-group my-topic 5 0 0 0 - - -
my-first-group my-topic 0 0 4 4 - - -
my-first-group my-topic 4 0 1 1 - - -
my-first-group my-topic 3 0 7 7 - -
dry-run
옵션도 제공한다. 이 경우 --execute
는 사용하지 않는다.
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my-first-group --topic my-topic --to-earliest --reset-offsets --dry-run
consumer 연결이 안된 상태에서 다음 명령어를 실행하면 consumer group이 삭제된다.
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group my-first-group
Deletion of requested consumer groups ('my-first-group') was successful.