카프카 2장 - 커맨드라인 툴

600g (Kim Dong Geun)·2021년 5월 30일
2

아파치 카프카 어플리케이션 프로그래밍 (저자 최원영님) 책을 보고 공부한 내용입니다 :) 😀

카프카 커맨드 라인 툴

카프카를 설치하면 다양한 명령어가 담긴 커맨트라인 툴을 이용할 수 있는데, 그 종류가 매우 많다.

실제로 카프카의 커맨드 라인툴을 이용해서 데이터를 producer 하는 경우는 많지는 않지만, 실무에서 클러스터를 운영할 때 자주 쓰인다고 한다.

또한, 커맨드 라인 툴을 통해 토픽 관련 명령을 실행할 때 필수옵션선택옵션이 있다.

선택 옵션은 지정하지 않을시 브로커에 설정된 기본값 또는 커맨드라인 툴의 기본값으로 대체되어 설정된다.

어떤 커맨드라인 툴을 지원하는지 그럼 알아보자.

kafka-topics.sh

이 커맨드 라인 툴을 통해 토픽과 관련된 명령을 실행할 수 있다. 토픽이란 카프카에서 데이터를 구분하는 가장 기본적인 개념이다. 마치 RDBMS의 테이블과 유사하다고 할 수 있다. 토픽에는 여러 파티션이 존재하는데, 파티션의 개수는 최소 1개부터 시작한다.

  • 파티션은 카프카에서 토픽을 구성하는데 아주 중요한 요소이다.
  • 파티션을 통해 한 번에 처리할 수 있는 데이터양을 늘릴 수 있고, 토픽 내부에서도 파티션을 통해 데이터의 종류를 나누어 처리할 수 있기 때문이다.

아직 파티션에 대한 자세한 설명은 하지 않은걸로 알고있는데, 뒤에 읽다보면 더 나오지 않을까?

토픽을 생성하는 방법

토픽을 생성하는 방법은 크게 2가지가 있다.

  1. 카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 데이터를 요청할때
  2. 커맨드 라인툴로 명시적으로 생성하는 것/

토픽을 효과적으로 유지보수하기 위해서는 토픽을 명시적으로 생성하는 것을 추천한다고 한다. 왜냐하면 토픽마다 처리되어야 하는 데이터의 특성이 다르기 때문이다.

토픽생성

kafka-topcis.sh --create --bootstrap-server localhost:9092 --topic hello.kakfa
  • --create : 토픽을 생성하는 명령어라는 것을 명시한다.
  • --bootstrap-server : 토픽을 생성할 카프카 클러스터를 구성하는 브로커들의 IP와 port를 적는다.
  • --topic : 토픽이름을 작성한다. 토픽 이름은 내부 데이터가 무엇이 있는지 유추가 가능할 정도로 자세히 적는 것을 추천한다.
kafka-topics.sh --create --bootstrap-server localhost:9092 \
	--partition 3 \
	--replication-factor 1 \
	--config retention.ms=172800000
	--topic hello.kafka.2
  • --partitions는 파티션 개수를 지정할 수 있다. 파티션 최소개수는 1개이다. 만약 이 옵션을 사용하지 않는다면 config/server.properties에 있는 num.partitions 옵션값에 따라 생성된다.
  • --replication-factor에는 토픽의 파티션을 복제할 복제 개수를 적는다. 1은 복제를 하지 않고 사용한다는 의미다. 2이면 1개의 복제본을 사용하겠다는 의미이다. 한 개의 브로커에 장애가 발생하더라도 나머지 한 개 브로커에 저장된 데이터를 사용하여 안전하게 데이터 처리를 지속적으로 할 수 있다. 복제 개수의 최소설정은 1이고 최대설정은 통신하는 카프카 클러스터의브로커 개수이다. 실제 업무 환경에서는 3개 이상의 카프카 브로커로 운영하는 것이 일반적이다. 만약 이 설정을 명시적으로 하지 않으면 카프카 브로커 설정에 있는 default.replication.factor 옵션값을 따른다고한다
  • --config를 통해 kafka.topics.sh 명령에 포함되지 않은 추가적인 설정을 할 수 있다. retension.ms는 토픽의 데이터를 유지하는 기간을 뜻한다. 위의 172800000ms는 2일이라는 뜻이고, 2일이 지난 토픽의 데이터는 삭제된다.

