[데이터 엔지니어링 데브코스 2기] TIL-15주차 Kafka와 Spark Streaming 기반 스트리밍 처리 (3)

이재호·2024년 1월 24일
0

1. Kafka


Kafka란,

  • 실시간 데이터 처리를 위한 오픈소스 분산 스트리밍 플랫폼.
  • 다수의 서버에 로그가 분산되어 저장됨. (Distributed Commit Log)
  • 다수의 서버에 저장하기 때문에 Scalability가 있으며, 서버를 Broker라고 부름.
  • Scale Out: 서버(Broker) 증설.
  • High Throughput과 Low Latency 실시간 데이터 처리에 맞게 구현됨.
  • Topic에 데이터를 저장할 때, 저장 기한을 지정할 수 있음.
  • Topic = # of partitions.로 이벤트 스트림을 의미함.

Eventual Consistency vs. Strong Consistency.

  • 시스템이 다수의 서버로 구성된 경우, 한 서버의 이벤트 write를 어떻게 처리할 지는 Topic 사용자 입장에서 생각해 보고 정하면 됨.

Eventual Consistency.

  • 이벤트의 write가 Topic(모든 서버)에 반영이 안 될 수도 있음.
  • 데이터의 생산 속도가 빠르지만, 불완전한 리턴.
  • 보통의 일반적인 데이터에 적용.

Strong Consistency.

  • 이벤트의 write가 Topic(모든 서버)에 반영이 됨.
  • 데이터의 생산(쓰기) 속도가 느리지만, 완전한 리턴.
  • 계좌나 결제와 같은 중요 데이터에 적용.

kafka의 장점.

  • 스트림 처리 가능. (실시간 데이터 처리.)
  • High Throughput. (초당 수 백만 개의 메시지 처리 가능.)
  • Fault Tolerance. (데이터 복제 및 분산 커밋 로그를 활용할 수 있음.)
  • Scalability. (Scale out이 쉬움.)
  • 풍부한 생태계가 구성됨. (많은 유저 존재.)

2. Kafka Architecture


Topic.

  • 이벤트 스트림(ts를 기준으로 이벤트가 정렬됨.)
  • Proucer(생산자) / Consumer(소비자)
  • 이름, 파티션 수, 복제본 수, Consistency level, 데이터 보존 기한, 메시지 압축 방식 등의 파라미터가 존재함.

Message(=Event).

  • key + value + ts (+ header(meta-data))로 구성됨.
  • key를 기준으로 partition이 구성됨.
  • 최대 1MB의 크기.

Partition.

  • Topic의 구성 요소.
  • 한 서버에 할당됨.
  • 메시지의 Partition 소속은 key 값의 해싱으로 정해지거나 라운드-로빈 방식으로 결정됨. 보토은 전자의 방식 선호.

Partition 복제본.

  • 각 Partition은 복제본이 존재함.
  • 복제본 덕분에 fail-over와 병렬 처리가 가능함.
  • Leader(쓰기, 읽기)와 Follwer(읽기)로 파티션의 역할을 지정함.

Broker(=Server=Node).

  • 실제 데이터를 저장하는 서버.
  • Kafka 클러스터의 구성 요소.
  • 최대 4000개의 파티션 관리(처리) 가능.
  • 물리 서버 혹은 VM 위에서 동작함.

메타 정보 관리자(Kraft 프로토콜 혹은 Zookeeper).

  • Broker 리스트 관리. (대장인 Controller 지정)
  • Topic 리스트 관리.
  • Topic별 ACL(Access Control List) 관리.
  • Quota 관리.

Segment.

  • Partition의 구성 요소.
  • 변경되지 않는 로그 파일(Commit Log).
  • 디스크 상에 존재하는 하나의 파일.
  • 한 세그먼트의 용량이 최대에 도달하면 새로운 세그먼트 파일을 생성함.

3. Kafka 기타 개념.


Kafka Connect.

  • Kafka 위에 만들어진 중앙 집중 데이터 허브.
  • 일종의 데이터 버스.
  • 별도의 서버가 필요하며, Kafka와는 별개의 오픈 소스.
  • 두 가지 모드가 존재함. (standalone, distributed)
  • 데이터 시스템 간의 데이터를 주고 받는 용도로 사용됨.
  • Data Source (RDB) -> Kafka Connect -> Kafka -> Kafka Connect -> Data Sink (AWS S3).
  • 코딩 없이 환경 설정만으로 위 구조를 구현할 수 있음.

Kafka Schema Registry.

  • Topic 메시지 데이터에 대한 스키마를 관리 및 검증 가능.
  • Data를 Serialize(압축)할 때 AVRO를 기본 데이터 포맷으로 사용함.
  • 메시지의 포맷만 지정하면 나머지는 Kafka 라이브러리가 해 줌.

Kafka Streams.

  • Kafka Topic을 좀더 real-time으로 처리할 수 있는 실시간 스트림 처리 라이브러리.
  • Spark Streaming은 micro-batch에 가까움.

ksqlDB.

  • Kafka Streams 위에 만들어진 스트림 처리 데이터베이스.
  • Kafka Topic을 Storage로 사용하여 이에 sql 쿼리를 할 수 있음.

4. Kafka 설치


  1. https://github.com/conduktor/kafka-stack-docker-compose.git 클론
  2. cd kafka-stack-docker-compose
  3. docker compose -f full-stack.yml up으로 Kfaka의 컴포넌트 설치.
  4. localhost:8080 에서 admin@admin.io / admin 으로 로그인.

5. Kafka 실습.


  1. pip3 install kafka-python

  2. producer 생성. (producer.py)

    from time import sleep
    from json import dumps
    from kafka import KafkaProducer
    
    producer = KafkaProducer(
       bootstrap_servers=['localhost:9092'], # 어느 브로커에 토픽을 생성할 지 결정. 브로커이름:포트#
       value_serializer=lambda x: dumps(x).encode('utf-8') # value에 대한 시리얼라이징 방식 설정. 
    )
    
    for j in range(99):
       print("Iteration", j)
       data = {'counter': j}
       producer.send('topic_test', value=data) # topic_test라는 토픽에 value인 data를 전달. 키는 빈 값.
       sleep(0.5)
  3. python3 producer.py

  4. web UI에서 topics 메뉴 선택 후 topic 확인.

  5. consumer 생성 (consumer.py)

    from kafka import KafkaConsumer
    from json import loads
    from time import sleep
    
    consumer = KafkaConsumer(
       'topic_test', # 소비할 토픽 이름.
       bootstrap_servers=['localhost:9092'], # 브로커 지정.
       auto_offset_reset='earliest', # earliest(생성된 지 오래된 것부터) vs. latest(생성된 된 지 얼마 안 된 것부터)
       enable_auto_commit=True, # 카프카Consumer가 알아서 커밋을 기록하라. 실제 현업에서는 보통 False로 지정함.
       group_id='my-group-id', # 그룹 지정.
       value_deserializer=lambda x: loads(x.decode('utf-8'))
    ) # Producer의 serializer된 내용을 deserializer함.
    for event in consumer:
       event_data = event.value # 이벤트의 벨류값 읽기.
       # Do whatever you want
       print(event_data)
       sleep(2)
  6. python3 consumer.py

profile
천천히, 그리고 꾸준히.

0개의 댓글