- kafka의 브로커 역할은 리눅스로 계속 구현하고 장고로 producer와 consumer가 통신하도록 만들 수 있다. 원래 그런 역할을 하는 친구니깐
pip install kafka-python
- producer 와 consumer 둘다 kafka를 설치해 준다.
- 브로커의 IP로 설정해 줘야 한다!!!
topic의 내용만
producer
from kafka import KafkaProducer
# 라이브러리를 가져오는것
import time
# 시간체크하는 친구 없어도 된다
producer = KafkaProducer(
bootstrap_servers=['200.200.200.5:9092']
)
# 객체를 생성할때 부트스트랩 서버에 브로커에 리스트 형식으로 입력되게 만들어줬다.
# 클러스터링 군집할수있으니깐
start = time.time()
# 시작!
for i in range(100):
# 한번 실행하면 100번보내고 끝
producer.send('test', value="test".encode("utf-8"))
producer.flush()
# 반복으로 실행 topic=test 내가 발행하는 메세지의 주제를 뜻한다. 테스트 주제로 테스트란 메세지를 보낸것이다.
# 만약 테스트를 받아보고 싶다? 그럼 consumer 객체를 생성할때 토픽이 만들어지고 test란 토픽으로 구독하면 프로듀서는 토픽을 보내준다(팔로우와 같다 보자 팔로우한사람것만 보이니깐)
# 보통 json형식으로 데이터를 주고 받는다.
print("elapsed :", time.time() - start)
consumer
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'test',
bootstrap_servers=['200.200.200.5:9092']
)
print('[begin] get consumer list')
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value.decode('utf-8') ))
# 계속 받아온다.
print('[end] get consumer list')
json 형식으로
producer
from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(
acks=0,
compression_type='gzip',
bootstrap_servers=['200.200.200.5:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8')
)
start = time.time()
for i in range(100):
data = {'str' : '숫자'+str(i)}
producer.send('new', value=data)
producer.flush()
print("elapsed :", time.time() - start)
consumer
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'new',
bootstrap_servers=['200.200.200.5:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
consumer_timeout_ms=10000
)
print('[begin] get consumer list')
for message in consumer:
print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
message.topic, message.partition, message.offset, message.key, message.value))
print('[end] get consumer list')