FastAPI를 배워보자 12일차 - Security

0

fastapi

목록 보기
12/13

Security - First steps

FastAPIOAuth2.0로 security를 지원한다.

먼저 다음의 코드를 확인하도록 하자.

from typing import Annotated

from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.get("/item/")
async def read_items(token: Annotated[str, Depends(oauth2_scheme)]):
    return {"token": token}

FastAPIOAuth2.0form data형식으로 usernamepassword를 전달하기 때문에 python-mulipart 모듈이 필요하다.

pip3 install python-mulipart

read_items handler를 보면 dependency로 oauth2_scheme가 붙어있는 것을 확인할 수 있다. 이는 request가 read_items로 오기전에 oauth2_scheme를 지나고 token값이 할당된다는 것인데, oauth2_scheme의 구현을 살펴보면 다음과 같다.

class OAuth2PasswordBearer(OAuth2):
    def __init__(
...
    ):
        if not scopes:
            scopes = {}
        flows = OAuthFlowsModel(
            password=cast(Any, {"tokenUrl": tokenUrl, "scopes": scopes})
        )
        super().__init__(
            flows=flows,
            scheme_name=scheme_name,
            description=description,
            auto_error=auto_error,
        )

    async def __call__(self, request: Request) -> Optional[str]:
        authorization = request.headers.get("Authorization")
        scheme, param = get_authorization_scheme_param(authorization)
        if not authorization or scheme.lower() != "bearer":
            if self.auto_error:
                raise HTTPException(
                    status_code=HTTP_401_UNAUTHORIZED,
                    detail="Not authenticated",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            else:
                return None
        return param

__call__부분을 보면 Authorization header값을 얻어와서 해당 값의 scheme가 bearer가 아니거나, 값이 없다면 401 Unauthorized를 반환하는 것을 볼 수 있다. 따라서, OAuth token값이 없다면 read_items 메서드에 접근할 수 없게된다.

추가적으로 __init__부분에 tokenUrlscopes를 설정하고 있는데, 이 부분들은 따로 parameter로 넘겨주는 값으로, 현재는 없지만 tokenUrl을 통해서 username, password form을 인증받고 scope에 해당하는 user인지 판별 후에 인증 결과가 oauth2_scheme dependency를 거친 후에 read_items으로 넘어가는 것이다. 단, 여기서의 tokenUrl은 사실 openAPI 명세서를 만들기위해 설명서를 붙이는 부분이며, 실제 token`발급 url과 달라도 상관없다.

password flow를 정리하면 다음과 같다.
1. form형식으로 usernamepassword가 frontend로 부터 전달된다.
2. usernamepasswordtokenUrl="token" URL handler로 전달된다.
3. API가 usernamepassword를 확인하여 token을 제공한다.
4. frontend는 token을 일시적으로 저장하고, API를 호출할 때 해당 token을 사용하여 요청한다.

OAuth2PasswordBearer instance를 만들 때 tokenUrl을 넘기는 것을 알 수 있다. 해당 parameter는 client가 token을 얻기위해 usernamepassword를 전달하는 API url이 된다.

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

만약 https://example.com/에서 해당 API를 호출하려고 한다면 https://example.com/token에 요청하면 되는 것이다.

그럼 실제로 /item/에 요청을 보내보도록 하여 read_items가 oauth로 보호받고 있는 지 확인해보도록 하자.

curl localhost:8888/item/

{"detail":"Not authenticated"}

막혀있는 것을 볼 수 있다. 그러나 사실 이는 Authorization header에 token이 없어서 인증이 실패했다고만 나오는 것이며, 실제로는 token의 유효성 검사를 하지 않기 때문에 header만 추가하면 문제없이 구동된다.

curl localhost:8888/item/ -H "Authorization:bearer hello"

{"token":"hello"}

이제 필요한 것들은 token의 유효성을 검사하는 부분과 token을 발급해주는 API를 만드는 것이다.

OAuth2 with password, bearer with JWT token

JWT token은 json web tokens로 JSON object를 하나의 긴 string으로 만든 것이다.

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

JWT는 암호화된 값이 아니라 encoded된 값이다. 따라서 복원이 가능하고 content에 대해 접근이 가능하다. 그러나, 이는 sign이 되어있기 때문에, 내가 만든 JWT에 대해서 나만이 내 sign으로 유효성 검사를 할 수 있다.

JWT에는 또한 token의 유효기간을 설정할 수 있기 때문에 유효기간 내에 다시 site에 접근하면 로그인하지 않아도 접근이 가능할 수 있다.

JWT token을 python에서 사용하기 위해서는 python-jose가 필요하다.

pip install "python-jose[cryptography]"

다음으로 user의 plain password를 hashing하기위해서 passlib을 설치하도록 하자. passlib은 여러 secure 알고리즘을 제공하는데, 그 중에서 Bcrypt를 사용해보도록 하자.

pip install "passlib[bcrypt]"

이제 JWT token과 user의 password를 해쉬화하여 저장하는 method들을 구현해보도록 하자.

from datetime import datetime, timedelta, timezone
from typing import Annotated, Union

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# plain password = dojacat
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$BatXQoCbxQYPmEaaPLtX5u6fgjpIIMTSq3J1ckPrLRYhCxZlWjWRO",
        "disabled": False,
    }
}

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Union[str, None] = None

class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

class UserInDB(User):
    hashed_password: str

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")

@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return current_user

@app.get("/users/me/items/")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return [{"item_id": "Foo", "owner": current_user.username}]

코드가 길어서 어려워보이지만 하나하나 보면 어렵지 않다.

아래는 JWT token발급을 위해 필요한 config들을 정리한 것이다.

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

SECRET_KEY가 바로 JWT token을 sign하기 위한 하나의 서명이다. 이 값은 랜덤하고 어려운 값들로 만들어지는 것이 좋으므로 openssl을 사용하는 것이 좋다.

openssl rand -hex 32

09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7

해당 값이 바로 SECRET_KEY로 쓰이는 것이다. 이 SECRET_KEY와 ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES를 사용하여 JWT` token을 만드는 것은 다음과 같다.

from jose import JWTError, jwt
...
class Token(BaseModel):
    access_token: str
    token_type: str
...
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

Token은 client에게 제공할 token model이다. create_access_tokendata를 받아서 expires를 설정하고 jwt 라이브러리로 인코딩된 jwt token을 만들어준다.

다음으로 create_access_token을 이용하여 JWT token을 만들고 user에게 전달해보도록 하자.

...

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

...

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

...

@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")
...

login_for_access_token/token url을 가지고 있는데, 이는 OAuth2PasswordBearer(tokenUrl="token")에서 설정한 url과 일치한다. 따라서 usernamepassword에 대한 form data가 login_for_access_token 파라미터로 들어간다. form_data가 바로 usernamepassword를 담는 것이다. 이는 OAuth2PasswordRequestForm dependency를 통해서 주입되는 것이다.

authenticate_user 함수를 통해서 form_data로 전달된 username, password를 검증한다. 만약 verify가 실패하면 HTTPExceptiopn을 발생시킨다.

create_access_token에 user에 관한 정보와 expires_delta를 넘겨주면 JWT token을 만들어준다. access_tokenToken model에 넣어주고, token_type까지만 알려주면 끝이다.

마지막으로 client에서 JWT token을 가지고 요청을 보냈을 때, JWT token을 검증하는 부분을 보도록 하자.

...
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user
...

oauth2_scheme 덕분에 header의 Authorization값을 얻어온다. 이 값이 유효한지 검사를 하기위해서 jwt.decode함수를 사용하는데 JWT token에 SECRET_KEY으로 sign했기 때문에 verify에서도 SECRET_KEY로 verify하는 것이다.

username: str = payload.get("sub")을 통해서 username을 얻어내고 이 data를 바탕으로 user data를 가져온다. 만약 user가 등록되지 않았다면 에러 응답을 반환한다.

JWT token의 sub는 subject로 token의 주인을 말한다. optional한 값이지만 user에 대한 인증정보를 담는 공간이므로 데이터를 담았다. 이렇게 user를 식별하는 이유는 user에 따라서 API에 접근하는 권한을 달리하고 싶기 때문이다. 확실한 것은 sub를 사용할 것이라면 unique한 identity를 가져야한다는 것이다. 그러나 이 값은 외부에 노출될 수 있기 때문에 외부에 노출되어도 위험하지 않은 data여야 한다. 대표적으로 email이 있다.

추가적으로 scopes를 사용할 수 있는데, 이를 통해서 JWT token이 어디까지 범위가 있는 지 인식할 수 있다.

이제 요청을 보내보도록 하자.

curl -X POST localhost:8888/token -H "Content-Type: application/x-www-form-urlencoded" -d "username=johndoe&password=dojacat"

{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzA2MDAwMDgyfQ.SVSjPqU-qV4THzfgqaFxO1pwtgdzjXzAXfzx7HBqM-Q","token_type":"bearer"}

성공한 것을 알 수 있다. 이제 이 access_token(JWT token)으로 /users/me/items/에 요청을 보내보도록 하자.

curl localhost:8888/users/me/items/ -H "Authorization:bearer eyJhbGci
OiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzA2MDAwMDgyfQ.SVSjPqU-qV4THzfgqaFxO1pwtgdzjXzAXfzx7HBqM-Q"

[{"item_id":"Foo","owner":"johndoe"}]

성공적으로 인증이 완료되고 응답이 온 것을 볼 수 있다. 마지막으로 Authorization을 설정하지 않고 요청을 보내보도록 하자.

curl localhost:8888/users/me/items/ 

{"detail":"Not authenticated"}

인증에 실패한 것을 볼 수 있다.

0개의 댓글

관련 채용 정보