Kafka tutorial을 위해 책에서는 EC2를 이용하였으나, Docker를 이용해서 진행하고자 한다.
# amazon linux install
docker pull amazonlinux
# port 9092, 2181 열어서 실행
docker run -it -p 9092:9092 -p 2181:2181 --name kafka_tutorial amazonlinux /bin/bash
# openjdk 1.8
yum install -y java-1.8.0-openjdk-devel.x86_64
# install 확인
java -version
아래와 같이 나오면 성공
openjdk version "1.8.0_342"
OpenJDK Runtime Environment (build 1.8.0_342-b07)
OpenJDK 64-Bit Server VM (build 25.342-b07, mixed mode)
최근 버전을 좋아하므로... 최근 버전으로 진행해보고자 한다.(물론 production에서는 stable 버전을 사용해야함)
# wget, tar install
yum install -y wget tar
# kafka 3.3.1, scalar 2.12 version
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
# unzip
tar xvf kafka_2.12-3.3.1.tgz
kafka 브로커는 레코드의 내용은 페이지 캐시로 시스템 메모리를 사용하고, 나머지 객체들은 힙 메모리에 저장하여 사용한다는 특징이 있다.
그래서 kafka 브로커의 경우 힙 메모리를 6GB 이상 설정하지 않는 것을 권장한다.
책에서는 설정 값을 400m로 설정 하던데, 도커이니 한번 2GB를 줘서 실행해보고자 한다.
# Xmx : Java 힙의 최대 크기를 지정하는 것
# Xms : Java 힙의 최초 크기를 지정하는 것
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"
config/server.properties
에서 카프카 브로커가 클러스터 운영에 필요한 옵션들을 지정할 수 있다. 이번 튜토리얼에서 advertised.listeners=PLAINTEXT://172.19.0.2:9092
로 수정(docker container ip)
config/server.properties
에 있는 대표적인 옵션은 아래와 같다.
broker.id
listeners
advertised.listners
listener.security.protocol.map
num.network.threads
num.io.threads
log.dirs
num.partitions
log.retention.hours
log.retention.minutes
또는 log.retention.ms
로도 사용할 수 있다. log.retention.ms
값을 설정하여 운영하는 것을 추천하며, log.retention.ms=-1
이면 영원히 삭제되지 않는다.log.segment.bytes
log.retention.check.interval.ms
zookeeper.connect
zookeeper.connection.timeout.ms
카프카를 실행하기 위해서는 주키퍼가 반드시 필요하다. 주키퍼는 클러스터 설정 리더 정보, 컨트롤러 정보를 담고 있으며, 상용 환경에서는 안전하게 3대 이상의 서버로 구성하여 사용하여야 한다.
여기서는 1대만 실행할 것이며, 이를 Quick-and-dirty single-node
라고 부른다고 한다.
# start zookeeper (daemon to run background, if you want to run foreground remove that argument)
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# check zookeeper run normally
jps -vm
# start kafka broker (daemon to run background, if you want to run foreground
bin/kafka-server-start.sh -daemon config/server.properties
# check kafka run normally
jps -m
# show broker log
tail -f logs/server.log
윈도우에서는 wsl로 실행
# kafka download
curl https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz --output kafka.tgz
# unzip
tar -xvf kafka.tgz
cd kafka_2.12-3.3.1/
docker과 wsl 네트워크 통신이 안 되는 것 같아서 아래와 같이 진행(여기서 삽질 엄청 함...ㅎㅎㅎ)
# network 생성 docker network create --driver bridge test-network # container network 연결 docker network connect test-network kafka_tutorial # network 연결된 docker container 생성 docker run -itd --name kafka_connect_test_ubuntu --net=test-network ubuntu /bin/bash # exec docker exec -it kafka_connect_test_ubuntu /bin/bash # kafka download curl https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz --output kafka.tgz # unzip tar -xvf kafka.tgz cd kafka_2.12-3.3.1/ # run bin/kafka-broker-api-versions.sh --bootstrap-server kafka_tutorial:9092
결과 적으로 아래와 같이 connection 성공
172.19.0.2:9092 (id: 0 rack: null) -> ( Produce(0): 0 to 9 [usable: 9], Fetch(1): 0 to 13 [usable: 13], ListOffsets(2): 0 to 7 [usable: 7], ... )
카프카 커맨드 라인 툴들은 카프카를 운영할 때 가장 많이 접하는 도구로, 커맨드 라인 툴을 이용하여, 브로커 운영에 필요한 다양한 명령을 내릴 수 있다.
--bootstrap-server
vs--zookeeper
kafka 2.1버전 포함 이전 버전에서는 kafka command-line tool이 주키퍼와 직접 통신하여 명령을 실행하였으나, 2.2 버전 이후부터는 카프카를 통해 토픽과 관련된 명령을 실행할 수 있게 되었다.
--create
로 topic을 생성하라는 명령어를 날림--bootstrap-server
로 토픽을 생성할 카프카 클러스터를 구성하는 브로커들의 IP와 port를 적는다.--topic
에서 토픽 이름을 적으며, 내부 데이터가 무엇이 있는지 유추가 가능할 정도로 자세히 적는 것을 추천한다.--partitions
로 파티션의 개수를 지정할 수 있으며, default는 server.properties의 num.partitions
이다.--replication-factor
로 토픽의 파티션을 복제할 복제 개수를 적는다. 1의 경우는 복제를 하지 않는 것이며, 2이면 1개의 복제본을 사용하겠다는 의미이다. max값은 브로커의 개수며, 명시하지 않으면 default.replication.factor
옵션의 값을 따른다.--config
를 통해 추가적인 설정을 할 수 있다.bin/kafka-topics.sh \
--create \
--bootstrap-server kafka_tutorial:9092 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=86400000 \
--topic topic_kafka_test
Created topic topic_kafka_test.
토픽 생성
토픽을 생성하는 방법은 크게 2가지로 아래와 같다.
토픽 생성 시 토픽에 들어오는 데이터 양이나, 병렬로 처리되어야 하는 용량, 보관 기간 등 잘 파악하여 생성하는 것이 중요하다.
- 카프카 컨슈머 또는 프로듀서가 카프카 브로커에 생성되지 않은 토픽에 대해 요청할 때
- 커맨드 라인 툴로 명시적으로 토픽을 생성(이것을 추천)
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --list
파티션이 몇개인지, 복제된 파티션이 위치한 브로커의 번호, 구성하는 설정 등을 출력한다. 또한, 토픽이 가진 파티션의 리더가 현재 어느 브로커에 있는지 확인 가능하다.
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 --describe --topic topi
c_kafka_test
Topic: topic_kafka_test TopicId: TI_ehzIXQzS0bg6CL2d2-w PartitionCount: 3 ReplicationFactor: 1 Configs: retention.ms=864000
Topic: topic_kafka_test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: topic_kafka_test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: topic_kafka_test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
파티션의 개수 수정은 kafka-topic.sh
, 토픽 리텐션 기간 변경은 kafka-configs.sh
를 사용하여야 한다.
# kafka partition count change
bin/kafka-topics.sh --bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test \
--alter \
--partitions 5
# kafka topic configure change
# if configure is already exists then change configure, else add configure
bin/kafka-configs.sh --bootstrap-server kafka_tutorial:9092 \
--entity-type topics \
--entity-name topic_kafka_test \
--alter --add-config retention.ms=86400000
토픽에 데이터를 넣을 수 있는 명령어로, 토픽에 넣는 데이터는 record라고 부르며 key,value 로 이루어져 있다.
키 값 없이 전송
이 때, 메세지 키는 null로 기본 설정되어 브로커로 전송된다.
bin/kafka-console-producer.sh \
--bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test
>hello kafka
>test
>1
>2
>3
>4
>5
>;;
메시지 키를 가지는 레코드 전송
만약 key.separator에 해당하는 값을 전송하지 않으면 org.apache.kafka.common.KafkaException: No key separator found on line number 1: 'k-v'
에러 발생
# default of key.separator = \t
bin/kafka-console-producer.sh \
--bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test \
--property "parse.key=true" \
--property "key.seperator=:"
토픽으로 전송한 데이터를 가져올 수 있는 명령어로 --from-beginning
옵션을 통해 토픽에 저장된 가장 처음 데이터부터 출력할 수 있다.
--group
을 이용하여 컨슈머 그룹을 생성할 수 있다. 컨슈머 그룹은 1개 이상의 컨슈머로 이루어졌으며, 해당 컨슈머 그룹을 통해 가져간 토픽의 메시지는 가져갔다고 commit을 한다. commit은 내가 여기까지 가져갔어라고 브로커에 오프셋 번호를 저장하는 것으로, __consumer_offsets
이름의 내부 토픽에 저장된다.
bin/kafka-console-consumer.sh \
--bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test \
--from-beginning
# if want to see message key, value
bin/kafka-console-consumer.sh \
--bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test \
--property print.key=true \
--property key.separator="-" \
--from-beginning
# consumer group
bin/kafka-console-consumer.sh \
--bootstrap-server kafka_tutorial:9092 \
--topic topic_kafka_test \
--property print.key=true \
--group consumer-group-test \
--from-beginning
output of below command-line
null hello kafka
null test
null 1
null 2
null 3
null 4
null 5
null ;;
k2 v2
k1 v1
k3 v3
partition 수가 2개 이상일 경우, 토픽에 넣은 데이터의 순서를 보장하지 못한다. 만약 데이터의 순서를 보장하고 싶다면 파티션 1개로 구성된 토픽을 만드는 것이다.
Consumer group list
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka_tutorial:9092 \
--list
특정 컨슈머 그룹 세부 정보
# describe consumer group which name is consumer-group-test
bin/kafka-consumer-groups.sh \
--bootstrap-server kafka_tutorial:9092 \
--group consumer-group-test \
--describe
output
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer-group-test topic_kafka_test 1 1 1 0 - - -
consumer-group-test topic_kafka_test 4 8 8 0 - - -
consumer-group-test topic_kafka_test 0 0 0 0 - - -
consumer-group-test topic_kafka_test 3 1 1 0 - - -
consumer-group-test topic_kafka_test 2 1 1 0 - -
kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 string 타입 메시지 값을 코드 없이 주고 받을 수 있다. 간단한 네트워크 통신 테스트를 할 때 유용하다.
kafka-verifiable-consumer
bin/kafka-verifiable-producer.sh \
--bootstrap-server kafka_tutorial:9092 \
--max-messages 10 \
--topic topic_kafka_test
kafka-verifiable-producer
bin/kafka-verifiable-consumer.sh \
--bootstrap-server kafka_tutorial:9092 \
--group-id consumer-group-test \
--topic topic_kafka_test
이미 적재된 토픽의 데이터를 지우는 방법으로, 이미 적재된 토픽의 데이터 중 가장 오래된 데이터 부터 특정 시점의 오프셋까지 삭제가 가능하다.
파티션에 저장된 특정 데이터만 삭제할 수 없다는 점에서 유의가 필요하다.
~/test-delete-record.json
{
"partitions" : [
{
"topic" : "topic_kafka_test",
"partition": 4,
"offset": 5
}
],
"version": 1
}
bash
bin/kafka-delete-records.sh \
--bootstrap-server kafka_tutorial:9092 \
--offset-json-file ~/test-delete-record.json
성공 시
Executing records delete operation
Records delete operation completed:
partition: topic_kafka_test-4 low_watermark: 5
현재 record보다 더 많은 offset 요구시
Executing records delete operation
Records delete operation completed:
partition: topic_kafka_test-0 error: org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested offset is not within the range of offsets maintained by the server.