Kafka [1] 설치, 실행, CLI

최준호·2022년 4월 1일
0

kafka

목록 보기
1/4
post-thumbnail

출처 SK T아카데미 토크ON 77차 아파치 카프카 입문 유튜브 영상

명령어 및 소스 명령어 깃 저장소

📘Kafka 간단 용어

Broker 카프카 애플리케이션 서버 단위
Topic 데이터 분리 단위, 3개 이상이 파티션 보유
Partition 레코드를 담고 있음, 컨슈머 요청시 레코드 전달
Offset 각 레코드당 파티션에 할당된 고유 번호
Consumer 레코드를 polling하는 Client 애플리케이션
Conumser Group 다수 컨슈머 그룹
Consumer Offset 특정 컨슈머가 가져간 Offset
Producer 레코드를 브로커로 전송하는 애플리케이션
Replication 파티션 복제 기능
ISR 리더 + 팔로워 파티션의 Sync(정보가 일치됨)가 된 묶음
Rack-awareness Server Rack 이슈에 대응

📘 Kafka 설치, 실행, CLI

aws에서 ec2를 생성하여 진행 ubuntu 20.04

sudo apt-get update && sudo apt-get upgrade

apt 업데이트 시켜주고

apt-get install openjdk-11-jdk -y

java를 먼저 설치
(Kafka는 java 기반 JVM으로 돌아간다!)

https://downloads.apache.org/kafka/
접근하여 버전을 확인하여 자신이 설치하고자 하는 버전으로 설치해준다.

wget https://downloads.apache.org/kafka/2.8.1/kafka_2.13-2.8.1.tgz

나의 경우 해당 버전으로 설치했다.

정상 설치되었고

tar xvf kafka_2.13-2.8.1.tgz

압축 풀어준다.

나는 폴더명을 변경하여 kafka로 이동했고 해당 폴더 안에는 다음과 같은 구성을 확인할 수 있으면 정상적인 파일을 다운받았다는 것이다.

export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"

kafka는 최소 사양이 ram 1Gb 이상 사용되어져야 켜진다. 그래서 기본 kafka 가상 메모리 사용 제한을 400mb로 변경하여 실행시키려고 환경변수를 설정해주자.

echo $KAFKA_HEAP_OPTS


명령어로 정상 등록처리 되었는지 확인해보자.

/config/server.config

listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://{ec2 ip}:9092

listeners 내부 포트
advertised.listeners 외부로 접근하는 ip와 포트


주석처리되어 있는 해당 부분을 주석을 풀어주고 변경해주자.

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

명령어로 zookeaper를 먼저 실행해주자.

jps

zookeaper java main이 실행되어 있는 것을 확인할 수 있다.

bin/kafka-server-start.sh -daemon config/server.properties

kafka를 실행하자

모두 정상 실행되었고

