DRF JWT 인증/인가

강태원·2024년 4월 20일
1

이번에 새로운 프로젝트를 진행하고 있습니다!

FastAPI를 사용하다가 장고도 한 번 써보고 싶은 마음에 무작정 돌입하게 됐는데, 건드는게 생각보다 쉽지 않아 여기저기 공부하면서 하고 있네요.. 장고가 처음이다보니 부족한 지식으로 잘못 기술되거나 보셨을 때 개선의 여지가 있다면 코멘트 부탁드리겠습니다! (__)

이번 프로젝트에서 적용한 인증/인가 방식을 한 번 소개해볼까 합니다.
글에서는 단순히 로직만 소개하는 것이 아니라 나름대로 구현할 때 살펴봤던 점도 함께 기술해보겠습니다.

흐름도

전체적인 흐름도입니다. 위에서는 FrontEnd만 표기되어 있지만, 사실 App과 통신할 때 Authorization 헤더를 사용하고 웹에서 프론트와 통신할 때는 쿠키를 사용했습니다. 글 읽으시다가 참고하기 좋으시라고 미리 올려두겠습니다.

이번에 앱과 웹 동시에 통신하는 백엔드를 처음 개발해보는데 이게 통상적인 방법인지는 모르겠습니다만 커스텀 헤더를 추가해서 앱에서 온 요청인지 웹에서 온 요청인지 구분하고 있습니다. (api 따로 추가 X)

커스텀 미들웨어

App에서 통신할 때는 굳이 쿠키를 이용하지 않고 JWT를 주고 받는 방식,
Web에서 통신할 때는 쿠키에 JWT를 넣어 주고 받는 방식을 구현하고자 했는데요.
이를 구현하기 위해서 미들웨어를 따로 추가해줬습니다.

from django.utils.deprecation import MiddlewareMixin
from rest_framework import status


class AttachJWTFromHeaderToCookieMiddleware(MiddlewareMixin):
    def __init__(self, get_response):
        super().__init__(get_response)
        self.API = ["github/login", "callback/github"]
        self.REFRESH = "token/refresh"

    def process_response(self, request, response):
        path = request.path_info
        is_valid = any(api in path for api in self.API)
        is_refresh = True if self.REFRESH in path else False
        if (
            is_valid
            or is_refresh
            and (response.status_code == status.HTTP_200_OK or response.status_code == status.HTTP_201_CREATED)
        ):
            if request.META.get("HTTP_X_FROM", None) == "web":
                response.set_cookie("access", response.headers.get("access", None), httponly=True)
                if is_valid:
                    del response.headers["access"]

                response.content = response.render().rendered_content

        return response


class AttachJWTFromCookieToHeaderMiddleware(MiddlewareMixin):
    def __init__(self, get_response):
        super().__init__(get_response)
        self.NOT_API = ["github/login", "callback/github"]

    def process_request(self, request):
        path = request.path_info
        is_valid = any(api in path for api in self.NOT_API)

        if not is_valid:
            if request.META.get("HTTP_X_FROM", None) == "web":
                request.META["HTTP_AUTHORIZATION"] = f"Bearer {request.COOKIES.get('access', None)}"

AttachJWTFromHeaderToCookieMiddleware

말 그대로 헤더에서 쿠키로 JWT를 붙여주는 미들웨어입니다.
서버에서 클라이언트 측으로 JWT를 보낼 때는 현재 회원가입/로그인, 토큰 리프레싱 API 밖에 없기 때문에 요청 Path에 위 태스크의 API 주소가 들어있다면 실행되도록 커스텀했습니다.

웹에서 온 요청일 때만 리스폰스의 헤더에서 쿠키로 JWT를 옮겨줍니다.

AttachJWTFromCookieToHeaderMiddleware

위의 미들웨어와 정반대의 일을 하는 미들웨어입니다.
클라이언트 쪽에서 JWT가 들어오는 요청은 회원가입/로그인 요청을 제외한 모든 요청입니다.

웹에서 온 요청일 때만 리퀘스트의 쿠키에서 헤더로 JWT를 옮겨줍니다.

왜? 굳이 쿠키에서 헤더로 다시 보냈나요?

라는 생각을 하신 분들이 계실 수도 있는데요.

DRF에서 JWT를 사용하면서 사용자를 식별하기 위해

rest_framework_simplejwt.authentication.JWTAuthentication

를 사용합니다. 저도 이 인증방식을 기본으로 채택해놨구요.
이 JWTAuthentication을 조금 살펴봅시다.

