> pip install sqlalchemy
> pip install pymysql
pymysql
은 순수 파이썬으로 작성된 라이브러리이며, mysqlclient
와 API 호환성을 유지한다.mysqlclient
에 비해 설치가 간단하고, 특정 시스템에 대한 이진 의존성이 없다. 성능은 mysqlclient
보다 약간 떨어질 수 있지만, 대부분의 일반적인 사용 사례에서 충분히 좋은 성능을 제공한다.
> pip install aiomysql
aiomysql
은 pymysql
을 기반으로 하는 비동기 프로그래밍(특히 asyncio
를 사용하는 경우)을 위한 라이브러리다. aiomysql
을 사용하면 비동기 I/O를 통해 데이터베이스 연산을 수행할 수 있어 애플리케이션의 동시성을 향상시킬 수 있다.
mysqlclient
를 이미 설치하셨다면, SQLAlchemy와 함께 사용할 수 있다. 하지만 비동기 코드를 작성하고자 한다면 aiomysql
를 설치해야 한다. pymysql
과 aiomysql
를 동시에 설치하는 것은 필요하지 않지만, aiomysql
는 내부적으로 pymysql
의 일부 코드를 사용하기 때문에 pymysql
이 필요할 수 있다.
├── main.py
├── dependencies.py
├── routers
│ ├── users.py
│ └── items.py
├── crud.py
├── models.py
├── schemas.py # 구조 검증때 사용
└── database.py
이 구조에서 routers
디렉터리에는 각 기능별 라우트를 정의하는 파일이 포함된다.
아래 파일 순서는 만드는 과정에 따라 정리했다.
데이터베이스 테이블에 매핑될 모델 정의
# models.py - DB table column 정의
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from database import Base
# User(table)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True)
hashed_password = Column()
items = relationship("Item", back_populates="owner")
# Item(table)
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True, index=True)
title = Column(String)
description = Column(String)
owner_id = Column(Integer, ForeignKey("users.id"))
owner = relationship("User", back_populates="items")
데이터베이스 연결과 세션 관리
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# (1) 비동기 방식 - Starlette
# (2) 데이터 검증 - pydantic
# 동기용 데이터 베이스 설정 (pymysql)
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:비밀번호@localhost/oz_fastapi"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(bind=engine)
# 비동기용 데이터 베이스 설정 (aiomysql)
# - 무거운 I/O 요청(5초)이 먼저 와도, 뒤에 가벼운 I/O 작업 요청(1초)이 들어오면 더 빨리 끝나는 것이 응답된다.
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
ASYNC_SQLALCHEMY_DATABASE_URL = "mysql+aiomysql://root:비밀번호@localhost/oz_fastapi"
async_engine = create_async_engine(ASYNC_SQLALCHEMY_DATABASE_URL)
AsyncSessionLocal = sessionmaker(bind=async_engine, class_=AsyncSession)
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
from sqlalchemy.orm import Session
from models import User, Item
from schemas import UserCreate, UserUpdate, ItemCreate, ItemUpdate
import bcrypt
# User - CRUD
def create_user(db: Session, user: UserCreate):
hashed_password = bcrypt.hashpw(user.password.encode("utf-8"), bcrypt.gensalt()) # bite 형태라 encode 필요!
db_user = User(email=user.email, hashed_password=hashed_password)
db.add(db_user)
db.commit()
return db_user
def get_user_id(db: Session, user_id: int):
return db.query(User).filter(User.id == user_id).first()
def get_user_email(db: Session, user_email: str):
return db.query(User).filter(User.email == user_email).first()
def get_users(db: Session, skip: int = 0, limit: int = 10):
return db.query(User).offset(skip).limit(limit).all()
def update_user(db: Session, user_id: int, user_update: UserUpdate):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
return None
user_data = user_update.dict()
for key, value in user_data.items():
setattr(db_user, key, value)
db.commit()
db.refresh(db_user)
return db_user
def delete_user(db: Session, user_id: int):
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
return None
db.delete(db_user)
db.commit()
return db_user
# Item - CRUD
def create_item(db: Session, item: ItemCreate, owner_id: int):
db_item = Item(**item.dict(), owner_id=owner_id)
db.add(db_item)
db.commit()
db.refresh(db_item)
return db_item
def get_item(db: Session, item_id: int):
return db.query(Item).filter(Item.id == item_id).first()
def get_items(db: Session, skip: int = 0, limit: int = 10):
return db.query(Item).offset(skip).limit(limit).all()
def update_item(db: Session, item_id: int, item_update: ItemUpdate):
db_item = db.query(Item).filter(Item.id == item_id).first()
if not db_item:
return None
for key, value in item_update.dict().items():
setattr(db_item, key, value)
db.commit()
db.refresh(db_item)
return db_item
def delete_item(db: Session, item_id: int):
db_item = db.query(Item).filter(Item.id == item_id).first()
if not db_item:
return None
db.delete(db_item)
db.commit()
return db_item
공통적으로 사용될 의존성을 정의
from database import SessionLocal, AsyncSessionLocal
# 동기용 의존성
def get_db():
db = SessionLocal()
try:
# Generator
yield db
finally:
db.close()
# 비동기용 의존성
async def get_async_db():
async with AsyncSessionLocal() as session:
yield session
API의 요청 및 응답 데이터 Validation
from pydantic import BaseModel
from typing import List, Optional
# schemas/item.py
# schemas/user.py
# pydantic -> 데이터 유효성 검증
# Item
class ItemBase(BaseModel):
title: str
description: str
class ItemCreate(ItemBase):
pass
class ItemUpdate(ItemBase):
title: Optional[str] = None
description: Optional[str] = None
class Item(BaseModel):
id: int
owner_id: int
class Config:
orm_mode = True # orm 방식으로 데이터 필드 읽기가 가능
# User
class UserBase(BaseModel):
email: str
class User(UserBase):
id: int
email: str
items: List[Item] = []
class Config:
orm_mode = True
class UserCreate(UserBase):
password: str
class UserUpdate(UserBase):
email: str | None = None
password: str | None = None
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from dependencies import get_db
from schemas import UserCreate, UserUpdate
from typing import Union
import crud
router = APIRouter()
# CRUD
@router.post("/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = crud.create_user(db, user)
return db_user
# api/v1/users/{user_id}
@router.get("/{user_data}") # user_id 문자와 숫자 어떤거든 입력하여 검색 가능하도록
def get_user(user_data: Union[int, str], db: Session = Depends(get_db)):
try: # user_data type이 int인 경우
user_data = int(user_data)
db_user = crud.get_user_id(db, user_data)
except: # user_data type이 str인 경우
db_user = crud.get_user_email(db, user_data)
if db_user is None:
raise HTTPException(status_code=404, detail="User Not Found")
return db_user
# api/v1/users/
@router.get("/")
def get_users(skip: int, limit: int, db: Session = Depends(get_db)):
return crud.get_users(db, skip, limit)
# api/v1/users/{user_id}
@router.put("/{user_id}")
def update_user(user_id: int, user: UserUpdate, db: Session = Depends(get_db)):
updated_user = crud.update_user(db, user_id, user)
if updated_user is None:
raise HTTPException(status_code=404, detail="User Not Found")
return updated_user
@router.delete("/{user_id}")
def delete_user(user_id: int, db: Session = Depends(get_db)):
deleted_user = crud.delete_user(db, user_id)
if delete_user is None:
raise HTTPException(status_code=404, detail="User Not Found")
return deleted_user
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from dependencies import get_db
from schemas import ItemCreate, ItemUpdate
from typing import Union
import crud
router = APIRouter()
# CRUD
@router.post("/")
def create_item(item: ItemCreate, owner_id: int, db: Session = Depends(get_db)):
db_item = crud.create_item(db, item, owner_id)
return db_item
# api/v1/users/{item_id}
@router.get("/{item_id}")
def get_item(item_id: int, db: Session = Depends(get_db)):
db_item = crud.get_item(db, item_id)
if db_item is None:
raise HTTPException(status_code=404, detail="Item Not Found")
return db_item
@router.get("/")
def get_items(skip: int=0, limit: int=10, db: Session = Depends(get_db)):
return crud.get_items(db, skip, limit)
@router.put("/{item_id}")
def update_item(item_id: int, item: ItemUpdate, db: Session = Depends(get_db)):
updated_item = crud.update_item(db, item_id, item)
if updated_item is None:
raise HTTPException(status_code=404, detail="Item Not Found")
return updated_item
@router.delete("/{item_id}")
def delete_item(item_id: int, db: Session = Depends(get_db)):
is_success = crud.delete_item(db, item_id)
if not is_success:
raise HTTPException(status_code=404, detail="Item Not Found")
return {'msg': 'Item deleted successfully'}
from fastapi import FastAPI
from routers.users import router as user_router
from routers.items import router as item_router
app = FastAPI()
app.include_router(user_router, prefix="/api/v1/users", tags=["User"])
app.include_router(item_router, prefix="/api/v1/items", tags=["Item"])
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", reload=True)
MySQL Workbench에 접속하여 database.py
파일에서 적었던 schema를 만들어주고, (나는 oz_fastapi
로 만들었다.) 아래 코드로 users
와 items
table을 만들어준다.
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
hashed_password VARCHAR(255) NOT NULL
);
CREATE TABLE items (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255),
description VARCHAR(255),
owner_id INT,
FOREIGN KEY (owner_id) REFERENCES users(id)
);
동기 버전의 함수는 기존과 동일하게 Session
객체를 사용한다.
from sqlalchemy.orm import Session
from . import models
def get_user(db: Session, user_id: int):
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
비동기 버전의 함수는 AsyncSession
객체를 사용하고 await
키워드로 비동기 쿼리를 수행한다. SQLAlchemy 1.4 이상에서 select()
함수를 사용하여 ORM 스타일의 쿼리를 생성할 수 있다.
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from . import models
async def get_user_async(db: AsyncSession, user_id: int):
result = await db.execute(select(models.User).where(models.User.id == user_id))
return result.scalars().first()
async def get_user_by_email_async(db: AsyncSession, email: str):
result = await db.execute(select(models.User).where(models.User.email == email))
return result.scalars().first()
비동기 버전에서는 db.execute
에 await
를 사용해 쿼리 실행을 비동기적으로 처리하고, result.scalars().first()
로 결과를 추출한다.
전체 코드는 ORM과 동일하며, crud.py 파일만 수정하면 된다.
from models import User, Item
from schemas import UserCreate, UserUpdate, ItemCreate, ItemUpdate
from sqlalchemy.orm import Session
import bcrypt # pip insall bcrypt
from sqlalchemy import text # 추가
def create_user(db: Session, user: UserCreate):
hashed_password = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt())
sql = text("INSERT INTO users (email, hashed_password) VALUES (:email, :hashed_password)")
db.execute(sql, {"email": user.email, "hashed_password": hashed_password})
db.commit()
last_id_query = text("SELECT LAST_INSERT_ID()")
last_id_result = db.execute(last_id_query)
last_id = last_id_result.scalar()
user_query = text("SELECT * FROM users WHERE id = :last_id")
user_result = db.execute(user_query, {"last_id": last_id}).fetchone()
return user_result._asdict()
# user의 id값을 기반으로 데이터를 찾는다.
def get_user_id(db: Session, user_id: int):
sql = text("SELECT * FROM users WHERE id = :user_id")
result = db.execute(sql, {"user_id": user_id}).fetchone()
return result._asdict()
# user의 email값을 기반으로 데이터를 찾는다.
def get_user_email(db: Session, user_email: str):
sql = text("SELECT * FROM users WHERE email = :user_email")
result = db.execute(sql, {"user_email": user_email}).fetchone()
return result._asdict()
# 전체 유저를 불러와 봅시다. (페이지 네이션)
def get_users(db: Session, skip: int = 0, limit: int = 10):
user_sql = text("SELECT * FROM users LIMIT :limit OFFSET :skip")
user_results = db.execute(user_sql, {"limit": limit, "skip": skip}).fetchall()
users = [row._asdict() for row in user_results]
for user in users:
item_sql = text("SELECT * FROM items WHERE owner_id = :owner_id")
item_results = db.execute(item_sql, {"owner_id": user['id']}).fetchall()
user['items'] = [item._asdict() for item in item_results]
return users
def update_user(db: Session, user_id: int, user_update: UserUpdate):
user_data = user_update.dict()
set_clause = ", ".join([f"{key} = :{key}" for key in user_data])
sql = text(f"UPDATE users SET {set_clause} WHERE id = :user_id")
user_data["user_id"] = user_id
db.execute(sql, user_data)
db.commit()
# 업데이트된 사용자 정보 다시 가져오기
return get_user_id(db, user_id) # 이전에 정의한 get_user_id 함수를 사용
def delete_user(db: Session, user_id: int):
sql = text("DELETE FROM users WHERE id = :user_id")
db.execute(sql, {"user_id": user_id})
db.commit()
return {"id": user_id}
def get_item(db: Session, item_id: int):
sql = text("SELECT * FROM items WHERE id = :item_id")
result = db.execute(sql, {"item_id": item_id}).fetchone()
return result._asdict()
def get_items(db: Session, skip: int = 0, limit: int = 10):
sql = text("SELECT * FROM items ORDER BY id DESC LIMIT :limit OFFSET :skip")
result = db.execute(sql, {"limit": limit, "skip": skip}).fetchall()
return [item._asdict() for item in result]
def create_item(db: Session, item: ItemCreate, owner_id: int):
# owner_id가 유효한지 확인
owner_exists = db.execute(
text("SELECT id FROM users WHERE id = :owner_id"),
{"owner_id": owner_id}
).fetchone()
if not owner_exists:
raise ValueError("유효하지 않은 owner_id입니다.")
item_data = item.dict()
item_data["owner_id"] = owner_id
# 아래의 'column1', 'column2', ... 은 실제 열 이름으로 바꾸세요.
sql = text("INSERT INTO items (title, description, owner_id) VALUES (:title, :description, :owner_id)")
db.execute(sql, item_data)
db.commit()
last_id_query = text("SELECT LAST_INSERT_ID()")
last_id = db.execute(last_id_query).scalar()
item_query = text("SELECT * FROM items WHERE id = :last_id")
item_result = db.execute(item_query, {"last_id": last_id}).fetchone()
return item_result._asdict()
def update_item(db: Session, item_id: int, item_update: ItemUpdate):
item_data = item_update.dict()
set_clause = ", ".join([f"{key} = :{key}" for key in item_data])
sql = text(f"UPDATE items SET {set_clause} WHERE id = :item_id RETURNING *")
item_data["item_id"] = item_id
result = db.execute(sql, item_data)
db.commit()
return result.fetchone()
def delete_item(db: Session, item_id: int):
sql = text("DELETE FROM items WHERE id = :item_id")
result = db.execute(sql, {"item_id": item_id})
db.commit()
return {"status": "success", "id": item_id}
update_user 함수 실행시 MysSQL 테이블에 CASCADE 관련 옵션 적용이 필요하다.
ALTER TABLE items
ADD CONSTRAINT fk_items_user_id
FOREIGN KEY (owner_id)
REFERENCES users(id)
ON DELETE CASCADE;
Flask부터 Django, FastAPI에서 API 만들고 model 만들고 하는 과정을 계속 반복하다보니 익숙해진거 같기도 하다.
각각의 기능이나 함수들에 대해서는 좀 더 사용해봐야지 제대로 알 수 있을거 같다.
Django에 비해 migrate가 없어서 바로 바로 서버에서 확인 가능하다는 점은 좋지만 MySQL Workbench에서 확인해야한다는 점은 살짝의 불편함이 있을 수 있다.
[오즈스쿨 스타트업 웹 개발 초격차캠프 백엔드 Fast API 실시간 강의]