Kafka 실행

bin/kafka-server-start.sh -daemon config/server.properties
  • server.properties

    broker.id=0
    
    #listeners=PLAINTEXT://:9092
    advertised.listeners=PLAINTEXT://ec2-3-36-77-126.ap-northeast-2.compute.amazonaws.com:9092
    
    num.network.threads=3
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    # A comma separated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs
    
    num.partitions=1
    
    num.recovery.threads.per.data.dir=1
    
    offsets.topic.replication.factor=1
    
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    
    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824
    
    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    zookeeper.connect=localhost:2181
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=18000
    
    group.initial.rebalance.delay.ms=0

ZooKeeper 실행

bin/zookeeper-server-start.sh -daemon config/zookeper.properties
  • Kafka를 실행하는데 필요한 필수 Application
  • 분산 코디네이션 서비스를 제공
  • Kafka의 Cluster Leader 정보, Controller 정보를 담고있다.
  • zookeeper.properties
    # the directory where the snapshot is stored.
    dataDir=/tmp/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    # disable the per-ip limit on the number of connections since this is a non-production config
    maxClientCnxns=0
    # Disable the adminserver by default to avoid port conflicts.
    # Set the port to something non-conflicting if choosing to enable this
    admin.enableServer=false
    # admin.serverPort=8080

kafka-topic.sh

Topic 생성

bin/kafka-topics.sh \
--create \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2 \
--partitions 3 \
--replication-factor 1 \
--config retention.ms=172800000

Topic 목록 조회

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--list

Topic 상세 조회

bin/kafka-topics.sh \
--bootstrap-server my-kafka:9092 \
--describe \
--topic hello.kafka.2
Topic: hello.kafka.2	PartitionCount: 4	ReplicationFactor: 1	Configs: segment.bytes=1073741824,retention.ms=86400000
	Topic: hello.kafka.2	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka.2	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka.2	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka.2	Partition: 3	Leader: 0	Replicas: 0	Isr: 0

Topic 옵션 수정

  • kafka-topic.sh
    • 파티션 개수 변경

      bin/kafka-topics.sh \
      --bootstrap-server my-kafka:9092 \
      --topic hello.kafka.2 \
      --alter \
      --partitions 4
  • kafa-configs.sh
    • 토픽 삭제 정책 (리텐션 기간) 변경

      bin/kafka-configs.sh \
      --bootstrap-server my-kafka:9092 \
      --entity-type topics \
      --entity-name hello.kafka.2 \
      --alter \
      --add-config retention.ms=86400000

Topic Dynamic Configs 조회

bin/kafka-configs.sh \
--bootstrap-server my-kafka:9092 \
--entity-type topics \
--entity-name hello.kafka.2 \
--describe
Dynamic configs for topic hello.kafka.2 are:
  retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000}

kafka-console-producer.sh

  • Topic에 넣는 테이터를 레코드(record)라고 부르며, 메시지 키(key)메시지 값(value)으로 이루어져 있다.

메시지 키 없이 메시지 값만 가진 레코드 전송

bin/kafka-console-producer.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2

메시지키를 가지는 레코드 전송

  • parse.key 를 true로 두면, 레코드를 전송할 때 메시지 키를 추가할 수 있다.
  • key.separator 기본 설정은 \t
    • 만약 구분자를 넣지않고 엔터를 누르면 KafkaException과 함께 종료된다.
  • 만약 키가 null일 경우
    • Producer 파티션으로 전송할 때 레코드 배치 단위로 Round Robin으로 전송
    • 레코드 배치 단위 = 레코드 전송 묶음
  • 메시지 키와 파티션 할당은 Producer에서 설정된 Partitioner에 의해 결정된다.
bin/kafka-console-producer.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2 \
--property "parse.key=true" \
--property "key.separator=:"

kafka-console-consumer.sh

  • --from-beginning
    • 토픽에 저장된 가장 처음 데이터부터 출력
