
# schema
from pydantic import BaseModel, ConfigDict
class UserResponse(BaseModel):
id: int
username: str
email: str
model_config = ConfigDict(from_attributes=True)
or
# pydantic v1
class Config:
from_attributes = True
from app.models.user import User # Tortoise ORM 모델
from app.schemas.user import UserResponse # Pydantic 모델
user = await User.get(id=1)
return UserResponse.model_validate(user)
# app/models/user
from tortoise import fields
from tortoise.models import Model
class User(Model):
id = fields.IntField(pk=True)
username = fields.CharField(50, unique=True)
email = fields.CharField(100, unique=True)
password = fields.CharField(255)
created_at = fields.DatetimeField(auto_now_add=True)
def __str__(self):
return self.username
# app/repositories/user_repo.py
from app.models.user import User
class UserRepository:
@staticmethod
async def create_user(username: str, email: str, password: str):
return await User.create(username=username, email=email, password=password)
@staticmethod
async def get_by_username(username: str):
return await User.get_or_none(username=username)
User.create() : Tortoise-ORM의 비동기 DB 삽입 함수User.get_or_noneawait User.get_or_none(username=username)
await User.create(**data)
await User.filter(email=email).update(...)
get_or_none() : 존재하지 않으면 None 반환 (try/except 필요 X)filter().update() : SQL UPDATE 직접 수행 (성능 좋음)Prefetch 나 select_related() : 관계 데이터 미리 불러오기 (JOIN)| 개념 | ORM 표현 | SQL 표현 |
|---|---|---|
| SELECT | User.all() | SELECT * FROM user; |
| WHERE | User.filter(username="kihoon") | WHERE username='kihoon' |
| JOIN | select_related("profile") | JOIN profile ON ... |
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.now() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")CryptContextschemes=["bcrypt"]: deprecated="auto": def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.now() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
to_encode = data.copy() : 입력 데이터 복사expire = datetime.now() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))to_encode.update({"exp": expire}) : payload에 만료시간 추가jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)from fastapi import Depends, HTTPException, status
from jose import jwt, JWTError
from app.core.config import settings
from app.repositories.user_repo import UserRepository
async def get_current_user(token: str):
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
user = await UserRepository.get_by_username(username)
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
async def get_current_user(token: str):payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])username: str = payload.get("sub")except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
user = await UserRepository.get_by_username(username)
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
from fastapi import APIRouter, Depends, HTTPException
from app.core.dependencies import get_current_user
from app.models.diary import Diary
router = APIRouter(prefix="/diary", tags=["Diary"])
@router.put("/{diary_id}")
async def update_diary(diary_id: int, content: str, current_user=Depends(get_current_user)):
diary = await Diary.get_or_none(id=diary_id)
if not diary:
raise HTTPException(status_code=404, detail="Diary not found")
if diary.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized")
diary.content = content
await diary.save()
return {"message": "Updated successfully"}
@router.put("/{diary_id}")
async def update_diary(diary_id: int, content: str, current_user=Depends(get_current_user)):
diary = await Diary.get_or_none(id=diary_id)
if not diary:
raise HTTPException(status_code=404, detail="Diary not found")
if diary.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Not authorized")
diary.content = content
await diary.save()
Authorization: Bearer <토큰>from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer
security = HTTPBearer()
@app.get("/secure")
async def secure_endpoint(credentials=Depends(security)):
token = credentials.credentials # 단순히 Authorization 헤더에서 토큰 꺼냄
if token != "mysecrettoken":
raise HTTPException(status_code=401, detail="Invalid token")
return {"msg": "Access granted"}
Authorization: Bearer <token> 을 읽음.oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6...# 1
@router.post("/logout")
async def logout(token: str = Depends(oauth2_scheme)):
return await AuthService.logout(token)
# 요청 헤더의 Bearer ... 토큰이 token 변수로 들어오게 됨
### 요약
GET /diary
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6...
이렇게 클라이언트가 요청을 보내면 Depends(oauth2_scheme) 덕분에
"eyJhbGciOi..." 부분만 token 인자로 자동으로 들어감
# 2
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
@app.get("/users/me")
async def read_users_me(token: str = Depends(oauth2_scheme)):
# 여기서 토큰 검증 (JWT decode)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
return {"username": username}
# auth_service.py
@staticmethod
async def login(username: str, password: str):
user = await UserRepository.get_by_username(username)
if not user or not verify_password(password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid credentials")
access_token = create_access_token(data={"sub": user.username})
return {"access_token": access_token, "token_type": "bearer"}
# dependencies.py
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise HTTPException(status_code=401, detail="Invalid token")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")

from fastapi import Depends
from app.models.user import User
from app.core.dependencies import get_current_user
@router.get("/myinfo")
async def get_my_info(current_user: User = Depends(get_current_user)):
return {"username": current_user.username}
GET /users HTTP/1.1
Host: example.com
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6Ikp...
Content-Type: application/json
User-Agent: Mozilla/5.0