/token 경로로 아이디와 비밀번호를 전송합니다.| 용어 | 설명 |
|---|---|
| JWT | Json Web Token. 인증 정보를 안전하게 담아 클라이언트-서버 간에 전달하기 위한 토큰 |
| Bearer Token | HTTP Authorization 헤더에 담아 전송하는 토큰 형식 (Bearer {토큰}) |
| OAuth2PasswordRequestForm | FastAPI에서 로그인 폼의 필드를 자동으로 처리해주는 폼 클래스 |
| bcrypt_context | 비밀번호 해시 생성 및 검증을 위한 도구 |
exp | JWT의 표준 클레임으로, 만료 시간(Expiration time)을 나타냄 |
[클라이언트]
|
| POST /token <- username, password
|
[FastAPI 서버]
|
|--> authenticate_user()
| |- DB에서 사용자 조회
| |- 비밀번호 검증
|
|--> create_access_token()
| |- 토큰에 사용자 정보 + 만료 시간 설정
|
|<- 토큰 반환 (access_token, token_type)
|
[클라이언트]
|
|- 저장 후 인증이 필요한 요청 시 Authorization: Bearer <token> 헤더로 사용
[토큰 생성 앤드포인트]
@router.post("/token", response_model = Token)
async def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db:db_dependency):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(status_code = status.HTTP_401_UNAUTHORIZED,
detail = 'Could not validate user.')
token = create_access_token(user.username, user.id, timedelta(minutes=20)) #imedelta(minutes=20): 20분 동안 토큰이 유지 된다. 이후에는 재로그인 해야함
return {'access_token':token, 'token_type':'bearer'}
[유저의 정보가 DB에 있는지 확인]
def authenticate_user(username: str, password: str, db):
user = db.query(Users).filter(Users.username == username).first()
if not user:
return False
if not bcrypt_context.verify(password, user.hashed_password): # verify: 이자식이 하는 일은 비밀번호가 일치하는지 확인하는 것
return False
return user
[토큰 생성]
SECRET_KEY = '1321KDFA#kj!@sadzx1952@dv3#advSADf#!Wvdd1@#'
ALGORITHM = 'HS256'
def create_access_token(username: str, user_id: int, expires_delta: timedelta):
# 1. JWT Payload 기본 구성 (사용자 정보 포함)
encode = {
'sub': username, # subject: 사용자 이름
'id': user_id # 사용자 고유 ID
}
# 2. 만료 시간 계산 (현재 시간 + expires_delta)
expires = datetime.now(timezone.utc) + expires_delta # UTC 기준으로 20분 등 더하기
# 3. payload에 만료 시간 추가
encode.update({'exp': expires}) # exp는 JWT의 표준 claim 중 하나 (expiration time)
# 4. JWT 토큰 생성 및 반환 (SECRET_KEY로 서명)
return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM)
[토큰 기반 인증]
async def get_current_user(token: Annotated[str,Depends(oauth2_bearer)]):
try:
# 1. 전달받은 JWT 토큰을 디코딩하여 payload(내용물)를 추출
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 2. payload에서 username과 user_id를 꺼냄
username: str = payload.get("sub") # subject (보통 사용자 이름)
user_id: int = payload.get("id") # 사용자 고유 ID
# 3. 필수 정보가 없으면 예외 발생 → 인증 실패 처리
if username is None or user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate user."
)
# 4. 유저 정보 반환 (보통 이후의 경로 함수에서 사용)
return {"username": username, "id": user_id}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate user."
)
[main.py]
from fastapi import FastAPI, status, Depends, HTTPException
import models
from database import engine, SessionLocal
from typing import Annotated
from sqlalchemy.orm import Session
import auth
from auth import get_current_user
app = FastAPI()
app.include_router(auth.router) # auth.py에서 만든 라우터를 메인 애플리케이션에 틍록하는 작업
models.Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
db_dependency = Annotated[Session,Depends(get_db)]
user_dependency = Annotated[dict,Depends(get_current_user)]
#status_code=status.HTTP_200_OK : 경로에서 성공적으로 처리될 경우 반환되는 HTTP 상태코드(명시적으로 작성)
@app.get("/",status_code=status.HTTP_200_OK)
async def user(user:user_dependency, db:db_dependency):
if user is None:
raise HTTPException(status_code=401, detail='Authentication Fails')
return {"User": user}