[Kafka] 카프카 커맨드 라인 툴

popolarburr·2023년 8월 11일
0
post-thumbnail

카프카에서 제공하는 카프카 커맨드 라인 툴들은 카프카를 운영할 때 가장 많이 접하는 도구다.
커맨드 라인 툴을 통해 카프카 브로커 운영데 필요한 다양한 명령을 내릴 수 있다.
카프카 클라이언트 애플리케이션을 운영할 때는 크프카 클러스터와 연동하여 데이터를 주고받는 것도 중요하지만 토픽이나 파티션 개수 변경과 같은 명령을 실행해야 하는 경우도 자주 발생한다.

how to command?

  1. 카프카 브로커가 설치된 인스턴스에 ssh로 원격 접속하여 명령을 직접 실행
  2. 브로커에 9092(카프카 기본 설정 포트)로 접근 가능한 컴퓨터에서 명령어 실행

나는 로컬 컴퓨터에서 카프카 커맨드 라인 툴 명령을 AWS EC2 인스턴스에 설치된 카프카 브로커로 요청하는 실습을 진행할 예정
또한 실습용 카프카를 설치하고 관련 명령어를 실행하여 토픽을 생성/수정하고 데이터를 전송(프로듀서)하고 받는(컨슈머) 실습을 진행할 예정.


kafka-topics.sh

이 커맨드 라인 툴을 통해 토픽(topic)과 관련된 명령을 실행할 수 있다.

토픽이란 카프카에서 데이터를 구분하는 가장 기본적인 개념이다. 마치 RDBMS에서 사용하는 테이블과 유사하다고 볼 수 있다.

카프카 클러스터에 토픽은 여러개 존재할 수 있다. 토픽에는 파티션이 존재하는데, 파티션의 개수는 최소 1개부터 시작한다.
파티션을 통해 한 번에 처리할 수 있는 데이터양을 늘릴 수 있고 토픽 내부에서도 파티션을 통해 데이터의 종류를 나누어 처리할 수 있기 때문.

토픽 생성

kafka-topics.sh를 통해 토픽 관련 명령을 실행할 수 있다. --create 옵션을 사용하여 hello.kafka라는 이름을 가진 토픽을 생성하 수 있다.

로컬
$ bin/kafka-topics.sh |
--create | -- 1)
-- bootstrap-server my-kafka:9092 | -- 2)
--topic hello.kafka | -- 3)

1) --create 옵션으로 토픽을 생성하는 명령어라는 것을 명시.
2) --bootstrap-server에는 토픽을 생성할 카프카 클러스터를 구성하는 브로커들의 IP와 port번호를 적는다. 여기서는 1개의 카프카 브로커와 통신하므로 my-kafka:9092만 입력
3) --topic에서 토픽 이름을 작성한다. 토픽이름은 내부 데이터가 무엇이 있는지 유추가 가능할 정도로 자세히 적는 것을 추천


앞서 작성한 방식처럼 간단하게 토픽을 만들 수도 있지만 조금더 자세하게 커스터마이징하여 토픽을 만들 수 있다.

로컬
$ bin/kafka-topics.sh |
--create |
--bootstrap-server my-kafka:9092 |
--partitions 3 | -- 1)
--replication-factor 1 | -- 2)
--config retention.ms=172800000 | -- 3)
--topic hello.kafka.2

1) --partitions는 파티션 개수를 지정. 파티션 최소 1개. 옵션 사용을 안할 시 , 카프카 브로커 설정파일(config/server.properties)에 있는 num.partitions 옵션값 따라 생성
2) --replication-factor에는 토픽의 파티션을 복제할 복제 개수를 적는다. 1은 복제를 하지 않고 사용한다는 의미. 2이면 1개의 복제본을 사용하겠다는 의미. 파티션의 데이터는 각 브로커마다 저장. 한 개의 브로커에 장애가 발생하더라도 나머지 한 개 브로커에 저장된 데이터를 사용하여 안전하게 데이터 처리를 지속 가능. 복제 개수의 최소 설정은 1이고, 최대 설정은 통신하는 카프카 클러스터의 브로커 개수이다. 실제 현업에서는 3개 이상의 카프카 브로커로 운영하는 것이 일반적.
3) --config를 통해 kafka-topics.sh 명령에 포함되지 않은 추가적인 설정을 할 수 있다. retention.ms 는 토픽의 데이터를 유지하는 기간을 뜻. 172800000ms 는 2일을 ms단위로 나타낸 것.


❗ 토픽 생성시 --zookeeper가 아닌 --bootstrap-server 옵션을 사용한 이유
카프카 버전 2.1을 포함한 이전 버전은 주키퍼와 직접 통신함. 그러나 이 방식은 명령을 처리하는 아키텍처의 복잡도를 높였고, 2.2버전 이후로는 주키퍼와 통신하는 대신 카프카를 통해 토픽과 고나련된 명령을 실행할 수 있게 되어 bootstrap-server 옵션을 사용.


