Producer, Consumer 구현 - Python

yoon__0_0·2024년 7월 8일
0

이어드림 수업

목록 보기
91/103

Producer, Consumer, Admin API로 구현해보기

Python API

setting

  • python_kafka 폴더 생성
mkdir pytho_kafka
  • python 가상환경 생성 및 환경 세팅
# python 3 가상환경 생성
> python3 -m venv kafka_virtualenv

## 가상환경 실행하기 
> source kafka_virtualenv/bin/activate
> python -V

# kafka library 설치 
> pip install --upgrade pip
> pip install confluent-kafka
  • topic 생성
    • topic이름 : my_topic
    • partition : 2개
    • replication : 없음 (1개)
> cd ~/apps/kafka_2.12-3.6.2
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic my_topic
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
  • 기본적인 consumer
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning

API로 코드 만들어 보기 (Basic)

1) vi producer.py

from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost'})

# Call back function (어떤 partition에 할당 되었는지 확인)
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

# Producer가 데이터를 전송한 후 callback 함수가 호출되기까지 대기하는 시간
p.poll(0)
p.produce('my_topic', "test-msg".encode('utf-8'), callback=delivery_report)

p.flush()
  • 위를 실행시, 기본적으로 만든 Consumer 콘솔에서 test-msg가 나타나게 됨

2) consumer.py

from confluent_kafka import Consumer

c = Consumer({
    'bootstrap.servers': 'localhost',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

# Subscribe to topics
c.subscribe(['my_topic'])

# Read messages from Kafka, print to stdout
# timeout : 메세지가 없는 경우 대기하는 최대 시간(second), -1인 경우 무한 대기
while True:
    msg = c.poll(timeout=1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()
  • 실행시, Consumer과 동일하게 나옴

3) Admin

  • 자동화가 필요할 때 사용함.
  • 토픽을 새롭게 생성 해주는 Admin 파일 생성해보기
  • 다른 것들도 자동적으로 관리 가능함.
  • vi admin.py
from confluent_kafka.admin import AdminClient, NewTopic

a = AdminClient({'bootstrap.servers': 'localhost'})

# 새롭게 생성할 topic에 대한 상세 설정 및 생성할 topic 명을 입력한다.
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in ["my_topic_new1", "my_topic_new2"]]


# Call create_topics to asynchronously create topics. A dict of <topic,future> is returned.
fs = a.create_topics(new_topics)

# Wait for each operation to finish.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))

API로 코드 만들어 보기 (Advanced)

1) Consumer

from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from pprint import pformat

# Kafka Consumer에 대한 세부 통계정보를 출력하는 Callback 함수
def stats_cb(stats_json_str):
    stats_json = json.loads(stats_json_str)
    print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))

def print_usage_and_exit(program_name):
    sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
    options = '''
 Options:
  -T <intvl>   Enable client statistics at specified interval (ms)
'''
    sys.stderr.write(options)
    sys.exit(1)

if __name__ == '__main__':
    optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
    if len(argv) < 3:
        print_usage_and_exit(sys.argv[0])

    broker = argv[0]
    group = argv[1]
    topics = argv[2:]

    # Create Consumer instance
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000,
            'auto.offset.reset': 'earliest'}

    # -T 옵션에 Check to see if -T option exists
    for opt in optlist:
        if opt[0] != '-T':
            continue
        try:
            intval = int(opt[1])
        except ValueError:
            sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
            sys.exit(1)

        if intval <= 0:
            sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
            sys.exit(1)

        # Enable statistics via librdkafka's statistics callback
        # Topic에 대한 통계정보 수집을 활성화함. (inval 단위로 수집)
        conf['stats_cb'] = stats_cb
        conf['statistics.interval.ms'] = intval

    # Create logger for consumer (logs will be emitted when poll() is called)
    logger = logging.getLogger('consumer')
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
    logger.addHandler(handler)

    # Create Consumer instance
    # Hint: try debug='fetch' to generate some log messages
    c = Consumer(conf, logger=logger)

    def print_assignment(consumer, partitions):
        print('Assignment:', partitions)

    # Subscribe to topics (Topic이 할당되면 callback함수로 print_assignment 실행)
    c.subscribe(topics, on_assign=print_assignment)

    # Read messages from Kafka, print to stdout
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    finally:
        # Close down consumer to commit final offsets.
        c.close()
  • -T옵션을 빼고 실행
python consumer_detail.py localhost my_group my_topic

  • producer.py를 여러번 실행해보면? : 파티션이 나눠서 여러번 가짐

  • T 옵션 넣고 실행 : 모든 옵션들이 막나옴

python consumer_detail.py -T 10000 localhost my_group my_topic

2) Producer

  • producer_detail.py
from confluent_kafka import Producer
import sys

if __name__ == '__main__':
    if len(sys.argv) != 3:
        sys.stderr.write('Usage: %s <bootstrap-brokers> <topic>\n' % sys.argv[0])
        sys.exit(1)

    broker = sys.argv[1]
    topic = sys.argv[2]

    # Producer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {'bootstrap.servers': broker}

    # Create Producer instance
    p = Producer(**conf)

    # Optional per-message delivery callback (triggered by poll() or flush())
    # Producer에서 메세지를 정상적으로 전달했거나, 최종 실패한 경우 deliver_callbak 함수를 실행함. 
    def delivery_callback(err, msg):
        if err:
            sys.stderr.write('%% Message failed delivery: %s\n' % err)
        else:
            sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
                             (msg.topic(), msg.partition(), msg.offset()))

    # Read lines from stdin, produce each line to Kafka
    for line in sys.stdin:
        try:
            # Produce line (without newline, rstrip()함수로 우축 공백 제거)
            p.produce(topic, line.rstrip(), callback=delivery_callback)

        except BufferError:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
                             len(p))

        # Serve delivery callback queue.
        # NOTE: Since produce() is an asynchronous API this poll() call
        #       will most likely not serve the delivery callback for the
        #       last produced message.
        p.poll(0)

    # Wait until all messages have been delivered
    sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
    p.flush()
  • 실행시

3) Producer Key/ Value

from confluent_kafka import Producer
from random import choice

p = Producer({'bootstrap.servers': 'localhost'})

# Call back function (어떤 partition에 할당 되었는지 확인)
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}] key : {}'.format(msg.topic(), msg.partition(), msg.key()))

# Producer가 데이터를 전송한 후 callback 함수가 호출되기까지 대기하는 시간
user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']

for _ in range(10):
    p.poll(0)
    # Key 값 설정
    user_id = choice(user_ids)
    # Value 값 설정
    product = choice(products)
    p.produce('my_topic', product, user_id, callback=delivery_report)
    # topic (str)
    # value (str|bytes) – Message payload
    # key (str|bytes) – Message key
    # partition (int) – Partition to produce to, else uses the configured built-in partitioner.
    # on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery
    # timestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
    # headers (dict|list) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)
    # https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#producer


p.flush()
profile
신윤재입니다

0개의 댓글