'project'라는 새로운 폴더를 만들고 하위에 __init__.py 파일을 생성 하겠습니다.
from fastapi import FastAPI
def create_app() -> FastAPI:
    app = FastAPI()
    @app.get("/")
    async def root():
        return {"message": "Hello World"}
    return app
위에서 정의한 creaet_app 함수는 팩토리 함수인데요. 여러번 호출됨은 물론이거니와 FastAPI app을 보다시피 반환하게 되요. 
main.py파일을 아래와 같이 변경할게요.
from project import create_app
app = create_app()
test
(env)$ uvicorn main:app --reload
INFO:     Uvicorn running on http://localhost:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [96439] using watchgod
INFO:     Started server process [96482]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
서버가 구동되는지 브라우저를 통해서 확인해 볼게요.
reqirements.txt파일에 아래 2가지를 추가 정의해줍니다.
...
...
alembic==1.6.5
SQLAlchemy==1.4.20
정의해줬다면 아래 명령어로 설치해 줄게요.
pip install -r requirements.txt
참고
- 현재로서는 Celery가 asyncio 를 잘 지원하지 않기 때문에(asyncio 이전에 개발되었기 때문에) FastAPI 및 Celery와 함께 사용할 수 있기 때문에 SQLAlchemy를 ORM으로 사용하고 있습니다.
 - Alembic은 SQLAlchemy용 데이터베이스 마이그레이션 도구입니다.
 
project 디렉토리 아래 config.py파일을 생성 할 게요.
import os
import pathlib
from functools import lru_cache
class BaseConfig:
    BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
    DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
    DATABASE_CONNECT_DICT: dict = {}
class DevelopmentConfig(BaseConfig):
    pass
class ProductionConfig(BaseConfig):
    pass
class TestingConfig(BaseConfig):
    pass
@lru_cache()
def get_settings():
    config_cls_dict = {
        "development": DevelopmentConfig,
        "production": ProductionConfig,
        "testing": TestingConfig
    }
    config_name = os.environ.get("FASTAPI_CONFIG", "development")
    config_cls = config_cls_dict[config_name]
    return config_cls()
settings = get_settings()
get_settings함수의 인스턴스를 이용하여 개발, 테스트, 배포 환경에 맞는 환경 설정을 구성할 수 있습니다. 그 첫 엔트리포인트에 해당하는 것이FASTAPI_CONFIG환경변수를 통해서 어떤 설정을 할지 결정 하는것이에요.- Pydantic의
 BaseSettings를 상속받아 사용하는 것을 Celery와 사용하는 경우 특히 Flower를 사용할 때 권장하지 않아요.
ERROR/MainProcess] pidbox command error: KeyError('__signature__')오류가 발생할 수 있습니다.
현재 프로젝트 구조는 아래와 같습니다.
├── main.py
├── project
│   ├── __init__.py
│   └── config.py
└── requirements.txt
project/database.py 파일을 생성할게요.
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from project.config import settings
engine = create_engine(
    settings.DATABASE_URL, connect_args=settings.DATABASE_CONNECT_DICT
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
Alembic 초기화
$ alembic init alembic
프로젝트 구성
├── alembic               # new
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
├── alembic.ini           # new
├── main.py
├── project
│   ├── __init__.py
│   ├── config.py
│   └── database.py
└── requirements.txt
alembic/env.py를 업데이트 합니다.
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from project import create_app                 # new
from project.config import settings            # new
from project.database import Base              # new
# Alembic Config 객체인데요. .ini파일을 통해서 명세된 값들에 대한 접근을 할 수 있게 해요.
config = context.config
# 파이썬 로깅 구성을 하기 위해서 config 파일을 인터프린팅하며
# 기본적으로 로거를 구성하게 합니다.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# model의 MetaData 객체를 더해줍니다.
# 'autogenerate' support 기능을 통해서 생성되는데요.
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL))        # new
fastapi_app = create_app()    # new
target_metadata = Base.metadata       # new
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
Notes
- DB 연결을 위한 문자열을
 config.set_main_option("sqlalchemy.url", str(settings.DATABASE_URL))코드를 통해 할 수 있습니다.- 그리고 나서, 관련된 모델을 로드하기 위해서 새로운 fastapi_app 인스턴스 생성을 위해서 create_app() 호출을 하게 됩니다.는데요.
 - 마지막으로 Alembic에서 새로운 모델을 찾을 수 있게 하기 위해서 target_metadata = Base.metadata를 추가해줍니다.
 