토픽 리스트 조회

로컬
$ bin/kafka-topics.sh --bootstrap.server my-kafka:9092 --list

hello.kafka
hello.kafka.2


토픽 상세 조회

로컬
$ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --describe --topic hello.kafka.2

이미 생성된 토픽의 상태를 --describe 옵션을 사용하여 확인 가능.
파티션을 보면 0~2로 3개 존재하고, 3개의 파티션 모두 Leader가 0으로 표시되어 있는데, 0번 브로커에 위치하고 있음을 의미. 여러 대의 브로커로 카프카 클러스터를 운영할 때 토픽의 리더 파티션이 일부 브로커에 몰려있을 수 있는데, 이를 확인하기 위해 --describe 옵션을 사용할 수 있다.

리더 파티션이 일부 브로커에 몰려있는 경우가 종종 발생하는데, 이는 부하의 분산이 제대로 되지 못하고 데이터 통신 쏠림 현상으로 인해 네트워크 대역의 이슈가 생길 수가 있다.

그럴 때 이 명령으로 확인하여 해결하는 것이 중요


토픽 옵션 수정

토픽에 설정된 옵션을 번경하기 위해서는 kafka-topics.sh 또는 kafka-configs.sh 두 개를 사용해야 한다. 파티션 개수 변경을 하려면 kafka-topics.sh를 사용해야 하고 토픽 삭제 정책인 리텐션 기간을 변경하려면 kafka-configs.sh를 사용해야한다.

옵션 수정 전

로컬
bin/kafka-topics.sh --bootstrap-server my-kafka:9092 |
--topic hello.kafka |
--alter |
--partitions 4

변경내용 : 파티션을 1(디폴트)에서 4로 늘림 , 위에서 파티션 개수를 설정안하면 디폴트값인 최솟값 1개로 설정됨.


❗옵션을 수정하기 위해서는 토픽을 먼저 정한다. --topic {토픽이름}
❗그 다음 --alter 옵션을 부여하여 수정 상태를 활성화시킴.
❗그 뒤로는 변경하고자 하는 옵션값들을 나열한다.

  • --partitions 4
  • --add config retention.ms=86400000


kafka-console-producer.sh

이전까지는 카프카의 토픽 생성 및 수정 하는 방법이였다.
이제는 생성된 hello.kafka와 같은 토픽에 데이터를 넣을 수 있는 kafka-console-producer.sh 명령어를 실행해보자. 토픽에 넣는 데이터는 레코드(record)라고 부르며 메시지 키(key)와 메시지 값(value)으로 이루어져 있다.

❗메시지는 키는 자바의 null로 기본 설정되어 전송!
❗먼저 키가 없는 레코드만 전송하는 방식 !

로컬
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
>hello
>kafka
>0
>1
>2
>3
>4
>5
>6

키보드로 문자를 작성하고 엔터키를 누르면 별다른 응닶없이 메세지 값이 전송된다.
❗ 여기서 주의할 점은 kafka-console-producer.sh로 전송되는 레코드 값은 UTF-8을 기반으로 Byte로 변환되고 ByteArraySerializer로만 직렬화 된다는 점이다. 즉, String이 아닌 타입으로는 직렬화하여 전송할 수 없다. 그러므로 텍스트 목적으로 문자열만 전송할 수 있고, 다른 타입으로 직렬화하여 데이터를 브로커로 전송하고 싶다면 카프카 프로듀서 애플리케이션을 직접 개발해야한다.

아래 그림에 나와있는 Partitioner, Buffer, Sender는 아직 언급 안함 !


[사진출처] : https://freedeveloper.tistory.com/397


❗그리고 키를 가지는 레코드 전송하는 방법
이를 위해서는 몇 가지 추가 옵션을 작성해야한다.

로컬
$ bin/kafka-console-producer.sh --boostrap-server my-kafka:9092 \
--topic hello.kafka \
--property "parse.key=true" \                                   --1)
--property "key.separator=:"                                    --2)
>key1:no1
>key2:no2
>key3:no3

--1) parse.key를 true로 두면 레코드를 전송할 때 메세지 키를 추가할 수 있다.
--2) 메세지 키와 메시지 값을 구분하는 구분자를 선언한다. key.separator를 선언하지 않으면 기본 설정은 Tab delimiter(\t)이다. 그러므로 key.separaotr를 선언하지 않고 메세지를 보내려면 메세지 키를 작성하고 키를 누른 뒤 메세지 값을 작성하고 엔터를 누른다.

위의 예제는 명시적으로 확인하기 위해 콜론(:)으로 구분자 선언)
❗만약 key.separaotr로 사용하는 구분자를 넣지 않고 엔터를 누르면 KafkaException과 함께 종료된다.

메시지 키와 값을 함께 전송한 레코드는 토픽의 파티션에 저장된다.


