[공부정리] Flask에서 eureka, kafka producer, logging 사용하기

jeyong·2024년 3월 9일
0

공부 / 생각 정리  

목록 보기
42/121

이번에 작성할 게시글은 [일단 박죠] O2O Object Detection 서버 Spring Cloud 적용을 위한 분석 게시글에서 진행한 내용이다. Flask 환경에서 eureka, kafka, logging 기술들을 사용하였는데, 이러한 내용을 다루는 자료는 많지 않아서 해당 경험을 공유하고자 한다. 이 기술들에 대해 더 깊이 이해하고, 또한 같은 기술 스택을 사용하려는 다른 개발자들에게 도움이 되고자 하는 목적으로 글을 작성하겠다.

1. 전체 코드

from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename
from kafka import KafkaProducer
import json
import pytz
from datetime import datetime
from py_eureka_client import eureka_client
import logging

app = Flask(__name__)

logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                    filename='demo.log',
                    filemode='a')

logger = logging.getLogger(__name__)

eureka_client.init(eureka_server="127.0.0.1:8761/eureka/",
                    app_name="object-detection",
                    instance_port=5000,
                    instance_ip='127.0.0.1',)

kafka_producer = KafkaProducer(
    bootstrap_servers="127.0.0.1:9092", 
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

KAFKA_TOPIC = 'object-detection-results'

def load_demo_results():
    with open('demo_results.json') as f:
        return json.load(f)
    
@app.route('/info', methods=['GET'])
def info(): 
    kst = pytz.timezone('Asia/Seoul')
    build_time = datetime.now(kst).isoformat()
    
    info_data = {
        "app": {
            "name": "object-detection",
            "company": "iia"
        },
        "build": {
            "artifact": "object-detection",
            "name": "object-detection",
            "time": build_time, 
            "version": "0.0.1-SNAPSHOT",
            "group": "com.iia"
        }
    }
    return jsonify(info_data), 200

@app.route('/health', methods=['GET'])
def health():  
    return jsonify({"status": "UP"}), 200 

@app.route('/detect', methods=['POST'])
def detect():
    if 'images' not in request.files:
        return 'No images part in the request', 400
    
    store_id = request.headers.get('Store-ID')
    if not store_id:
        return 'Store-ID header is missing', 400
    
    images = request.files.getlist('images')
    demo_results = load_demo_results()
    item_summary = {}
    for image_file in images:
        filename = secure_filename(image_file.filename)
        for demo_result in demo_results:
            item_name = demo_result['object_name']
            if item_name in item_summary:
                item_summary[item_name]['count'] += 1
            else:
                item_summary[item_name] = {'count': 1}
        
        kafka_message = {
                'store-id': store_id,
                'items': item_summary
                }
        try:
            future = kafka_producer.send(topic=KAFKA_TOPIC, value=kafka_message)
            future.add_callback(lambda metadata: logger.info(f"Message sent to {metadata.topic} partition {metadata.partition} file {filename}"))
            future.add_errback(lambda e: logger.error(str(e)))
        except Exception as e:
            logger.error(f"Failed to send message to Kafka: {str(e)}")

    return jsonify({"status": "OK"}), 200

if __name__ == '__main__':
    app.run(debug=True, host='127.0.0.1', port=5000)

O2O 프로젝트를 진행하며 만든 객체 검출 서버의 Fake 객체이다. 사진과 함께 요청이 들어오면 사진에서 YOLO기반으로 물품을 검출하고 결과를 보내주는 Flask 서버 이다. YOLO에 대한 환경 구축이 되어있지 않은 환경에서 프로젝트를 테스트하기 위해서 구현하였다. 아래에 프로젝트 링크를 첨부하겠다.
O2O-Automatic-Store_Object-Detection_Demo/MessageQueue/object-detection/

이번 게시글에서는 주제에 맞게 eureka, kafka, logging에 대해서만 작성하도록 하겠다.

2. eureka, kafka, logging

2-1. eureka

from py_eureka_client import eureka_client

eureka_client.init(eureka_server="127.0.0.1:8761/eureka/",
                    app_name="object-detection",
                    instance_port=5000,
                    instance_ip='127.0.0.1',)

Eureka는 Netflix에 의해 개발된 서비스 디스커버리 툴로, 마이크로서비스 아키텍처에서 서비스 인스턴스들의 위치를 식별하고 관리하는 역할을 한다.

  • py_eureka_client 라이브러리를 통해 Flask 애플리케이션을 Eureka 서버에 등록함으로써, 다른 서비스가 이 애플리케이션을 쉽게 찾을 수 있게 한다.
  • 등록 과정에서 애플리케이션 이름, 인스턴스 포트, 인스턴스 IP 주소를 지정하여, Eureka 대시보드에서 이 애플리케이션의 정보를 확인할 수 있다.

2-2. kafka

from kafka import KafkaProducer

kafka_producer = KafkaProducer(
    bootstrap_servers="127.0.0.1:9092", 
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
KAFKA_TOPIC = 'object-detection-results'

 kafka_message = {
                'store-id': store_id,
                'items': item_summary
                }
                
future = kafka_producer.send(topic=KAFKA_TOPIC, value=kafka_message)
                

Apache Kafka는 고처리량, 내구성, 확장성을 갖춘 분산 메시징 시스템으로, 대규모 데이터 스트림의 실시간 처리에 널리 사용된다.

  • KafkaProducer 인스턴스를 생성하여 Kafka 서버와의 연결을 설정하고, 메시지를 JSON 형식으로 직렬화하여 Kafka 토픽에 전송한다.
  • KAFKA_TOPIC은 'object-detection-results'로 설정되어 있으며, 객체 검출 결과를 해당 토픽으로 전송한다. 이를 통해 다른 시스템이나 서비스에서 실시간으로 객체 검출 결과를 구독하고 처리할 수 있다.
  • Kafka로 메시지를 비동기적으로 전송할 때, KafkaProducer.send() 메서드는 Future 객체를 반환한다. 이 객체를 사용하여 메시지 전송이 성공했을 때와 오류가 발생했을 때의 콜백 함수를 등록할 수 있다.

2-3. logging

import logging

logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
                    filename='demo.log',
                    filemode='a')

logger = logging.getLogger(__name__)

 try:
    future = kafka_producer.send(topic=KAFKA_TOPIC, value=kafka_message)
    future.add_callback(lambda metadata: logger.info(f"Message sent to {metadata.topic} partition {metadata.partition} file {filename}"))
    future.add_errback(lambda e: logger.error(str(e)))
except Exception as e:
    logger.error(f"Failed to send message to Kafka: {str(e)}")

로깅은 애플리케이션의 실행 과정에서 발생하는 이벤트를 기록하는 과정으로, 디버깅, 모니터링, 보안 분석 등에 사용할 수 있다.

  • Logging 설정
    • level=logging.INFO: 로그 레벨을 INFO로 설정한다. 이는 DEBUG, INFO, WARNING, ERROR, CRITICAL 중 하나를 선택할 수 있는데, INFO 레벨은 애플리케이션이 정상적으로 작동하고 있는지를 알리는 메시지를 포함한다. 설정된 레벨 이상의 모든 로그가 출력된다.
    • format='%(asctime)s - %(name)s - %(levelname)s - %(message)s': 로그 메시지의 형식을 지정한다. 여기서는 각 로그 메시지가 시간, 로거의 이름, 로그 레벨, 로그 메시지 순으로 기록되도록 설정한다.
      • %(asctime)s: 로그가 기록된 시간
      • %(name)s: 로거의 이름
      • %(levelname)s: 로그 메시지의 레벨 (예: INFO, ERROR 등)
      • %(message)s: 로그 메시지의 본문
    • filename='demo.log': 로그 메시지가 기록될 파일의 이름을 지정한다.
    • filemode='a': 파일 모드를 'a'로 설정하여, 로그 파일이 이미 존재할 경우 로그를 파일 끝에 추가한다.
    • logger = logging.getLogger(name): 현재 모듈의 이름을 사용하여 로거 객체를 생성한다.
  • 비동기 전송 시작: kafka_producer.send 메소드를 호출하여 Kafka 토픽으로 메시지를 비동기적으로 전송한다. 이 메소드는 즉시 Future 객체를 반환하고, 메시지 전송 작업은 백그라운드에서 계속 진행된다.
  • 콜백 함수 추가: 반환된 Future 객체에 add_callback과 add_errback 메소드를 사용하여 콜백 함수를 추가한다.
    - add_callback 메소드는 메시지 전송이 성공적으로 완료되었을 때 호출된다. 콜백 함수는 메타데이터를 인자로 받아, 해당 메시지가 어느 토픽의 어느 파티션에 저장되었는지, 어떤 파일에 대한 처리였는지 로깅한다.
    - add_errback 메소드는 메시지 전송 중 오류가 발생했을 때 호출된다. 이 콜백은 오류 객체를 인자로 받아, 오류 내용을 로깅한다.
  • 예외 처리: 전체 send 호출은 try-except 블록 내에 위치한다. 이는 send 메소드 호출 시 발생할 수 있는 예외를 잡아내고, 해당 예외에 대한 로깅을 수행한다.

생성한 로그 파일에서 서버의 동작 상태를 확인 할 수 있다.

혹여나 나와 같이 Flask에서 해당 기술을 다룰 사람에게 도움이 되고자 간단하게 글을 작성하였다.

profile
노를 젓다 보면 언젠가는 물이 들어오겠지.

0개의 댓글