kafka-console-producer, kafka-console-consumer을 통하여 직접 토픽에 데이터를 넣고 전달하는 것으로 테스트를 할 수 도 있지만, 단순히 명령어 만으로 테스트를 할 수 있는 것이 있다.
해당 명령어는 String 타입의 메시지 값을 코드 없이 주고 받을 수 있는 테스트 명령이다.
카프카 클러스터 설치가 완료된 이후 토픽에 데이터를 전송하여 간단 네트워크 통신 테스트를 할 때 유용하다.
bin/kafka-verifiable-producer.sh --bootstrap-server {aws IP} \
> --max-message 10 \
> --topic verify-test
{"timestamp":1662187111749,"name":"startup_complete"}
[2022-09-03 15:38:32,306] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {verify-test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
{"timestamp":1662187113034,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"verify-test","partition":0}
{"timestamp":1662187113040,"name":"producer_send_success","key":null,"value":"1","offset":1,"topic":"verify-test","partition":0}
{"timestamp":1662187113040,"name":"producer_send_success","key":null,"value":"2","offset":2,"topic":"verify-test","partition":0}
{"timestamp":1662187113040,"name":"producer_send_success","key":null,"value":"3","offset":3,"topic":"verify-test","partition":0}
{"timestamp":1662187113041,"name":"producer_send_success","key":null,"value":"4","offset":4,"topic":"verify-test","partition":0}
{"timestamp":1662187113041,"name":"producer_send_success","key":null,"value":"5","offset":5,"topic":"verify-test","partition":0}
{"timestamp":1662187113041,"name":"producer_send_success","key":null,"value":"6","offset":6,"topic":"verify-test","partition":0}
{"timestamp":1662187113043,"name":"producer_send_success","key":null,"value":"7","offset":7,"topic":"verify-test","partition":0}
{"timestamp":1662187113043,"name":"producer_send_success","key":null,"value":"8","offset":8,"topic":"verify-test","partition":0}
{"timestamp":1662187113043,"name":"producer_send_success","key":null,"value":"9","offset":9,"topic":"verify-test","partition":0}
{"timestamp":1662187113059,"name":"shutdown_complete"}
{"timestamp":1662187113061,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":7.616146230007616}
상단의 옵션중 --max-message는 해당 명령으로 보내는 데이터 개수를 의미한다.
이 옵션을 -1로 지정을 하면, 종료가 될 때 까지 계속 데이터 토픽을 전송하는 것을 의미한다.
--topic으로 대상 토픽을 지정한다.
최초로 데이터를 전달할 때는 start_complete이 출력된다.
메시지 별 보낸 시간, 상태, 메시지 키, 메시지 값, 저장된 오프셋 번호, 토픽, 저장된 파티션이 출력된다.
메시지 전송이 완료되면 통계값이 출력되며 평균 처리량을 확인할 수 있다.
전송한 데이터는 consumer을 통해 확인 할 수 있다.
컨슈머 그룹은 리스트를 뽑아 확인 할 수 있다.
bin/kafka-consumer-groups.sh --bootstrap-server 54.183.147.139:9092 --list
test-group
hello-group
새로 추가된 test-group이 해당 verifiable 명령을 통해 생성된 그룹이다.
bin/kafka-verifiable-consumer.sh --bootstrap-server 54.183.147.139:9092 \
> --topic verify-test \
> --group-id test-group
{"timestamp":1662187178995,"name":"startup_complete"}
{"timestamp":1662187180252,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1662187180939,"name":"records_consumed","count":10,"partitions":[{"topic":"verify-test","partition":0,"count":10,"minOffset":0,"maxOffset":9}]}
{"timestamp":1662187181075,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":10}],"success":true}
^C{"timestamp":1662187748992,"name":"partitions_revoked","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1662187749150,"name":"shutdown_complete"}
--topic 옵션에 토픽 이름과
--group-id에 list에서 찾은 그룹 이름을 넣어주면 결과가 나온다.
컨슈머가 시작되면 startup_complete 문자열과 시작시간이 출력된다.
토픽의 데이터를 지우는 방법은 kafka-delete-recodes.sh 명령을 사용하면 된다.
해당 명령어를 사용하면, 적재된 토픽의 데이터 중 가장 오래된 데이터부터 특정 시점의 오프셋까지 삭제 할 수 있다.
해당 명령을 사용하기 위해서는 JSON 파일을 하나 생성해줘야한다.
{"partitions" : [{"topic": "verify-test", "partition": 0, "offset": 3}], "version":1}
파일 내부에는 삭제하고자 하는 토픽, 파티션 정보, 오프셋 범위가 들어가게 된다.
bin/kafka-delete-records.sh --bootstrap-server {aws IP} \
> --offset-json-file delete-topic.json
Executing records delete operation
Records delete operation completed:
partition: verify-test-0 low_watermark: 3
여기서 주의할 점은 오프셋으로 가장 오래된 순서 부터 지정된 범위까지 데이터를 지울 수는 있지만,
특정 데이터를 삭제하는 것은 불가능 하다는 것이다.