> pip install sqlalchemy
> pip install pymysql
pymysql은 순수 파이썬으로 작성된 라이브러리이며, mysqlclient와 API 호환성을 유지한다.mysqlclient에 비해 설치가 간단하고, 특정 시스템에 대한 이진 의존성이 없다. 성능은 mysqlclient보다 약간 떨어질 수 있지만, 대부분의 일반적인 사용 사례에서 충분히 좋은 성능을 제공한다.
> pip install aiomysql
aiomysql은 pymysql을 기반으로 하는 비동기 프로그래밍(특히 asyncio를 사용하는 경우)을 위한 라이브러리다. aiomysql을 사용하면 비동기 I/O를 통해 데이터베이스 연산을 수행할 수 있어 애플리케이션의 동시성을 향상시킬 수 있다.
mysqlclient를 이미 설치하셨다면, SQLAlchemy와 함께 사용할 수 있다. 하지만 비동기 코드를 작성하고자 한다면 aiomysql를 설치해야 한다. pymysql과 aiomysql를 동시에 설치하는 것은 필요하지 않지만, aiomysql는 내부적으로 pymysql의 일부 코드를 사용하기 때문에 pymysql이 필요할 수 있다.

├── main.py
├── dependencies.py
├── routers
│ ├── users.py
│ └── items.py
├── crud.py
├── models.py
├── schemas.py # 구조 검증때 사용
└── database.py
이 구조에서 routers 디렉터리에는 각 기능별 라우트를 정의하는 파일이 포함된다.
아래 파일 순서는 만드는 과정에 따라 정리했다.
데이터베이스 테이블에 매핑될 모델 정의
# models.py - DB table column 정의
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from database import Base
# User(table)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True)
hashed_password = Column()
items = relationship("Item", back_populates="owner")
# Item(table)
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String)
description = Column(String)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
데이터베이스 연결과 세션 관리
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# (1) 비동기 방식 - Starlette
# (2) 데이터 검증 - pydantic
# 동기용 데이터 베이스 설정 (pymysql)
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:비밀번호@localhost/oz_fastapi"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
# 비동기용 데이터 베이스 설정 (aiomysql)
# - 무거운 I/O 요청(5초)이 먼저 와도, 뒤에 가벼운 I/O 작업 요청(1초)이 들어오면 더 빨리 끝나는 것이 응답된다.
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
ASYNC_SQLALCHEMY_DATABASE_URL = "mysql+aiomysql://root:비밀번호@localhost/oz_fastapi"
async_engine = create_async_engine(ASYNC_SQLALCHEMY_DATABASE_URL)
AsyncSessionLocal = sessionmaker(bind=async_engine, class_=AsyncSession)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from sqlalchemy.orm import Session
from models import User, Item
from schemas import UserCreate, UserUpdate, ItemCreate, ItemUpdate
import bcrypt
# User - CRUD
def create_user(db: Session, user: UserCreate):
hashed_password = bcrypt.hashpw(user.password.encode("utf-8"), bcrypt.gensalt()) # bite 형태라 encode 필요!
db_user = User(email=user.email, hashed_password=hashed_password)
db.add(db_user)
db.commit()
return db_user
def get_user_id(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def get_user_email(db: Session, user_email: str):
return db.query(User).filter(User.email == user_email).first()
def get_users(db: Session, skip: int = 0, limit: int = 10):
return db.query(User).offset(skip).limit(limit).all()
def update_user(db: Session, user_id: int, user_update: UserUpdate):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
return None
user_data = user_update.dict()
for key, value in user_data.items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user
def delete_user(db: Session, user_id: int):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
return None
db.delete(db_user)
db.commit()
return db_user
# Item - CRUD
def create_item(db: Session, item: ItemCreate, owner_id: int):
db_item = Item(**item.dict(), owner_id=owner_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
def get_item(db: Session, item_id: int):
return db.query(Item).filter(Item.id == item_id).first()
def get_items(db: Session, skip: int = 0, limit: int = 10):
return db.query(Item).offset(skip).limit(limit).all()
def update_item(db: Session, item_id: int, item_update: ItemUpdate):
db_item = db.query(Item).filter(Item.id == item_id).first()
if not db_item:
return None
for key, value in item_update.dict().items():
setattr(db_item, key, value)
db.commit()
db.refresh(db_item)
return db_item
def delete_item(db: Session, item_id: int):
db_item = db.query(Item).filter(Item.id == item_id).first()
if not db_item:
return None
db.delete(db_item)
db.commit()
return db_item
공통적으로 사용될 의존성을 정의
from database import SessionLocal, AsyncSessionLocal
# 동기용 의존성
def get_db():
db = SessionLocal()
try:
# Generator
yield db
finally:
db.close()
# 비동기용 의존성
async def get_async_db():
async with AsyncSessionLocal() as session:
yield session
API의 요청 및 응답 데이터 Validation
from pydantic import BaseModel
from typing import List, Optional
# schemas/item.py
# schemas/user.py
# pydantic -> 데이터 유효성 검증
# Item
class ItemBase(BaseModel):
title: str
description: str
class ItemCreate(ItemBase):
pass
class ItemUpdate(ItemBase):
title: Optional[str] = None
description: Optional[str] = None
class Item(BaseModel):
id: int
owner_id: int
class Config:
orm_mode = True # orm 방식으로 데이터 필드 읽기가 가능
# User
class UserBase(BaseModel):
email: str
class User(UserBase):
id: int
email: str
items: List[Item] = []
class Config:
orm_mode = True
class UserCreate(UserBase):
password: str
class UserUpdate(UserBase):
email: str | None = None
password: str | None = None
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from dependencies import get_db
from schemas import UserCreate, UserUpdate
from typing import Union
import crud
router = APIRouter()
# CRUD
@router.post("/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = crud.create_user(db, user)
return db_user
# api/v1/users/{user_id}
@router.get("/{user_data}") # user_id 문자와 숫자 어떤거든 입력하여 검색 가능하도록
def get_user(user_data: Union[int, str], db: Session = Depends(get_db)):
try: # user_data type이 int인 경우
user_data = int(user_data)
db_user = crud.get_user_id(db, user_data)
except: # user_data type이 str인 경우
db_user = crud.get_user_email(db, user_data)
if db_user is None:
raise HTTPException(status_code=404, detail="User Not Found")
return db_user
# api/v1/users/
@router.get("/")
def get_users(skip: int, limit: int, db: Session = Depends(get_db)):
return crud.get_users(db, skip, limit)
# api/v1/users/{user_id}
@router.put("/{user_id}")
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
updated_user = crud.update_user(db, user_id, user)
if updated_user is None:
raise HTTPException(status_code=404, detail="User Not Found")
return updated_user
@router.delete("/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
deleted_user = crud.delete_user(db, user_id)
if delete_user is None:
raise HTTPException(status_code=404, detail="User Not Found")
return deleted_user
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from dependencies import get_db
from schemas import ItemCreate, ItemUpdate
from typing import Union
import crud
router = APIRouter()
# CRUD
@router.post("/")
def create_item(item: ItemCreate, owner_id: int, db: Session = Depends(get_db)):
db_item = crud.create_item(db, item, owner_id)
return db_item
# api/v1/users/{item_id}
@router.get("/{item_id}")
def get_item(item_id: int, db: Session = Depends(get_db)):
db_item = crud.get_item(db, item_id)
if db_item is None:
raise HTTPException(status_code=404, detail="Item Not Found")
return db_item
@router.get("/")
def get_items(skip: int=0, limit: int=10, db: Session = Depends(get_db)):
return crud.get_items(db, skip, limit)
@router.put("/{item_id}")
def update_item(item_id: int, item: ItemUpdate, db: Session = Depends(get_db)):
updated_item = crud.update_item(db, item_id, item)
if updated_item is None:
raise HTTPException(status_code=404, detail="Item Not Found")
return updated_item
@router.delete("/{item_id}")
def delete_item(item_id: int, db: Session = Depends(get_db)):
is_success = crud.delete_item(db, item_id)
if not is_success:
raise HTTPException(status_code=404, detail="Item Not Found")
return {'msg': 'Item deleted successfully'}
from fastapi import FastAPI
from routers.users import router as user_router
from routers.items import router as item_router
app = FastAPI()
app.include_router(user_router, prefix="/api/v1/users", tags=["User"])
app.include_router(item_router, prefix="/api/v1/items", tags=["Item"])
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", reload=True)
MySQL Workbench에 접속하여 database.py파일에서 적었던 schema를 만들어주고, (나는 oz_fastapi로 만들었다.) 아래 코드로 users와 items table을 만들어준다.
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
hashed_password VARCHAR(255) NOT NULL
);
CREATE TABLE items (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255),
description VARCHAR(255),
owner_id INT,
FOREIGN KEY (owner_id) REFERENCES users(id)
);
동기 버전의 함수는 기존과 동일하게 Session 객체를 사용한다.
from sqlalchemy.orm import Session
from . import models
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
비동기 버전의 함수는 AsyncSession 객체를 사용하고 await 키워드로 비동기 쿼리를 수행한다. SQLAlchemy 1.4 이상에서 select() 함수를 사용하여 ORM 스타일의 쿼리를 생성할 수 있다.
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from . import models
async def get_user_async(db: AsyncSession, user_id: int):
result = await db.execute(select(models.User).where(models.User.id == user_id))
return result.scalars().first()
async def get_user_by_email_async(db: AsyncSession, email: str):
result = await db.execute(select(models.User).where(models.User.email == email))
return result.scalars().first()
비동기 버전에서는 db.execute에 await를 사용해 쿼리 실행을 비동기적으로 처리하고, result.scalars().first()로 결과를 추출한다.
전체 코드는 ORM과 동일하며, crud.py 파일만 수정하면 된다.
from models import User, Item
from schemas import UserCreate, UserUpdate, ItemCreate, ItemUpdate
from sqlalchemy.orm import Session
import bcrypt # pip insall bcrypt
from sqlalchemy import text # 추가
def create_user(db: Session, user: UserCreate):
hashed_password = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt())
sql = text("INSERT INTO users (email, hashed_password) VALUES (:email, :hashed_password)")
db.execute(sql, {"email": user.email, "hashed_password": hashed_password})
db.commit()
last_id_query = text("SELECT LAST_INSERT_ID()")
last_id_result = db.execute(last_id_query)
last_id = last_id_result.scalar()
user_query = text("SELECT * FROM users WHERE id = :last_id")
user_result = db.execute(user_query, {"last_id": last_id}).fetchone()
return user_result._asdict()
# user의 id값을 기반으로 데이터를 찾는다.
def get_user_id(db: Session, user_id: int):
sql = text("SELECT * FROM users WHERE id = :user_id")
result = db.execute(sql, {"user_id": user_id}).fetchone()
return result._asdict()
# user의 email값을 기반으로 데이터를 찾는다.
def get_user_email(db: Session, user_email: str):
sql = text("SELECT * FROM users WHERE email = :user_email")
result = db.execute(sql, {"user_email": user_email}).fetchone()
return result._asdict()
# 전체 유저를 불러와 봅시다. (페이지 네이션)
def get_users(db: Session, skip: int = 0, limit: int = 10):
user_sql = text("SELECT * FROM users LIMIT :limit OFFSET :skip")
user_results = db.execute(user_sql, {"limit": limit, "skip": skip}).fetchall()
users = [row._asdict() for row in user_results]
for user in users:
item_sql = text("SELECT * FROM items WHERE owner_id = :owner_id")
item_results = db.execute(item_sql, {"owner_id": user['id']}).fetchall()
user['items'] = [item._asdict() for item in item_results]
return users
def update_user(db: Session, user_id: int, user_update: UserUpdate):
user_data = user_update.dict()
set_clause = ", ".join([f"{key} = :{key}" for key in user_data])
sql = text(f"UPDATE users SET {set_clause} WHERE id = :user_id")
user_data["user_id"] = user_id
db.execute(sql, user_data)
db.commit()
# 업데이트된 사용자 정보 다시 가져오기
return get_user_id(db, user_id) # 이전에 정의한 get_user_id 함수를 사용
def delete_user(db: Session, user_id: int):
sql = text("DELETE FROM users WHERE id = :user_id")
db.execute(sql, {"user_id": user_id})
db.commit()
return {"id": user_id}
def get_item(db: Session, item_id: int):
sql = text("SELECT * FROM items WHERE id = :item_id")
result = db.execute(sql, {"item_id": item_id}).fetchone()
return result._asdict()
def get_items(db: Session, skip: int = 0, limit: int = 10):
sql = text("SELECT * FROM items ORDER BY id DESC LIMIT :limit OFFSET :skip")
result = db.execute(sql, {"limit": limit, "skip": skip}).fetchall()
return [item._asdict() for item in result]
def create_item(db: Session, item: ItemCreate, owner_id: int):
# owner_id가 유효한지 확인
owner_exists = db.execute(
text("SELECT id FROM users WHERE id = :owner_id"),
{"owner_id": owner_id}
).fetchone()
if not owner_exists:
raise ValueError("유효하지 않은 owner_id입니다.")
item_data = item.dict()
item_data["owner_id"] = owner_id
# 아래의 'column1', 'column2', ... 은 실제 열 이름으로 바꾸세요.
sql = text("INSERT INTO items (title, description, owner_id) VALUES (:title, :description, :owner_id)")
db.execute(sql, item_data)
db.commit()
last_id_query = text("SELECT LAST_INSERT_ID()")
last_id = db.execute(last_id_query).scalar()
item_query = text("SELECT * FROM items WHERE id = :last_id")
item_result = db.execute(item_query, {"last_id": last_id}).fetchone()
return item_result._asdict()
def update_item(db: Session, item_id: int, item_update: ItemUpdate):
item_data = item_update.dict()
set_clause = ", ".join([f"{key} = :{key}" for key in item_data])
sql = text(f"UPDATE items SET {set_clause} WHERE id = :item_id RETURNING *")
item_data["item_id"] = item_id
result = db.execute(sql, item_data)
db.commit()
return result.fetchone()
def delete_item(db: Session, item_id: int):
sql = text("DELETE FROM items WHERE id = :item_id")
result = db.execute(sql, {"item_id": item_id})
db.commit()
return {"status": "success", "id": item_id}
update_user 함수 실행시 MysSQL 테이블에 CASCADE 관련 옵션 적용이 필요하다.
ALTER TABLE items
ADD CONSTRAINT fk_items_user_id
FOREIGN KEY (owner_id)
REFERENCES users(id)
ON DELETE CASCADE;
Flask부터 Django, FastAPI에서 API 만들고 model 만들고 하는 과정을 계속 반복하다보니 익숙해진거 같기도 하다.
각각의 기능이나 함수들에 대해서는 좀 더 사용해봐야지 제대로 알 수 있을거 같다.
Django에 비해 migrate가 없어서 바로 바로 서버에서 확인 가능하다는 점은 좋지만 MySQL Workbench에서 확인해야한다는 점은 살짝의 불편함이 있을 수 있다.
[오즈스쿨 스타트업 웹 개발 초격차캠프 백엔드 Fast API 실시간 강의]