bin/kafka-server-start.sh -daemon config/server.properties
server.properties
broker.id=0
#listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://ec2-3-36-77-126.ap-northeast-2.compute.amazonaws.com:9092
num.network.threads=3
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
bin/zookeeper-server-start.sh -daemon config/zookeper.properties
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
bin/kafka-topics.sh \
--create \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=172800000
bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--list
bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--describe \
--topic hello.kafka.2
Topic: hello.kafka.2 PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=86400000
Topic: hello.kafka.2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 3 Leader: 0 Replicas: 0 Isr: 0
kafka-topic.sh
파티션 개수 변경
bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2 \
--alter \
--partitions 4
kafa-configs.sh
토픽 삭제 정책 (리텐션 기간) 변경
bin/kafka-configs.sh \
--bootstrap-server my-kafka:9092 \
--entity-type topics \
--entity-name hello.kafka.2 \
--alter \
--add-config retention.ms=86400000
bin/kafka-configs.sh \
--bootstrap-server my-kafka:9092 \
--entity-type topics \
--entity-name hello.kafka.2 \
--describe
Dynamic configs for topic hello.kafka.2 are:
retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000}
레코드(record)
라고 부르며, 메시지 키(key)
와 메시지 값(value)
으로 이루어져 있다.bin/kafka-console-producer.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2
parse.key
를 true로 두면, 레코드를 전송할 때 메시지 키를 추가할 수 있다.key.separator
기본 설정은 \t
KafkaException
과 함께 종료된다.bin/kafka-console-producer.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2 \
--property "parse.key=true" \
--property "key.separator=:"
--from-beginning
bin/kafka-console-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2 \
--from-beginning
--property
print.key=true
key.seperator="-"
--group
__consumer_offsets
내부 토픽에 저장된다.bin/kafka-console-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2 \
--property print.key=true \
--property key.seperator="-" \
--group hello-group \
--from-beginning
kafka-console-producer.sh
로 전송했던 데이터의 순서가 kafka-console-consumer.sh
로 데이터를 가져오는 순서가 다르다bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--list
bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--group hello-group \
--describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
hello-group hello.kafka.2 1 1 1 0 - - -
hello-group hello.kafka.2 0 5 5 0 - - -
hello-group hello.kafka.2 3 2 2 0 - - -
hello-group hello.kafka.2 2 3 3 0 - - -
--max-messages
bin/kafka-verifiable-producer.sh \
--bootstrap-server my-kafka:9092 \
--topic verify-test \
--max-messages 10
{"timestamp":1683468586120,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"verify-test","partition":0}
{"timestamp":1683468586122,"name":"producer_send_success","key":null,"value":"1","offset":1,"topic":"verify-test","partition":0}
{"timestamp":1683468586122,"name":"producer_send_success","key":null,"value":"2","offset":2,"topic":"verify-test","partition":0}
{"timestamp":1683468586123,"name":"producer_send_success","key":null,"value":"3","offset":3,"topic":"verify-test","partition":0}
{"timestamp":1683468586123,"name":"producer_send_success","key":null,"value":"4","offset":4,"topic":"verify-test","partition":0}
{"timestamp":1683468586129,"name":"shutdown_complete"}
{"timestamp":1683468586130,"name":"tool_data","sent":5,"acked":5,"target_throughput":-1,"avg_throughput":13.66120218579235}
--group-id
bin/kafka-verifiable-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic verify-test \
--group-id test-group
{"timestamp":1683468622131,"name":"startup_complete"}
{"timestamp":1683468622390,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1683468622473,"name":"records_consumed","count":5,"partitions":[{"topic":"verify-test","partition":0,"count":5,"minOffset":0,"maxOffset":4}]}
{"timestamp":1683468622487,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":5}],"success":true}
{
"partitions": [
{
"topic": "test",
"partition": 0,
"offset": 50
}
],
"version": 1
}
bin/kafka-delete-records.sh \
--bootstrap-server my-kafka:9092 \
--offset-json-file delete-topic.json
Executing records delete operation
Records delete operation completed:
partition: test-0 low_watermark: 50