[데이터 엔지니어링 데브코스 2기] TIL-15주차 Kafka와 Spark Streaming 기반 스트리밍 처리 (4)

이재호·2024년 1월 25일
0

1. Kafka 기본 프로그래밍


Kafka CLI tool.

  • docker ps를 통해 broker의 컨테이너 아이디로 접근.
  • docker exec -it Broker_Container_ID sh
  • kafka-{topics, configs, console-conumer, console-producer} 등의 명령어 존재.
  • kafka-topics --bootstrap-server kafka1:9092 --list: kafka1:9092라는 브로커의 토픽 리스트 조회.
  • kafka-topics --bootstrap-server kafka1:9092 --delete --topic topic_test: kafka1:9092라는 브로커의 topic_test라는 토픽 삭제.
  • kafka-console-producer --bootstrap-server kafka1:9092 --topic test_console: kafka1:9092라는 브로커에 test_console이라는 토픽 생성 후 메시지 입력 가능.
  • kafka-console-consumer --bootstrap-server kafka1:9092 --topic test_console --from-beginning: kafka1:9092라는 브로커의 test_console이라는 토픽에서 처음부터(earliest) 메시지를 읽음. Ctrl+C로 중간에 직접 정지해야 함.

2. Topic 파라미터 생성.


  • 먼저 KafkaAdminClient 오브젝트를 생성하고 create_topics 함수로 Topic 추가.
  • create_topics의 인자로 NewTopic 클래스의 오브젝트를 지정.
client = KafkaAdminClient(boostrap_servers=boostrap_servers) # 메시지를 보낼 때 사용할 브로커 리스트, 기본 값 localhost:9092
topic = NewTopic)
	name=name, # 토픽 이름
    num_partitions=partitions, # 파티션 수, 디폴트 값 1
    replication_facotr=replica) # 복제본 수, 디폴트 값 1
client.create_topics([topic])
  • pip3 install faker

  • person.py

"""
Pydantic is a Python library for data parsing and validation. 
It uses the type hinting mechanism of the newer versions of Python (version 3.6 onwards)
and validates the types during the runtime. Pydantic defines BaseModel class.
It acts as the base class for creating user defined models.
"""
from pydantic import BaseModel


class Person(BaseModel):
    id: str
    name: str
    title: str
  • fake_person_producer.py
import uuid
import json
from typing import List
from person import Person

from faker import Faker
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
from kafka import KafkaAdminClient
from kafka.producer import KafkaProducer


def create_topic(bootstrap_servers, name, partitions, replica=1):
    client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    try:
        topic = NewTopic(
            name=name,
            num_partitions=partitions,
            replication_factor=replica)
        client.create_topics([topic])
    except TopicAlreadyExistsError as e:
        print(e)
        pass
    finally:
        client.close()


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]

    # create a topic first
    create_topic(bootstrap_servers, topic_name, 4)

    # ingest some random people events
    people: List[Person] = []
    faker = Faker()
    producer = KafkaProducer(
        bootstrap_servers=bootstrap_servers,
        client_id="Fake_Person_Producer",
        
    )

    for _ in range(100):
        person = Person(id=str(uuid.uuid4()), name=faker.name(), title=faker.job().title())
        people.append(person)
        producer.send(
            topic=topic_name,
            key=person.title.lower().replace(r's+', '-').encode('utf-8'),
            value=person.json().encode('utf-8'))

    producer.flush()

if __name__ == '__main__':
    main()
  • python3 fake_person_producer.py 실행.

3. Consumer 옵션


KafkaConsumer 파라미터.

  • boostrap_servers : 메시지를 보낼 때 사용할 브로커 리스트.
  • client_id: Kafka Consumer 이름.
  • group_id: Kafka Consumer 그룹 이름. (같은 그룹에서는 파티션을 나눠 줌.)
  • key_deserializer, value_deserilaizer: 메시지의 키와 값의 deserializer 방법 지정(함수).
  • auto_offset_reset: earliest, latest
  • enable_auto_commit: True이면 컨슈머의 오프셋이 백그라운드에서 주기적으로 커밋. False이면 명시적으로 커밋을 해 주어야 함. 오프셋은 별도로 리셋이 가능하며 Conduktor Web UI에서도 가능.

