01. FastAPI ORM(feat. SQLAlchemy)

1. 필수 Library 설치

1) SQLAlchemy 설치

> pip install sqlalchemy

2) pymysql & aiomysql 설치

  1. pymysql
> pip install pymysql

pymysql은 순수 파이썬으로 작성된 라이브러리이며, mysqlclient와 API 호환성을 유지한다.mysqlclient에 비해 설치가 간단하고, 특정 시스템에 대한 이진 의존성이 없다. 성능은 mysqlclient보다 약간 떨어질 수 있지만, 대부분의 일반적인 사용 사례에서 충분히 좋은 성능을 제공한다.

  1. aiomysql
> pip install aiomysql

aiomysqlpymysql을 기반으로 하는 비동기 프로그래밍(특히 asyncio를 사용하는 경우)을 위한 라이브러리다. aiomysql을 사용하면 비동기 I/O를 통해 데이터베이스 연산을 수행할 수 있어 애플리케이션의 동시성을 향상시킬 수 있다.

mysqlclient를 이미 설치하셨다면, SQLAlchemy와 함께 사용할 수 있다. 하지만 비동기 코드를 작성하고자 한다면 aiomysql를 설치해야 한다. pymysqlaiomysql를 동시에 설치하는 것은 필요하지 않지만, aiomysql는 내부적으로 pymysql의 일부 코드를 사용하기 때문에 pymysql이 필요할 수 있다.

2. 파일 구조

  • 사용할 파일 구조
├── main.py
├── dependencies.py
├── routers
│   ├── users.py
│   └── items.py
├── crud.py
├── models.py
├── schemas.py	# 구조 검증때 사용
└── database.py

이 구조에서 routers 디렉터리에는 각 기능별 라우트를 정의하는 파일이 포함된다.

아래 파일 순서는 만드는 과정에 따라 정리했다.

3. models.py

데이터베이스 테이블에 매핑될 모델 정의

# models.py - DB table column 정의
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from database import Base

# User(table)
class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True)
    hashed_password = Column()

    items = relationship("Item", back_populates="owner")

# Item(table)
class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String)
    description = Column(String)
    owner_id = Column(Integer, ForeignKey("users.id"))

    owner = relationship("User", back_populates="items")

4. database.py

데이터베이스 연결과 세션 관리

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# (1) 비동기 방식 - Starlette
# (2) 데이터 검증 - pydantic

# 동기용 데이터 베이스 설정 (pymysql)
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:비밀번호@localhost/oz_fastapi"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)

# 비동기용 데이터 베이스 설정 (aiomysql)
# - 무거운 I/O 요청(5초)이 먼저 와도, 뒤에 가벼운 I/O 작업 요청(1초)이 들어오면 더 빨리 끝나는 것이 응답된다.
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

ASYNC_SQLALCHEMY_DATABASE_URL = "mysql+aiomysql://root:비밀번호@localhost/oz_fastapi"
async_engine = create_async_engine(ASYNC_SQLALCHEMY_DATABASE_URL)
AsyncSessionLocal = sessionmaker(bind=async_engine, class_=AsyncSession)

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

5. crud.py

from sqlalchemy.orm import Session
from models import User, Item
from schemas import UserCreate, UserUpdate, ItemCreate, ItemUpdate
import bcrypt

# User - CRUD
def create_user(db: Session, user: UserCreate):
    hashed_password = bcrypt.hashpw(user.password.encode("utf-8"), bcrypt.gensalt())  # bite 형태라 encode 필요!

    db_user = User(email=user.email, hashed_password=hashed_password)
    db.add(db_user)
    db.commit()

    return db_user

def get_user_id(db: Session, user_id: int):
    return db.query(User).filter(User.id == user_id).first()

def get_user_email(db: Session, user_email: str):
    return db.query(User).filter(User.email == user_email).first()

def get_users(db: Session, skip: int = 0, limit: int = 10):
    return db.query(User).offset(skip).limit(limit).all()

def update_user(db: Session, user_id: int, user_update: UserUpdate):
    db_user = db.query(User).filter(User.id == user_id).first()
    if not db_user:
        return None

    user_data = user_update.dict()

    for key, value in user_data.items():
        setattr(db_user, key, value)

    db.commit()
    db.refresh(db_user)

    return db_user

def delete_user(db: Session, user_id: int):
    db_user = db.query(User).filter(User.id == user_id).first()
    if not db_user:
        return None

    db.delete(db_user)
    db.commit()

    return db_user