토픽 생성시 --zookeeper가 아니라 --bootstrap-server를 사용하는 이유.

과거 2.1버전 이하에서는 주키퍼와 직접통신하기 때문에 --zookeeper를 이용하여 카프카 서버와 통신했다. 그러나 주키퍼와 직접통신하여 명령을 처리하는 것은 아키텍쳐의 복잡성을 높일수 있는 것이고, 아키텍쳐의 복잡도를 높였기 때문에 이후 --bootstrap-server로 대체 되었다.

토픽리스트 조회

kafka-topics.sh --bootstrap-server localhost:9092 --list
  • --list : 해당 bootstrap server의 토픽 리스트를 조회한다.

토픽 상세조회

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka.2
  • --describe : describe 옵션을 통해서 파티션 개수가 몇개인지, 해당 토픽의 구성은 어떠한지 상세한 정보를 확인할 수 있다.

    --describe 옵션을 통해서 여러대의 브로커로 카프카 클러스터를 운영할때 토픽의 리버 파티션이 일부 브로커에 몰려있지는 않은지 확인할 수 있다.

    리더 파티션이 일부 브로커에 몰리면, 카프카 클러스터 부하가 특정 브로커로 몰릴 수 있고 이는 데이터 통신 쏠림현상으로 네트워크 대역 이슈가 발생할 수 있다.

    따라서 카프카 클러스터의 성능이 생각보다 좋지 못하다면 토픽 상세 조회 명령을 통해 토픽의 리더 파티션 쏠림현상을 확인하는 것도 괜찮은 방법이다.

토픽 옵션 수정

토픽에 설정된 옵션을 변경하기 위해서는 kafka-topics.sh 또는 kafka-configs.sh옵션 2개를 사용해야한다.

예를들어, 파티션 개수 변경을 하려면 kafka-topics.sh를 사용해야 하고 토픽 삭제 정책인 리텐션 기간을 변경하려면 kakfa-configs.sh 를 사용해야한다.

이렇게 토픽 설정 옵션이 파편화된 이유는 토픽에 대한 정보를 관리하는 일부 로직이 다른 명령어로 넘어갔기 때문이다. 카프카 2.5까지는 kafka-topics.sh 와 --alter 옵션을 사용하여 리텐션 기간을 변경할 수 있지만 추후 삭제될 예정이라고 하며 삭제할 예정이라 하니 kafka-configs.sh를 사용하는 것이 바람직하다.

kafka-topics.sh --bootstrap-server localhost:9092 \
	--topic hello.kafka
	--alter \
	--partitions 4
	
# 카프카의 파티션이 4개로 늘어났는지 확인해보자!
kafka--topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka

# 카프카의 retentions.ms 수정
kafka-configs.sh --bootstrap-server localhost:9092 \
	--entity-type topics \
	--eitity-name hello.kafka
	--alter --add-config retention.ms=86400000
	
# 카프카 토픽 리텐션이 변경됐는지 확인하여 보자~
kafka-configs.sh --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name hello.kafka
  --describe
  • --alter 옵션과 --partitions 옵션을 함께 사용하여 파티션 개수를 변경할 수 있다. 토픽의 파티션을 늘릴수는 잇지만 줄일 수는 없다. 그러므로 파티션 개수를 늘릴때는 반드시 늘려야하는 상황인지 판단을 하는 것이 중요하다.
  • --alter --add-config 옵션을 사용하여 retension옵션을 변경했다. --add-option을 사용하면 이미 존재하는 설정값은 변경하고 존재하지 않는 설정값은 신규로 추가된다.
  • 다이나믹 토픽 옵션인 retension.ms가 86400000ms로 변경된 것을 kafka0configs.sh와 --describe옵션을 통해 확인할 수 있다.

