[Kafka] Django와 Kafka를 활용한 메시지 전송 및 수신

angie·2024년 10월 21일

1. Docker로 Kafka 설치하기

1)docker-compose.yml 파일을 생성하고 작성

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  
  kafka:
    image: wurstmeister/kafka:2.12-2.5.0
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  • 카프카는 설치를 할 때 2개의 이미지를 이용
  • 카프카 와 주키퍼(카프카 코디네이터)를 같이 설치

2)터미널에서 명령을 수행 - 클러스터 1개를 가진 카프카 서버를 실행

docker-compose up -d

3)외부에서 사용할 수 있도록 설정 변경

  • 터미널에서 도커 컨테이너 안으로 접속

    docker exec -it kafka /bin/bash
  • 설정 파일을 수정 - 내용을 추가

    cd /opt/kafka/config
     vi server.properties
    
    
     listeners=PLAINTEXT://:9092                 
     delete.topic.enable=true                     
     auto.create.topics.enable=true  

4)토픽 생성과 조회 및 삭제

  • 명령어를 사용하기 위해서 프롬프트 이동:
bash# cd /opt/kafka/bin
  • 첫번째 카프카 서버의 첫번째 영역에 토픽(exam-topic) 생성:
bash# kafka-topics.sh --bootstrap-server localhost:9092 --create --topic exam-topic --partitions 1 --replication-factor 1
  • 생성된 토픽 목록 확인
bash# kafka-topics.sh --bootstrap-server localhost:9092 --list
  • 토픽 삭제:
bash# kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic exam-topic

5)메세지 전송 및 받기

  • 메시지 전송
    터미널에서 docker exec -it kafka /bin/bash
bash# cd /opt/kafka/bin
bash# kafka-console-producer.sh --topic exam-topic --broker-list localhost:9092
>메세지를 작성
  • 메시지 받기
    새로운 터미널에서 docker exec -it kafka /bin/bash
bash# cd /opt/kafka/bin
bash# ./kafka-console-consumer.sh --topic exam-topic --from-beginning --bootstrap-server localhost:9092

docker로 kafka를 만들었을 때 메시지 전송과 수신이 잘 수행되는 것을 알 수 있다.

2. Django에서 카프카 메세지 전송 및 수신 (Window)

  • 가상 환경을 생성(Window)
python -m venv kafka_env
  • 가상환경 활성화
kafka_env\Scripts\activate
  • 패키지 설치
pip install kafka-python
pip install six==1.6.0
  • 메시지 전송하는 코드를 작성하고 실행 한 후 터미널을 확인

producer.py

import sys
import six
if sys.version_info >= (3, 12, 0):
   sys.modules['kafka.vendor.six.moves'] = six.moves

from kafka import KafkaProducer
import json


class MessageProducer:
   def __init__(self, broker, topic):
       self.broker = broker
       self.topic = topic
       #key_serializer=str.encode 를 추가하면 key 와 함께 전송
       #그렇지 않으면 value 만 전송
       self.producer = KafkaProducer(
           bootstrap_servers=self.broker,
           value_serializer=lambda x: json.dumps(x).encode("utf-8"),
           acks=0,
           api_version=(2, 5, 0),
           key_serializer=str.encode,
           retries=3,
       )
   def send_message(self, msg, auto_close=True):
       try:
           print(self.producer)
           future = self.producer.send(self.topic, value=msg, key="key")
           self.producer.flush()  # 비우는 작업
           if auto_close:
               self.producer.close()
           future.get(timeout=2)
           return {"status_code": 200, "error": None}
       except Exception as exc:
           raise exc

# 브로커와 토픽명을 지정
broker = ["localhost:9092"]
topic = "exam-topic"
pd = MessageProducer(broker, topic)
#전송할 메시지 생성
msg = {"name": "John", "age": 30}
res = pd.send_message(msg)
print(res)
python producer.py
  • 파이썬 카프카 컨슈머를 작성

consumer.py

import sys
import six
if sys.version_info >= (3, 12, 0):
   sys.modules['kafka.vendor.six.moves'] = six.moves

from kafka import KafkaConsumer
import json
class MessageConsumer:
   def __init__(self, broker, topic):
       self.broker = broker
       self.consumer = KafkaConsumer(
           topic,  # Topic to consume
           bootstrap_servers=self.broker,
           value_deserializer=lambda x: x.decode(
               "utf-8"
           ),  # Decode message value as utf-8
           group_id="my-group",  # Consumer group ID
           auto_offset_reset="earliest",  # Start consuming from earliest available message
           enable_auto_commit=True,  # Commit offsets automatically
       )
   def receive_message(self):
       try:
           for message in self.consumer:
               #print(message.value)
               result = json.loads(message.value)
               for k, v in result.items():
                   print(k, ":", result[k])
               print(result["name"])
               print(result["age"])
       except Exception as exc:
           raise exc
      
# 브로커와 토픽명을 지정한다.
broker = ["localhost:9092"]
topic = "exam-topic"
cs = MessageConsumer(broker, topic)
cs.receive_message()
python consumer.py

  • 작성한 파일을 실행하면 에러가 발생: 이전에 JSON 형식이 아닌 데이터를 전송했는데 그 데이터를 파싱할려고 해서 에러가 발생

  • 토픽을 삭제하고 다시 전송한 후 실행

#  docker exec -it kafka /bin/bash
bash-5.1# cd /opt/kafka/bin
bash-5.1# kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic exam-topic 
profile
열심히 달리는 개발자

0개의 댓글