오늘은 Kafka의 간단한 개념과 Kafka 사용 시에 제가 자주 사용했던 명령어를 정리해보겠습니다.
프로듀서(Producer)
메시지를 생산하여 토픽으로 메시지를 전달
컨슈머(Consumer)
브로커의 토픽으로부터 저장된 메시지를 전달 받음
브로커(Broker)
Kafka에 설치되어 있는 서버 (홀수로 생성할 것!)
주키퍼(Zookeeper)
토픽과 파티션을 관리하고 브로커 모니터링
아파치 카프카는 아파치 공식 홈페이지 kafka 설치에서 원하는 버전을 다운로드 하면 설치가 가능합니다. 간혹 Windows 환경에서 Kafka를 설치하면 Windows의 파일 시스템과 잘 연동이 되지 않음으로 버전을 다운그레이드하는 것을 추천드립니다.
kafka 실행 전에는 반드시 broker를 관리해주는 Zookeeper를 먼저 실행 후 Kafka를 실행해야 합니다.
D:\kafka_2.12-2.8.1>
bin\windows\zookeeper-server-start.bat \config\zookeeper.properties
D:\kafka_2.12-2.8.1>
bin\windows\kafka-server-start.bat config\server.properties
Zookeeper의 기본 포트는 2181이고 Kafka의 기본 포트는 9092 입니다. 실행을 확인하고 싶다면 터미널을 통해 해당 port가 LISTENING 상태인지 확인해줍니다.
# 실행중인 port 찾기
netstat -a
# 실행중인 port 표시, 프로세스id(pid) 표시
netstat -a -o
# kill port
taskkill /f /pid [PID]
다음으로는 자주 사용하는 Kafka 명령어를 정리해보겠습니다. 각 명령어마다 더 세분화된 옵션이 있지만 지금은 일단 자주 사용하는 명령어만 정리해보겠습니다.
(Linux에서 kafka를 설치한 경우 .sh 파일을 실행하고, Windows에서 Kafka를 설치했다면 .bat 파일을 실행하면 됩니다)
#topic 생성
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic [토픽 이름]
#topic 상세보기 (토픽 이름, 파티션 수, 리플리케이션 팩터 수)
bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic [토픽 이름]
#topic list 확인
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
#topic groups 확인
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
#producer (토픽에 데이터 넣기)
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic [토픽 이름]
#consumer (토픽 내 데이터 확인)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [토픽 이름]
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic [토픽 이름] --from-beginning
#offset 확인
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group [group 이름] --offsets --describe
#offset 상태 변경(latest, earliest 옵션)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group [group 이름] --reset-offsets --to-latest --topic [토픽 이름] --execute
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group [group 이름] --reset-offsets --to-earliest--topic [토픽 이름] --execute
#kakfa lag 확인
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group [group 이름] --describe
#kafka topic 삭제
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic [토픽 이름]
#kafka retention 시간 변경 (retention.ms=3600000)
bin/kafka-configs.sh —zookeeper localhost:2181 —entity-type topics —entity-name [토픽 이름] —alter —add-config retention.ms=3600000
#kafka retention bytes 지정 (bytes=104857600)
kafka-topics --zookeeper localhost:2181 --alter --topic [토픽 이름] --config retention.bytes=104857600