이 글은 최원영 저 아파치 카프카 애플리케이션 프로그래밍 with 자바를 기반으로 하고 있습니다.
$ cd ~
$ cd kafka_2.12-2.5.0
$ bin/kafka-topics.sh \
--create \
--bootstrap-server {IP}:9092 \
--topic {topic name}
1개의 카프카 브로커({IP}:9092)만으로 카프카 클러스터를 구성한 상태로, 기본 옵션으로 토픽을 생성하였다. (파티션 개수: 1)
$ bin/kafka-topics.sh \
--create \
--bootstrap-server {IP}:9092 \
--partitions 3 \
--replication-factor 1\
--config retention.ms=172800000 \
--topic {topic name}
1개의 카프카 브로커로 구성된 카프카 클러스터로, 3개의 파티션, 복제하지 않음 그리고 2일(172800000ms)의 토픽 데이터 유지 기간을 옵션으로 설정하여 토픽을 생성하였다.
$ bin/kafka-topics.sh --bootstrap-server {IP}:9092 --list
$ bin/kafka-topics.sh --bootstrap-serveer {IP}:9092 \
--describe \
--topic {topic name}
Topic: hello.kafka.2 PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=172800000
Topic: hello.kafka.2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Leader는 파티션이 위치하고 있는 브로커를 나타낸다. 위의 예시에서는 3개의 파티션이 모두 0번 브로커에 위치하고 있음을 나타낸다.
$bin/kafka-topics.sh --bootstrap.sever {IP}:9092 \
--topic {topic name} \
--alter \
--partitions 4
주의할 점: 파티션 개수는 늘리는 것만 가능. 줄이는 것은 불가능
$ bin/kafka-configs.sh --bootstrap-server {IP}:9092 \
--entity-type topics \
--entity-name {topic name} \
--alter --add-config retention.ms=86400000
주의할 점: kafka-console-producer.sh로 전송되는 레코드는 String 타입만 가능. 다른 타입을 전송하고 싶다면 카프카 프로듀서 애플리케이션을 직접 개발해야함.
$ bin/kafka-console-producer.sh --bootstrap-server {IP}:9092 \
--topic {topic name}
프로듀서가 파티션으로 전송 시, 레코드 배치 단위로 라운드 로빈으로 전송한다.
$ bin/kafka-console-producer.sh --bootstrap-server {IP}:9092 \
--topic {topic name}
--property "parse.key=true" \
--property "key.separator=:"\
>key1:no1
>key2:no2
>key3:no3
":"로 key와 value를 구분하는 옵션을 추가하였다. 기본 설정은 Tab으로 정해져 있다.
키의 해시값을 작성하여 존재하는 파티션 중 한 개에 할당한다.
즉, 키가 동일한 경우 동일한 파티션으로 전송된다.
주의할 점: 키를 가진 레코드의 경우 파티션이 추가되면 키의 일관성이 보장되지 않아 다른 파티션에 저장될 수 있음.
$ bin/kafka-console-consumer.sh --bootstrap-server {IP}:9092 \
--topic {topic name} \
--from-beginning
$ bin/kafka-console-consumer.sh --bootstrap-server {IP}:9092 \
--topic {topic name} \
--property print.key=true \
--property key.separator="-" \
--group {group name} \
--from-beginning
null-hello
null-3
key2-no2
null-kafka
....(생략)
key separator를 "-"로 설정해주었다.
--group 옵션으로 새로운 컨슈머 그룹(consumer group)을 생성했다. 컨슈머 그룹은 1개 이상의 컨슈머로 이루어져 있으며, 그룹을 통해 가져간 토픽의 메시지는 가져간 메시지에 대해 커밋을 한다.
주의할 점: kafka-console-consumer.sh 명령어를 통해 데이터를 가져가게 되면 토픽의 모든 파티션으로부터 동일한 중요도로 데이터를 가져간다. 따라서 파티션이 2개 이상이라면 입력한 순서와 다르게 출력된다. 파티션을 1개로 구성하면 출력 순서와 입력 순서가 항상 같아진다.
json파일을 생성하여 지우며, 토픽에 적재된 레코드의 오래된 순서대로 지워 나간다. 몇 번째 오프셋 데이터까지 지우고 싶은지 json파일을 통해 정할 수 있다.
$ vim delete-topic.json
{"partitions" :
[
{"topic" : "{topic name}",
"partition" : 0,
"offset" : 10
}
],
"version" : 1
}
0번 파티션에서 9번 오프셋까지 삭제하는 json 파일 구성
$ bin/kafka-delete-records.sh --bootstrap-server {IP}:9092 \
--offset-json-file delete-topic.json