[Kafka] Kafka - Django

이정훈·2023년 3월 20일
0

kafka

목록 보기
2/5
post-thumbnail
  • kafka의 브로커 역할은 리눅스로 계속 구현하고 장고로 producer와 consumer가 통신하도록 만들 수 있다. 원래 그런 역할을 하는 친구니깐
pip install kafka-python

  • producer 와 consumer 둘다 kafka를 설치해 준다.
  • 브로커의 IP로 설정해 줘야 한다!!!

topic의 내용만

producer

from kafka import KafkaProducer
# 라이브러리를 가져오는것
import time
# 시간체크하는 친구 없어도 된다

producer = KafkaProducer(
    bootstrap_servers=['200.200.200.5:9092']
)
# 객체를 생성할때 부트스트랩 서버에 브로커에 리스트 형식으로 입력되게 만들어줬다.
# 클러스터링 군집할수있으니깐
start = time.time()
# 시작!
for i in range(100):
    # 한번 실행하면 100번보내고 끝
    producer.send('test', value="test".encode("utf-8"))
    producer.flush()
# 반복으로 실행 topic=test 내가 발행하는 메세지의 주제를 뜻한다. 테스트 주제로 테스트란 메세지를 보낸것이다.
# 만약 테스트를 받아보고 싶다? 그럼 consumer 객체를 생성할때 토픽이 만들어지고 test란 토픽으로 구독하면 프로듀서는 토픽을 보내준다(팔로우와 같다 보자 팔로우한사람것만 보이니깐)
# 보통 json형식으로 데이터를 주고 받는다.

print("elapsed :", time.time() - start)

consumer

from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['200.200.200.5:9092']
)

print('[begin] get consumer list')

for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % ( message.topic, message.partition, message.offset, message.key, message.value.decode('utf-8') ))
# 계속 받아온다.
print('[end] get consumer list')

json 형식으로

producer

from kafka import KafkaProducer
from json import dumps
import time

producer = KafkaProducer(
    acks=0,
    compression_type='gzip',
    bootstrap_servers=['200.200.200.5:9092'],
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

start = time.time()

for i in range(100):
    data = {'str' : '숫자'+str(i)}
    producer.send('new', value=data)
    producer.flush()

print("elapsed :", time.time() - start)

consumer

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'new',
    bootstrap_servers=['200.200.200.5:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    consumer_timeout_ms=10000
)

print('[begin] get consumer list')

for message in consumer:
    print("Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s" % (
    message.topic, message.partition, message.offset, message.key, message.value))

print('[end] get consumer list')

profile
싱숭생숭늉

0개의 댓글