아파치 카프카 어플리케이션 프로그래밍 (저자 최원영님) 책을 보고 공부한 내용입니다 :) 😀
카프카를 설치하면 다양한 명령어가 담긴 커맨트라인 툴을 이용할 수 있는데, 그 종류가 매우 많다.
실제로 카프카의 커맨드 라인툴을 이용해서 데이터를 producer 하는 경우는 많지는 않지만, 실무에서 클러스터
를 운영할 때 자주 쓰인다고 한다.
또한, 커맨드 라인 툴을 통해 토픽 관련 명령을 실행할 때 필수옵션과 선택옵션이 있다.
선택 옵션은 지정하지 않을시 브로커에 설정된 기본값 또는 커맨드라인 툴의 기본값으로 대체되어 설정된다.
어떤 커맨드라인 툴을 지원하는지 그럼 알아보자.
이 커맨드 라인 툴을 통해 토픽과 관련된 명령을 실행할 수 있다. 토픽이란 카프카에서 데이터를 구분하는 가장 기본적인 개념이다. 마치 RDBMS의 테이블
과 유사하다고 할 수 있다. 토픽에는 여러 파티션이 존재하는데, 파티션의 개수는 최소 1개부터 시작한다.
아직 파티션에 대한 자세한 설명은 하지 않은걸로 알고있는데, 뒤에 읽다보면 더 나오지 않을까?
토픽을 생성하는 방법
토픽을 생성하는 방법은 크게 2가지가 있다.
- 카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 데이터를 요청할때
- 커맨드 라인툴로 명시적으로 생성하는 것/
토픽을 효과적으로 유지보수하기 위해서는 토픽을 명시적으로 생성하는 것을 추천한다고 한다. 왜냐하면 토픽마다 처리되어야 하는 데이터의 특성이 다르기 때문이다.
kafka-topcis.sh --create --bootstrap-server localhost:9092 --topic hello.kakfa
kafka-topics.sh --create --bootstrap-server localhost:9092 \
--partition 3 \
--replication-factor 1 \
--config retention.ms=172800000
--topic hello.kafka.2
config/server.properties
에 있는 num.partitions
옵션값에 따라 생성된다.실제 업무 환경에서는 3개 이상의 카프카 브로커로 운영하는 것이 일반적이다.
만약 이 설정을 명시적으로 하지 않으면 카프카 브로커 설정에 있는 default.replication.factor
옵션값을 따른다고한다retension.ms
는 토픽의 데이터를 유지하는 기간을 뜻한다. 위의 172800000ms는 2일이라는 뜻이고, 2일이 지난 토픽의 데이터는 삭제된다.토픽 생성시 --zookeeper가 아니라 --bootstrap-server를 사용하는 이유.
과거 2.1버전 이하에서는 주키퍼와 직접통신하기 때문에 --zookeeper를 이용하여 카프카 서버와 통신했다. 그러나 주키퍼와 직접통신하여 명령을 처리하는 것은 아키텍쳐의 복잡성을 높일수 있는 것이고, 아키텍쳐의 복잡도를 높였기 때문에 이후 --bootstrap-server로 대체 되었다.
kafka-topics.sh --bootstrap-server localhost:9092 --list
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka.2
--describe : describe 옵션을 통해서 파티션 개수가 몇개인지, 해당 토픽의 구성은 어떠한지 상세한 정보를 확인할 수 있다.
이 --describe
옵션을 통해서 여러대의 브로커로 카프카 클러스터를 운영할때 토픽의 리버 파티션이 일부 브로커에 몰려있지는 않은지 확인할 수 있다.
리더 파티션이 일부 브로커에 몰리면, 카프카 클러스터 부하가 특정 브로커로 몰릴 수 있고 이는 데이터 통신 쏠림현상으로 네트워크 대역 이슈가 발생할 수 있다.
따라서 카프카 클러스터의 성능이 생각보다 좋지 못하다면 토픽 상세 조회 명령을 통해 토픽의 리더 파티션 쏠림현상을 확인하는 것도 괜찮은 방법이다.
토픽에 설정된 옵션을 변경하기 위해서는 kafka-topics.sh
또는 kafka-configs.sh
옵션 2개를 사용해야한다.
예를들어, 파티션 개수 변경을 하려면 kafka-topics.sh를 사용해야 하고 토픽 삭제 정책인 리텐션 기간을 변경하려면 kakfa-configs.sh
를 사용해야한다.
이렇게 토픽 설정 옵션이 파편화된 이유는 토픽에 대한 정보를 관리하는 일부 로직이 다른 명령어로 넘어갔기 때문이다. 카프카 2.5까지는 kafka-topics.sh 와 --alter 옵션을 사용하여 리텐션 기간을 변경할 수 있지만 추후 삭제될 예정이라고 하며 삭제할 예정이라 하니 kafka-configs.sh를 사용하는 것이 바람직하다.
kafka-topics.sh --bootstrap-server localhost:9092 \
--topic hello.kafka
--alter \
--partitions 4
# 카프카의 파티션이 4개로 늘어났는지 확인해보자!
kafka--topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka
# 카프카의 retentions.ms 수정
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics \
--eitity-name hello.kafka
--alter --add-config retention.ms=86400000
# 카프카 토픽 리텐션이 변경됐는지 확인하여 보자~
kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name hello.kafka
--describe
그러므로 파티션 개수를 늘릴때는 반드시 늘려야하는 상황인지 판단을 하는 것이 중요하다.
--add-option
을 사용하면 이미 존재하는 설정값은 변경하고 존재하지 않는 설정값은 신규로 추가된다.kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka
여기서 주의할점은
kafka-console-producer.sh
로 전송되는 레코드 값은 UTF-8 기반으로Byte
로 변환되고ByteArraySerializer
로만 직렬화된다는 점이다. 즉, String이 아닌 타입으로 직렬화하여 사용할 수 없다. 그러므로 텍스트 목적으로만 문자열을 전송할 수 있고, 다른 타입으로 직렬화하여 데이터를 브로커로 전송하고 싶다면 카프카 프로듀서 어플리케이션을 직접 개발해야한다.
이제 메시지 키값을 가지는 레코드를 전송해보자. 메시지 키를 가지는 레코드를 전송하기 위해서는 몇가지 추가 옵션을 작성해야 한다.
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic hello.kafka
--property "parse.key=true" \
--property "key.separator=:"
parse.key를 true로 두면 레코드를 전송할 때 메시지 키를 추가할 수 있다.
메시지 키와 메시지 값을 구분하는 구분자를 선언한다. key.separator
를 선언하지 않으면 기본설정은 Tab delimiter(\t)이다. 그러므로 key.separtor를 선언하지 않고 메시지를 보내려면 메시지키를 작성하고 탭 키를 누른 뒤 메시지 값을 작성하고 엔터를 누른다.
여기서는 명시적으로 확인하기 위해 콜론을 선언했다.
메시지 키와 메시지 키 값을 함께 전송한 레코드는 토픽의 파티션에 저장된다. 메시지 키가 null인 경우에는 프로듀서가 파티션으로 전송할 때 배치 단위로 라운드 로빈
으로 전송한다. 메시지키가 존재할 경우 키의 해시값을 작성하여 존재하는 파티션 중 한개로 할당된다. 이로 인해 메시지 키가 동일한 경우에는 동일한 파티션으로 전송된다 다만 이런 메시지 키와 파티션 할당은 프로듀서에서 설정된 파티셔너에 의해 결정되는데 기본 파티셔너의 경우 이와 같은 동작을 보장한다. 커스텀 파티셔너를 사용할 경우에는 메시지 키에 따른 파티션할당이 다르데 동작할수도 있으니 참고
hello.kafka 토픽으로 전송한 데이터는 kafka-console-consumer.sh 명령어로 확인할 수 있다. 이때 필수 옵션으로 --bootstrap-server에 카프카 클러스터 정보, --topic에 토픽이름이 필요하다. 추가로 --from-beginning
옵션을 주면 토픽에 저장된 가장 처음 데이터부터 출력한다.
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --from-beginning
# 메시지 키와 메시지 값을 확인하고 싶다면 --property 옵션
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka \
--property print.key=true \
--property key.separator="-"
--group hello-group
--from-beginning
__consumer_offset
이름의 내부토픽에 저장된다.데이터의 순서를 보장하고 싶다면 가장 좋은 방법은 파티션 1개로 구성된 토픽을 만드는 것이다.
hello-group 이름의 컨슈머 그룹으로 생성된 컨슈머로 hello.kafka토픽의 데이터를 가져갔다.
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group hello-group \
--describe
kafka-verifiable
로 시작하는 2개의 스크립트를 사용하면 String 타입 메시지 값을 코드 없이 주고받을 수 있다. 카프카 클러스터 설치가 완료된 이후에 토픽에 데이터를 전송하여 간단한 네트워크 통신 테스트를 할 대 유용하다.
kafka-verifiable-producer.sh --bootstrap-server localhost:9092 \
--max-messages 10 \
--topic verify-test
--max-messages
는 Kafka-verifiable-producer.sh
로 보내는 데이터 개수를 지정한다. 만약 -1을 입력하면 종료될때까지 계속 데이터로 토픽을 보낸다.전송한 데이터는 kafka-verifiable-consumer.sh
로 확인할 수 있다.
kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 \
--topic verify-test \
--group-id test-group
이미 적재된 토픽의 데이터를 지우는 방법으로 kafka-delete-records.sh
를 사용할 수 있다. 이미 적재된 토픽의 데이터 중 가장 오래된 데이터부터 특정시점의 오프셋까지 삭제할 수 있다. 예를들어, test토픽의 0번 파티션에 0부터 100까지 데이터가 들어있다고 가정하자. 0번 파티션에 저장된 데이터 중 0-부터 30까지의 오프셋을 지우고 싶다면 다음과 같이 입력할 수 있다.
vi delete-topic.json
{"partitions" : [{"topic" : "test", "partition" : 0, "offset": 50}],"version" : 1}
:wq
kafka-delete-records.sh --bootstrap-server localhost:9092 \
--offset-json-file delete-topic.json