230713 - Kafka와 Spark Streaming

김지석·2023년 8월 1일
0

- Kafka 기본 프로그래밍

- Client Tool 사용

Kafka CLI Tools 접근 방법

  • docker ps를 통해 Broker의 Container ID 혹은 Container 이름 파악
  • 해당 컨테이너로 로그인
    • docker exec -it Broker_Container_ID sh
  • 거기서 다양한 kafka 관련 클라이언트 툴을 사용 가능
    • kafka-topics
    • kafka-configs
    • kafka-console-consumer
    • kafka-console-producer

kafka-topics

  • kafka topic과 관련된 다양한 기능을 제공해주는 커맨드라인 유틸리티
$ kafka-topics --bootstrap-server kafka1:9092 --list
$ kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test 

kafka-console-producer

  • Data를 생성하는 유틸리티
  • Command line을 통해 Topic 만들고 Message 생성 가능
$ kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console

kafka-console-consumer

  • Data를 소비하는 유틸리티
  • Command line을 통해 Topic에서 Message 읽기 가능
    • --from-beginning 옵션이 있으면 처음부터 읽음 (earliest). 아니면 latest로 동작
$ kafka-console-consumer --bootstrap-server kafka1:9092 --topic 
test_console --from-beginning

두 Console 프로세스들의 Side-by-side 실행

  • 터미널을 하나 열고 동일 Broker 로그인 후 console-producer로 메세지 발생

- Topic 파라미터 설정

Topic 생성시 다수의 Partition이나 Replica를 주려면

  • 먼저 KafkaAdminClient 오브젝트를 생성하고 create_topics 함수로 Topic을 추가
  • create_topics의 인자로는 NewTopic 클래스의 오브젝트를 지정
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
 name=name,
 num_partitions=partitions # partition의 수,
 replication_factor=replica # 복제본의 수, 1일 경우 원본만 한개)
client.create_topics([topic])

NewTopic 클래스

KafkaProducer 파라미터

Kafka Producer 동작

KafkaProducer로 토픽 만들기

  • 랜덤하게 사람 정보를 만들어서 저장하는 Kafka Producer를 구현해보자
    • Faker라는 모듈 사용: pip3 install faker
    • pydantic의 BaseModel을 사용하여 메세지 클래스를 구현 (Person)
      • pip3 install pydantic
    • 이번에는 Topic을 먼저 만들고 진행

  • 실행한 후 Conduktor Web UI로 토픽 확인
    • 아니면 앞서 설명한 CLI TOOL로 체크 가능

profile
초짜에요...

1개의 댓글

comment-user-thumbnail
2023년 8월 1일

좋은 정보 감사합니다

답글 달기