bin/kafka-console-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2 \
--from-beginning
  • --property
    • print.key=true
    • key.seperator="-"
  • --group
    • Consumer Group 설정
    • Consumer Group을 통해 가져간 토픽의 메시지는 가져간 메시지에 대해 Commit을 한다.
    • Commit 정보는 __consumer_offsets 내부 토픽에 저장된다.
bin/kafka-console-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic hello.kafka.2 \
--property print.key=true \
--property key.seperator="-" \
--group hello-group \
--from-beginning
  • kafka-console-producer.sh 로 전송했던 데이터의 순서가 kafka-console-consumer.sh 로 데이터를 가져오는 순서가 다르다
    • Partition 때문에 생기는 현상
    • Partition이 1개인 토픽은 데이터 순서를 보장한다.

kafka-consumer-groups.sh

Consumer Group 목록 조회

bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--list

Consumer Group 상세 조회

bin/kafka-consumer-groups.sh \
--bootstrap-server my-kafka:9092 \
--group hello-group \
--describe
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
hello-group     hello.kafka.2   1          1               1               0               -               -               -
hello-group     hello.kafka.2   0          5               5               0               -               -               -
hello-group     hello.kafka.2   3          2               2               0               -               -               -
hello-group     hello.kafka.2   2          3               3               0               -               -               -

kafka-verifiable-producer.sh

  • Kafka Cluster 설치 완료 이후, 데이터 전송에 대한 네트워크 통신 테스트에 유용하다.
  • --max-messages
    • -1로 설정하면 종료될 때까지 계속 데이터를 토픽으로 보낸다.
bin/kafka-verifiable-producer.sh \
--bootstrap-server my-kafka:9092 \
--topic verify-test \
--max-messages 10
{"timestamp":1683468586120,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"verify-test","partition":0}
{"timestamp":1683468586122,"name":"producer_send_success","key":null,"value":"1","offset":1,"topic":"verify-test","partition":0}
{"timestamp":1683468586122,"name":"producer_send_success","key":null,"value":"2","offset":2,"topic":"verify-test","partition":0}
{"timestamp":1683468586123,"name":"producer_send_success","key":null,"value":"3","offset":3,"topic":"verify-test","partition":0}
{"timestamp":1683468586123,"name":"producer_send_success","key":null,"value":"4","offset":4,"topic":"verify-test","partition":0}
{"timestamp":1683468586129,"name":"shutdown_complete"}
{"timestamp":1683468586130,"name":"tool_data","sent":5,"acked":5,"target_throughput":-1,"avg_throughput":13.66120218579235}

kafka-verifiable-consumer.sh

  • Kafka Cluster 설치 완료 이후, 데이터 수신에 대한 네트워크 통신 테스트에 유용하다.
  • --group-id
    • Consumer Group 설정
bin/kafka-verifiable-consumer.sh \
--bootstrap-server my-kafka:9092 \
--topic verify-test \
--group-id test-group
{"timestamp":1683468622131,"name":"startup_complete"}
{"timestamp":1683468622390,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1683468622473,"name":"records_consumed","count":5,"partitions":[{"topic":"verify-test","partition":0,"count":5,"minOffset":0,"maxOffset":4}]}
{"timestamp":1683468622487,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":5}],"success":true}

kafka-delete-records.sh

  • 이미 적재된 토픽의 데이터를 지운다.
  • 가장 오래된 데이터부터 특정 시점의 오프셋까지 삭제
    • 가장 낮은 숫자의 오프셋부터

delete-topic.json

{
	"partitions":  [
		{
			"topic": "test",
			"partition": 0,
			"offset": 50
		}
	],
	"version": 1
}
bin/kafka-delete-records.sh \
--bootstrap-server my-kafka:9092 \
--offset-json-file delete-topic.json
Executing records delete operation
Records delete operation completed:
partition: test-0	low_watermark: 50
profile
Hello velog!

0개의 댓글