📖 학습주제
Kafka와 Spark Streaming 기반 스트리밍 처리 (4)
- docker ps를 통해 Broker의 Container ID 혹은 Container 이름 파악
- 해당 컨테이너로 로그인
- docker exec -it Broker_Container_ID sh
- 거기서 다양한 kafka 관련 클라이언트 툴을 사용 가능
- kafka-topics
- kafka-configs
- kafka-console-consumer
- kafka-console-producer
- …
Topic 파라미터 설정
Topic 생성시 다수의 Partition이나 Replica를 주려면
- 먼저 KafkaAdminClient 오브젝트를 생성하고 create_topics 함수로 Topic을 추가
- create_topics의 인자로는 NewTopic 클래스의 오브젝트를 지정
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
topic = NewTopic(
name=name,
num_partitions=partitions,
replication_factor=replica)
client.create_topics([topic])
NewTopic 클래스
| 파라미터 | 의미 | 기본값 |
|---|
| name | Topic의 이름 | |
| num_partitions | Partition의 수 | 1 |
| replication_factor | Replication의 수 | 1 |
KafkaProducer 파라미터
https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
Kafka Producer 동작

Consumer 옵션
KafkaConsumer 파라미터
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
Consumer가 다수의 Partitions들로부터 읽는 방법
- Consumer가 하나이고 다수의 Partitions들로 구성된 Topic으로부터 읽어야한다면?
- Consumer는 각 Partition들로부터 라운드 로빈 형태로 하나씩 읽게 됨
- 이 경우 병렬성이 떨어지고 데이터 생산 속도에 따라 Backpressure가 심해질 수 있음
- 이를 해결하기 위한 것이 뒤에 이야기할 Consumer Group
- 한 프로세스에서 다수의 Topic을 읽는 것 가능
- Topic 수만큼 KafkaConsumer 인스턴스 생성하고 별도의 Group ID와 Client ID를 지정해야 함
Consumer Group
- Consumer가 Topic을 읽기 시작하면 해당 Topic내 일부 Partition들이 자동으로 할당됨
- Consumer의 수보다 Partition의 수가 더 많은 경우, Partition은 라운드 로빈 방식으로 Consumer들에게 할당됨 (한 Partition은 한 Consumer에게만 할당됨)
- 이를 통해 데이터 소비 병렬성을 늘리고 Backpressure 경감
- 그리고 Consumer가 일부 중단되더라도 계속해서 데이터 처리 가능
- Consumer Group Rebalancing
- 기존 Consumer가 무슨 이유로 사라지거나 새로운 Consumer가 Group에 참여하는 경우 Partition들이 다시 지정이 되어야함. 이를 Consumer Group Rebalancing이라고 부르면 이는 Kafka에서 알아서 수행해줌
Message Processing Guarantee 방식

Exactly Once
각 Message가 Consumer에게 정확히 한 번만 전달된다는 것을 보장. 네트워크 문제, 장애 또는 재시도 가능성으로 아주 어려운 문제.
1> Producer 단에서는 enable_idempotence를 True로 설정
2> Producer에서 메세지를 쓸 때와 Consumer에서 읽을 때 Transaction API를 사용
At Least Once
모든 메시지가 Consumer에게 적어도 한 번 이상 전달되도록 보장하지만, 메시지 중복 가능성 존재.
이 경우 Consumer는 중복 메시지를 처리하기 위해 중복 제거 메커니즘을 구현해야함 (멱등성). 이는 보통 Consumer가 직접 오프셋을 커밋을 할때 발생
At Most Once
메시지 손실 가능성에 중점을 둠. 이는 메시지가 손실될 수는 있지 중복이 없음을 의미. 가장 흔한 메시지 전송 보장 방식 (Default)
Consumer/Producer 패턴
- 많은 경우 Consumer는 한 Topic의 메세지를 소비해서 새로운 Topic을 만들기도 함
- 즉 Consumer이면서 Producer로 동작하는 것이 아주 흔한 패턴임
- 데이터 Transformation, Filtering, Enrichment
- 동일한 프로세스 내에서 Kafka Consumer를 사용하여 한 Topic에서 메시지를 읽고 필요한 데이터 변환 또는 Enrichment을 수행한 다음, Producer를 사용하여 수정된 데이터를 다른 Topic으로 푸시 가능
ksqlDB
- REST API나 ksql 클라이언트 툴을 사용해서 Topic을 테이블처럼 SQL로 조작
- docker ps 후 confluentinc/cp-ksqldb-server의 Container ID 복사
- docker exec -it ContainerID sh
- ksql 실행후 아래 두 개의 명령 실행
◦ CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');
◦ SELECT * FROM my_stream;