애플리케이션 보안: 허가되지 않은 개체가 애플리케이션을 해킹하거나 불법적으로 변경하는 것을 방지하기 위해 애플리케이션에 대한 접근을 제한하는 것.
인증(Authentication): 개체가 전달한 인증 정보를 검증하는 것
허가(Authorization): 개체가 특정 처리를 할 수 있도록 권한을 주는 것
인증 정보가 검증되면 권한이 주어지며 해당 권한으로 다양한 처리를 실행할 수 있다.
기본 HTTP 인증: 사용자 인증 정보(일반적으로 사용자명과 패스워드)를 Authorization HTTP 헤더를 사용해 전송하는 방식. Basic 값을 포함하는 WWW-Authenticate 헤더와 인증 요청을 처리한 리소스를 나타내는 영역 매개변수가 반환된다.
쿠키: 데이터를 클라이언트 측(웹 브라우저 등)에 저장할 때 사용된다. FastAPI 애플리케이션도 쿠키를 사용해서 사용자 정보를 저장할 수 있으며 서버는 이 정보를 추출해 인증 처리에 사용한다.
bearer 토큰 인증: bearer 토큰이라는 보안 토큰을 사용해 인증하는 방식. 이 토큰은 Bearer 키워드와 함께 요청의 Authorization 헤더에 포함돼 전송된다. 가장 많이 사용되는 토큰은 JWT이며 사용자 ID와 토큰 만료 기간으로 구성된 딕셔너리 형식이 일반적이다.
인증에 사용되는 메소드는 런타임 시 호출되는 의존 라이브러리로 FastAPI 애플리케이션에 주입된다. 즉, 정의한 인증 메소드는 실제로 사용되기 전까지 휴면 상태에 있는 것이다. 이를 의존성 주입이라 한다.
객체(함수)가 실행에 필요한 인스턴스 변수를 받는 방식을 의미.
FastAPI에서는 라우트 처리 함수의 인수로 의존 라이브러리를 주입한다.
@user_router.post("/signup")
async def sign_new_user(data: User) -> dict:
if data.email in users:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User with supplied username exists"
)
users[data.email] = data
return {
"message": "User successfully registered!"
}
이 코드에서 User 클래스가 의존 라이브러리이며 이를 sign_new_user() 함수에 주입한다. User를 사용자 함수의 인수로 주입해 User 클래스의 속성을 쉽게 추출할 수 있다.
FastAPI에서 의존 라이브러리는 함수 또는 클래스로 정의된다. 생성된 의존 라이브러리는 기본값과 메서드에 접근할 수 있으므로 함수 내에서 이러한 객체를 상속하지 않아도 된다. 의존성 주입은 반복된 코드 작성을 줄여주므로 인증과 허가처럼 반복적인 구현이 필요한 경우에 큰 도움이 된다.
async def get_user(token: str):
user = decode_token(token)
return user
이 의존 라이브러리는 token을 인수로 사용하고 외부 함수인 decode_token에서 받은 user 매개 변수를 반환하는 함수이다. 이 라이브러리를 사용하려면 Depends 매개변수를 사용하고자 하는 함수의 인수로 설정해야 한다.
from fastapi import Depends
@router.get("/user/me")
async def get_user_details(user: User = Depends(get_user)):
return User
이 라우트 함수는 get_user이라는 함수에 의존한다.
이 라우트를 사용하려면 get_user 의존 라이브러리가 존재해야 한다.
FastAPI 라이브러리에서 import 하는 Depends 클래스는 라우트가 실행될 때 인수로 받은 함수를 실행한다. 또한 함수의 반환값을 라우트에 전달한다.
사용자명과 패스워드를 폼(form) 데이터로 전달하는 OAuth2 패스워드 처리를 사용할 것이다.
폼 데이터가 클라이언트에서 서버로 전송되면 서버는 JWT로 singed된 액세스 토큰을 응답으로 반환한다. 일반적으로 인증 정보 확인은 토큰을 생성하기 전 백그라운드에서 진행된다.
인증된 사용자(사용자명과 패스워드 확인이 끝난 사용자)는 bearer이라는 JWT 토큰 정보를 서버에 전송해서 허가를 받아야 특정 작업을 처리할 수 있다.
JWT는 인코딩된 문자열로, 페이로드, 시그니처, 알고리즘으로 구성된다. JWT는 인코딩된 문자열이 제 3자에 의해 해킹되는 것을 방지하기 위해 서버와 클라이언트만 알고 있는 고유한 키(unique key)로 사인된다.
mkdir auth
auth 폴더에 신규 파일을 생성한다.
cd auth && touch {__init__,jwt_handler,authenticate,hash_password}.py
패스워드는 적절한 라이브러리를 사용해서 반드시 암호화(해싱)해야 한다.
bcrypt를 사용해 패스워드 암호화
passlib 라이브러리 설치
패스워드를 해싱하는 bcrypt 알고리즘을 제공
pip install passlib
hash_password.py 파일에 패스워드를 해싱하는 함수를 작성
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class HashPassword:
def create_hash(self, password: str):
return pwd_context.hash(password)
def verify_hash(self, plain_password: str, hashed_password: str):
return pwd_context.verify(plain_password, hashed_password)
bcrypt를 사용해 문자열을 해싱할 수 있도록 CryptContext를 import한다. context는 pwd_context 변수에 저장되며 이 변수를 사용해 해싱에 필요한 함수들을 호출한다.
패스워드를 해싱해서 데이터베이스에 저장하도록 routes/users.py의 사용자 등록 라우트 수정:
from fastapi import APIRouter, HTTPException, status, Depends
from models.users import User, UserSignIn
from auth.hash_password import HashPassword
from database.connection import get_session
from sqlmodel import select
user_router = APIRouter(
tags=["User"],
)
users = {}
hash_password = HashPassword()
@user_router.post("/signup")
async def sign_new_user(data: User, session=Depends(get_session)) -> dict:
user = session.get(User, data.email)
if user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User with supplied username exists"
)
hashed_password = hash_password.create_hash(data.password)
data.password = hashed_password
session.add(data)
session.commit()
session.refresh(data)
# users[data.email] = data
return {
"message": "User successfully registered!"
}
@user_router.post("/signin")
async def sign_user_in(data: UserSignIn, session=Depends(get_session)) -> dict:
user = session.get(User, data.email)
if not user :
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User does not exist"
)
if users[user.email].password != user.password:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Worng credentials passed"
)
return {
"message": "User signed in successfully."
}
user model 수정:
from pydantic import BaseModel, EmailStr
from typing import Optional, List
from models.events import Event
from sqlmodel import SQLModel, JSON, Field, Column
class User(SQLModel, table=True):
email: str = Field(default=None, primary_key=True)
password: str
events: Optional[List[Event]] = Field(sa_column=Column(JSON))
class Config:
arbitrary_types_allowed = True
schema_extra={
"example": {
"email": "fastapi@packt.com",
"password": "strong!!!",
"events": [],
}
}
class UserSignIn(SQLModel):
email: EmailStr
password: str
class Config:
schema_extra={
"example":{
"email": "fastapi@packt.com",
"password": "strong!!!",
}
}
JWT를 구현하면 애플리케이션의 보안을 한 단계 더 강화할 수 있다. 토큰의 페이로드는 사용자 ID와 만료 시간으로 구성되며 하나의 긴 문자열로 인코딩된다.
JWT는 서버와 클라이언트만 아는 비밀키를 사용해 사인된다.
database/connection.py에 SECRET_KEY 변수 추가
class Settings(BaseSettings):
SECRET_KEY: Optional[str] = None
.env 파일에 SECRET_KEY 값 설정
jwt_handler.py 파일
import time
from datetime import datetime
from fastapi import HTTPException, status
from jose import jwt, JWTError
from database.connection import Settings
settings = Settings()
def create_access_token(user: str):
payload = {
"user": user,
"expires": time.time() + 3600
}
token = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
return token
토큰 생성 함수는 문자열 하나를 받아서 payload 딕셔너리에 전달한다. payload 딕셔너리는 사용자명과 만료 시간을 포함하여 JWT가 디코딩될 때 반환된다. expires값(만료 시간)은 생성 시점에서 한 시간 후로 설정됐다.
encode() 메소드는 다음과 같이 세 개의 인수를 받으며 payload를 암호화한다.
토큰을 검증하는 함수를 jwt_handler.py에 추가
def verify_access_token(token: str):
try:
data = jwt.decode(token, settings.SECRET_KEY, algorithms=["HS256"])
expire = data.get("expires")
if expire is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No access token supplied"
)
if datetime.now() > datetime.fromtimestamp(expire):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Token expired!"
)
return data
except:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid token"
)
함수가 토큰을 문자열로 받아 try 블록 내에서 확인 작업을 거친다.
먼저 확인하는 것은 토큰의 만료 시간이 존재하는지 여부다.
만료 시간이 없으면 유효한 토큰이 존재하지 않는다고 판단한다.
두 번째로 확인하는 것은 토큰이 유효한지(만료 시간이 지나지 않았는지) 여부다.
토큰이 유효하다면 디코딩된 페이로드를 반환한다.
마지막으로 except 블록에서는 JWT 요청 자체에 오류가 있는지 확인한다.
의존 함수를 구현해서 이벤트 라우트에 주입 -> 활성 세션에 존재하는 사용자 정보를 추출하는 단일 창구 역할을 한다.
auth/authenticate.py:
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from auth.jwt_handler import verify_access_token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/user/signin")
async def authenticate(token: str = Depends(oauth2_scheme)) -> str:
if not token:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Sign in for access"
)
decoded_token = verify_access_token(token)
return decoded_token["user"]
OAuth2를 위한 토큰 URL과 authenticate() 함수를 정의한다. authenticate() 함수는 토큰을 인수로 받는다. 토큰이 유효하면 토큰을 디코딩한 후 페이로드의 사용자 필드를 반환하고 유효하지 않으면 verify_access_token() 함수에 정의된 오류 메시지를 반환한다.
@user_router.post("/signin")
async def sign_user_in(data: OAuth2PasswordRequestForm = Depends(), session=Depends(get_session)) -> dict:
user = session.get(User, data.email)
if not user :
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User does not exist"
)
if users[user.email].password != user.password:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Worng credentials passed"
)
if hash_password.verify_hash(data.password, user.password):
access_token = create_access_token(user.email)
return{
"access_token": access_token,
"token_type": "Bearer"
}
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid details passed"
)
OAuth2PasswordRequestForm 클래스를 sign_user_in() 라우트 함수에 주입하여 해당 함수가 OAuth2 사양을 엄격하게 따르도록 한다.
함수 내에서 패스워드, 반환된 접속 토큰, 토큰 유형을 검증
models/users.py의 로그인용 응답 모델 수정
UserSignIn -> TokenResponse:
class TokenResponse(BaseModel):
access_token: str
token_type: str
로그인 라우트 import, 응답 모델 변경:
@user_router.post("/signin", response_model=TokenResponse)
테스트:
curl -X 'POST' 'http://127.0.0.1:8000/user/signin' -H 'accept: application/json' -H 'Content-Type: application/x-www-form-urlencoded' -d 'grant_type=&username=reader%40packt.com&password=exemplary&scope=&client_id=&client_secret='