Kafka CLI tool.
- docker ps를 통해 broker의 컨테이너 아이디로 접근.
docker exec -it Broker_Container_ID sh
kafka-{topics, configs, console-conumer, console-producer}
등의 명령어 존재.kafka-topics --bootstrap-server kafka1:9092 --list
: kafka1:9092라는 브로커의 토픽 리스트 조회.kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test
: kafka1:9092라는 브로커의 topic_test라는 토픽 삭제.kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console
: kafka1:9092라는 브로커에 test_console이라는 토픽 생성 후 메시지 입력 가능.kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning
: kafka1:9092라는 브로커의 test_console이라는 토픽에서 처음부터(earliest) 메시지를 읽음. Ctrl+C로 중간에 직접 정지해야 함.
client = KafkaAdminClient(boostrap_servers=boostrap_servers) # 메시지를 보낼 때 사용할 브로커 리스트, 기본 값 localhost:9092
topic = NewTopic)
name=name, # 토픽 이름
num_partitions=partitions, # 파티션 수, 디폴트 값 1
replication_facotr=replica) # 복제본 수, 디폴트 값 1
client.create_topics([topic])
pip3 install faker
person.py
"""
Pydantic is a Python library for data parsing and validation.
It uses the type hinting mechanism of the newer versions of Python (version 3.6 onwards)
and validates the types during the runtime. Pydantic defines BaseModel class.
It acts as the base class for creating user defined models.
"""
from pydantic import BaseModel
class Person(BaseModel):
id: str
name: str
title: str
import uuid
import json
from typing import List
from person import Person
from faker import Faker
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka import KafkaAdminClient
from kafka.producer import KafkaProducer
def create_topic(bootstrap_servers, name, partitions, replica=1):
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
try:
topic = NewTopic(
name=name,
num_partitions=partitions,
replication_factor=replica)
client.create_topics([topic])
except TopicAlreadyExistsError as e:
print(e)
pass
finally:
client.close()
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
# create a topic first
create_topic(bootstrap_servers, topic_name, 4)
# ingest some random people events
people: List[Person] = []
faker = Faker()
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
client_id="Fake_Person_Producer",
)
for _ in range(100):
person = Person(id=str(uuid.uuid4()), name=faker.name(), title=faker.job().title())
people.append(person)
producer.send(
topic=topic_name,
key=person.title.lower().replace(r's+', '-').encode('utf-8'),
value=person.json().encode('utf-8'))
producer.flush()
if __name__ == '__main__':
main()
python3 fake_person_producer.py
실행.KafkaConsumer 파라미터.
- boostrap_servers : 메시지를 보낼 때 사용할 브로커 리스트.
- client_id: Kafka Consumer 이름.
- group_id: Kafka Consumer 그룹 이름. (같은 그룹에서는 파티션을 나눠 줌.)
- key_deserializer, value_deserilaizer: 메시지의 키와 값의 deserializer 방법 지정(함수).
- auto_offset_reset: earliest, latest
- enable_auto_commit: True이면 컨슈머의 오프셋이 백그라운드에서 주기적으로 커밋. False이면 명시적으로 커밋을 해 주어야 함. 오프셋은 별도로 리셋이 가능하며 Conduktor Web UI에서도 가능.
Consumer은 다수의 파티션들을 어떻게 읽나,
- 라운드 로빈 형태로 하나씩 읽음.
- 하지만 라운드-로빈 방식은 병렬성이 떨어지는 문제점이 존재함.
- 따라서 컨슈머 그룹 개념을 사용함.
- 다수의 컨슈머가 하나의 토픽을 처리하기 위한 그룹 지정.
Consumer Group이란,
- 다수의 컨슈머가 하나의 토픽을 읽게끔하기 위한 용도.
- 컨슈머가 파티션 수보다 많은면 파티션을 라운드 로빈 방식으로 컨슈머에게 할당.
- 컨슈머가 그룹에서 사라질 경우, 리밸런싱을 통해서 해당 컨슈머의 파티션을 다른 컨슈머에게 알아서 재 할당함.
예제 프로그램(1).
- auto_offset_reset이 True인 경우.
import json
from kafka.consumer import KafkaConsumer
def key_deserializer(key):
return key.decode('utf-8')
def value_deserializer(value):
return json.loads(value.decode('utf-8'))
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "fake_people_group"
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=consumer_group_id,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset='earliest',
enable_auto_commit=True)
consumer.subscribe([topic_name])
for record in consumer:
print(f"""
Consumed person {record.value} with key '{record.key}'
from partition {record.partition} at offset {record.offset}
""")
if __name__ == '__main__':
main()
예제 프로그램(2).
- auto_offset_reset이 False인 경우.
import json
from kafka import TopicPartition, OffsetAndMetadata
from kafka.consumer import KafkaConsumer
def key_deserializer(key):
return key.decode('utf-8')
def value_deserializer(value):
return json.loads(value.decode('utf-8'))
def main():
topic_name = "fake_people"
bootstrap_servers = ["localhost:9092"]
consumer_group_id = "manual_fake_people_group"
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=consumer_group_id,
key_deserializer=key_deserializer,
value_deserializer=value_deserializer,
auto_offset_reset='earliest',
enable_auto_commit=False)
consumer.subscribe([topic_name])
for record in consumer:
print(f"""
Consumed person {record.value} with key '{record.key}'
from partition {record.partition} at offset {record.offset}
""")
topic_partition = TopicPartition(record.topic, record.partition)
offset = OffsetAndMetadata(record.offset + 1, record.timestamp)
consumer.commit({
topic_partition: offset
})
if __name__ == '__main__':
main()
Consumer/Producer 패턴.
- 컨슈머는 한 토픽의 메시지를 소비해서 새로운 토픽을 만들기도 함.
- 즉 컨슈머이면서 프로듀서로 동작하는 것이 아직 흔한 패턴임.
ksql
CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');
SELECT * FROM my_stream;
SELECT *, ROWTIME FROM my_stream EMIT CHANGES;
이전 SELECT에서 바뀐 내용들에 한해서, ROWTIME(epoch)을 추가해서 읽기.