Producer, Consumer, Admin API로 구현해보기
mkdir pytho_kafka
# python 3 가상환경 생성
> python3 -m venv kafka_virtualenv
## 가상환경 실행하기
> source kafka_virtualenv/bin/activate
> python -V
# kafka library 설치
> pip install --upgrade pip
> pip install confluent-kafka
> cd ~/apps/kafka_2.12-3.6.2
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic my_topic
> bin/kafka-topics.sh --list --bootstrap-server localhost:9092
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning
1) vi producer.py
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost'})
# Call back function (어떤 partition에 할당 되었는지 확인)
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
# Producer가 데이터를 전송한 후 callback 함수가 호출되기까지 대기하는 시간
p.poll(0)
p.produce('my_topic', "test-msg".encode('utf-8'), callback=delivery_report)
p.flush()
2) consumer.py
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
# Subscribe to topics
c.subscribe(['my_topic'])
# Read messages from Kafka, print to stdout
# timeout : 메세지가 없는 경우 대기하는 최대 시간(second), -1인 경우 무한 대기
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
3) Admin
from confluent_kafka.admin import AdminClient, NewTopic
a = AdminClient({'bootstrap.servers': 'localhost'})
# 새롭게 생성할 topic에 대한 상세 설정 및 생성할 topic 명을 입력한다.
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in ["my_topic_new1", "my_topic_new2"]]
# Call create_topics to asynchronously create topics. A dict of <topic,future> is returned.
fs = a.create_topics(new_topics)
# Wait for each operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
1) Consumer
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from pprint import pformat
# Kafka Consumer에 대한 세부 통계정보를 출력하는 Callback 함수
def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))
def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
options = '''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)
if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) < 3:
print_usage_and_exit(sys.argv[0])
broker = argv[0]
group = argv[1]
topics = argv[2:]
# Create Consumer instance
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'}
# -T 옵션에 Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except ValueError:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)
if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)
# Enable statistics via librdkafka's statistics callback
# Topic에 대한 통계정보 수집을 활성화함. (inval 단위로 수집)
conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = intval
# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)
# Create Consumer instance
# Hint: try debug='fetch' to generate some log messages
c = Consumer(conf, logger=logger)
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
# Subscribe to topics (Topic이 할당되면 callback함수로 print_assignment 실행)
c.subscribe(topics, on_assign=print_assignment)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
# Close down consumer to commit final offsets.
c.close()
python consumer_detail.py localhost my_group my_topic
producer.py를 여러번 실행해보면? : 파티션이 나눠서 여러번 가짐
T 옵션 넣고 실행 : 모든 옵션들이 막나옴
python consumer_detail.py -T 10000 localhost my_group my_topic
2) Producer
from confluent_kafka import Producer
import sys
if __name__ == '__main__':
if len(sys.argv) != 3:
sys.stderr.write('Usage: %s <bootstrap-brokers> <topic>\n' % sys.argv[0])
sys.exit(1)
broker = sys.argv[1]
topic = sys.argv[2]
# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker}
# Create Producer instance
p = Producer(**conf)
# Optional per-message delivery callback (triggered by poll() or flush())
# Producer에서 메세지를 정상적으로 전달했거나, 최종 실패한 경우 deliver_callbak 함수를 실행함.
def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
# Read lines from stdin, produce each line to Kafka
for line in sys.stdin:
try:
# Produce line (without newline, rstrip()함수로 우축 공백 제거)
p.produce(topic, line.rstrip(), callback=delivery_callback)
except BufferError:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
len(p))
# Serve delivery callback queue.
# NOTE: Since produce() is an asynchronous API this poll() call
# will most likely not serve the delivery callback for the
# last produced message.
p.poll(0)
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()
3) Producer Key/ Value
from confluent_kafka import Producer
from random import choice
p = Producer({'bootstrap.servers': 'localhost'})
# Call back function (어떤 partition에 할당 되었는지 확인)
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}] key : {}'.format(msg.topic(), msg.partition(), msg.key()))
# Producer가 데이터를 전송한 후 callback 함수가 호출되기까지 대기하는 시간
user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']
for _ in range(10):
p.poll(0)
# Key 값 설정
user_id = choice(user_ids)
# Value 값 설정
product = choice(products)
p.produce('my_topic', product, user_id, callback=delivery_report)
# topic (str)
# value (str|bytes) – Message payload
# key (str|bytes) – Message key
# partition (int) – Partition to produce to, else uses the configured built-in partitioner.
# on_delivery(err,msg) (func) – Delivery report callback to call (from poll() or flush()) on successful or failed delivery
# timestamp (int) – Message timestamp (CreateTime) in milliseconds since epoch UTC (requires librdkafka >= v0.9.4, api.version.request=true, and broker >= 0.10.0.0). Default value is current time.
# headers (dict|list) – Message headers to set on the message. The header key must be a string while the value must be binary, unicode or None. Accepts a list of (key,value) or a dict. (Requires librdkafka >= v0.11.4 and broker version >= 0.11.0.0)
# https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#producer
p.flush()