[kafka] python Publisher / Consumer 구현

songmoana·2024년 3월 12일
0

kafka

목록 보기
3/3

1. Docker kafka Up On local

Docker 컨테이너로 kafka , zookeeper 올리기

* docker-compose.yml
version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    ports:
      - "22181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

2. Publisher.py 작성

* 한번에 100개의 토픽에 메시지를 전송하고 무한반복문으로 부하테스트
-> 이전에 redis로 똑같은 작업을 했을 때 2m48s 후 자동 종료되는 경우가 발생해서 고성능을 가지고 있는 카프카로 테스트
-> 테스트 데이터는 1 (rows)* 100 (columns) 의 DF 데이터
-> streaming 테스트
-> 비동기 vs 동기 테스트 시 크게 시간차가 나지 않아서 메시지 순서를 보증하는 동기방식으로 구현
import time
import pandas as pd
from kafka import KafkaProducer


def publish_to_topic():
    
    try:
        # start = time.time()
        producer = KafkaProducer(
        bootstrap_servers="localhost:29092" # broker server와 연결
    )
        df = pd.read_csv("~/Downloads/test2.csv", index_col=0)
        data = df.to_json()
        
        # count = 0
        while True:
            [producer.send(topic=f"test-topic{i}", value=data.encode('utf-8')) for i in range(100)]
            # count += 1
            # print(count)
            time.sleep(0.01)
            
        # print(f"Elapsed time : {time.time() - start}s")

    except Exception as e:
        print(e)
        raise Exception
        
        
if __name__=="__main__":
    publish_to_topic()

3. Consumer.py 작성

import json
import pandas as pd
from kafka import KafkaConsumer


class Consumer:

    def connect_kafka_consumer(self):
        try:
            self.consumer = KafkaConsumer(
                "test-topic1",  # topic name 
                bootstrap_servers="localhost:29092",  # kafka broker ip address
                auto_offset_reset="latest",  # 최신 메시지부터 받아오는 옵션
                enable_auto_commit=True,
                group_id="test_group",  # consumer group identifier
            )
            return self.consumer
        except Exception as e:
            print(e)

    def check_message(self):
        for message in self.consumer:
            # print(message.value)  # check the message
            message = message.value.decode("utf-8")
            mess_json = json.loads(message)
            df = pd.DataFrame.from_dict(mess_json)
            print(df)


if __name__ == "__main__":
    consumer = Consumer()
    consumer.connect_kafka_consumer()
    consumer.check_message()

Test Result

<부하테스트>
* 24.03.12 AM 11:32 시작

profile
옹모아나 - 개발백과

0개의 댓글