# Item - CRUD
def create_item(db: Session, item: ItemCreate, owner_id: int):
    db_item = Item(**item.dict(), owner_id=owner_id)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)

    return db_item

def get_item(db: Session, item_id: int):
    return db.query(Item).filter(Item.id == item_id).first()

def get_items(db: Session, skip: int = 0, limit: int = 10):
    return db.query(Item).offset(skip).limit(limit).all()

def update_item(db: Session, item_id: int, item_update: ItemUpdate):
    db_item = db.query(Item).filter(Item.id == item_id).first()

    if not db_item:
        return None

    for key, value in item_update.dict().items():
        setattr(db_item, key, value)

    db.commit()
    db.refresh(db_item)

    return db_item

def delete_item(db: Session, item_id: int):
    db_item = db.query(Item).filter(Item.id == item_id).first()
    if not db_item:
        return None

    db.delete(db_item)
    db.commit()

    return db_item

6. dependencies.py

공통적으로 사용될 의존성을 정의

from database import SessionLocal, AsyncSessionLocal

# 동기용 의존성
def get_db():
    db = SessionLocal()
    try:
        # Generator
        yield db
    finally:
        db.close()

# 비동기용 의존성
async def get_async_db():
    async with AsyncSessionLocal() as session:
        yield session

7. schemas.py

API의 요청 및 응답 데이터 Validation

from pydantic import BaseModel
from typing import List, Optional

# schemas/item.py
# schemas/user.py

# pydantic -> 데이터 유효성 검증
# Item
class ItemBase(BaseModel):
    title: str
    description: str

class ItemCreate(ItemBase):
    pass

class ItemUpdate(ItemBase):
    title: Optional[str] = None
    description: Optional[str] = None

class Item(BaseModel):
    id: int
    owner_id: int

    class Config:
        orm_mode = True  # orm 방식으로 데이터 필드 읽기가 가능

# User
class UserBase(BaseModel):
    email: str

class User(UserBase):
    id: int
    email: str

    items: List[Item] = []

    class Config:
        orm_mode = True

class UserCreate(UserBase):
    password: str

class UserUpdate(UserBase):
    email: str | None = None
    password: str | None = None

8. routers/users.py

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from dependencies import get_db
from schemas import UserCreate, UserUpdate
from typing import Union
import crud

router = APIRouter()

# CRUD
@router.post("/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = crud.create_user(db, user)
    return db_user

# api/v1/users/{user_id}
@router.get("/{user_data}")  # user_id 문자와 숫자 어떤거든 입력하여 검색 가능하도록
def get_user(user_data: Union[int, str], db: Session = Depends(get_db)):
    try:  # user_data type이 int인 경우
        user_data = int(user_data)
        db_user = crud.get_user_id(db, user_data)
    except:  # user_data type이 str인 경우
        db_user = crud.get_user_email(db, user_data)

    if db_user is None:
        raise HTTPException(status_code=404, detail="User Not Found")
    return db_user

# api/v1/users/
@router.get("/")
def get_users(skip: int, limit: int, db: Session = Depends(get_db)):
    return crud.get_users(db, skip, limit)

# api/v1/users/{user_id}
@router.put("/{user_id}")
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
    updated_user = crud.update_user(db, user_id, user)

    if updated_user is None:
        raise HTTPException(status_code=404, detail="User Not Found")
    return updated_user

@router.delete("/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
    deleted_user = crud.delete_user(db, user_id)

    if delete_user is None:
        raise HTTPException(status_code=404, detail="User Not Found")
    return deleted_user

9. routers/items.py

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from dependencies import get_db
from schemas import ItemCreate, ItemUpdate
from typing import Union
import crud

router = APIRouter()

# CRUD
@router.post("/")
def create_item(item: ItemCreate, owner_id: int, db: Session = Depends(get_db)):
    db_item = crud.create_item(db, item, owner_id)
    return db_item

# api/v1/users/{item_id}
@router.get("/{item_id}")
def get_item(item_id: int, db: Session = Depends(get_db)):
    db_item = crud.get_item(db, item_id)

    if db_item is None:
        raise HTTPException(status_code=404, detail="Item Not Found")
    return db_item

@router.get("/")
def get_items(skip: int=0, limit: int=10, db: Session = Depends(get_db)): 
    return crud.get_items(db, skip, limit)

@router.put("/{item_id}")
def update_item(item_id: int, item: ItemUpdate, db: Session = Depends(get_db)):
    updated_item = crud.update_item(db, item_id, item)

    if updated_item is None:
        raise HTTPException(status_code=404, detail="Item Not Found")
    return updated_item

@router.delete("/{item_id}")
def delete_item(item_id: int, db: Session = Depends(get_db)):
    is_success = crud.delete_item(db, item_id)

    if not is_success:
        raise HTTPException(status_code=404, detail="Item Not Found")
    return {'msg': 'Item deleted successfully'}

10. main.py

from fastapi import FastAPI
from routers.users import router as user_router
from routers.items import router as item_router

app = FastAPI()

app.include_router(user_router, prefix="/api/v1/users", tags=["User"])
app.include_router(item_router, prefix="/api/v1/items", tags=["Item"])

if __name__ == "__main__":
    import uvicorn

    uvicorn.run("main:app", reload=True)

11. MySQL Workbench Schema & Table 만들기

MySQL Workbench에 접속하여 database.py파일에서 적었던 schema를 만들어주고, (나는 oz_fastapi로 만들었다.) 아래 코드로 usersitems table을 만들어준다.

CREATE TABLE users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    hashed_password VARCHAR(255) NOT NULL
);

CREATE TABLE items (
    id INT AUTO_INCREMENT PRIMARY KEY,
    title VARCHAR(255),
    description VARCHAR(255),
    owner_id INT,
    FOREIGN KEY (owner_id) REFERENCES users(id)
);

12. 동기 버전 함수 & 비동기 버전 함수

동기 버전의 함수는 기존과 동일하게 Session 객체를 사용한다.

1) 동기 버전 함수

