FastApi + dependency_injector + Celery, Celery beat

CHOJUNGHO96·2024년 1월 18일
0

파이썬

목록 보기
5/5

해당프로젝트 github link : https://github.com/CHOJUNGHO96/FastApi-dependency_injector-Celery-Celery-beat.git

FastApi-dependency_injector-Celery-Celery-beat 구조

FastApi-dependency_injector-Celery-Celery-beat
├── dependencies.py
├── main.py
├── worker.py
├── pyproject.toml
├── poetry.lock
├── Dockerfile
├── docker-compose.yml
└── README.md



dependencies.py 코드

from dependency_injector import containers, providers
from celery import Celery


class Container(containers.DeclarativeContainer):
    config = providers.Configuration()

    celery_app = providers.Singleton(Celery, broker=config.celery.broker_url)

Container 클래스를 사용해 Celery 애플리케이션의 인스턴스를 싱글턴으로 생성
앱 전역에서 하나의 Celery 인스턴스가 유지되며, 이 인스턴스는 설정 정보를 환경 변수에서 가져옴



main.py 코드

from fastapi import FastAPI
from dependencies import Container
from worker import create_celery_app

app = FastAPI()
container = Container()
container.wire(modules=[__name__])
celery_app = create_celery_app()


@app.get("/")
async def read_root():
    celery_app.send_task("worker.test_task")
    return {"message": "Task dispatched"}

FastAPI 애플리케이션을 초기화하고, Dependency Injector 컨테이너를 설정하여 celery_app을 생성
루트 경로에서는 Celery를 사용하여 test_task를 비동기적으로 실행



worker.py 코드

from celery import Celery
from dependencies import Container


def create_celery_app() -> Celery:
    container: Container = Container()
    container.config.celery.broker_url.from_env("CELERY_BROKER_URL", "redis://:redis!123@127.0.0.1:6379/0")
    _celery_app = container.celery_app()
    return _celery_app


celery_app = create_celery_app()


@celery_app.task
def test_task():
    print("테스트")


# Celery Beat 설정
celery_app.conf.beat_schedule = {
    "test-task-every-10-seconds": {"task": "worker.test_task", "schedule": 10.0},
}

Celery 애플리케이션을 생성하고 설정
환경 변수를 통해 브로커 URL을 구성하고, test_task라는 간단한 작업을 정의



Dockerfile 코드

# Python 이미지 사용
FROM python:3.11

# 작업 디렉토리 설정
WORKDIR /app

# 의존성 설치
COPY pyproject.toml poetry.lock* /app/
RUN pip install poetry
RUN poetry install

# 소스 코드 복사
COPY . /app

# FastAPI 실행
CMD ["poetry", "run", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

pyproject.toml 에있는 라이브러리 사용을위해 poetry 의존성 설치후 FastApi 앱 실행



docker-compose.yml 코드

version: '3.8'
services:
  web:
    build: .
    ports:
      - "8000:8000"
    depends_on:
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0

  worker:
    build: .
    command: poetry run celery -A worker worker --loglevel=info
    depends_on:
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0

  worker-beat:
    build: .
    command: poetry run celery -A worker beat --loglevel=info
    depends_on:
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0

  redis:
    image: "redis:alpine"
    ports:
      - "6379:6379"

Dockerfil 기반으로 web, worker, worker-beat 컨테이너 실행 및 redis 이미지 컨테이너 실행


## 실행TEST

celery_app.conf.beat_schedule 에 설정된 스케쥴(10초로 설정)마다 worker-beat 가 실행

WEB에서 celery 작업 api요청시 celery worker가 실행


profile
백엔드개발자

0개의 댓글