Topic
1. 토픽 생성
kafka-topics.sh --bootstrap-server localhost:9092 --topic [토픽 이름] --create --partitions [파티션개수] --replication-factor [브로커 개수보다 작거나 같은 개수]
- 주의할 점은 replication-factor는 현재 브로커의 개수보다 크면 안된다
2. 토픽 리스트 확인
kafka-topics.sh --bootstrap-server localhost:9092 --list
3. 토픽 정보 확인
kafka-topics.sh --bootstrap-server localhost:9092 --topic [토픽 이름] --describe
- 이때 Leader와 ISR의 숫자는 브로커 ID 를 의미함
4. 토픽 삭제
kafka-topics.sh --bootstrap-server localhost:9092 --topic [토픽 이름] --delete
- 주의할 점, delete.topic.enable 이란 설정이 true가 default 이기에 삭제가 되지만 이를 false로 설정해두면 삭제를 방지할 수 있다.
Producer
1. 토픽 지정하기
kafka-console.producer.sh --broker-list localhost:9092 --topic [토픽 이름]
- 명령어 맨 뒤에 --producer-property 를 통해 acks 정보를 기입 가능
kafka-console.producer.sh --broker-list localhost:9092 --topic [토픽 이름] --producer-property acks=all
- 프로듀서에서 토픽을 지정할 때 주의할 점은 기존에 없던 토픽을 지정하면 해당 명령을 통해 토픽 자체가 만들어지게 된다
- 기본적으로 partition=1 replicationfactor=1 인 상태 , config/server.properties 에서 num.partitions의 값을 바꿔주면 된다
2. Key, Value
kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic --property parse.key=true --property key.separator=,
> key,value
> another key,another value
Consumer
1.토픽으로부터 메세지 읽어오기
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [토픽 이름]
- 지정한 토픽으로부터 메세지를 읽어온다
- 주의할 점은 이 명령어는 실행된 시점에서부터 새롭게 토픽에 생성된 메세지만 읽어온다
- 토픽에 존재하는 모든 메세지를 가져오기 위해선 --from-beginning 옵션 필수
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [토픽 이름] --from-beginning
2. Key
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --property print.key=true --property key.separator=,
Consumer Group
1. 그룹 생성
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [토픽 이름] --group [그룹 이름]
- 그룹에 속하는 컨슈머가 여러개이면 로드밸런싱을 통해 자동으로 메세지를 분배
2. 그룹 정보 확인
kafka-console-groups.sh --bootstrap-server localhost:9092 --describe --group [그룹 이름]
- 위에 대한 명령어로 아래의 정보들을 얻을 수 있다.
-
- CURRENT-OFFSET : Consumer Group이 Kafka에서 읽은 offset
- LOG-END-OFFSET : 해당 topic, partition의 마지막 offset
- LAG : LOG-END-OFFSET 과 CURRENT-OFFSET의 차이
- LAG의 경우 topic의 partition단위로 읽어야 할 남은 데이터 수를 의미한다
Offsets
오프셋 리셋
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group [그룹 이름] --reset-offsets --to-earlist --execute --topic [토픽 이름]
- 오프셋을 리셋하여 데이터를 다시 읽어드릴 때 사용
- reset-offsets의 옵션으로 --to-earlist , --to-datetime, --from-file, --to-current, --shift-by 등이 있다
- --shift-by -2 를 하면 오프셋이 2칸 전으로 이동한다
- execute의 옵션으로 --all-topics 와 --topic [그룹 이름] 이 있다