Docker 컨테이너로 kafka , zookeeper 올리기
* docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
ports:
- "22181:2181"
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
* 한번에 100개의 토픽에 메시지를 전송하고 무한반복문으로 부하테스트
-> 이전에 redis로 똑같은 작업을 했을 때 2m48s 후 자동 종료되는 경우가 발생해서 고성능을 가지고 있는 카프카로 테스트
-> 테스트 데이터는 1 (rows)* 100 (columns) 의 DF 데이터
-> streaming 테스트
-> 비동기 vs 동기 테스트 시 크게 시간차가 나지 않아서 메시지 순서를 보증하는 동기방식으로 구현
import time
import pandas as pd
from kafka import KafkaProducer
def publish_to_topic():
try:
# start = time.time()
producer = KafkaProducer(
bootstrap_servers="localhost:29092" # broker server와 연결
)
df = pd.read_csv("~/Downloads/test2.csv", index_col=0)
data = df.to_json()
# count = 0
while True:
[producer.send(topic=f"test-topic{i}", value=data.encode('utf-8')) for i in range(100)]
# count += 1
# print(count)
time.sleep(0.01)
# print(f"Elapsed time : {time.time() - start}s")
except Exception as e:
print(e)
raise Exception
if __name__=="__main__":
publish_to_topic()
import json
import pandas as pd
from kafka import KafkaConsumer
class Consumer:
def connect_kafka_consumer(self):
try:
self.consumer = KafkaConsumer(
"test-topic1", # topic name
bootstrap_servers="localhost:29092", # kafka broker ip address
auto_offset_reset="latest", # 최신 메시지부터 받아오는 옵션
enable_auto_commit=True,
group_id="test_group", # consumer group identifier
)
return self.consumer
except Exception as e:
print(e)
def check_message(self):
for message in self.consumer:
# print(message.value) # check the message
message = message.value.decode("utf-8")
mess_json = json.loads(message)
df = pd.DataFrame.from_dict(mess_json)
print(df)
if __name__ == "__main__":
consumer = Consumer()
consumer.connect_kafka_consumer()
consumer.check_message()
<부하테스트>
* 24.03.12 AM 11:32 시작