📖 학습 주제
- Kafka CLI Tools
- Kafka-Topic
- Kafka-Consumer
- ksqlDB
✏️ 주요 메모 사항 소개
Kafka CLI 기능들을 이용하기 위해서는 docker ps를 통해 Kafka Broker의 Container ID 혹은 이름을 파악해야 한다. 그 후에 해당 컨테이너로 로그인 (docker exec -it Broker_Container_ID sh) 하면 다양한 kafka 관련 클라이언트 툴을 사용할 수 있다.
- kafka-topics
- Topic 리스트 출력 :
kafka-topics --bootstrap-server kafka1:9092 --list
- Topic 삭제 :
kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test
- kafka-console-producer
- Topic 만들고 메세지 생성 :
kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
-
kafka-console-consumer
- Topic에서 Message 읽기 :
kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning
-
kafka-configs


Kafka-Topic
kafka-console-producer를 사용하여 바로 토픽을 만들 수 있었지만, partition이나 replica의 수를 설정할 순 없었다. 이를 해결하기 위해서는 Topic을 미리 만들면서 파라미터 값을 설정하면 된다.
- 먼저 KafkaAdminClient 오브젝트를 생성하고 create_topics 함수로 Topic을 추가
- create_topics의 인자로는 NewTopic 클래스의 오브젝트를 지정
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
name=name,
num_partitions=partitions,
replication_factor=replica)
client.create_topics([topic])

Kafka Producer

Kafka Producer 동작방식

Kafka Consumer

- Consumer가 하나이고 다수의 Partitions들로 구성된 Topic으로부터 읽어야한다면?
- Consumer는 각 Partition들로부터 라운드 로빈 형태로 하나씩 읽게 됨
- 이 경우 병렬성이 떨어지고 데이터 생산 속도에 따라 Backpressure가 심해질 수 있음
- 이를 해결하기 위한 것이 뒤에 이야기할 Consumer Group
- 한 프로세스에서 다수의 Topic을 읽는 것 가능
- Topic 수만큼 KafkaConsumer 인스턴스 생성하고 별도의 Group ID와 Client ID를 지정해야함
Consumer Group
- Consumer가 Topic을 읽기 시작하면 해당 Topic내 일부 Partition들이 자동으로 할당됨
- Consumer의 수보다 Partition의 수가 더 많은 경우, Partition은 라운드 로빈 방식으로 Consumer들에게 할당됨 (한 Partition은 한 Consumer에게만 할당됨)
- 이를 통해 데이터 소비 병렬성을 늘리고 Backpressure 경감
- 그리고 Consumer가 일부 중단되더라도 계속해서 데이터 처리 가능
- Consumer Group Rebalancing
- 기존 Consumer가 무슨 이유로 사라지거나 새로운 Consumer가 Group에 참여하는 경우 Partition들이 다시 지정이 되어야함. 이를 Consumer Group Rebalancing이라고 부르면 이는 Kafka에서 알아서 수행해줌
Consumer/Producer 패턴
- 많은 경우 Consumer는 한 Topic의 메세지를 소비해서 새로운 Topic을 만들기도함
- 즉 Consumer이면서 Producer로 동작하는 것이 아주 흔한 패턴임
- 데이터 Transformation, Filtering, Enrichment
- 동일한 프로세스 내에서 Kafka Consumer를 사용하여 한 Topic에서 메시지를 읽고 필요한 데이터 변환 또는 Enrichment을 수행한 다음, Producer를 사용하여 수정된 데이터를 다른 Topic으로 푸시 가능
Message Processing Guarantee
실시간 메시지 처리 및 전송 관점에서 보낸 메세지가 정말로 수신되었는지 보장하는 것이 중요. 이를 보장하는 방식은 크게 3가지가 존재한다.
- Exactly Once
- Exactly Once('정확히 한 번')는 각 Message가 Consumer에게 정확히 한번만 전달된다는 것을 보장. 네트워크 문제, 장애 또는 재시도 가능성으로 아주 어려운 문제.
- 1> Producer 단에서는 enable_idempotence를 True로 설정
- 2> Producer에서 메세지를 쓸 때와 Consumer에서 읽을 때 Transaction API를 사용
- At Least Once
- At Least Once ('적어도 한 번 이상')는 모든 메시지가 Consumer에게 적어도 한 번 이상 전달되도록 보장하지만, 메시지 중복 가능성 존재.
- 이 경우 Consumer는 중복 메시지를 처리하기 위해 중복 제거 메커니즘을 구현해야함 (멱등성).
- 이는 보통 Consumer가 직접 오프셋을 커밋을 할때 발생함.
- At Most Once (Kafka 기본 방식)
- At Most Once ('최대 한 번만')는 메시지 손실 가능성에 중점을 둠.
- 이는 메시지가 손실될 수는 있지만 중복이 없음을 의미.
- 가장 흔한 메시지 전송 보장 방식 (default)

ksqlDB

REST API나 ksql 클라이언트 툴을 사용해서 Topic을 테이블처럼 SQL로 조작하는 프로그램
- <간단하게 실습하는 방법>
- docker ps 후 confluentinc/cp-ksqldb-server의 Container ID 복사
docker exec -it ContainerID sh 실행
- ksql 실행후 아래 두 개의 명령 실행
CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');
SELECT * FROM my_stream;