
pip install sqlalchemy psycopg2 asyncpg pydantic alembic
| 라이브러리 | 하는 일 | 키워드 |
|---|---|---|
| SQLAlchemy | DB 테이블을 Python으로 다룸 (ORM) | 테이블, 모델, 세션 |
| psycopg2 | PostgreSQL과 실제 연결 | 드라이버, PostgreSQL |
| asyncpg | PostgreSQL과 실제 연결 (비동기) | 드라이버, PostgreSQL |
| pydantic | 데이터 검증(Data Validation)과 직렬화(Serialization)를 자동으로 해주는 Python 라이브러리 | Validation, 직렬화(Serialization) |
| Alembic | 테이블 구조 변경을 추적하고 반영 | 마이그레이션, 자동화 |
SAMPLE CODE
database.py
------------------------------------------------
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
# SQLite
#=======================================================================================
# SQLAlchemy 엔진 생성 (동기 방식)
engine = create_engine("sqlite:///./{DatabaseName}.db", echo=True) #echo=True > DB 작업 시 로그 출력
#=======================================================================================
# PostgreSQL
# 접속 URL
DATABASE_URL = "postgresql+psycopg2://{Username}:{Password}@{Host}:{Port}/{DatabaseName}"
# SQLAlchemy 엔진 생성
engine = create_engine(
DATABASE_URL,
echo=True # SQL 로그 출력
)
#=======================================================================================
# sessionmaker는 세션을 생성해주는 팩토리 함수
SessionLocal = sessionmaker(
bind=engine, # DB 엔진에 연결
class_=Session, # 사용할 세션 클래스 (기본값이므로 생략 가능)
expire_on_commit=False # 커밋 후 객체의 속성을 만료시키지 않음 (재조회 없이 사용 가능)
)
# 모든 모델이 상속할 Base 클래스 정의
Base = declarative_base()
# FastAPI 의존성 주입용 함수
def get_db():
db: Session = SessionLocal() # 세션 인스턴스 생성
try:
yield db # 세션을 외부에 제공
finally:
db.close() # 요청 종료 시 세션 닫기
# 테이블 자동 생성 함수 (동기)
def create_tables():
try:
Base.metadata.create_all(bind=engine)
print("테이블 생성 완료")
except Exception as e:
print(f"테이블 생성 중 오류 발생: {e}")
database.py
------------------------------------------------
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
# SQLite
#=======================================================================================
# SQLAlchemy 비동기 엔진 생성
engine = create_async_engine("sqlite+aiosqlite:///./{DatabaseName}.db", echo=True) #echo=True > DB 작업 시 로그 출력
#=======================================================================================
# PostgreSQL
# PostgreSQL 접속 URL (asyncpg 드라이버 사용)
DATABASE_URL = "postgresql+asyncpg://{Username}:{Password}@{Host}:{Port}/{DatabaseName}"
# Async 엔진 생성
engine = create_async_engine(
DATABASE_URL,
echo=True # SQL 로그 출력
)
#=======================================================================================
# 비동기 세션 팩토리 생성
AsyncSessionLocal = sessionmaker(
bind=engine, # 비동기 DB 엔진에 연결
class_=AsyncSession, # 사용할 세션 클래스 (비동기용)
expire_on_commit=False # 커밋 후에도 객체 속성 유지
)
# 모든 모델이 상속할 Base 클래스 정의
Base = declarative_base()
# FastAPI 의존성 주입용 비동기 함수
@asynccontextmanager
async def get_async_db():
async with AsyncSessionLocal() as session:
yield session # 세션 제공 (자동으로 닫힘)
# 테이블 자동 생성
async def create_tables():
try:
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
except Exception as e:
print(f'Create Table 시도 중 오류 발생 : {e}')
| 항목 | 동기(Sync) 방식 | 비동기(Async) 방식 |
|---|---|---|
| 사용 드라이버 | psycopg2 | asyncpg |
| 엔진 생성 | create_engine() | create_async_engine() |
| 세션 클래스 | Session | AsyncSession |
| 세션 팩토리 | SessionLocal = sessionmaker(...) | AsyncSessionLocal = sessionmaker(...) |
| 커밋 설정 | autocommit=False | expire_on_commit=False |
| 주 용도 | 일반적인 Python / FastAPI 동기 API | FastAPI 비동기 API (async def) 환경에 적합 |
| 사용 예시 | yield db | async with ...: yield session |
| 장점 | 설정이 간단하고 익숙함 | 비동기 처리에 적합, 빠른 응답성 |
| 단점 | 동시성 처리에 한계 | 드라이버/모듈 호환성 신경 써야 함 |
models.py는 데이터베이스 테이블과 매핑되는 ORM 모델 클래스들을 정의하는 파일.
SQLAlchemy를 이용하여 Python 클래스로 DB 테이블 구조를 선언하고, FastAPI에서 CRUD 등의 데이터 조작에 활용.
SAMPLE CODE
models.py
------------------------------------------------
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, Text
from sqlalchemy.orm import relationship
from database import Base
from datetime import datetime
# 사용자 테이블 정의
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# 관계 설정
boards = relationship("Board", back_populates="owner")
# 게시판 테이블 정의
class Board(Base):
__tablename__ = 'boards'
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
content = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# 외래 키 (User 테이블과 연결)
user_id = Column(Integer, ForeignKey("users.id"))
# 관계 설정
owner = relationship("User", back_populates="boards")
| 항목 | 설명 |
|---|---|
Base | SQLAlchemy ORM의 모든 모델이 상속받는 베이스 클래스 |
| 클래스 이름 | 테이블 이름에 대응. 보통 대문자 카멜표기 사용 (User, Board 등) |
__tablename__ | DB 테이블 이름 (소문자 + 복수형으로 하는 경우가 많음) |
| 컬럼 정의 | Column() 객체를 통해 필드 정의, 자료형, 제약조건 등 설정 가능 |
| 관계 설정 | relationship(), ForeignKey() 등을 사용해 테이블 간 관계 표현 |
schemas.py는 FastAPI에서 데이터 검증 및 직렬화(Serialization) 를 위해 Pydantic 모델을 정의하는 파일.
클라이언트와 서버 간의 데이터 구조를 정의하고, API 요청 및 응답 데이터를 검증하는 역할을 함.
| 항목 | 설명 |
|---|---|
BaseModel | 모든 Pydantic 스키마가 상속받는 기본 클래스 |
| 필드 정의 | str, int, datetime 등의 타입을 사용해 필드 정의 |
| 유효성 검사 | Field(), validator 등을 사용해 입력 값 검증 가능 |
| ORM 모드 | Config 내부에 orm_mode = True를 설정하여 SQLAlchemy 모델과 호환 |
| 요청/응답 | FastAPI의 요청(Request) 및 응답(Response) 스키마로 활용 |
SAMPLE CODE
schemas.py
------------------------------------------------
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Optional
# 요청(Request) 스키마
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: str
password: str = Field(..., min_length=6)
# 응답(Response) 스키마
class UserResponse(BaseModel):
id: int
username: str
email: str
created_at: datetime
class Config:
from_attributes = True # ORM 모드 활성화 (SQLAlchemy 모델과 호환)
# 업데이트(Update) 스키마 (Optional 필드 포함)
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[str] = None
crud.py는 FastAPI 프로젝트에서 DB와의 상호작용 로직을 모아놓은 파일로, 주로 다음 작업을 수행:
crud.py
------------------------------------------------
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from models import User
from schemas import UserCreate, UserUpdate
# 사용자 생성
async def create_user(db: AsyncSession, user_create: UserCreate):
new_user = User(**user_create.dict())
db.add(new_user)
await db.commit()
await db.refresh(new_user)
return new_user
# 사용자 조회 by ID
async def get_user(db: AsyncSession, user_id: int):
result = await db.execute(select(User).where(User.id == user_id))
return result.scalars().first()
# 전체 사용자 조회
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 10):
result = await db.execute(select(User).offset(skip).limit(limit))
return result.scalars().all()
# 사용자 수정
async def update_user(db: AsyncSession, user_id: int, user_update: UserUpdate):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalars().first()
if user:
for field, value in user_update.dict(exclude_unset=True).items():
setattr(user, field, value)
await db.commit()
await db.refresh(user)
return user
# 사용자 삭제
async def delete_user(db: AsyncSession, user_id: int):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalars().first()
if user:
await db.delete(user)
await db.commit()
return user
# main.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
import models, schemas, crud
from database import engine, get_db
asynccontextmanager로 lifespan 정의
@asynccontextmanager
async def lifespan(app: FastAPI):
print("앱 시작 중... 테이블 생성 시도")
#==============================================
# 동기
create_tables() # 시작 시 테이블 자동 생성
#==============================================
# 비동기
await create_tables() # 시작 시 테이블 자동 생성
#==============================================
yield
print("앱 종료 중...")
app = FastAPI(lifespan=lifespan)
# POST: 사용자 생성
@app.post("/users/", response_model=schemas.UserResponse)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
return crud.create_user(db=db, user=user)
# GET: 사용자 리스트 조회
@app.get("/users/", response_model=list[schemas.UserResponse])
def read_users(skip: int = 0, limit: int = 10, db: Session = Depends(get_db)):
users = crud.get_users(db, skip=skip, limit=limit)
return users