Fastapi SQLAlchemy Multiple databases

yslee·2022년 12월 3일
0

서비스를 만들다보면 가끔 여러 DB를 사용해야할 때가 있다.
FastAPI에서 SQLAlchemy를 사용할때 이런 경우 어떻게 할 수 있을까 고민중 flask-sqlalchemy에서 사용하는 패턴에서 영감을 받아 ORM Model에 따라서 동적으로 라우팅시키는 방법을 작성해 보았다.


bind_key style

flask-sqlalchemy에서는 ORM class에 bind_key 라는 메타데이터로 DB 라우팅을 하는 패턴을 사용하고 있다. 개인적으로 직관적이며 나쁘지 않은 방식이라 생각해 이방식을 FastAPI에 적용해 보려고 한다.

SQLALCHEMY_BINDS = {
    "meta": "sqlite:////path/to/meta.db",
    "auth": {
        "url": "mysql://localhost/users",
        "pool_recycle": 3600,
    },
}

class User(db.Model):
    __bind_key__ = "auth"
    id = db.Column(db.Integer, primary_key=True)

상황은 아래와 같다.

  • Model A는 DB-1
  • Model B는 DB-2
    하나의 서비스에서 Model A,B를 동시에 사용해야 한다.

Session.get_bind

SQLAlchemy에서는 Session.get_bind()에서 ORM 객체를 후킹할 수 있다.

ORM Model

class Name(Base):
    __tablename__ = "Name"
    __bind_key__ = "db1" # 메타데이터 변수를 만들어 라우팅 DB 인스턴스 key를 입력

    id = Column(BigInteger, primary_key=True)
    name = Column(String(30))


class Book(Base):
    __tablename__ = "Book"
    __bind_key__ = "db2" # 메타데이터 변수를 만들어 라우팅 DB 인스턴스 key를 입력

    id = Column(BigInteger, primary_key=True)
    title = Column(String(30))

ORM class에 bind_key 메타데이터 정보를 추가

RoutingSession

from sqlalchemy.orm import Session, sessionmaker, scoped_session
from sqlalchemy import select, create_engine

engines = {
    "db1": create_engine(
        f"mysql+mysqlconnector://root:admin@localhost:3307/mydatabase"
    ),
    "db2": create_engine(
        f"mysql+mysqlconnector://root:admin@localhost:3308/mydatabase"
    ),
}

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kw):
        bind_key = mapper.class_.__bind_key__
        return engines[bind_key]


session = scoped_session(sessionmaker(class_=RoutingSession))
...

SQLAlchemy Session의 get_bind를 재정의, mapper class의 bind_key 메타데이터로 db 엔진을 반환하게 하면 ORM에 정의에 따라 DB를 선택할 수 있다.

Test

DB-1의 Name table

idname
1A
2B
3C

DB-1의 Name table

idname
1D
2E
3F

이렇게 저장되어 있다.

def get_db():
    db = session()
    try:
        yield db
    finally:
        db.close()


app = FastAPI()


@app.get("/names")
async def get_data_from_db_1(
    session=Depends(get_db),
):

    print(session)
    result = session.query(Name).all()
    return result


def get_data_from_db(
    session=Depends(get_db),
):
    print(session)


@app.get("/books")
async def get_data_from_db_2(
    session=Depends(get_db),
    svc=Depends(get_data_from_db),
):
    print(session)
    result = session.query(Book).all()
    return result
curl -X GET http://localhost:8000/names
[{"id":1,"name":"A"},{"id":2,"name":"B"},{"id":3,"name":"C"}]%     
❯ curl -X GET http://localhost:8000/books
[{"id":1,"title":"D"},{"id":2,"title":"E"},{"id":3,"title":"F"}]%      

테스트 결과 각 db에서 동적으로 데이터를 잘 가져오는 모습을 확인할 수 있다.
디버깅을 해보면 session.query(Name).all() 로 qurey를 던지면 재정의한 get_bind 메서드에서 mapper를 통해 bind_key 정보를 잘 가져오는 모습을 확인 할 수 있다.

profile
지식보다 지혜를

0개의 댓글