db.dqlite3를 생성해보겠습니다. 
(env)$ python
>>> from main import app
>>> from project.database import Base, engine
>>> Base.metadata.create_all(bind=engine)
>>> exit()
(env) ls db.sqlite3
db.dqlite3
현재 아무런 모델이 없더라도 dabase migrate를 하겠습니다.
(env)$ alembic revision --autogenerate
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
  Generating /fastapi-celery-project/alembic/versions/4ea12e629032_.py ...  done
(env)$ alembic upgrade head
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running upgrade  -> 4ea12e629032, empty message
Django 또는 Flask에 대한 경험이 있는 경우 "Django Apps" 또는 "Flask Blueprints"를 사용하여 공통 기능을 재사용 가능한 구성 요소로 그룹화하여 더 큰 애플리케이션을 분할했을 것입니다.
FastAPI도 마찬가지로 진행해보겠습니다.
user폴더를 project폴더 하위에 만들고 그리고 __init__.py파일을 user폴더에 생성하겠습니다.
from fastapi import APIRouter
users_router = APIRouter(
    prefix="/users",
)
from . import models # noqa
models.py 파일을 users 디렉토리 아래 생성하고 User 클래스를 정의합니다. 
from sqlalchemy import Column, Integer, String
from project.database import Base
class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(128), unique=True, nullable=False)
    email = Column(String(128), unique=True, nullable=False)
    def __init__(self, username, email, *args, **kwargs):
        self.username = username
        self.email = email
project/__init__.py 파일을 업데이트 합니다.
from fastapi import FastAPI
def create_app() -> FastAPI:
    app = FastAPI()
    from project.users import users_router                # new
    app.include_router(users_router)                      # new
    @app.get("/")
    async def root():
        return {"message": "Hello World"}
    return app
결국 from project.users import users_router가 호출되면, project/users/__init__.py 파일이 실행되어 코드가 실행되고 결국 models.py도 임포트 되는 겁니다.  
project 구조는 아래와 같습니다.
├── alembic
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       └── 4ea12e629032_.py
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│   ├── __init__.py
│   ├── config.py
│   ├── database.py
│   └── users
│       ├── __init__.py
│       └── models.py
└── requirements.txt
- main.py -
 create_app새로운 FastAPI 앱을 만드는 데 사용- project/__init__.py - Factory function
 - project/config.py - FastAPI config
 - "project/users" - 관련 모델 및 Users에 대한 라우트
 
새로운 DB 마이그레이션을 해보고 위에서 만든 User 모델의 테이블도 생성해 보겠습니다. 
(env)$ alembic revision --autogenerate
# INFO  [alembic.autogenerate.compare] Detected added table 'users'
(env)$ alembic upgrade head
# Create user table
파이썬 쉘에서 DB 데이터를 생성해보도록 할 게요.
$ python
>>>from main import app
>>>from project.database import SessionLocal
>>>from project.users.models import User
>>> user = User(username='test1', email='test@example.com')
>>>session = SessionLocal()
>>>session.add(user)
>>>session.commit()
>>>
>>>new_session = SessionLocal()
>>>new_session.query(user).first().username
'test1'
>>> exit()
project/config.py파일에서 BaseConfig 를 CELERY_BROKER_URL, CELERY_RESULT_BACKEND 변수를 추가하여 업데이트 하겠습니다. 
class BaseConfig:
    BASE_DIR: pathlib.Path = pathlib.Path(__file__).parent.parent
    DATABASE_URL: str = os.environ.get("DATABASE_URL", f"sqlite:///{BASE_DIR}/db.sqlite3")
    DATABASE_CONNECT_DICT: dict = {}
    CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL", "redis://127.0.0.1:6379/0")            # NEW
    CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND", "redis://127.0.0.1:6379/0")    # NEW
