'project'라는 새로운 폴더를 만들고 하위에 __init__.py 파일을 생성 하겠습니다.
from fastapi import FastAPI
def create_app() -> FastAPI:
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
위에서 정의한 creaet_app
함수는 팩토리 함수인데요. 여러번 호출됨은 물론이거니와 FastAPI app을 보다시피 반환하게 되요.
main.py파일을 아래와 같이 변경할게요.
from project import create_app
app = create_app()
test
(env)$ uvicorn main:app --reload
INFO: Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO: Started reloader process [96439] using watchgod
INFO: Started server process [96482]
INFO: Waiting for application startup.
INFO: Application startup complete.
서버가 구동되는지 브라우저를 통해서 확인해 볼게요.
reqirements.txt파일에 아래 2가지를 추가 정의해줍니다.
...
...
alembic==1.6.5
SQLAlchemy==1.4.20
정의해줬다면 아래 명령어로 설치해 줄게요.
pip install -r requirements.txt
참고
- 현재로서는 Celery가 asyncio 를 잘 지원하지 않기 때문에(asyncio 이전에 개발되었기 때문에) FastAPI 및 Celery와 함께 사용할 수 있기 때문에 SQLAlchemy를 ORM으로 사용하고 있습니다.
- Alembic은 SQLAlchemy용 데이터베이스 마이그레이션 도구입니다.
project 디렉토리 아래 config.py파일을 생성 할 게요.
import os
import pathlib
from functools import lru_cache
class BaseConfig:
BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
DATABASE_CONNECT_DICT: dict = {}
class DevelopmentConfig(BaseConfig):
pass
class ProductionConfig(BaseConfig):
pass
class TestingConfig(BaseConfig):
pass
@lru_cache()
def get_settings():
config_cls_dict = {
"development": DevelopmentConfig,
"production": ProductionConfig,
"testing": TestingConfig
}
config_name = os.environ.get("FASTAPI_CONFIG", "development")
config_cls = config_cls_dict[config_name]
return config_cls()
settings = get_settings()
get_settings
함수의 인스턴스를 이용하여 개발, 테스트, 배포 환경에 맞는 환경 설정을 구성할 수 있습니다. 그 첫 엔트리포인트에 해당하는 것이FASTAPI_CONFIG
환경변수를 통해서 어떤 설정을 할지 결정 하는것이에요.- Pydantic의
BaseSettings
를 상속받아 사용하는 것을 Celery와 사용하는 경우 특히 Flower를 사용할 때 권장하지 않아요.
ERROR/MainProcess] pidbox command error: KeyError('__signature__')
오류가 발생할 수 있습니다.
현재 프로젝트 구조는 아래와 같습니다.
├── main.py
├── project
│ ├── __init__.py
│ └── config.py
└── requirements.txt
project/database.py 파일을 생성할게요.
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from project.config import settings
engine = create_engine(
settings.DATABASE_URL, connect_args=settings.DATABASE_CONNECT_DICT
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
Alembic 초기화
$ alembic init alembic
프로젝트 구성
├── alembic # new
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
├── alembic.ini # new
├── main.py
├── project
│ ├── __init__.py
│ ├── config.py
│ └── database.py
└── requirements.txt
alembic/env.py를 업데이트 합니다.
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from project import create_app # new
from project.config import settings # new
from project.database import Base # new
# Alembic Config 객체인데요. .ini파일을 통해서 명세된 값들에 대한 접근을 할 수 있게 해요.
config = context.config
# 파이썬 로깅 구성을 하기 위해서 config 파일을 인터프린팅하며
# 기본적으로 로거를 구성하게 합니다.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# model의 MetaData 객체를 더해줍니다.
# 'autogenerate' support 기능을 통해서 생성되는데요.
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL)) # new
fastapi_app = create_app() # new
target_metadata = Base.metadata # new
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
Notes
- DB 연결을 위한 문자열을
config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL))
코드를 통해 할 수 있습니다.- 그리고 나서, 관련된 모델을 로드하기 위해서 새로운 fastapi_app 인스턴스 생성을 위해서 create_app() 호출을 하게 됩니다.는데요.
- 마지막으로 Alembic에서 새로운 모델을 찾을 수 있게 하기 위해서 target_metadata = Base.metadata를 추가해줍니다.
db.dqlite3
를 생성해보겠습니다.
(env)$ python
>>> from main import app
>>> from project.database import Base, engine
>>> Base.metadata.create_all(bind=engine)
>>> exit()
(env) ls db.sqlite3
db.dqlite3
현재 아무런 모델이 없더라도 dabase migrate를 하겠습니다.
(env)$ alembic revision --autogenerate
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
Generating /fastapi-celery-project/alembic/versions/4ea12e629032_.py ... done
(env)$ alembic upgrade head
INFO [alembic.runtime.migration] Context impl SQLiteImpl.
INFO [alembic.runtime.migration] Will assume non-transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> 4ea12e629032, empty message
Django 또는 Flask에 대한 경험이 있는 경우 "Django Apps" 또는 "Flask Blueprints"를 사용하여 공통 기능을 재사용 가능한 구성 요소로 그룹화하여 더 큰 애플리케이션을 분할했을 것입니다.
FastAPI도 마찬가지로 진행해보겠습니다.
user폴더를 project폴더 하위에 만들고 그리고 __init__.py파일을 user폴더에 생성하겠습니다.
from fastapi import APIRouter
users_router = APIRouter(
prefix="/users",
)
from . import models # noqa
models.py 파일을 users 디렉토리 아래 생성하고 User
클래스를 정의합니다.
from sqlalchemy import Column, Integer, String
from project.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(128), unique=True, nullable=False)
email = Column(String(128), unique=True, nullable=False)
def __init__(self, username, email, *args, **kwargs):
self.username = username
self.email = email
project/__init__.py 파일을 업데이트 합니다.
from fastapi import FastAPI
def create_app() -> FastAPI:
app = FastAPI()
from project.users import users_router # new
app.include_router(users_router) # new
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
결국 from project.users import users_router
가 호출되면, project/users/__init__.py
파일이 실행되어 코드가 실행되고 결국 models.py도 임포트 되는 겁니다.
project 구조는 아래와 같습니다.
├── alembic
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ └── 4ea12e629032_.py
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│ ├── __init__.py
│ ├── config.py
│ ├── database.py
│ └── users
│ ├── __init__.py
│ └── models.py
└── requirements.txt
- main.py -
create_app
새로운 FastAPI 앱을 만드는 데 사용- project/__init__.py - Factory function
- project/config.py - FastAPI config
- "project/users" - 관련 모델 및 Users에 대한 라우트
새로운 DB 마이그레이션을 해보고 위에서 만든 User
모델의 테이블도 생성해 보겠습니다.
(env)$ alembic revision --autogenerate
# INFO [alembic.autogenerate.compare] Detected added table 'users'
(env)$ alembic upgrade head
# Create user table
파이썬 쉘에서 DB 데이터를 생성해보도록 할 게요.
$ python
>>>from main import app
>>>from project.database import SessionLocal
>>>from project.users.models import User
>>> user = User(username='test1', email='test@example.com')
>>>session = SessionLocal()
>>>session.add(user)
>>>session.commit()
>>>
>>>new_session = SessionLocal()
>>>new_session.query(user).first().username
'test1'
>>> exit()
project/config.py파일에서 BaseConfig
를 CELERY_BROKER_URL
, CELERY_RESULT_BACKEND
변수를 추가하여 업데이트 하겠습니다.
class BaseConfig:
BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
DATABASE_CONNECT_DICT: dict = {}
CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0") # NEW
CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0") # NEW
project/celery_utils.py
파일을 새로 만들게요.
from celery import current_app as current_celery_app
from project.config import settings
def create_celery():
celery_app = current_celery_app
celery_app.config_from_object(settings, namespace="CELERY")
return celery_app
create_celery
팩토리 함수는 Celery 설정과 app 인스턴스를 반환하는 역할을 합니다.- 새로운 Celery 인스턴스를 만들기 보다는 current_app을 통해서 shared tasks가 예상되는 로직에서 작동되도록 구성했습니다.
- celeryapp.config_from_object(settings, namespace="CELERY") 모든 셀러리 관련 구성 키에 접두사(예. CELERY)를 붙이도록 했습니다.다음 broker_url을 사용해야 합니다. broker_url 구성을 위해서 CELERY_BROKER_URL 환경 변수를 사용하였습니다 .
project/__init__.py 파일을 아래와 같이 업데이트 하겠습니다.
from fastapi import FastAPI
from project.celery_utils import create_celery
def create_app() -> FastAPI:
app = FastAPI()
# do this before loading routes
app.celery_app = create_celery()
from project.users import users_router
app.include_router(users_router)
@app.get("/")
async def root():
return {"message": "Hello World"}
return app
project/users/tasks.py 파일을 생성합니다.
from celery import shared_task
@shared_task
def divide(x, y):
import time
time.sleep(5)
return x / y
참고
1. 여러 웹사이트에서celery.task
사용을 추천하지만 Celery instance를 임포트해서 사용할 경우 순환 circular imports를 발생시킬수 있다는 점에 유의해야 합니다.
2. 새로운 Celery instance를 만들지 않고 create_celery(project/celery_utils.py) 팩토리 함수에서 celery_app 인스턴스를 가져오기 때문에 재사용 가능한 코드로 만들어 쓸수 있게 합니다.
해당 앱에서 어디든지 함수를 호출하여 사용 가능한 상태로 만들게 되었습니다.
project/users/__init__.py 파일을 아래와 같이 변경 할 게요.
from fastapi import APIRouter
users_router = APIRouter(
prefix='/users',
)
from . import models, tasks
워커를 돌리게 되면 이제 Celery task를 발견 할 수 있게 됩니다.
main.py 파일을 업데이트 할게요.
from project import create_app
app = create_app()
celery = app.celery_app
현재까지 프로젝트 구조는 아래와 같습니다.
├── alembic
│ ├── README
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 31d362f0573c_.py
│ └── 4ea12e629032_.py
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│ ├── __init__.py
│ ├── celery_utils.py
│ ├── config.py
│ ├── database.py
│ └── users
│ ├── __init__.py
│ ├── models.py
│ └── tasks.py
└── requirements.txt
터미널 하나를 열어서 워커를 돌려볼게요.
(env)$ celery -A main.celery worker --loglevel=info
[config]
.> app: default:0x10f681940 (.default.Loader)
.> transport: redis://127.0.0.1:6379/0
.> results: redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. project.users.tasks.divide
새로운 터미널에서 Python shell을 입력 할게요.
>>> from main import app
>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)
첫 터미널로 돌아가면 워커를 통해 실행된 태스크 로그 기록을 볼 수 있습니다.
[2022-05-15 10:11:40,244: INFO/MainProcess] Task project.users.tasks.divide[efba162e-1fbb-4b67-a338-3a9899363ec6] received
[2022-05-15 10:11:45,253: INFO/ForkPoolWorker-16] Task project.users.tasks.divide[efba162e-1fbb-4b67-a338-3a9899363ec6] succeeded in 5.007629750999996s: 0.5