from sqlalchemy.orm import Session
from . import models

def get_user(db: Session, user_id: int):
    return db.query(models.User).filter(models.User.id == user_id).first()

def get_user_by_email(db: Session, email: str):
    return db.query(models.User).filter(models.User.email == email).first()

2) 비동기 버전 함수

비동기 버전의 함수는 AsyncSession 객체를 사용하고 await 키워드로 비동기 쿼리를 수행한다. SQLAlchemy 1.4 이상에서 select() 함수를 사용하여 ORM 스타일의 쿼리를 생성할 수 있다.

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from . import models

async def get_user_async(db: AsyncSession, user_id: int):
    result = await db.execute(select(models.User).where(models.User.id == user_id))
    return result.scalars().first()

async def get_user_by_email_async(db: AsyncSession, email: str):
    result = await db.execute(select(models.User).where(models.User.email == email))
    return result.scalars().first()

비동기 버전에서는 db.executeawait를 사용해 쿼리 실행을 비동기적으로 처리하고, result.scalars().first()로 결과를 추출한다.


02. FastAPI SQL

전체 코드는 ORM과 동일하며, crud.py 파일만 수정하면 된다.

  • crud.py
from models import User, Item
from schemas import UserCreate, UserUpdate, ItemCreate, ItemUpdate
from sqlalchemy.orm import Session
import bcrypt # pip insall bcrypt

from sqlalchemy import text # 추가

def create_user(db: Session, user: UserCreate):
    hashed_password = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt())
    sql = text("INSERT INTO users (email, hashed_password) VALUES (:email, :hashed_password)")
    db.execute(sql, {"email": user.email, "hashed_password": hashed_password})
    db.commit()

    last_id_query = text("SELECT LAST_INSERT_ID()")
    last_id_result = db.execute(last_id_query)
    last_id = last_id_result.scalar()

    user_query = text("SELECT * FROM users WHERE id = :last_id")
    user_result = db.execute(user_query, {"last_id": last_id}).fetchone()
    return user_result._asdict()

# user의 id값을 기반으로 데이터를 찾는다.
def get_user_id(db: Session, user_id: int):
    sql = text("SELECT * FROM users WHERE id = :user_id")
    result = db.execute(sql, {"user_id": user_id}).fetchone()
    return result._asdict()

# user의 email값을 기반으로 데이터를 찾는다.
def get_user_email(db: Session, user_email: str):
    sql = text("SELECT * FROM users WHERE email = :user_email")
    result = db.execute(sql, {"user_email": user_email}).fetchone()
    return result._asdict()