참고
메세지 키가 null인 경우엔 프로듀서가 파티션으로 전송할 때 레코드 배치 단위(레코드 전송 묶음)로 라운드 로빈으로 전송한다.
메시지 키가 존재하는 경우(not null)에는 키의 해시값을 작성하여 존재하는 파티션 중 한 개에 할당된다. 이로 인해 메시지 키가 동일한 경우에는 동일한 파티션으로 전송된다.

다만, 이런 메시지 키와 파티션 할당은 프로듀서에서 설정된 파티셔너에 의해 결정되는데, 기본 파티셔너의 경우 이와 같은 동작을 보장한다. 커스텀 파티셔너를 사용할 경우에는 메시지 키에 따른 파티션 할당이 다르게 동작할 수도 있으니 참고!


kafka-console-consumer.sh

hello.kafka 토픽으로 전송한 데이터는 kafka-console-consumer.sh 명령어로 확인할 수 있다. 이때 필수옵션으로 --bootstrap-server에 카프카 클러스터 정보, --topic에 토픽이름이 필요하다. 추가로 --from-beginning 옵션을 주면 토픽에 저장된 가장 처음 데이터부터 출력한다.

로컬
$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--from-beginning


kafka-console-producer.sh로 보낸 메세지 값 출력됨을 확인가능. 만약 데이터의 메시지 키와 값을 확인하고 싶다면 --property 옵션을 사용하면 된다.

로컬
$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic hello.kafka \
--property print.key=true \
--property key.separator="-" \
--group hello-group \
--from-beginning

  • property print.key=true -> 메시지키 확인용. 설정하지 않으면 디폴트 false이기에 키 값 확인 불가
  • 메시지 키 값을 구분하기 위해 key.separator를 설정. 설정하지 않으면 디폴트 tab.
  • --group 옵션을 통해 신규 컨슈머 그룹 생성. 컨슈머 그룹은 1개 이상의 컨슈머로 이루어져 있다. 이 컨슈머 그룹을 통해 가져간 토픽의 메시지는 가져간 메시지에 대해 커밋한다.

여기서 말하는 커밋이란, 컨슈머가 특정 레코드까지 처리를 완료했다고 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것이다. 커밋 정보는 __consumer_offsets이름의 내부 토픽에 저장된다.

[사진출처] https://jhleed.tistory.com/179

kafka-consumer-groups.sh

hello-group 이름의 컨슈머 그룹으로 생성된 컨슈머로 hello.kafka 토픽의 데이터를 가져갔다. 컨슈머 그룹은 따로 생성하는 명령을 나리지 않고 컨슈머를 동작할 때 컨슈머 그룹 이름을 지정하면 새로 생성된다. 생성된 컨슈머 그룹의 리스트는 kafka-consumer-groups.sh 명령어 확인 가능.

로컬
$ bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --list

또한 컨슈머 그룹의 상세 정보를 확인하는 명령어이다. 이를 통해 중복되지 않는지 확인하거나 운영하고 있는 컨슈머 랙이 얼마인지 확인하여 컨슈머 상태를 최적화하는데 사용.

로컬
$ bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \
--group hello-group \
--describe


kafka-verifiable-producer, consumer.sh

kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 String 타입 메시지 값을 코드없이 주고받을 수 있다. 카프카 클러스터가 설치가 완료된 이후에 토픽에 데이터를 전송하여 간단한 네트워크통신 테스트를 할 때 유용하다.

로컬
$ bin/kafka-verifiable-producer.sh --bootstrap-server my-kafka:9092 \
--max-message 10 \
--topic verify-test

전송한 데이터는 kafka-verifiable-consumer.sh로 확인가능.


로컬
$ bin/kafka-verifiable-consumer.sh --bootstrap-server my-kafka:9092 \
--topic verify-test \
--group-id test-group


kafka-delete-records.sh

이미 적재된 토픽의 데이터를 지우는 방법으로 kafka-delete-records.sh를 사용할 수 있다. kafka-delete-records.sh는 이미 적재된 토픽의 데이터 중 가장 오래된 데이터(가장 낮은 숫자의 오프셋)부터 특정 시점의 오프셋까지 삭제가능.

로컬
$ vi delete-topic.json

  1. 우선 삭제하고자 하는 데이터에 대한 정보를 파일로 저장해서 사용해야함. 예시로 delete-topic.json이라는 파일로 생성.
    해당 파일에는 삭제하고자 하는 토픽, 파티션, 오프셋 정보가 들어가야함.

위와 같이 제공된 예시로 작성 후 저장해줌.
그 다음 아래와 같은 명령어 사용

로컬
$ bin/kafka-delete-records.sh --bootstrap-server my-kafka:9092 \
--offset-json-file delete-topic.json

profile
차곡차곡

1개의 댓글

comment-user-thumbnail
2023년 8월 11일

글 잘 봤습니다.

답글 달기