[FastAPI + Celery]

귤티·2024년 3월 20일
0

Capstone

목록 보기
15/17

Celery

tast scheduling을 지원하면서 real-time processing에 집중한 task queue이다.

What is Celery

  • Task queues는 threads 또는 machines 위에서 작업을 구별하기 위해 mechanism으로 사용된다. task queue의 input은 task라 불리는 작업의 unit이다. 헌신적인 worker processes는 지속적으로 새 work를 수행하기 위해 task queues를 monitor 한다.
  • Celery는 대개 clients와 workers를 중재하기 위해 borker를 사용하여 messages를 통해 communicate 한다. task를 초기화하기 위해 client는 queue에 message를 추가한다, 그럼 broker는 message를 worker에게 전달한다.
  • Celery system은 고가용성과 수평적 확장을 제공하면서 다수의 workers와 brokers로 구성될 수 있다. 이것은 Python으로 쓰여졌으나 protocol은 어떤 언어로든 구현될 수 있다.

다른 말로, the entities involved in Celery are:

  • producers: 또는 client라고 불린다, 작업을 요청하고 결과로 무언가를 하는 역할이다.
  • broker: producer와 workers 사이에 message를 보내고 받는 데에 사용되는 message transport이다. 다른 말로, 이것들은 task queue에 저장된다. Celery는 수많은 Message brokers를 지원하나, 현재는 오직 두 완전한 기능만이다: Redis and RabbitMQ.
  • workers: workers는 지속적으로 test quere를 보고 task를 실행하는 processes이다.
  • result backend: backend는 우리가 tast의 상태를 추적 또는 task로부터의 결과를 검색하고자 할 때 오직 필요하다. result backend는 선택적이나 기본적으로 켜져 있다.

Understanding Celery's architecture

Celery has an architecture based on pluggable components and a mechanism of message exchange that use a protocol according to a selected message transport(broker). This is illustrated in the following diagram:

Working with tasks

이전 diagram에 나타난 듯이, client component들은 task를 생성하고 brokers에게 전달하는 function을 가지고 있다.

이제 @app.task decorator를 사용한 task의 정의를 입증하는 코드 예시를 분석할 것이다, 이것은 Celery apllication의 instance를 통해 접근 가능하다, 이제는 app 불릴 것이다.

The following code example demonstrates a simple Hello World app:

@app.task
def hello_world():
	return "Hello I'm a celery task"

Getting Started

Launch a broker/backend

First, 우리는 broker와 backend가 필요하다. We will use Redis, as it both full-featured and easy to use:

poetry add 'celery[redis]'

We can run Redis locally using:

docker run --rm --name some-redis -p 6379:6379 redis:latest

Create a task

task를 생성해보자. 먼저, Celery의 entrypoint인 Celery instance를 생성해야 한다:
task 제출(client), workers 관리, result 얻기 등이 될 수 있다.
We usually call it the Celery application, or app for short.

from celery.app import Celery

redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")

celery_app = Celery(__name__, broker=redis_url, backend=redis_url)

이제, dummy task를 정의하자, 이것은 timestamp와 같이 file을 생성할 것이다:

# in file task.py
from celery.app import Celery
from datetime import datetime
import os

redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")

app = Celery(__name__, broker=redis_url, backend=redis_url)


@app.task
def dummy_task():
    folder = "/tmp/celery"
    os.makedirs(folder, exist_ok=True)
    now = datetime.now().strftime("%Y-%m-%dT%H:%M:%s")
    with open(f"{folder}/task-{now}.txt", "w") as f:
        f.write("hello!")

이것이 작동하는 지 확인하기 위해, 직접적으로 Python REPL을 사용해 호출해보자:

>>> from fastapi_celery import task
>>> task.dummy_task()

이것은 file을 만들어야만 한다 - 이것을 직접적으로 호출해서, Celery는 포함되지 않는다. Celery를 사용해 이 task를 실행하기 위해, 우리는 decrator에 의해 추가된 Methods 중의 하나를 사용해야만 한다. 가장 흔한 것은 apply_async()의 지름길인 delay()이다. 이러한 methods는 AsyncResult를 반환한다, 이것은 상태를 query하는 데에 추가로 사용될 수 있다.

>>> t = task.dummy_task.delay()
>>> t.status
PENDING

왜 pending인가?, 어떤 worker도 파견하지 않았기 때문이다.

Launch a worker

다른 terminal에서 실행:

celery --app=fastapi_celery.task.app worker --concurrency=1 --loglevel=DEBUG

이제 다시 해보자ㅣ

>>> t.status
SUCCESS

이 task를 보장하기 위해, task에 delay를 추가해보자: time.sleep(10). worker를 재실행하는 것을 잊지 말자, method definition이 변경되었으니까! 더 좋은 점은 watchdog을 사용해 worker를 자동적으로 재실행하는 것이다:

poetry add watchdog --group=dev
watchmedo auto-restart --directory=./fastapi_celery --pattern=task.py -- celery --app=fastapi_celery.task.app worker --concurrency=1 --loglevel=DEBUG

Parameters and return values

이제, dummy task를 조금 바꾸어보자, 그래서 argument를 받고 result를 반환하도록:

def dummy_task(name='Bob') -> str:
    sleep(5)
    return f'Hello {name}!'
>>> import importlib
>>> importlib.reload(task)
>>> t = task.dummy_task.delay('Lucy')
>>> t.result # empty until sucess
>>> t.result
'Hello Lucy!'

대신 dictionary를 반환하도록 해보자. 이것은 같게 동작할 것이다.
하지만 이건 어떤가?