tail -f logs/*

명령어로 로그를 확인해보면

카프카가 미쳐 날뛰고 있었다. 그 이유는 포트를 안열어놔서 그렇다


포트를 열어주고 다시 확인해보자

kill -9 {프로세스 ID}

jps로 확인한 프로세스 id를 입력하여 zookeaper, kafka를 종료시켜주고 다시 실행하자

아까와는 다르게 이쁘게 실행되었다.

여기까지 kafka를 설치하여 실행까지 진행하여 kafka 서버를 구축해보았다.
또한 우리는 400mb로 진행하지만 실제 서버에서 무리 없이 kafka 서버를 구현하고자 한다면 1Gb보다 6Gb로 설정하는 링크드인에서 테스트한 최적의 옵션 값이 있다고 한다. 필요하다면 해당 설정으로 진행하면 된다. 나처럼 테스트나 실습용도면 이정도도 충분하다.

✅config/server.properties

broker.id 브로커 번호. 클러스터 내 고유번호로 지정
listeners kafka 통신에 사용되는 host:port
advertiesd.listeners kafka client가 접속할 host:port
log.dirs 메세지를 저장할 디렉토리
log.segment.bytes 메세지 저장 파일의 제한 크기
log.retention.ms 메세지 파일 보존 기간
zookeeper.connect 주키퍼 위치
auto.create.topics.enable 자동 토픽 생성 여부
num.partitions 자동생성된 토픽의 default 파티션 개수
message.max.bytes kafka broker에 쓰려는 메세지 최대 크기

📕Kafka Client 서버 만들기

위에서는 aws에 kafka server를 생성하여 실행해두었다면 이제 다른 서버에서 cli를 만들어서 테스트하려고 한다. 나는 리눅스 컴퓨터가 아닌 윈도우에서 wsl을 사용하여 ubuntu로 실행하였다. 리눅스용 컴퓨터나 맥을 사용하시는 분들은 그대로 사용해도 된다.

카프카는 linux와 window 설정이나 실행 방법이 달라서 이렇게 진행했다!

기본적으로 apt와 apt-get을 모두 업데이트 해준 뒤 java까지 설치 후 진행하면 된다!

curl https://archive.apache.org/dist/kafka/2.5.0/kafka_2.13-2.5.0.tgz --output kafka.tgz 

명령어로 카프카를 설치한다.

tar -xvf kafka.tgz

압축을 풀어주고

cd kafka/bin

폴더로 이동해서

./kafka-topics.sh --create --bootstrap-server {aws ec2 public ip}:9092 --replication-factor 1 --partitions 3 --topic test

명령어로 test라는 토픽을 만드는데 우리는 브로커가 1개의 서버만 기동하고 있기 때문에 replication 또한 1개만 지정한다. 그리고 파티션은 3개로 토픽을 생성해보자

정상적으로 실행되면

Created topic test

가 나온다.

카프카 서버에도 메세지가 나온다.

./kafka-console-producer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test

명령어를 입력하면

꺽쇠가 나오며 메세지를 입력할 수 있는 상태가 된다.

나는 다음과 같이 입력해보았고

새로운 리눅스 연결을 한 뒤 (3번째 창)

cd /kafka/bin
./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test --from-beginning

명령어를 입력해주면

다음과 같이 메세지를 최초 입력했던 데이터부터 가져오게 된다.
--from-beginning 옵션을 사용해서 최초 데이터부터 가져올 수 있는 것이다.

./kafka-console-consumer.sh --bootstrap-server {aws ec2 public ip}:9092 --topic test -group testgroup --from-beginning

해당 명령어로 컨슈머를 그룹지어서 실행할 수 있는데 여기서 중요한 점은 그룹되어 있는 컨슈머는 다른 컨슈머(1)에서 해당 메세지를 처리했다면 현재 컨슈머(2)에서는 메세지가 노출되지 않는다. 그래서 앞에서 컨슈머를 실행시켜서 모든 메세지를 입력 받았다면 메세지가 노출되지 않지만 컨슈머를 끈채로 메세지를 입력하고 컨슈머를 실행시키면 다른 컨슈머(1)에서 처리하지 않았기 때문에 현재 컨슈머(2)에 그동안 처리되지 않았던 메세지가 노출되어 진다.

그럼 먼저 컨슈머 실행을 종료 시키고
producer에서 abcd를 순서대로 입력한 뒤 컨슈머 그룹을 실행시켜보자!

모든 메세지는 위의 명령어로 노출된 메세지와 같지만
마지막 명령어를 확인해보면 abcd만 노출된 것을 확인할 수 있다.

여기서 위의 명령어에 순서가 이상한 이유는 우리가 topic을 생성할 때 파티션을 3개로 나눠놨기 때문이다. 파티션 3개의 각각 메세지가 저장되어 지는데 해당 메세지를 가져가는 순서는 입력된 순서와는 상관없이 각 파티션에서 꺼내오는 것이므로 순서는 달라질 수 있다. 순서를 동일하게 하고 싶다면 파티션을 나누지 않고 생성하면 된다.

./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --list

컨슈머 그룹을 확인할 수 있고

./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --describe

컨슈머 그룹의 상태를 확인할 수 있다.

GROUP 토픽 그룹 명
TOPIC 토픽 명
PARTITION 파티션 번호
CURRENT-OFFSET 현재 오프셋 번호 (저장된 개수라고 생각하면 편함)
LOG-END-OFFSET 메세지로 전송된 오프셋 번호
LAG 전송되지 못한 오프셋의 개수

의도치 않게 위의 사진들에 ip가 노출되어 사진을 모두 지웠다... 명령어 입력하면 모두 정상적으로 노출되니 잘 따라해보자!

./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute

또한 컨슈머의 offset 값을 reset할 수 있는 명령어가 있는데 위의 명령어를 실행하면

다음과 같이 현재 컨슈머 그룹의 가장 낮은 번호로 리셋하도록 하는 명령어이다.

./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --topic test:1 --reset-offsets --to-offset 2 --execute

파티션 번호를 지정하여 offset 번호까지 지정할 수 있기도 하다.

👊LAG을 이해해보자!

위의 설명중 LAG의 개념이 있는데 전송되지 못한 오프셋의 개수라고 설명해놨다. (나의 개인적인 의견)

예를 들어서 컨슈머로써 실행되고 있는 서버가 어떤 이유로 서버가 종료되거나 kafka 컨슈머 서버가 죽어 있다고 했을 때 producer에서 메세지를 만들어도 전송되지 못할 것이다.

실제로 모든 컨슈머 서버를 끄고 efgh라는 메세지를 추가로 입력해보았다.

그리고

./kafka-consumer-groups.sh --bootstrap-server {aws ec2 public ip}:9092 --group testgroup --describe

명령어를 사용하여 컨슈머의 상태를 확인해보면

4개의 메세지를 전달하지 못해 4개의 LAG이 발생한 것을 확인할 수 있다.

profile
해당 주소로 이전하였습니다. 감사합니다. https://ililil9482.tistory.com

0개의 댓글