Kafka 환경 구성

문주은·2024년 1월 19일

1. Kafka 구성

Kafka Replication : 각 Topic의 Partition들을 Kafka Cluster내의 다른 Broker들로 복제

bash-4.4# ./kafka-topics.sh --bootstrap-server localhost:9092 --topic health_checkup_detail --describe

Topic:health_checkup_detail     PartitionCount:1        ReplicationFactor:1     Configs:segment.bytes=1073741824 
        Topic: health_checkup_detail    Partition: 0    Leader: 1003    Replicas: 1003  Isr: 1003
  • ReplicationFactor : 각 토픽이 몇 개의 복제본을 복제해야 하는지를 나타내는 값
  • Leader : 각 토픽의 파티션에서 쓰기 및 읽기 작업을 주관하는 복제본. Leader는 데이터의 쓰기와 읽기를 관리하며 클러스터 내에서 해당 파티션의 주요 역할을 담당
  • Isr : Replication Group을 형성하여 관리
  • Replicas : 각 토픽의 파티션에 대한 복제본들의 집합. ReplicationFactor로 정의된 값에 따라 토픽의 파티션은 여러 복제본으로 복제되어 클러스터 내에서 안정성과 가용성을 제공. Replicas 중에서 하나는 리더가 되고, 나머지는 Isr에 속하며 동기화된 상태를 유지

2. Kafka docker-compose 설명

아래의 예시는 현재 3개의 broker중 1개의 broker에 대한 예시입니다.

kafka1: 
    image: confluentinc/cp-kafka:7.0.0 
    hostname: kafka1 
    ports: 
      - "9091:9091" 
    environment: 
      KAFKA_BROKER_ID: 1 
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091 
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT 
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL 
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 
    volumes: 
      - ~/data/kafka1/data:/tmp/kafka-logs 
    depends_on: 
      - zookeeper   
  • KAFKA_ADVERTISED_LISTENERS : 외부에서 접속하기 위한 리스너

    • LISTENER_DOCKER_INTERNAL://kafka1:19091
      : docker 내부에서 kafka 브로커가 kafka1:19091 주소로 듣는다

    • LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
      : docker 외부에서 kafka 브로커에 접근할 수 있는 주소로
      ${DOCKER_HOST_IP:-127.0.0.1} 는 환경 변수 $DOCKER_HOST_IP 의 값을 사용
      $DOCKER_HOST_IP 환경 변수가 설정되어져 있지 않으면 127.0.0.1 localhost 사용

  • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : 보안을 위한 프로토콜 매핑

  • KAFKA_INTER_BROKER_LISTENER_NAME : 도커 내부에서 사용할 리스너 이름을 지정

3. consumer offset option

  • auto.offset.reset=latest
    : 마지막으로 구독한 다음 메시지부터 구독(consume) --> 가장 최신의 메시지
  • auto.offset.reset=earliest
    : 처음부터 메시지 구독 --> 가장 오래된 메시지부터
  • auto.offset.reset=none
    : 구독하고자 하는 topic의 offset 정보가 없으면 exception 발생

4. CLI Command

4-1. 모니터링 CLI

[ kafka producer topic 목록 확인 ]

bash-4.4# cd ./opt/bin/
bash-4.4# ./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--list

[ topic 정보 확인 ]

bash-4.4# ./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--topic health_checkup_detail --describe

[ 토픽 데이터 확인 (실시간 프로듀싱 되는 데이터 확인) ]

bash-4.4# ./kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic health_checkup_list --from-beginning

[ 해당 topic의 offset 확인 ]

bash-4.4# ./kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic health_checkup_detail

4-2. CRUD CLI

[ delete topic (purge) ]

bash-4.4# ./kafka-topics.sh \
--bootstrap-server localhost:9092 \
--delete --topic health_checkup_detail

[ update topic config ]

bash-4.4# kafka-topics.sh \ 
--bootstrap-server localhost:9092 \ 
--alter --topic health_checkup_list \
--partitions 3

[ create topic ]

# topic : health_checkup_list
bash-4.4# kafka-topics.sh \
--create --bootstrap-server localhost:9092 \
--replication-factor 3 \
--partitions 3 \
--topic health_checkup_list
# topic : health_checkup_detail
bash-4.4# kafka-topics.sh \ 
--create --bootstrap-server localhost:9092 \ 
--replication-factor 3 \ 
--partitions 3 \
--topic health_checkup_detail
profile
Data Engineer

0개의 댓글