def dummy_task() -> str:
    return open('/tmp/celery/x.txt', 'w')
>>> t = task.dummy_task.delay()
>>> t.status
'FAILURE'
>>> t.result
EncodeError("TypeError('Object of type TextIOWrapper is not JSON serializable')")
t.successful()
False

조심할 것: result는 result backend에서 저장되고 직렬화될 것이기 때문에 result는 반드시 JSON-직렬화 가능한(또는 Celery에서 구성된 직렬화와 일치)이어야 한다.

Using Celery with FastAPI

그 building blocks로, 우리는 두 개를 함께 bind 할 수 있다. 간단하게 FastAPI에서 task.py를 import하고, REST call로부터 task.delay()를 호출한다. 우리는 taskID와 그것의 상태를 user에게 반환할 수 있다.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from celery.result import AsyncResult

from . import task

app = FastAPI()


class TaskOut(BaseModel):
    id: str
    status: str


@app.get("/start")
def start() -> TaskOut:
    r = task.dummy_task.delay()
    return _to_task_out(r)


@app.get("/status")
def status(task_id: str) -> TaskOut:
    r = task.app.AsyncResult(task_id)
    return _to_task_out(r)


def _to_task_out(r: AsyncResult) -> TaskOut:
    return TaskOut(id=r.task_id, status=r.status)

Restricting to one task at a time

Celery는 많은 동시작업 task의 수를 제한하는 분명한 방식을 제공하지 않는다. 우리의 경우, 우리는 한 time에 하나의 task만 실행되기를 바란다. 만약 user가 다른 task가 이미 실행되고 있는 동안 task를 시작하고 싶다면, error를 얻고말 것이다.

multithreading/multiprocessing과 함께, 흔한 construct는 mutual exclusion (mutex)lock이다. 문제는 여기에, 우리는 multiple processfes를 이곳에 가지고 있고, 우리는 Python process 외부에 있는 lock이 필요하다.

우리는 이미 Redis를 가지고 있기 때문에, 우리는 Redis Lock을 이용할 수 있다. 하지만 어떻게 쓸 수 있는가?

이상적으로, 우리는 task를 시작할 때(from the REST endpoint - FastAPI) lock을 얻고 싶을 것이다, 그리고 task가 끝났을 때(Celery worker로부터)이것을 release할 것이다. 하지만 lock은 같은 thread에서 얻어져야만 하고 release 되어야만 한다... 그리고 최악의 경우, 만약 우리의 worker가 lock을 release하는 것에 실패한다면, 우리는 stuck이다.

더 좋은 방법은 FastAPI에서만 lock을 사용하는 것이다. task가 끝났을 때를 우리는 알지 못한다, 하지만 우리는 주어진 ID의 state에 query를 사용할 수 있다. 이제 Redis key에 마지막 task의 ID를 가진 current_task_id를 안전하게 읽고 쓰기 위해 lockd을 사용해보자.

그래서, 구현을 위해, redis lock을 첫 번째로 만들어보자:

from redis import Redis
from redis.lock import Lock as RedisLock

redis_instance = Redis.from_url(task.redis_url)
lock = RedisLock(redis_instance, name="task_id")

REDIS_TASK_KEY = "current_task"

/start endpoint는 이제 다음과 같다:

@app.get("/start")
def start() -> TaskOut:
    try:
        if not lock.acquire(blocking_timeout=4):
            raise HTTPException(status_code=500, detail="Could not acquire lock")

        task_id = redis_instance.get(REDIS_TASK_KEY)
        if task_id is None or task.app.AsyncResult(task_id).ready():
            # no task was ever run, or the last task finished already
            r = task.dummy_task.delay()
            redis_instance.set(REDIS_TASK_KEY, r.task_id)
            return _to_task_out(r)
        else:
            # the last task is still running!
            raise HTTPException(
                status_code=400, detail="A task is already being executed"
            )
    finally:
        lock.release()

그리고 /status에 대해, 우리는 이제 task_id query parameter를 선택적으로 만들 수 있다:

@app.get("/status")
def status(task_id: str = None) -> TaskOut:
    task_id = task_id or redis_instance.get(REDIS_TASK_KEY)
    if task_id is None:
        raise HTTPException(
            status_code=400, detail=f"Could not determine task {task_id}"
        )
    r = task.app.AsyncResult(task_id)
    return _to_task_out(r)

Canceling long-running tasks

만약 현재 task를 삭제하고 싶다. 어떻게 할 것인가?
Celery app은 우리에게 control에 대한 접근을 준다, 이것은 우리에게 통계, 실행 중인 작업자 수 등을 얻게 해준다.

from . import task

# note: if id is read from redis, use:
#  task_id = redis_instance.get(...).decode('utf-8')
task.app.control.revoke(task_id, terminate=True, signal="SIGKILL")

Returning results and exceptions

간단하게 새 속성을 TaskOut에 추가하자:

class TaskOut(BaseModel):
    id: str
    status: str
    result: str | None = None

그리고 __to_task_out을 수정하자:

def _to_task_out(r: AsyncResult) -> TaskOut:
    return TaskOut(
        id=r.task_id, 
        status=r.status, 
        result=r.traceback if r.failed() else r.result,
    )

예외 또는 반환값을 throw하는 것을 만듬으로, traceback을 얻으려고 시도할 수 있으며, 호출한다:

curl http://localhost:8000/start
curl http://localhost:8000/status | jq -r '.result'
profile
취준 진입

0개의 댓글