Kafka (feat. Django)

GisangLee·2023년 5월 14일
0

django

목록 보기
33/35

서비스를 MSA와 시키는 decoupling 작업을 하면서
SQS와 Lambda로 빅데이터를 처리 중이었다.

  • 하지만 이 구조를 만들면서 굉장히 맘에 들지 않았다. 그 이유는 바로
    Django의 ORM이나 Celery를 사용할 수 없었다는 점.

  • 그래서 대용량 실시간 처리에 조금더 유리하다고 생각한 Kafka를 사용해보기로 했고, 이렇게 했을 때, 메세지 Consumer를 서비스 레벨에서 핸들링 할 수 있었다. 그리고 Topic별로 나누어 메세지를 처리하고 replica를 활용하여 고가용성을 유지할 수 있었기에 사용해보기로 했다.

기술적으로 해결해야했던 문제

Kafka Consumer

  • 메세지가 kafak에 담기면 django 앱이 실시간 성으로 트리거를 받아야했는데, 그 조건이 앱 실행 시 최초 1회 consumer가 생성되고 그 이후부터 지속적으로 kafka의 토픽을 Listen하면서 메세지를 소비해야했다.
  • 카프카 메세지 처리나 consumer 생성 등을 비동기적으로, 그리고 멀티쓰레드 환경에서 실행해야 했다.

1. Kafka 로컬 실행 ( 테스트 )

prod 환경에서는 AWS의 MSK를 사용할 예정이었기 때문에, 로컬에서 테스트할 카프카 환경을 만들어야 했다.

  • docker-compose.yml
version: '2'

services:
  zookeeper:
    image: zookeeper
    container_name: local-zookeeper
    ports:
      - "22181:2181"

  kafka:
    image: wurstmeister/kafka
    container_name: local-kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_CREATE_TOPICS: "analysis-request"
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
docker-compose up -d

2. Django에서 메세지 Consume

멀티쓰레드, 비동기 및 기술적 요구사항을 처리하기 위한 방법이다.

import json
import threading
import time
from threading import Thread

from django.apps import AppConfig
from kafka import KafkaConsumer
from celery import shared_task


class KafkaConsumerThread(threading.Thread):
    def __init__(self, topic_name):
        super(KafkaConsumerThread, self).__init__(daemon=True)
        self.topic_name = topic_name
        self.broker = "localhost:9092"
        self.group_id = "my_group"

    def run(self):
        consumer = KafkaConsumer(
            self.topic_name,
            bootstrap_servers=[self.broker],
            group_id=self.group_id,
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            auto_offset_reset='earliest',
            enable_auto_commit=True,
        )
        
        for message in consumer:
        	# 메세지 처리 로직
            ....
            ....
            ..
            .

@shared_task
def start_kafka_consumer_thread(topic_name):
    thread = KafkaConsumerThread(topic_name)
    thread.start()

class KafkaConfig(AppConfig):
    name = "app.kafka"

    def ready(self):
        kafka_consumer_thread.delay("my_topic")
  1. Django 앱 최초 1회 실행 시 컨수머를 생성하고
    그 이후부터 지속적으로 kafka의 메세지를 Listen 하기 위해
    AppConfig의 ready 메서드내에 컨수머를 생성

  2. Consumer를 비동기로 처리하기 위해 Celery를 사용

  3. 해당 비동기 프로세스를 멀티쓰레드로 작동시키기 위해 Threading 활용

3. Conclusion

아주 흡족했다.

profile
포폴 및 이력서 : https://gisanglee.github.io/web-porfolio/

0개의 댓글