project/celery_utils.py 파일을 새로 만들게요. 
from celery import current_app as current_celery_app
from project.config import settings
def create_celery():
    celery_app = current_celery_app
    celery_app.config_from_object(settings, namespace="CELERY")
    return celery_app
create_celery팩토리 함수는 Celery 설정과 app 인스턴스를 반환하는 역할을 합니다.- 새로운 Celery 인스턴스를 만들기 보다는 current_app을 통해서 shared tasks가 예상되는 로직에서 작동되도록 구성했습니다.
 - celeryapp.config_from_object(settings, namespace="CELERY") 모든 셀러리 관련 구성 키에 접두사(예. CELERY)를 붙이도록 했습니다.다음 broker_url을 사용해야 합니다. broker_url 구성을 위해서 CELERY_BROKER_URL 환경 변수를 사용하였습니다 .
 
project/__init__.py 파일을 아래와 같이 업데이트 하겠습니다.
from fastapi import FastAPI
from project.celery_utils import create_celery
def create_app() -> FastAPI:
    app = FastAPI()
    # do this before loading routes
    app.celery_app = create_celery()
    from project.users import users_router
    app.include_router(users_router)
    @app.get("/")
    async def root():
        return {"message": "Hello World"}
    return app
project/users/tasks.py 파일을 생성합니다.
from celery import shared_task
@shared_task
def divide(x, y):
    import time
    time.sleep(5)
    return x / y
참고
1. 여러 웹사이트에서celery.task사용을 추천하지만 Celery instance를 임포트해서 사용할 경우 순환 circular imports를 발생시킬수 있다는 점에 유의해야 합니다.
2. 새로운 Celery instance를 만들지 않고 create_celery(project/celery_utils.py) 팩토리 함수에서 celery_app 인스턴스를 가져오기 때문에 재사용 가능한 코드로 만들어 쓸수 있게 합니다.
해당 앱에서 어디든지 함수를 호출하여 사용 가능한 상태로 만들게 되었습니다.
project/users/__init__.py 파일을 아래와 같이 변경 할 게요.
from fastapi import APIRouter
users_router = APIRouter(
	prefix='/users',
)
from . import models, tasks 
워커를 돌리게 되면 이제 Celery task를 발견 할 수 있게 됩니다.
main.py 파일을 업데이트 할게요.
from project import create_app
app = create_app()
celery = app.celery_app
현재까지 프로젝트 구조는 아래와 같습니다.
├── alembic
│   ├── README
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       ├── 31d362f0573c_.py
│       └── 4ea12e629032_.py
├── alembic.ini
├── db.sqlite3
├── main.py
├── project
│   ├── __init__.py
│   ├── celery_utils.py
│   ├── config.py
│   ├── database.py
│   └── users
│       ├── __init__.py
│       ├── models.py
│       └── tasks.py
└── requirements.txt
터미널 하나를 열어서 워커를 돌려볼게요.
(env)$ celery -A main.celery worker --loglevel=info
[config]
.> app:         default:0x10f681940 (.default.Loader)
.> transport:   redis://127.0.0.1:6379/0
.> results:     redis://127.0.0.1:6379/0
.> concurrency: 8 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery           exchange=celery(direct) key=celery
[tasks]
  . project.users.tasks.divide
새로운 터미널에서 Python shell을 입력 할게요.
>>> from main import app
>>> from project.users.tasks import divide
>>> task = divide.delay(1, 2)
첫 터미널로 돌아가면 워커를 통해 실행된 태스크 로그 기록을 볼 수 있습니다.
[2022-05-15 10:11:40,244: INFO/MainProcess] Task project.users.tasks.divide[efba162e-1fbb-4b67-a338-3a9899363ec6] received
[2022-05-15 10:11:45,253: INFO/ForkPoolWorker-16] Task project.users.tasks.divide[efba162e-1fbb-4b67-a338-3a9899363ec6] succeeded in 5.007629750999996s: 0.5