이번 포스팅의 목표는 아래와 같습니다.
Name | Main Info | 기타 |
---|---|---|
Local OS | macOS Big Sur | - |
JDK | Ver: 1.8.0_282 | AWS EC2 인스턴스 A |
Kafka | IP: 54.180.81.28 PORT: 9092 | AWS EC2 인스턴스 A |
Zookeeper | IP: 54.180.81.28 PORT: 2181 | AWS EC2 인스턴스 A |
AWS EC2 인스턴스 생성하고 접근하기
이전에 작성한 글이 있으니 인스턴스 생성은 위 링크를 참고해주세요.
이번에는 Kafka
작업이니 pem key만 kafka-key로 생성해서 인스턴스 접근하시면 됩니다.
기존에 사용하던 pem key를 쓰고 싶으시면 그걸 쓰셔도 무방합니다.
로컬 환경에서 명령을 실행할 수 있게 아래와 같이 Kafka는 9092 포트와 Zookeeper는 2181 포트를 열어줍니다.
yum install -y java-1.8.0-openjdk-devel.x86_64
명령으로 Java 8을 설치합니다.
java -version
명령으로 설치된 버전을 확인할 수 있습니다.
Kafka 2.5.0
버전을 wget 명령을 사용하여 ec2-user 경로에 다운로드 받습니다.
tar xvf kafka_2.12-2.5.0.tgz
명령을 사용해서 패키지를 풀어줍니다.
Kafka
패키지의 기본 힙 메모리 설정이 Kafka
= 1G, Zookeeper
= 512M으로 설정되어 있어서 프리티어의 메모리 1G에서는 Kafka
와 Zookeeper
를 동시에 실행하면 "Cannot allocate memory"와 같은 문제가 발생합니다.
echo $KAFKA_HEAP_OPTS
export KAFKA_HEAP_OPTS="-Xms400m -Xmx400m"
echo $KAFKA_HEAP_OPTS
이렇게 설정한 환경변수는 터미널을 종료하고 나면 초기화가 된다고 합니다.
아래와 같이 .bashrc
파일을 수정해주면 터미널을 재실행했을 때도, 설정이 유지된다고 합니다.
vi /home/ec2-user/kafka_2.12-2.5.0/config/server.properties
명령으로 server.properties
파일의 내용 중 아래 2개 항목을 수정해줍니다.
advertised.listeners=PLAINTEXT://54.180.81.28:9092
zookeeper.connect=localhost:2181
advertised.listeners는 통신을 위해 열어두는 PROTOCOL, IP, PORT를 지정하고
zookeeper.connect는 Kafka
와 연동할 Zookeeper
의 IP, PORT를 지정합니다.
(여기서는 로컬환경에서 Zookeeper
까지 동작시키므로 localhost로 세팅합니다.)
/home/ec2-user/kafka_2.12-2.5.0/bin/zookeeper-server-start.sh -daemon /home/ec2-user/kafka_2.12-2.5.0/config/zookeeper.properties
명령으로 Zookeeper
를 실행하고,
jps -vm
명령으로 JVM 프로세스를 확인할 수 있습니다.
/home/ec2-user/kafka_2.12-2.5.0/bin/kafka-server-start.sh -daemon /home/ec2-user/kafka_2.12-2.5.0/config/server.properties
명령으로 Kafka
를 실행하고,
jps -m
명령으로 JVM 프로세스를 확인할 수 있습니다.
2-1) Java 8 버전 이상 설치부터
2-2-1) Kafka 2.5.0 패키지 다운로드 및 압축해제까지 Local에서 동일하게 세팅해주도록 합니다.
아래와 같은 명령으로 topic을 생성할 수 있습니다. 저는 쇼핑몰의 '주문'에 대한 데이터를 저장하기 위해 order라는 topic name을 지정하였습니다.
bin/kafka-topics.sh \
--create \
--bootstrap-server 54.180.81.28:9092 \
--topic order
위와 같이 topic 생성할 때, 설정해주지 않은 옵션들에 대해서는 카프카 브로커 설정(/config/server.properties
)에 있는 기본값이 적용됩니다.
bin/kafka-topics.sh --bootstrap-server 54.180.81.28:9092 --list
명령으로 현재 생성된 토픽 목록을 확인할 수 있습니다.
bin/kafka-topics.sh --bootstrap-server 54.180.81.28:9092 \
--topic order \
--describe
과 같은 명령으로 order topic의 내부 정보를 열람할 수도 있습니다.
order topic의 경우 topic 생성시 옵션 지정을 해주지 않아서 파티션이 1개만 생성된 것을 볼 수 있습니다.
bin/kafka-console-producer.sh --bootstrap-server 54.180.81.28:9092 \
--topic order
위와 같은 명령을 실행하면, 유저의 입력을 받을 수 있는 상태가 됩니다.
이때, 아래의 두 문장들을 입력해보겠습니다.
big shirt
blue shoes
이제, 방금전에 입력한 record가 정상적으로 전달되었는지 consume 해보겠습니다.
bin/kafka-console-consumer.sh --bootstrap-server 54.180.81.28:9092 \
--topic order \
--from-beginning
vi delete-topic.json
명령으로 삭제할 topic이 있는 partition을 지정하고, 가장 오래된 record부터 지우고 싶은 record가 있는 offset까지 지정할 수 있습니다.
삭제를 실행하기 전에, 현재 order topic에 몇 개의 record가 있는지 조회해보았더니, 아래 그림처럼 5개의 record가 있는 상태였습니다.
bin/kafka-delete-records.sh --bootstrap-server 54.180.81.28:9092 \
--offset-json-file delete-topic.json
위와 같은 명령으로 토픽 삭제를 명령해보았더니,
Oops... 오류가 발생했습니다.
"The requested offset is not within the range of offsets maintained by the server"
제가 삭제를 요청한 offset의 범위가 현재 서버에 있는 offset의 범위보다 크다는 것 같네요.
delete-topic.json
에서 지정한 offset을 10에서 5로 바꾸어보겠습니다.
그리고 나서 다시 삭제 명령을 실행해보았습니다.
"Records delete operation completed" 삭제가 정상적으로 진행되었음을 확인할 수 있습니다.
마지막으로, 현재 topic에 몇 개의 record가 있는지 조회해보겠습니다.
현재 order topic에 남아있는 record가 없음을 확인 할 수 있었습니다.
이번 포스팅은 최근에 출판된 아파치 카프카 애플리케이션 프로그래밍 with 자바라는 책의 베타리더로 활동하며 카프카에 대한 내용을 공부하면서 작성한 포스팅입니다.
해당 책에는 이 포스팅보다 다양한 옵션에 대해 더욱 상세하고, 실제 운영 노하우 및 카프카의 역할을 보다 잘 이해할 수 있는 프로젝트를 진행해볼 기회를 제공하고 있으니, 책으로 보시는 것도 추천드립니다.
좋은 글 고맙습니다. :D