OS에 사실 레디스를 바로 설치 하거나 혹은 도커를 이용해서 레디스 서버를 구성할 수도 있습니다.
하지만 기본적으로 도커를 사용하는게 여러모로 편리합니다. 도커를 강추합니다.
도커설치를 안했다면 일단 도커부터 설치하길 당부드립니다.
$ docker run -p 6379:6379 --name some-redis -d redis
도커허브에서 레디스 공식 이미지를 다운로드 할텐데요. PORT는 6379로 포트포워딩되고 백그라운드로 실행될 거에요.
정상적으로 컨테이너가 기동되었는지 아래 명령어를 날려주세요.
$ docker exec -it some-redis redis-cli ping
PONG
Redis 공식 웹사이트에서 다운받았든, package manager(ex. APT, YUM, Homebrew or Chocolatey)로 설치했든 아래 명령어를 통해 서버를 기동합니다.
$ redis-server
다른 터미널을 열어서 ping 명령어를 날려서 PONG 응답을 받습니다.
$ redis-cli ping
PONG
$ mkdir fastapi-celery-project && cd fastapi-celery-project
가상환경을 만들고 활성화 시킬게요.
$ python -m venv env
$ source env/bin/activate
(env)$
Poetry or Pipenv를 이용해서 기존 virtualenv나 Pip 가상환경을 좀더 모던한 파이썬 환경으로 변경시킬수 있어요.
poetry - https://python-poetry.org/
Pipenv - https://github.com/pypa/pipenv
Modern Python Environment - https://testdriven.io/blog/python-environments/
requirements.txt 파일에 아래 내용을 작성할게요.
fastapi==0.66.0
uvicorn[standard]==0.14.0
설치
(env)$ pip install -r requirements.txt
main.py라는 새로운 파일을 생성 할게요.
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
app 기동을 해 볼게요.
(env)$ uvicorn main:app --reload
INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO: Started reloader process [96439] using watchgod
INFO: Started server process [96482]
INFO: Waiting for application startup.
INFO: Application startup complete.
브라우저에 localhost 8000포트로 가봅니다. 그럼 "{'message':'Hello World'}" 데이터를 확인 할 수 있어요. 서버를 끌게요.
일단 프로젝트 구조는 아래와 같아요.
├── main.py
└── requirements.txt
celery 설치와 구성을 해볼게요.
requirements.txt 파일을 업데이트 할게요.
celery==5.1.2
redis==3.5.3
설치하기
(env)$ pip install -r requirements.txt
main.py 파일을 업데이트 할게요.
from celery import Celery
from fastapi import FastAPI
app = FastAPI()
celery = Celery(
__name__,
broker="redis://127.0.0.1:6379/0",
backend="redis://127.0.0.1:6379/0"
)
@app.get("/")
async def root():
return {"message": "Hello World"}
@celery.task
def divide(x, y):
import time
time.sleep(5)
return x / y
1. FastAPI 인스턴스를 만든 다음, 새로운 Celery 인스턴스 하나를 만듭니다.
2. Celery에게 Redis를 `broker`, `backend`로 사용할 것을 지정해줍니다. 사실 위 방법 처럼 하드코딩하기 보다는 컨피그 파일을 별도로 두거나 혹은 환경 변수로 추후 변경 할 수 있습니다.
3. 임시로 long-running task를 만들기 위해서 divide 태스크를 구현했습니다.
설정이 완료 되었으면, Celery에게 task를 보내서 어떻게 작동하는지 확인해 보겠습니다.
(env)$ celery -A main.celery worker --loglevel=info
[config]
.> app: main:0x10ad0d5f8
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. main.divide
이전 터미널을 열어서 python shell 모드로 진입하겠습니다.
(env)$ python
Celery worker에 task 몇 개를 보내보겠습니다.
>>> from main import app, divide
>>> task = divide.delay(1, 2)
내부 작동
1. 메시지 브로커에게 새로운 메시지를delay
method를 이용하여 보냈습니다. 그리고 큐로부터 워커가 픽업한 다음 해당 태스크를 실행하게 됩니다.
2. 엔터키를 탁! 누르고 난 다음 백그라운드로 해당 태스크 작업이 돌아가고 마친다고 보면 됩니다.
아래와 같은 로그 메시지를 셀러리 워커 터미널에서 보게 될 것입니다.
[2022-05-05 20:40:17,901: INFO/MainProcess] Task main.divide[ec8f8ee0-4e47-4a25-851e-acd69bc3e3b1] received
[2022-05-05 20:40:22,913: INFO/ForkPoolWorker-16] Task main.divide[ec8f8ee0-4e47-4a25-851e-acd69bc3e3b1] succeeded in 5.011416195999999s: 0.5
20:40:17,901
에 task를 받아 워커에서 처리했습니다. 시작하고 마치는데 약 5초가 소요되었습니다.
result backend
에 저장됩니다.새로운 task를 추가하겠습니다.
>>> task = divide.delay(1, 2)
>>> type(task)
<class 'celery.result.AsyncResult'>
delay
메서드를 호출한 후 AsyncResult 반환 값 또는 exception detail과 같은 정보등과 함께 작업 상태를 확인하는 데 사용할 수 있는 인스턴스를 얻습니다.
새 task을 추가한 다음 task.state, task.result를 print로 찍어내보겠습니다.
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
PENDING None
>>> print(task.state, task.result)
SUCCESS 0.5
>>> print(task.state, task.result)
SUCCESS 0.5
오류 발생시 어떻게 되는지 확인해보겠습니다.
>>> task = divide.delay(1, 0)
>>> task.state
'FAILURE'
>>> task.result
ZeroDivisionError('division by zero')
Flower는 Celery용 실시간 웹 애플리케이션 모니터링 및 관리 도구입니다.
dependency를 requirements.txt파일에 추가하고 설치 후 실행 해 보겠습니다.
flower==1.0.0
$ pip install -r requirements.txt
$ celery -A main.celery flower --port=5555
대시보드를 보려면 http://localhost:5555로 이동하면 됩니다. 완료된 작은 상당 Tasks를 클릭하면 볼 수 있습니다.
UUID
컬럼이 보일 텐데요 task에 고유하게 부여되는 id입니다. 보이는 UUID중 한가지를 선택하여 카피한다음 shell에서 작업을 해보겠습니다.
>>> from celery.result import AsyncResult
>>> task = AsyncResult('9ad203e0-d59d-48e4-8805-9ac81708b649') # UUID를 붙여 넣습니다.
>>>
>>> task.state
'FAILURE'
>>>
>>> task.result
ZeroDivisionError('division by zero')
해당 대시보드에서 작업을 통해서 편리하게 처리된 태스크의 내역들을 일목요연하며 직관적으로 여러 정보를 파악할 수 있어서 장점이 있습니다.