kafka-console-producer.sh

  • 생성된 토픽에 데이터를 넣을 수 있는 커맨드라인툴은 kafka-console.producer.sh이다
  • 토픽에 넣는 데이터를 레코드라고 부르며 메시지 키메시지 값으로 이루어져 있다.
  • 다음 예제는 메시지 키 없이 메시지 값만 보내도록 하자 (메시지 키는 자바의 null로 기본 설정되어 브로커로 전송.)
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka

여기서 주의할점은 kafka-console-producer.sh 로 전송되는 레코드 값은 UTF-8 기반으로 Byte로 변환되고 ByteArraySerializer로만 직렬화된다는 점이다. 즉, String이 아닌 타입으로 직렬화하여 사용할 수 없다. 그러므로 텍스트 목적으로만 문자열을 전송할 수 있고, 다른 타입으로 직렬화하여 데이터를 브로커로 전송하고 싶다면 카프카 프로듀서 어플리케이션을 직접 개발해야한다.

이제 메시지 키값을 가지는 레코드를 전송해보자. 메시지 키를 가지는 레코드를 전송하기 위해서는 몇가지 추가 옵션을 작성해야 한다.

kafka-console-producer.sh --bootstrap-server localhost:9092 \
	--topic hello.kafka
	--property "parse.key=true" \
	--property "key.separator=:"

  • parse.key를 true로 두면 레코드를 전송할 때 메시지 키를 추가할 수 있다.

  • 메시지 키와 메시지 값을 구분하는 구분자를 선언한다. key.separator 를 선언하지 않으면 기본설정은 Tab delimiter(\t)이다. 그러므로 key.separtor를 선언하지 않고 메시지를 보내려면 메시지키를 작성하고 탭 키를 누른 뒤 메시지 값을 작성하고 엔터를 누른다.

    여기서는 명시적으로 확인하기 위해 콜론을 선언했다.

  • 메시지 키와 메시지 키 값을 함께 전송한 레코드는 토픽의 파티션에 저장된다. 메시지 키가 null인 경우에는 프로듀서가 파티션으로 전송할 때 배치 단위로 라운드 로빈으로 전송한다. 메시지키가 존재할 경우 키의 해시값을 작성하여 존재하는 파티션 중 한개로 할당된다. 이로 인해 메시지 키가 동일한 경우에는 동일한 파티션으로 전송된다 다만 이런 메시지 키와 파티션 할당은 프로듀서에서 설정된 파티셔너에 의해 결정되는데 기본 파티셔너의 경우 이와 같은 동작을 보장한다. 커스텀 파티셔너를 사용할 경우에는 메시지 키에 따른 파티션할당이 다르데 동작할수도 있으니 참고

kafka-console-consumer.sh

hello.kafka 토픽으로 전송한 데이터는 kafka-console-consumer.sh 명령어로 확인할 수 있다. 이때 필수 옵션으로 --bootstrap-server에 카프카 클러스터 정보, --topic에 토픽이름이 필요하다. 추가로 --from-beginning 옵션을 주면 토픽에 저장된 가장 처음 데이터부터 출력한다.

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --from-beginning

