Application Factory

Hyeseong·2022년 5월 15일
0

fastapi

목록 보기
2/2

개요

  1. FastAPI app 초기화를 위한 application factory 패턴 생성하기
  2. application factory패턴을 이용하여 Celery 설정 작업하기
  3. SQLAlchemy, Alembic을 이용하여 DB 관리

App Factory

'project'라는 새로운 폴더를 만들고 하위에 __init__.py 파일을 생성 하겠습니다.


from fastapi import FastAPI


def create_app() -> FastAPI:
    app = FastAPI()

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app

위에서 정의한 creaet_app 함수는 팩토리 함수인데요. 여러번 호출됨은 물론이거니와 FastAPI app을 보다시피 반환하게 되요.

main.py파일을 아래와 같이 변경할게요.

from project import create_app

app = create_app()

test

(env)$ uvicorn main:app --reload

INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [96439] using watchgod
INFO:     Started server process [96482]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

서버가 구동되는지 브라우저를 통해서 확인해 볼게요.

Alembic & SQLAlchemy

reqirements.txt파일에 아래 2가지를 추가 정의해줍니다.

...
...
alembic==1.6.5
SQLAlchemy==1.4.20

정의해줬다면 아래 명령어로 설치해 줄게요.

pip install -r requirements.txt

참고

  1. 현재로서는 Celery가 asyncio 를 잘 지원하지 않기 때문에(asyncio 이전에 개발되었기 때문에) FastAPI 및 Celery와 함께 사용할 수 있기 때문에 SQLAlchemy를 ORM으로 사용하고 있습니다.
  2. Alembic은 SQLAlchemy용 데이터베이스 마이그레이션 도구입니다.

Config

project 디렉토리 아래 config.py파일을 생성 할 게요.

import os
import pathlib
from functools import lru_cache


class BaseConfig:
    BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent

    DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
    DATABASE_CONNECT_DICT: dict = {}


class DevelopmentConfig(BaseConfig):
    pass


class ProductionConfig(BaseConfig):
    pass


class TestingConfig(BaseConfig):
    pass


@lru_cache()
def get_settings():
    config_cls_dict = {
        "development": DevelopmentConfig,
        "production": ProductionConfig,
        "testing": TestingConfig
    }

    config_name = os.environ.get("FASTAPI_CONFIG", "development")
    config_cls = config_cls_dict[config_name]
    return config_cls()


settings = get_settings()
  1. get_settings함수의 인스턴스를 이용하여 개발, 테스트, 배포 환경에 맞는 환경 설정을 구성할 수 있습니다. 그 첫 엔트리포인트에 해당하는 것이 FASTAPI_CONFIG 환경변수를 통해서 어떤 설정을 할지 결정 하는것이에요.
  2. Pydantic의 BaseSettings를 상속받아 사용하는 것을 Celery와 사용하는 경우 특히 Flower를 사용할 때 권장하지 않아요.
    ERROR/MainProcess] pidbox command error: KeyError('__signature__') 오류가 발생할 수 있습니다.

현재 프로젝트 구조는 아래와 같습니다.

├── main.py
├── project
│   ├── __init__.py
│   └── config.py
└── requirements.txt

SQLAlchemy 임포팅

project/database.py 파일을 생성할게요.

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from project.config import settings

engine = create_engine(
    settings.DATABASE_URL, connect_args=settings.DATABASE_CONNECT_DICT
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Base = declarative_base()

Alembic 설정

Alembic 초기화

$ alembic init alembic

프로젝트 구성

├── alembic               # new
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
├── alembic.ini           # new
├── main.py
├── project
│   ├── __init__.py
│   ├── config.py
│   └── database.py
└── requirements.txt

alembic/env.py를 업데이트 합니다.

from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context

from project import create_app                 # new
from project.config import settings            # new
from project.database import Base              # new


# Alembic Config 객체인데요. .ini파일을 통해서 명세된 값들에 대한 접근을 할 수 있게 해요.
config = context.config

# 파이썬 로깅 구성을 하기 위해서 config 파일을 인터프린팅하며
# 기본적으로 로거를 구성하게 합니다.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# model의 MetaData 객체를 더해줍니다.
# 'autogenerate' support 기능을 통해서 생성되는데요.
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL))        # new

