Kafka란,
- 실시간 데이터 처리를 위한 오픈소스 분산 스트리밍 플랫폼.
- 다수의 서버에 로그가 분산되어 저장됨. (Distributed Commit Log)
- 다수의 서버에 저장하기 때문에 Scalability가 있으며, 서버를 Broker라고 부름.
- Scale Out: 서버(Broker) 증설.
- High Throughput과 Low Latency 실시간 데이터 처리에 맞게 구현됨.
- Topic에 데이터를 저장할 때, 저장 기한을 지정할 수 있음.
- Topic = # of partitions.로 이벤트 스트림을 의미함.
Eventual Consistency vs. Strong Consistency.
- 시스템이 다수의 서버로 구성된 경우, 한 서버의 이벤트 write를 어떻게 처리할 지는 Topic 사용자 입장에서 생각해 보고 정하면 됨.
Eventual Consistency.
- 이벤트의 write가 Topic(모든 서버)에 반영이 안 될 수도 있음.
- 데이터의 생산 속도가 빠르지만, 불완전한 리턴.
- 보통의 일반적인 데이터에 적용.
Strong Consistency.
- 이벤트의 write가 Topic(모든 서버)에 반영이 됨.
- 데이터의 생산(쓰기) 속도가 느리지만, 완전한 리턴.
- 계좌나 결제와 같은 중요 데이터에 적용.
kafka의 장점.
- 스트림 처리 가능. (실시간 데이터 처리.)
- High Throughput. (초당 수 백만 개의 메시지 처리 가능.)
- Fault Tolerance. (데이터 복제 및 분산 커밋 로그를 활용할 수 있음.)
- Scalability. (Scale out이 쉬움.)
- 풍부한 생태계가 구성됨. (많은 유저 존재.)
Topic.
- 이벤트 스트림(ts를 기준으로 이벤트가 정렬됨.)
- Proucer(생산자) / Consumer(소비자)
- 이름, 파티션 수, 복제본 수, Consistency level, 데이터 보존 기한, 메시지 압축 방식 등의 파라미터가 존재함.
Message(=Event).
- key + value + ts (+ header(meta-data))로 구성됨.
- key를 기준으로 partition이 구성됨.
- 최대 1MB의 크기.
Partition.
- Topic의 구성 요소.
- 한 서버에 할당됨.
- 메시지의 Partition 소속은 key 값의 해싱으로 정해지거나 라운드-로빈 방식으로 결정됨. 보토은 전자의 방식 선호.
Partition 복제본.
- 각 Partition은 복제본이 존재함.
- 복제본 덕분에 fail-over와 병렬 처리가 가능함.
- Leader(쓰기, 읽기)와 Follwer(읽기)로 파티션의 역할을 지정함.
Broker(=Server=Node).
- 실제 데이터를 저장하는 서버.
- Kafka 클러스터의 구성 요소.
- 최대 4000개의 파티션 관리(처리) 가능.
- 물리 서버 혹은 VM 위에서 동작함.
메타 정보 관리자(Kraft 프로토콜 혹은 Zookeeper).
- Broker 리스트 관리. (대장인 Controller 지정)
- Topic 리스트 관리.
- Topic별 ACL(Access Control List) 관리.
- Quota 관리.
Segment.
- Partition의 구성 요소.
- 변경되지 않는 로그 파일(Commit Log).
- 디스크 상에 존재하는 하나의 파일.
- 한 세그먼트의 용량이 최대에 도달하면 새로운 세그먼트 파일을 생성함.
Kafka Connect.
- Kafka 위에 만들어진 중앙 집중 데이터 허브.
- 일종의 데이터 버스.
- 별도의 서버가 필요하며, Kafka와는 별개의 오픈 소스.
- 두 가지 모드가 존재함. (standalone, distributed)
- 데이터 시스템 간의 데이터를 주고 받는 용도로 사용됨.
- Data Source (RDB) -> Kafka Connect -> Kafka -> Kafka Connect -> Data Sink (AWS S3).
- 코딩 없이 환경 설정만으로 위 구조를 구현할 수 있음.
Kafka Schema Registry.
- Topic 메시지 데이터에 대한 스키마를 관리 및 검증 가능.
- Data를 Serialize(압축)할 때 AVRO를 기본 데이터 포맷으로 사용함.
- 메시지의 포맷만 지정하면 나머지는 Kafka 라이브러리가 해 줌.
Kafka Streams.
- Kafka Topic을 좀더 real-time으로 처리할 수 있는 실시간 스트림 처리 라이브러리.
- Spark Streaming은 micro-batch에 가까움.
ksqlDB.
- Kafka Streams 위에 만들어진 스트림 처리 데이터베이스.
- Kafka Topic을 Storage로 사용하여 이에 sql 쿼리를 할 수 있음.
cd kafka-stack-docker-compose
docker compose -f full-stack.yml up
으로 Kfaka의 컴포넌트 설치.pip3 install kafka-python
producer 생성. (producer.py)
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'], # 어느 브로커에 토픽을 생성할 지 결정. 브로커이름:포트#
value_serializer=lambda x: dumps(x).encode('utf-8') # value에 대한 시리얼라이징 방식 설정.
)
for j in range(99):
print("Iteration", j)
data = {'counter': j}
producer.send('topic_test', value=data) # topic_test라는 토픽에 value인 data를 전달. 키는 빈 값.
sleep(0.5)
python3 producer.py
web UI에서 topics 메뉴 선택 후 topic 확인.
consumer 생성 (consumer.py)
from kafka import KafkaConsumer
from json import loads
from time import sleep
consumer = KafkaConsumer(
'topic_test', # 소비할 토픽 이름.
bootstrap_servers=['localhost:9092'], # 브로커 지정.
auto_offset_reset='earliest', # earliest(생성된 지 오래된 것부터) vs. latest(생성된 된 지 얼마 안 된 것부터)
enable_auto_commit=True, # 카프카Consumer가 알아서 커밋을 기록하라. 실제 현업에서는 보통 False로 지정함.
group_id='my-group-id', # 그룹 지정.
value_deserializer=lambda x: loads(x.decode('utf-8'))
) # Producer의 serializer된 내용을 deserializer함.
for event in consumer:
event_data = event.value # 이벤트의 벨류값 읽기.
# Do whatever you want
print(event_data)
sleep(2)
python3 consumer.py