def authenticate(self, request: Request) -> Optional[Tuple[AuthUser, Token]]:
        header = self.get_header(request)

        if header is None:
            return None

        raw_token = self.get_raw_token(header)
        if raw_token is None:
            return None

        validated_token = self.get_validated_token(raw_token)

        return self.get_user(validated_token), validated_token
        
def get_header(self, request: Request) -> bytes:
        """
        Extracts the header containing the JSON web token from the given
        request.
        """
        header = request.META.get(api_settings.AUTH_HEADER_NAME)
        if isinstance(header, str):
            # Work around django test client oddness
            header = header.encode(HTTP_HEADER_ENCODING)

JWTAuthentication의 일부 함수들인데요.
음.. 쿠키로 인증하는 내용은 없고 읽어보니 기본 인증 헤더(Authorization)로부터 JWT를 가져오네요. 물론 JWTAuthentication을 상속받아 쿠키에서도 토큰을 가져오도록 커스텀 할 수도 있겠습니다만은, 저는 미들웨어에서 그냥 헤더로 넣어버리는 방법을 선택해봤습니다.

JWTAuthentication를 통과하고나면 request.user에 유저 객체 또는 AnonymousUser가 들어갑니다. 이를 통해 APIView에서 permission_classes로 권한 설정을 쉽게 할 수 있었어요!

    def get_validated_token(self, raw_token: bytes) -> Token:
        """
        Validates an encoded JSON web token and returns a validated token
        wrapper object.
        """
        messages = []
        for AuthToken in api_settings.AUTH_TOKEN_CLASSES:
            try:
                return AuthToken(raw_token)
            except TokenError as e:
                messages.append(
                    {
                        "token_class": AuthToken.__name__,
                        "token_type": AuthToken.token_type,
                        "message": e.args[0],
                    }
                )

        raise InvalidToken(
            {
                "detail": _("Given token not valid for any token type"),
                "messages": messages,
            }
        )

authenticate 함수 내에서 실행되는 토큰 확인 함수입니다. 여기서 정상적인 토큰인지, 토큰이 만료되었는지 확인하게 됩니다. simple_jwt 라이브러리에서 정의된 AuthToken 클래스에서 확인합니다. 여기서 위 흐름도의 401을 반환하는 경우의 수를 충족하겠죠? get_user 함수를 통해 유저 객체를 반환해줍니다.

드디어 View로..

위에서 미리 설명한대로 APIView 속에서 유저의 권한을 쉽게 확인할 수 있습니다.
아래에 View를 두 개 올려둘게요.

class UserManageView(APIView):
    serializer_class = ApproveUserSerializer
    permission_classes = [IsAdminUser]

    def get(self, request):
        queryset = User.objects.filter(is_approved=False)
        if queryset:
            serializer = self.serializer_class(queryset, many=True)
            return Response(serializer.data)
        else:
            return Response({"error": "No Content"}, status=status.HTTP_404_NOT_FOUND)

    def patch(self, request, *args, **kwargs):
        user_id = kwargs.get("user_id")
        user = User.objects.get(id=user_id)

        serializer = ApproveUserSerializer(user, data={"is_approved": True}, partial=True)
        if serializer.is_valid():
            serializer.save()
            return Response({"success": True}, status=status.HTTP_202_ACCEPTED)

        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

class GithubOAuthCallBackView(APIView):
    permission_classes = [AllowAny]

    def get(self, request: Request):
        if code := request.GET.get("code"):
            response = self.send_code_to_github_login_view(code)
            if response.status_code == 200:
                return Response(response.json(), status=status.HTTP_200_OK)
            return Response(
                {"error": "Failed to process with GithubLoginView"},
                status=response.status_code,
            )

    def send_code_to_github_login_view(self, code: str):
        url = "http://localhost:8000/api/auth/github/login"
        payload = {"code": code}
        headers = {"Content-Type": "application/json"}
        response = requests.post(url, json=payload, headers=headers)

        return response

두 뷰가 하는 일에 대해서는 우선 제쳐두고, permission_classes 부분을 보도록 합시다.

각각 AllowAny, IsAdminUser가 있죠?

APIView에서 메소드들에 접근하기 전에

def initial(self, request, *args, **kwargs):
        """
        Runs anything that needs to occur prior to calling the method handler.
        """
        self.format_kwarg = self.get_format_suffix(**kwargs)

        # Perform content negotiation and store the accepted info on the request
        neg = self.perform_content_negotiation(request)
        request.accepted_renderer, request.accepted_media_type = neg

        # Determine the API version, if versioning is in use.
        version, scheme = self.determine_version(request, *args, **kwargs)
        request.version, request.versioning_scheme = version, scheme

        # Ensure that the incoming request is permitted
        self.perform_authentication(request)
        self.check_permissions(request)
        self.check_throttles(request)