fastapi_app = create_app()    # new

target_metadata = Base.metadata       # new


# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.

Notes

  1. DB 연결을 위한 문자열을 config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL)) 코드를 통해 할 수 있습니다.
  2. 그리고 나서, 관련된 모델을 로드하기 위해서 새로운 fastapi_app 인스턴스 생성을 위해서 create_app() 호출을 하게 됩니다.는데요.
  3. 마지막으로 Alembic에서 새로운 모델을 찾을 수 있게 하기 위해서 target_metadata = Base.metadata를 추가해줍니다.

db.dqlite3를 생성해보겠습니다.


(env)$ python
>>> from main import app
>>> from project.database import Base, engine
>>> Base.metadata.create_all(bind=engine)
>>> exit()

(env) ls db.sqlite3
db.dqlite3

현재 아무런 모델이 없더라도 dabase migrate를 하겠습니다.

(env)$ alembic revision --autogenerate

INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
  Generating /fastapi-celery-project/alembic/versions/4ea12e629032_.py ...  done

(env)$ alembic upgrade head
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 4ea12e629032, empty message

앱 구조

Django 또는 Flask에 대한 경험이 있는 경우 "Django Apps" 또는 "Flask Blueprints"를 사용하여 공통 기능을 재사용 가능한 구성 요소로 그룹화하여 더 큰 애플리케이션을 분할했을 것입니다.

FastAPI도 마찬가지로 진행해보겠습니다.

user폴더를 project폴더 하위에 만들고 그리고 __init__.py파일을 user폴더에 생성하겠습니다.


from fastapi import APIRouter

users_router = APIRouter(
    prefix="/users",
)

from . import models # noqa

models.py 파일을 users 디렉토리 아래 생성하고 User 클래스를 정의합니다.

from sqlalchemy import Column, Integer, String

from project.database import Base


class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(128), unique=True, nullable=False)
    email = Column(String(128), unique=True, nullable=False)

    def __init__(self, username, email, *args, **kwargs):
        self.username = username
        self.email = email

project/__init__.py 파일을 업데이트 합니다.


from fastapi import FastAPI


def create_app() -> FastAPI:
    app = FastAPI()

    from project.users import users_router                # new
    app.include_router(users_router)                      # new

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app

결국 from project.users import users_router가 호출되면, project/users/__init__.py 파일이 실행되어 코드가 실행되고 결국 models.py도 임포트 되는 겁니다.

project 구조는 아래와 같습니다.

├── alembic
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       └── 4ea12e629032_.py
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│   ├── __init__.py
│   ├── config.py
│   ├── database.py
│   └── users
│       ├── __init__.py
│       └── models.py
└── requirements.txt
  1. main.py - create_app 새로운 FastAPI 앱을 만드는 데 사용
  2. project/__init__.py - Factory function
  3. project/config.py - FastAPI config
  4. "project/users" - 관련 모델 및 Users에 대한 라우트

Database 운영

새로운 DB 마이그레이션을 해보고 위에서 만든 User 모델의 테이블도 생성해 보겠습니다.

(env)$ alembic revision --autogenerate
# INFO  [alembic.autogenerate.compare] Detected added table 'users'

(env)$ alembic upgrade head
# Create user table

파이썬 쉘에서 DB 데이터를 생성해보도록 할 게요.

$ python

>>>from main import app
>>>from project.database import SessionLocal
>>>from project.users.models import User

>>> user = User(username='test1', email='test@example.com')
>>>session = SessionLocal()
>>>session.add(user)
>>>session.commit()
>>>
>>>new_session = SessionLocal()
>>>new_session.query(user).first().username
'test1'
>>> exit()

Celery 추가하기

