[6/27] TIL - Kafka 기본 프로그래밍

Sangwon Jwa·2024년 6월 28일

데브코스 TIL

목록 보기
53/54
post-thumbnail

📖 학습 주제


  1. Kafka CLI Tools
  2. Kafka-Topic
  3. Kafka-Consumer
  4. ksqlDB

✏️ 주요 메모 사항 소개


Kafka CLI Tools

Kafka CLI 기능들을 이용하기 위해서는 docker ps를 통해 Kafka Broker의 Container ID 혹은 이름을 파악해야 한다. 그 후에 해당 컨테이너로 로그인 (docker exec -it Broker_Container_ID sh) 하면 다양한 kafka 관련 클라이언트 툴을 사용할 수 있다.

  • kafka-topics
    • Topic 리스트 출력 : kafka-topics --bootstrap-server kafka1:9092 --list
    • Topic 삭제 : kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test
  • kafka-console-producer
    • Topic 만들고 메세지 생성 : kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
  • kafka-console-consumer

    • Topic에서 Message 읽기 : kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning
  • kafka-configs


Kafka-Topic

kafka-console-producer를 사용하여 바로 토픽을 만들 수 있었지만, partition이나 replica의 수를 설정할 순 없었다. 이를 해결하기 위해서는 Topic을 미리 만들면서 파라미터 값을 설정하면 된다.

  • 먼저 KafkaAdminClient 오브젝트를 생성하고 create_topics 함수로 Topic을 추가
  • create_topics의 인자로는 NewTopic 클래스의 오브젝트를 지정
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
	name=name,
	num_partitions=partitions,
	replication_factor=replica)
client.create_topics([topic])


Kafka Producer

Kafka Producer 동작방식


Kafka Consumer

  • Consumer가 하나이고 다수의 Partitions들로 구성된 Topic으로부터 읽어야한다면?
    • Consumer는 각 Partition들로부터 라운드 로빈 형태로 하나씩 읽게 됨
    • 이 경우 병렬성이 떨어지고 데이터 생산 속도에 따라 Backpressure가 심해질 수 있음
    • 이를 해결하기 위한 것이 뒤에 이야기할 Consumer Group
  • 한 프로세스에서 다수의 Topic을 읽는 것 가능
    • Topic 수만큼 KafkaConsumer 인스턴스 생성하고 별도의 Group ID와 Client ID를 지정해야함

 

Consumer Group

  • Consumer가 Topic을 읽기 시작하면 해당 Topic내 일부 Partition들이 자동으로 할당됨
  • Consumer의 수보다 Partition의 수가 더 많은 경우, Partition은 라운드 로빈 방식으로 Consumer들에게 할당됨 (한 Partition은 한 Consumer에게만 할당됨)
    • 이를 통해 데이터 소비 병렬성을 늘리고 Backpressure 경감
    • 그리고 Consumer가 일부 중단되더라도 계속해서 데이터 처리 가능
  • Consumer Group Rebalancing
    • 기존 Consumer가 무슨 이유로 사라지거나 새로운 Consumer가 Group에 참여하는 경우 Partition들이 다시 지정이 되어야함. 이를 Consumer Group Rebalancing이라고 부르면 이는 Kafka에서 알아서 수행해줌

 

Consumer/Producer 패턴

  • 많은 경우 Consumer는 한 Topic의 메세지를 소비해서 새로운 Topic을 만들기도함
  • 즉 Consumer이면서 Producer로 동작하는 것이 아주 흔한 패턴임
  • 데이터 Transformation, Filtering, Enrichment
    • 동일한 프로세스 내에서 Kafka Consumer를 사용하여 한 Topic에서 메시지를 읽고 필요한 데이터 변환 또는 Enrichment을 수행한 다음, Producer를 사용하여 수정된 데이터를 다른 Topic으로 푸시 가능

Message Processing Guarantee

실시간 메시지 처리 및 전송 관점에서 보낸 메세지가 정말로 수신되었는지 보장하는 것이 중요. 이를 보장하는 방식은 크게 3가지가 존재한다.

  • Exactly Once
    • Exactly Once('정확히 한 번')는 각 Message가 Consumer에게 정확히 한번만 전달된다는 것을 보장. 네트워크 문제, 장애 또는 재시도 가능성으로 아주 어려운 문제.
    • 1> Producer 단에서는 enable_idempotence를 True로 설정
    • 2> Producer에서 메세지를 쓸 때와 Consumer에서 읽을 때 Transaction API를 사용
  • At Least Once
    • At Least Once ('적어도 한 번 이상')는 모든 메시지가 Consumer에게 적어도 한 번 이상 전달되도록 보장하지만, 메시지 중복 가능성 존재.
    • 이 경우 Consumer는 중복 메시지를 처리하기 위해 중복 제거 메커니즘을 구현해야함 (멱등성).
    • 이는 보통 Consumer가 직접 오프셋을 커밋을 할때 발생함.
  • At Most Once (Kafka 기본 방식)
    • At Most Once ('최대 한 번만')는 메시지 손실 가능성에 중점을 둠.
    • 이는 메시지가 손실될 수는 있지만 중복이 없음을 의미.
    • 가장 흔한 메시지 전송 보장 방식 (default)


ksqlDB

REST API나 ksql 클라이언트 툴을 사용해서 Topic을 테이블처럼 SQL로 조작하는 프로그램

  • <간단하게 실습하는 방법>
    • docker ps 후 confluentinc/cp-ksqldb-server의 Container ID 복사
    • docker exec -it ContainerID sh 실행
    • ksql 실행후 아래 두 개의 명령 실행
      • CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');
      • SELECT * FROM my_stream;

0개의 댓글