Consumer은 다수의 파티션들을 어떻게 읽나,

  • 라운드 로빈 형태로 하나씩 읽음.
  • 하지만 라운드-로빈 방식은 병렬성이 떨어지는 문제점이 존재함.
  • 따라서 컨슈머 그룹 개념을 사용함.
  • 다수의 컨슈머가 하나의 토픽을 처리하기 위한 그룹 지정.

Consumer Group이란,

  • 다수의 컨슈머가 하나의 토픽을 읽게끔하기 위한 용도.
  • 컨슈머가 파티션 수보다 많은면 파티션을 라운드 로빈 방식으로 컨슈머에게 할당.
  • 컨슈머가 그룹에서 사라질 경우, 리밸런싱을 통해서 해당 컨슈머의 파티션을 다른 컨슈머에게 알아서 재 할당함.

예제 프로그램(1).

  • auto_offset_reset이 True인 경우.
  • autocommit_consumer.py
import json

from kafka.consumer import KafkaConsumer


def key_deserializer(key):
    return key.decode('utf-8')


def value_deserializer(value):
    return json.loads(value.decode('utf-8'))


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]
    consumer_group_id = "fake_people_group"

    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=consumer_group_id,
        key_deserializer=key_deserializer,
        value_deserializer=value_deserializer,
        auto_offset_reset='earliest',
        enable_auto_commit=True)

    consumer.subscribe([topic_name])
    for record in consumer:
        print(f"""
            Consumed person {record.value} with key '{record.key}'
            from partition {record.partition} at offset {record.offset}
        """)


if __name__ == '__main__':
    main()

예제 프로그램(2).

  • auto_offset_reset이 False인 경우.
  • manualcommit_consumer.py
import json

from kafka import TopicPartition, OffsetAndMetadata
from kafka.consumer import KafkaConsumer


def key_deserializer(key):
    return key.decode('utf-8')


def value_deserializer(value):
    return json.loads(value.decode('utf-8'))


def main():
    topic_name = "fake_people"
    bootstrap_servers = ["localhost:9092"]
    consumer_group_id = "manual_fake_people_group"

    consumer = KafkaConsumer(
        bootstrap_servers=bootstrap_servers,
        group_id=consumer_group_id,
        key_deserializer=key_deserializer,
        value_deserializer=value_deserializer,
        auto_offset_reset='earliest',
        enable_auto_commit=False)

    consumer.subscribe([topic_name])
    for record in consumer:
        print(f"""
            Consumed person {record.value} with key '{record.key}'
            from partition {record.partition} at offset {record.offset}
        """)

        topic_partition = TopicPartition(record.topic, record.partition)
        offset = OffsetAndMetadata(record.offset + 1, record.timestamp)
        consumer.commit({
            topic_partition: offset
        })

if __name__ == '__main__':
    main()
  • python3 명령어로 위 파일들 실행.

Consumer/Producer 패턴.

  • 컨슈머는 한 토픽의 메시지를 소비해서 새로운 토픽을 만들기도 함.
  • 즉 컨슈머이면서 프로듀서로 동작하는 것이 아직 흔한 패턴임.

4. ksqlDB


  • REST API나 ksql 클라이언트 툴을 사용해서 토픽을 테이블처럼 SQL로 조작.
  • ksqldb 컨테이너 아이디 확인 후, 다커 명령어로 sh 접속.
  • ksql
  • CREATE STREAM my_stream (id STRING, name STRING, title STRING) with (kafka_topic='fake_people', value_format='JSON');
  • SELECT * FROM my_stream;
  • SELECT *, ROWTIME FROM my_stream EMIT CHANGES; 이전 SELECT에서 바뀐 내용들에 한해서, ROWTIME(epoch)을 추가해서 읽기.
profile
천천히, 그리고 꾸준히.

0개의 댓글