FastAPI
는 OAuth2.0
로 security를 지원한다.
먼저 다음의 코드를 확인하도록 하자.
from typing import Annotated
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.get("/item/")
async def read_items(token: Annotated[str, Depends(oauth2_scheme)]):
return {"token": token}
FastAPI
의 OAuth2.0
은 form data
형식으로 username
과 password
를 전달하기 때문에 python-mulipart
모듈이 필요하다.
pip3 install python-mulipart
read_items
handler를 보면 dependency로 oauth2_scheme
가 붙어있는 것을 확인할 수 있다. 이는 request가 read_items
로 오기전에 oauth2_scheme
를 지나고 token
값이 할당된다는 것인데, oauth2_scheme
의 구현을 살펴보면 다음과 같다.
class OAuth2PasswordBearer(OAuth2):
def __init__(
...
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(
password=cast(Any, {"tokenUrl": tokenUrl, "scopes": scopes})
)
super().__init__(
flows=flows,
scheme_name=scheme_name,
description=description,
auto_error=auto_error,
)
async def __call__(self, request: Request) -> Optional[str]:
authorization = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return param
__call__
부분을 보면 Authorization
header값을 얻어와서 해당 값의 scheme가 bearer
가 아니거나, 값이 없다면 401 Unauthorized
를 반환하는 것을 볼 수 있다. 따라서, OAuth token값이 없다면 read_items
메서드에 접근할 수 없게된다.
추가적으로 __init__
부분에 tokenUrl
과 scopes
를 설정하고 있는데, 이 부분들은 따로 parameter로 넘겨주는 값으로, 현재는 없지만 tokenUrl
을 통해서 username
, password
form을 인증받고 scope
에 해당하는 user인지 판별 후에 인증 결과가 oauth2_scheme
dependency를 거친 후에 read_items으로 넘어가는 것이다. 단, 여기서의
tokenUrl은 사실 openAPI 명세서를 만들기위해 설명서를 붙이는 부분이며, 실제
token`발급 url과 달라도 상관없다.
password flow를 정리하면 다음과 같다.
1. form형식으로 username
과 password
가 frontend로 부터 전달된다.
2. username
과 password
는 tokenUrl="token"
URL handler로 전달된다.
3. API가 username
과 password
를 확인하여 token
을 제공한다.
4. frontend는 token
을 일시적으로 저장하고, API를 호출할 때 해당 token을 사용하여 요청한다.
OAuth2PasswordBearer
instance를 만들 때 tokenUrl
을 넘기는 것을 알 수 있다. 해당 parameter는 client가 token을 얻기위해 username
과 password
를 전달하는 API url이 된다.
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
만약 https://example.com/
에서 해당 API를 호출하려고 한다면 https://example.com/token
에 요청하면 되는 것이다.
그럼 실제로 /item/
에 요청을 보내보도록 하여 read_items
가 oauth로 보호받고 있는 지 확인해보도록 하자.
curl localhost:8888/item/
{"detail":"Not authenticated"}
막혀있는 것을 볼 수 있다. 그러나 사실 이는 Authorization
header에 token이 없어서 인증이 실패했다고만 나오는 것이며, 실제로는 token의 유효성 검사를 하지 않기 때문에 header만 추가하면 문제없이 구동된다.
curl localhost:8888/item/ -H "Authorization:bearer hello"
{"token":"hello"}
이제 필요한 것들은 token의 유효성을 검사하는 부분과 token을 발급해주는 API를 만드는 것이다.
JWT token은 json web tokens로 JSON object를 하나의 긴 string으로 만든 것이다.
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
JWT는 암호화된 값이 아니라 encoded된 값이다. 따라서 복원이 가능하고 content에 대해 접근이 가능하다. 그러나, 이는 sign이 되어있기 때문에, 내가 만든 JWT에 대해서 나만이 내 sign으로 유효성 검사를 할 수 있다.
JWT에는 또한 token의 유효기간을 설정할 수 있기 때문에 유효기간 내에 다시 site에 접근하면 로그인하지 않아도 접근이 가능할 수 있다.
JWT token을 python에서 사용하기 위해서는 python-jose
가 필요하다.
pip install "python-jose[cryptography]"
다음으로 user의 plain password를 hashing하기위해서 passlib
을 설치하도록 하자. passlib
은 여러 secure 알고리즘을 제공하는데, 그 중에서 Bcrypt
를 사용해보도록 하자.
pip install "passlib[bcrypt]"
이제 JWT token과 user의 password를 해쉬화하여 저장하는 method들을 구현해보도록 하자.
from datetime import datetime, timedelta, timezone
from typing import Annotated, Union
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# plain password = dojacat
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$BatXQoCbxQYPmEaaPLtX5u6fgjpIIMTSq3J1ckPrLRYhCxZlWjWRO",
"disabled": False,
}
}
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Union[str, None] = None
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
app = FastAPI()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/users/me/", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return current_user
@app.get("/users/me/items/")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return [{"item_id": "Foo", "owner": current_user.username}]
코드가 길어서 어려워보이지만 하나하나 보면 어렵지 않다.
아래는 JWT token발급을 위해 필요한 config들을 정리한 것이다.
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
SECRET_KEY
가 바로 JWT token을 sign하기 위한 하나의 서명이다. 이 값은 랜덤하고 어려운 값들로 만들어지는 것이 좋으므로 openssl
을 사용하는 것이 좋다.
openssl rand -hex 32
09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
해당 값이 바로 SECRET_KEY
로 쓰이는 것이다. 이 SECRET_KEY와
ALGORITHM,
ACCESS_TOKEN_EXPIRE_MINUTES를 사용하여
JWT` token을 만드는 것은 다음과 같다.
from jose import JWTError, jwt
...
class Token(BaseModel):
access_token: str
token_type: str
...
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
Token
은 client에게 제공할 token model이다. create_access_token
은 data
를 받아서 expires
를 설정하고 jwt
라이브러리로 인코딩된 jwt
token을 만들어준다.
다음으로 create_access_token
을 이용하여 JWT token을 만들고 user에게 전달해보도록 하자.
...
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
...
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
...
@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
...
login_for_access_token
는 /token
url을 가지고 있는데, 이는 OAuth2PasswordBearer(tokenUrl="token")
에서 설정한 url과 일치한다. 따라서 username
과 password
에 대한 form data가 login_for_access_token
파라미터로 들어간다. form_data
가 바로 username
과 password
를 담는 것이다. 이는 OAuth2PasswordRequestForm
dependency를 통해서 주입되는 것이다.
authenticate_user
함수를 통해서 form_data
로 전달된 username
, password
를 검증한다. 만약 verify가 실패하면 HTTPExceptiopn
을 발생시킨다.
create_access_token
에 user에 관한 정보와 expires_delta를 넘겨주면 JWT
token을 만들어준다. access_token
을 Token
model에 넣어주고, token_type
까지만 알려주면 끝이다.
마지막으로 client에서 JWT
token을 가지고 요청을 보냈을 때, JWT token을 검증하는 부분을 보도록 하자.
...
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
...
oauth2_scheme
덕분에 header의 Authorization
값을 얻어온다. 이 값이 유효한지 검사를 하기위해서 jwt.decode
함수를 사용하는데 JWT token에 SECRET_KEY
으로 sign했기 때문에 verify에서도 SECRET_KEY
로 verify하는 것이다.
username: str = payload.get("sub")
을 통해서 username
을 얻어내고 이 data를 바탕으로 user
data를 가져온다. 만약 user가 등록되지 않았다면 에러 응답을 반환한다.
JWT token의 sub
는 subject로 token의 주인을 말한다. optional한 값이지만 user에 대한 인증정보를 담는 공간이므로 데이터를 담았다. 이렇게 user를 식별하는 이유는 user에 따라서 API에 접근하는 권한을 달리하고 싶기 때문이다. 확실한 것은 sub
를 사용할 것이라면 unique한 identity를 가져야한다는 것이다. 그러나 이 값은 외부에 노출될 수 있기 때문에 외부에 노출되어도 위험하지 않은 data여야 한다. 대표적으로 email이 있다.
추가적으로 scopes
를 사용할 수 있는데, 이를 통해서 JWT token이 어디까지 범위가 있는 지 인식할 수 있다.
이제 요청을 보내보도록 하자.
curl -X POST localhost:8888/token -H "Content-Type: application/x-www-form-urlencoded" -d "username=johndoe&password=dojacat"
{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzA2MDAwMDgyfQ.SVSjPqU-qV4THzfgqaFxO1pwtgdzjXzAXfzx7HBqM-Q","token_type":"bearer"}
성공한 것을 알 수 있다. 이제 이 access_token(JWT token)으로 /users/me/items/
에 요청을 보내보도록 하자.
curl localhost:8888/users/me/items/ -H "Authorization:bearer eyJhbGci
OiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzA2MDAwMDgyfQ.SVSjPqU-qV4THzfgqaFxO1pwtgdzjXzAXfzx7HBqM-Q"
[{"item_id":"Foo","owner":"johndoe"}]
성공적으로 인증이 완료되고 응답이 온 것을 볼 수 있다. 마지막으로 Authorization
을 설정하지 않고 요청을 보내보도록 하자.
curl localhost:8888/users/me/items/
{"detail":"Not authenticated"}
인증에 실패한 것을 볼 수 있다.