이 곳을 먼저 지나갑니다. 저희가 이번에 볼 곳은 저 check_permissions인데요.
저희가 설정해놓은 permission_classes를 검증해보면서 권한을 확인해봅니다.

각각의 permission_classes 내부에 오버라이딩 되어 있는 has_permission 메소드를 통해 True 또는 False를 반환하게 되어 있어요.

class IsAdminUser(BasePermission):
    """
    Allows access only to admin users.
    """

    def has_permission(self, request, view):
        return bool(request.user and request.user.is_staff)

위에 설정되어 있는 IsAdminUser의 모습입니다.
JWTAuthentication을 통해 식별된 유저가 request.user에 들어있을 거에요.
식별된 유저의 is_staff 컬럼이 True, False인지에 따라서 권한이 정해지는 방식입니다!

이 형식을 따라서 우리도 쉽게 권한 설정을 커스터마이징 할 수 있습니다.

# 이건 제가 사용하고 있는 커스텀 권한입니다!
from rest_framework.permissions import BasePermission


class IsApprovedUser(BasePermission):
    def has_permission(self, request, view):
        return bool(request.user and request.user.is_approved)

has_permission 함수에서 False를 반환하면, 403 상태 코드를 반환하도록 설계되어 있습니다!


토큰 리프레싱

401 상태 코드를 처음 리턴 받았다면, 클라이언트 쪽에서는 토큰 리프레싱 해주는 API로 다시 통신을 시도하도록 했습니다.

제가 사용한 simple_jwt 라이브러리에서는 제공해주는 뷰를 통해 이를 쉽게 할 수 있는데요.

class WtntTokenRefreshView(TokenRefreshView):
    def post(self, request: Request, *args, **kwargs):
        _, access_token = request.META.get("HTTP_AUTHORIZATION").split(" ")
        user_id = AccessToken(access_token, verify=False).payload.get("user_id")

        refresh_token = cache.get(user_id)
        if not refresh_token:
            return Response({"error": "Expired Refresh Token"}, status=status.HTTP_401_UNAUTHORIZED)
        serializer = self.get_serializer(data={"refresh": refresh_token})

        try:
            serializer.is_valid(raise_exception=True)
        except TokenError as e:
            raise InvalidToken(e.args[0])

        token = serializer.validated_data
        response = Response({"success": True}, status=status.HTTP_200_OK)

        response.headers["access"] = token["access"]

        return response

원래 이 뷰에서는, 기본으로 지정되어 있는 TokenRefreshSerializer에 RefreshToken을 데이터로 넣어주기만 하면 토큰을 리프레싱 해줍니다. 하지만 이번 프로젝트를 진행하면서 RefreshToken은 클라이언트 측에 전해주지 않고 서버 측에서 관리하기로 했는데요. 따라서, AccessToken의 페이로드에 들어있는 유저 정보를 통해 RefreshToken을 식별해서 리프레싱해줘야 합니다.

simple-jwt에서는 AccessToken을 디코드할 때 만료기한이 지났을 경우 바로 401 상태 코드를 반환하게 되어 있습니다.

유저가 보낸 AccessToken은 이미 만료되어 있기 때문에 이 뷰에 접근을 요청했잖아요!!

네, 그래서 verify=False 옵션을 반드시 명시해서 만료된 AccessToken에서도 유저 정보를 식별할 수 있어야합니다.

RefreshToken은 어디에서 받아 오나요?

AccessToken의 만료기한을 30분으로 정해놨는데, RefreshToken을 현재 사용중인 MySQL의 테이블에 넣어놓고 관리하면 자주 I/O가 생길 것 같았고 RefreshToken의 만료기한이 지났을 때 따로 삭제해줘야 했습니다.(스케쥴러 등을 통해)

그래서 Redis에 토큰을 저장하고, expire로 만료기한을 지정하는 방법을 사용해보기로 했습니다.

AccessToken의 payload에서 가져온 유저의 식별정보로 RefreshToken을 DB에서 받아옵니다.

RefreshToken이 존재한다면 시리얼라이저를 통해 새로운 AccessToken을 발급해주고, 존재하지 않는다면 두 번째 401 상태 코드를 반환합니다.

이렇게 인증/인가 로직을 구현해봤습니다. 제 부족한 점이나 궁금하신 점이 있으시다면 편하게 의견 남겨주세요!!

profile
가치를 창출하는 개발자! 가 목표입니다

0개의 댓글