project/config.py파일에서 BaseConfigCELERY_BROKER_URL, CELERY_RESULT_BACKEND 변수를 추가하여 업데이트 하겠습니다.

class BaseConfig:
    BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent

    DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
    DATABASE_CONNECT_DICT: dict = {}

    CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")            # NEW
    CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")    # NEW

project/celery_utils.py 파일을 새로 만들게요.

from celery import current_app as current_celery_app

from project.config import settings


def create_celery():
    celery_app = current_celery_app
    celery_app.config_from_object(settings, namespace="CELERY")

    return celery_app
  1. create_celery 팩토리 함수는 Celery 설정과 app 인스턴스를 반환하는 역할을 합니다.
  2. 새로운 Celery 인스턴스를 만들기 보다는 current_app을 통해서 shared tasks가 예상되는 로직에서 작동되도록 구성했습니다.
  3. celeryapp.config_from_object(settings, namespace="CELERY") 모든 셀러리 관련 구성 키에 접두사(예. CELERY)를 붙이도록 했습니다.다음 broker_url을 사용해야 합니다. broker_url 구성을 위해서 CELERY_BROKER_URL 환경 변수를 사용하였습니다 .

project/__init__.py 파일을 아래와 같이 업데이트 하겠습니다.

from fastapi import FastAPI

from project.celery_utils import create_celery


def create_app() -> FastAPI:
    app = FastAPI()

    # do this before loading routes
    app.celery_app = create_celery()

    from project.users import users_router
    app.include_router(users_router)

    @app.get("/")
    async def root():
        return {"message": "Hello World"}

    return app

project/users/tasks.py 파일을 생성합니다.

from celery import shared_task


@shared_task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y

참고
1. 여러 웹사이트에서 celery.task사용을 추천하지만 Celery instance를 임포트해서 사용할 경우 순환 circular imports를 발생시킬수 있다는 점에 유의해야 합니다.
2. 새로운 Celery instance를 만들지 않고 create_celery(project/celery_utils.py) 팩토리 함수에서 celery_app 인스턴스를 가져오기 때문에 재사용 가능한 코드로 만들어 쓸수 있게 합니다.
해당 앱에서 어디든지 함수를 호출하여 사용 가능한 상태로 만들게 되었습니다.

project/users/__init__.py 파일을 아래와 같이 변경 할 게요.

from fastapi import APIRouter

users_router = APIRouter(
	prefix='/users',
)

from . import models, tasks 

워커를 돌리게 되면 이제 Celery task를 발견 할 수 있게 됩니다.

main.py 파일을 업데이트 할게요.

from project import create_app

app = create_app()
celery = app.celery_app

현재까지 프로젝트 구조는 아래와 같습니다.

├── alembic
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       ├── 31d362f0573c_.py
│       └── 4ea12e629032_.py
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│   ├── __init__.py
│   ├── celery_utils.py
│   ├── config.py
│   ├── database.py
│   └── users
│       ├── __init__.py
│       ├── models.py
│       └── tasks.py
└── requirements.txt

테스트

터미널 하나를 열어서 워커를 돌려볼게요.

(env)$ celery -A main.celery worker --loglevel=info

[config]
.> app:         default:0x10f681940 (.default.Loader)
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . project.users.tasks.divide

새로운 터미널에서 Python shell을 입력 할게요.

>>> from main import app
>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)

첫 터미널로 돌아가면 워커를 통해 실행된 태스크 로그 기록을 볼 수 있습니다.

[2022-05-15 10:11:40,244: INFO/MainProcess] Task project.users.tasks.divide[efba162e-1fbb-4b67-a338-3a9899363ec6] received
[2022-05-15 10:11:45,253: INFO/ForkPoolWorker-16] Task project.users.tasks.divide[efba162e-1fbb-4b67-a338-3a9899363ec6] succeeded in 5.007629750999996s: 0.5
profile
어제보다 오늘 그리고 오늘 보다 내일...

0개의 댓글