# 전체 유저를 불러와 봅시다. (페이지 네이션)
def get_users(db: Session, skip: int = 0, limit: int = 10):
    user_sql = text("SELECT * FROM users LIMIT :limit OFFSET :skip")
    user_results = db.execute(user_sql, {"limit": limit, "skip": skip}).fetchall()
    users = [row._asdict() for row in user_results]

    for user in users:
        item_sql = text("SELECT * FROM items WHERE owner_id = :owner_id")
        item_results = db.execute(item_sql, {"owner_id": user['id']}).fetchall()
        user['items'] = [item._asdict() for item in item_results]

    return users

def update_user(db: Session, user_id: int, user_update: UserUpdate):
    user_data = user_update.dict()
    set_clause = ", ".join([f"{key} = :{key}" for key in user_data])
    sql = text(f"UPDATE users SET {set_clause} WHERE id = :user_id")
    user_data["user_id"] = user_id
    db.execute(sql, user_data)
    db.commit()

    # 업데이트된 사용자 정보 다시 가져오기
    return get_user_id(db, user_id)  # 이전에 정의한 get_user_id 함수를 사용

def delete_user(db: Session, user_id: int):
    sql = text("DELETE FROM users WHERE id = :user_id")
    db.execute(sql, {"user_id": user_id})
    db.commit()
    return {"id": user_id}

def get_item(db: Session, item_id: int):
    sql = text("SELECT * FROM items WHERE id = :item_id")
    result = db.execute(sql, {"item_id": item_id}).fetchone()
    return result._asdict()

def get_items(db: Session, skip: int = 0, limit: int = 10):
    sql = text("SELECT * FROM items ORDER BY id DESC LIMIT :limit OFFSET :skip")
    result = db.execute(sql, {"limit": limit, "skip": skip}).fetchall()
    return [item._asdict() for item in result]

def create_item(db: Session, item: ItemCreate, owner_id: int):
    # owner_id가 유효한지 확인
    owner_exists = db.execute(
        text("SELECT id FROM users WHERE id = :owner_id"),
        {"owner_id": owner_id}
    ).fetchone()

    if not owner_exists:
        raise ValueError("유효하지 않은 owner_id입니다.")

    item_data = item.dict()
    item_data["owner_id"] = owner_id
    # 아래의 'column1', 'column2', ... 은 실제 열 이름으로 바꾸세요.
    sql = text("INSERT INTO items (title, description, owner_id) VALUES (:title, :description, :owner_id)")
    db.execute(sql, item_data)
    db.commit()

    last_id_query = text("SELECT LAST_INSERT_ID()")
    last_id = db.execute(last_id_query).scalar()

    item_query = text("SELECT * FROM items WHERE id = :last_id")
    item_result = db.execute(item_query, {"last_id": last_id}).fetchone()
    return item_result._asdict()

def update_item(db: Session, item_id: int, item_update: ItemUpdate):
    item_data = item_update.dict()
    set_clause = ", ".join([f"{key} = :{key}" for key in item_data])
    sql = text(f"UPDATE items SET {set_clause} WHERE id = :item_id RETURNING *")
    item_data["item_id"] = item_id
    result = db.execute(sql, item_data)
    db.commit()
    return result.fetchone()

def delete_item(db: Session, item_id: int):
    sql = text("DELETE FROM items WHERE id = :item_id")
    result = db.execute(sql, {"item_id": item_id})
    db.commit()
    return {"status": "success", "id": item_id}

update_user 함수 실행시 MysSQL 테이블에 CASCADE 관련 옵션 적용이 필요하다.

ALTER TABLE items
ADD CONSTRAINT fk_items_user_id
FOREIGN KEY (owner_id)
REFERENCES users(id)
ON DELETE CASCADE;

[2일차 후기]

Flask부터 Django, FastAPI에서 API 만들고 model 만들고 하는 과정을 계속 반복하다보니 익숙해진거 같기도 하다.
각각의 기능이나 함수들에 대해서는 좀 더 사용해봐야지 제대로 알 수 있을거 같다.
Django에 비해 migrate가 없어서 바로 바로 서버에서 확인 가능하다는 점은 좋지만 MySQL Workbench에서 확인해야한다는 점은 살짝의 불편함이 있을 수 있다.


[참고 자료]

  • [오즈스쿨 스타트업 웹 개발 초격차캠프 백엔드 Fast API 실시간 강의]

  • 강사님 Github

profile
백엔드 코린이😁

0개의 댓글

관련 채용 정보