# 메시지 키와 메시지 값을 확인하고 싶다면 --property 옵션
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka \
	--property print.key=true \
	--property key.separator="-"
	--group hello-group
	--from-beginning

  • --group 옵션을 통해 신규 컨슈머 그룹을 생성했다. 컨슈머 그룹은 1개 이상의 컨슈머로 이루어져 있다. 이 컨슈머 그룹을 통해 가져간 토픽의메시지는 가져간 메시지에 대해 커밋을 한다. 커밋이란 컨슈머가 특정레코드까지 처리를 완료했다고 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것
  • 커밋 정보는 __consumer_offset이름의 내부토픽에 저장된다.
  • 여기서 특이 했던점은 kafka-console-producer.sh로 전송했던 데이터의 순서가 현재 출력되는 순서와 다를 수가 있다.
  • 이는 카프카의 핵심개념인 파티션 때문에 생기는 현상이다. kafka-console-consumer.sh 명령어를 통해서 토픽의 데이터를 가져가게 되면 토픽의 모든 파티션으로부터 동일한 중요도로 데이터를 가져간다. 이로인해 프로듀서가 토픽에 넣은 데이터의 순서와 컨슈머가 토픽에서 가져간 데이터의 순서가 달라지게 되는 것이다. 만약 토픽에 넣은 데이터의 순서를 보장하고 싶다면 가장 좋은 방법은 파티션 1개로 구성된 토픽을 만드는 것이다.

kafka-consumer-groups.sh

hello-group 이름의 컨슈머 그룹으로 생성된 컨슈머로 hello.kafka토픽의 데이터를 가져갔다.

  • 컨슈머 그룹은 따로 생성하는 명령을 날리지 않고 컨슈머를 동작할 때 컨슈머 그룹 이름을 지정하면 새로 생성된다.
  • 생성된 컨슈머 그룹의 리스트는 kafka-consumer-groups.sh를 통해 확인할 수 있다.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  • --list는 컨슈머 그룹의 리스트를 확인하는 옵션이다.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
	--group hello-group \
	--describe
  • --descrbie 옵션을 통해서 컨슈머 그룹의 상세내용을 확인할 수있다.
  • --group 으로 어떤 컨슈머 그룹을 선택할 것인지 지정할 수 있다.

kafka-verifiable-producer, consumer.sh

kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 String 타입 메시지 값을 코드 없이 주고받을 수 있다. 카프카 클러스터 설치가 완료된 이후에 토픽에 데이터를 전송하여 간단한 네트워크 통신 테스트를 할 대 유용하다.

kafka-verifiable-producer.sh --bootstrap-server localhost:9092 \
--max-messages 10 \
--topic verify-test

  • 통신하고자 하는 클러스터 호스트와 포트를 --bootstrap-server 옵션으로 입력한다.
  • --max-messagesKafka-verifiable-producer.sh로 보내는 데이터 개수를 지정한다. 만약 -1을 입력하면 종료될때까지 계속 데이터로 토픽을 보낸다.
  • --topic 옵션을 통해 대상 토픽을 입력한다.
  • 최초실행시점이 start_complete 메시지와 함께 출력된다.
  • 메시지별로 보낸시간과 메시지키, 메시지값, 저장된 파티션, 저장된 오프셋 번호가 출력된다.
  • 10개 데이터가 모두 전송된 이후 통계값이 출력되며, 평균처리량을 확인할 수 있다.

전송한 데이터는 kafka-verifiable-consumer.sh로 확인할 수 있다.

kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 \
	--topic verify-test \
	--group-id test-group

Kafka-delete-record.sh

이미 적재된 토픽의 데이터를 지우는 방법으로 kafka-delete-records.sh를 사용할 수 있다. 이미 적재된 토픽의 데이터 중 가장 오래된 데이터부터 특정시점의 오프셋까지 삭제할 수 있다. 예를들어, test토픽의 0번 파티션에 0부터 100까지 데이터가 들어있다고 가정하자. 0번 파티션에 저장된 데이터 중 0-부터 30까지의 오프셋을 지우고 싶다면 다음과 같이 입력할 수 있다.

vi delete-topic.json
{"partitions" : [{"topic" : "test", "partition" : 0, "offset": 50}],"version" : 1}
:wq

kafka-delete-records.sh --bootstrap-server localhost:9092 \
	--offset-json-file delete-topic.json
profile
수동적인 과신과 행운이 아닌, 능동적인 노력과 치열함

0개의 댓글