[십자말풀이] 회원가입/로그인과 소셜 로그인 JWT로 구현하기

강태원·2024년 10월 26일
0

십자말풀이 게임

목록 보기
5/6

작업 레포지토리 : https://github.com/fnzksxl/word-puzzle

이번 포스트에서는 FastAPI 서버에 회원가입/로그인과 소셜 로그인을 구현하고 JWT로 인증하는 기능을 작성해보도록 하겠습니다.

JWT와 쿠키를 사용하는 이유

참고 링크 잘 정리되어있는 블로그 글이 있어서 링크를 걸어두도록 하겠습니다.

프로젝트 구조 변경

역할 별로 폴더를 나누던 구조에서 기능 별로 폴더를 나누는 구조로 변경했습니다.

변경 전

ROOT
ㄴ app/
  ㄴ utils/
   ㄴ puzzle.py
   ㄴ ...
 ㄴ api/
  ㄴ v1/
   ㄴ puzzle.py
   ㄴ ...
 ㄴ schema/

...

변경 후

ROOT
ㄴ app/
  ㄴ api/
   ㄴ v1/
    ㄴ puzzle/
     ㄴ controller.py
     ㄴ schema.py
     ㄴ ...
    ㄴ auth/
     ㄴ controller.py
     ㄴ service.py
     ㄴ ...

...

1. 회원가입/로그인 구현

# jwt.py
# models.py
class BaseDate(BaseMin):
    created_at = Column(DATETIME, default=func.now())
    updated_at = Column(DATETIME, default=func.now(), onupdate=func.now())

class User(BaseDate, Base):
    __tablename__ = "user"

    email = Column(VARCHAR(20), unique=True, nullable=False)
    password = Column(VARCHAR(70), nullable=True)
    nickname = Column(VARCHAR(10), nullable=False)
    solved = Column(Integer, default=0)

    def as_dict(self):
        return {column.name: getattr(self, column.name) for column in self.__table__.columns}

User 테이블을 추가해주겠습니다.

JWT 암호화 및 해독을 담당할 관련 서비스와
쿠키에 토큰을 달아줄 서비스를 작성하겠습니다.

class JWTService:
    """
    JWT 암호화 및 해독에 필요한 기능을 제공하는 클래스
    """

    def __init__(self):
        self.algorithm = settings.ALGORITHM
        self.secret_key = settings.SECRET_KEY
        self.access_expire_time = settings.ACCESS_EXPIRE_TIME
        self.refresh_expire_time = settings.REFRESH_EXPIRE_TIME

    def _encode(self, data: dict, expires_delta: int) -> str:
        """
        JWT 토큰으로 암호화한다.

        Args:
            data (dict): 암호화 할 데이터
            expires_delta (int): JWT 만료기간
        Returns:
            str: JWT 토큰
        """
        to_encode = data.copy()
        expire = datetime.now(ZoneInfo("Asia/Seoul")) + timedelta(minutes=expires_delta)
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)

    def _decode(self, token: str) -> Optional[Dict]:
        """
        JWT 토큰을 해독한다.

        Args:
            token (str): JWT 토큰
        Returns:
            Dict or None: 해독 성공 시 정보를 담은 사전, 실패 시 None
        """
        try:
            return jwt.decode(token, self.secret_key, algorithms=self.algorithm)
        except JWTError:
            return None

    def _create_token(self, data: dict, expires_delta: int) -> str:
        """
        JWT 토큰 암호화 래퍼함수
        """
        return self._encode(data, expires_delta)

    def create_access_token(self, data) -> str:
        """
        Access Token 생성 함수
        """
        return self._create_token(data, self.access_expire_time)

    def create_refresh_token(self, data) -> str:
        """
        Refresh Token 생성 함수
        """
        return self._create_token(data, self.refresh_expire_time)

    def check_is_expired(self, token: str) -> Optional[Dict]:
        """
        JWT 토큰 해독 래퍼함수
        """
        payload = self._decode(token)

        now = datetime.timestamp(datetime.now(ZoneInfo("Asia/Seoul")))
        if payload and payload["exp"] < now:
            return None

        return payload
        
# cookie.py
class Cookie:
    """
    응답 객체의 쿠키 관련 작업을 담당하는 클래스
    """

    async def attach_token_into_cookie(
        self, response: JSONResponse, access_token: str, refresh_token: str
    ) -> JSONResponse:
        """
        쿠키에 Access, Refresh Token을 부착하는 메소드

        Args:
            response (JSONResponse): 쿠키 생성해줄 response 객체
            access_token (str): Access Token JWT
            refresh_token (str): Refresh Token JWT
        Returns:
            JSONResponse: 쿠키가 부착된 JSONResponse 객체
        """
        response.set_cookie(
            key="access",
            domain="localhost",
            samesite="None",
            value=access_token,
        )
        response.set_cookie(
            key="refresh",
            domain="localhost",
            samesite="None",
            value=refresh_token,
        )
        return response

config.py와 .env 파일에 필요한 설정들을 추가되어 있는게 전제입니다.
위에 작성된 두 클래스를 Auth 서비스에 주입해 내부 메소드처럼 사용할 예정입니다.

# auth.py
class AuthBase(ABC):
    """
    AuthService의 기초가 되는 추상클래스
    """

    def __init__(self):
        self.jwt_service = JWTService()
        self.cookie_service = Cookie()

    @abstractmethod
    async def register(self):
        """
        회원가입을 담당하는 추상 메소드
        """
        pass

    @abstractmethod
    async def login(self):
        """
        로그인을 담당하는 추상 메소드
        """
        pass

    async def _get_login_response(self, user) -> JSONResponse:
        """
        로그인 성공 후 응답을 생성하는 메소드
        Args:
            user (User): 로그인한 유저의 User 객체
        Returns:
            JSONResponse: Access, Refresh Token이 쿠키에 들어간 응답
        """
        self.user_dict = user.as_dict()
        self.user_dict.pop("password")
        self.user_dict.pop("created_at")
        self.user_dict.pop("updated_at")

        response = JSONResponse(content=self.user_dict)
        return await self._attach_token(response, self.user_dict)

    async def _attach_token(self, response, data) -> JSONResponse:
        """
        응답 쿠키에 Access, Refresh Token을 부착하는 메소드
        Args:
            response (JSONResponse): content로 유저 정보가 들어간 응답 객체
            data: Access, Refresh Token
        Returns:
            JSONResponse: Access, Refresh Token이 쿠키에 들어간 응답
        """
        access_token = self.jwt_service.create_access_token(data)
        refresh_token = self.jwt_service.create_refresh_token(data)
        return await self.cookie_service.attach_token_into_cookie(
            response, access_token, refresh_token
        )

Auth 서비스의 추상 클래스를 먼저 선언해줬습니다.

JWTService, Cookie 클래스가 init 함수에서 주입되었고,
로그인 후 응답을 생성하는 메소드와 응답 쿠키에 토큰을 달아주는 메소드는
정의되어 있는 상태입니다.

# exception.py
class LoginNotValidIDPWException(HTTPException):
    def __init__(self, detail: str = "입력한 ID 또는 비밀번호가 일치하지 않습니다."):
        super().__init__(status_code=400, detail=detail)


# service.py
class GeneralAuthService(AuthBase):
    """
    아이디(이메일), 비밀번호로 회원가입/로그인, 인증 서비스 클래스
    """

    def __init__(self, email: str, password: str, db: Session, nickname: Optional[str] = None):
        """
        입력받은 이메일, 비밀번호, 닉네임을 초기화한다.
        """
        self.email = email
        self.password = password
        self.nickname = nickname
        self.db = db
        super().__init__()

    async def register(self) -> Dict:
        """
        비밀번호를 해쉬화해서 데이터베이스에 유저 정보를 저장하는 메소드

        Returns:
            dict: 필요한 유저 정보만 담아 반환되는 딕셔너리 자료
        """
        user = User(email=self.email, password=await self._hash_pw(), nickname=self.nickname)
        self.db.add(user)
        self.db.commit()
        
        return await self._get_login_response(user)

    async def login(self) -> JSONResponse:
        """
        이메일과 비밀번호로 유저 정보를 확인하여 로그인하는 메소드

        Returns:
            JSONResponse: 쿠키에 토큰 정보, 콘텐츠에 유저 정보를 담은 response 객체
        """
        user = self.db.query(User).filter(User.email == self.email).first()
        if (
            user
            and user.password
            and bcrypt.checkpw(self.password.encode(), user.password.encode())
        ):
            return await self._get_login_response(user)
        raise LoginNotValidIDPWException()

    async def _hash_pw(self) -> str:
        """
        비밀번호를 해쉬화 해 반환한다.

        Returns:
            str: 해쉬화 된 비밀번호
        """
        salt_value = bcrypt.gensalt()
        return bcrypt.hashpw(self.password.encode(), salt_value)

저번에 작성했던 PuzzleCreateService와 init 함수 부분이 미묘하게 다릅니다.
퍼즐 서비스에서는 DB 세션을 바로 DI 받았었는데, 여기서는 안 받고있네요.

퍼즐 서비스에서는 SIZE 파라미터를 쿼리 형태로 받아왔습니다.
하지만, 회원가입/로그인할 때 사용할 데이터를 쿼리로 받아올 수는 없겠죠.

저는 JSON Body 형태로 필요 데이터를 받아오기 위해서 의존성 주입 함수를 따로 작성해줬습니다.

# schema.py
class GeneralLoginModel(BaseModel):
    email: str
    password: str


class GeneralRegisterModel(GeneralLoginModel):
    nickname: str

# dependancy.py
def get_general_auth_service_register(
    auth_data: GeneralRegisterModel, db: Session = Depends(get_db)
) -> GeneralAuthService:
    return GeneralAuthService(
        email=auth_data.email, password=auth_data.password, nickname=auth_data.nickname, db=db
    )


def get_general_auth_service_login(
    auth_data: GeneralLoginModel, db: Session = Depends(get_db)
) -> GeneralAuthService:
    return GeneralAuthService(email=auth_data.email, password=auth_data.password, db=db)

이 함수들을 엔드포인트에서 DI 해주면, JSON Body으로 데이터를 받아오는 동시에 DB 세션을 서비스에 주입해줄 수 있습니다.

# controller.py

@router.post(
    "/general-register", status_code=status.HTTP_201_CREATED, description="EMAIL/PW로 가입"
)
async def general_register(
    auth_service: GeneralAuthService = Depends(get_general_auth_service_register),
):
    return await auth_service.register()


@router.post("/general-login", status_code=status.HTTP_200_OK, description="EMAIL/PW로 로그인")
async def general_login(auth_service: GeneralAuthService = Depends(get_general_auth_service_login)):
    return await auth_service.login()

TEST

회원가입/로그인 후에 JWT가 생성되어 쿠키에 담겨 반환되었는지 알아보기 위해
POSTMAN을 사용해서 테스트 해보도록 하겠습니다.

쿠키 access, refresh에 데이터가 잘 들어와있는 모습을 볼 수 있습니다.

2. 소셜 로그인

소셜 로그인 기능 구현에 앞서 소셜 로그인의 큰 틀을 먼저 살펴보겠습니다.

구글 소셜 로그인에 대한 예시를 그림으로 그려보겠습니다.

General Auth와 비교하면 클라이언트에게서 인가 코드를 전달받고 구글에 access token과 유저 정보를 요청하는 로직이 추가되었다고 볼 수 있겠습니다.

# auth.py
class OAuthBase(AuthBase):
    """
    GeneralAuth 서비스를 상속받아 OAuth 서비스의 기초가 되는 추상클래스
    """

    def __init__(self):
        super().__init__()

    @abstractmethod
    async def get_token(self):
        """
        OAuth 유저 정보 획득에 필요한 Token을 획득하는 추상 메소드
        """
        pass

    @abstractmethod
    async def get_userinfo(self):
        """
        Token으로 OAuth 유저 정보를 획득하는 추상 메소드
        """
        pass

기능 확장에 용이함을 주기 위해서 소셜 로그인에서 반드시 필요한 과정인
get_token과 get_userinfo 메소드를 추상메소드로 생성하는
OAuthBase 추상클래스를 작성해줍니다.

# exception.py

class EmailDuplicatedException(HTTPException):
    def __init__(self, detail: str = "중복된 이메일입니다."):
        super().__init__(status_code=400, detail=detail)


class GoogleGetTokenException(HTTPException):
    def __init__(self, detail: str = "구글 토큰 획득 과정에서 오류가 발생했습니다."):
        super().__init__(status_code=400, detail=detail)


class GoogleGetUserInfoException(HTTPException):
    def __init__(self, detail: str = "구글 유저 정보 과정에서 오류가 발생했습니다."):
        super().__init__(status_code=400, detail=detail)


class GoogleRegisterException(HTTPException):
    def __init__(self, detail: str = "구글 소셜 회원가입 과정에서 오류가 발생했습니다."):
        super().__init__(status_code=400, detail=detail)

# service.py

class GoogleOAuthService(OAuthBase):
    def __init__(self, code: str, db: Session = Depends(get_db)):
        """
        구글 소셜 로그인에 필요한 정보를 초기화 한다.
        """
        super().__init__()
        self.provider = "google"
        self.client_id = settings.GOOGLE_CLIENT_ID
        self.redirect_uri = settings.GOOGLE_REDIRECT_URI
        self.client_secret = settings.GOOGLE_CLIENT_SECRET
        self.db = db
        self.code = code
        self.token_request_url = "https://oauth2.googleapis.com/token"
        self.userinfo_endpoint = "https://www.googleapis.com/userinfo/v2/me"

    async def register(self) -> JSONResponse:
        try:
            email = self.oauth_user_info.get("email", None)
            nickname = email.split("@")[0]
            user = User(email=email, nickname=nickname)
            self.db.add(user)
            self.db.flush()

            oauth = OAuth(user_id=user.id, email=user.email, provider=self.provider)
            self.db.add(oauth)

            self.db.commit()
        except Exception:
            raise GoogleRegisterException()

        return await self._get_login_response(user)

    async def login(self) -> JSONResponse:
        """
        소셜 로그인 로직을 수행하는 기초 메소드,
        데이터베이스에 유저 존재 -> 정보 반환
        존재 X -> 저장(회원가입) 후 정보 반환

        Returns:
            JSONResponse: 쿠키에 토큰 정보, 콘텐츠에 유저 정보를 담은 response 객체
        """
        user = await self.is_registered()

        if user is None:
            return await self.register()

        return await self._get_login_response(user)

    async def get_token(self) -> str:
        """
        유저 정보 조회에 필요한 토큰을 받아온다.

        Returns:
            str: 구글 OAuth 서비스에서 받은 토큰
        """
        token_request_payload = {
            "grant_type": "authorization_code",
            "client_id": self.client_id,
            "redirect_uri": self.redirect_uri,
            "code": self.code,
            "client_secret": self.client_secret,
        }

        async with httpx.AsyncClient() as client:
            response = await client.post(self.token_request_url, data=token_request_payload)
        result = response.json()

        if "access_token" in result:
            return result["access_token"]
        else:
            raise GoogleGetTokenException()

    async def get_userinfo(self, token) -> Dict:
        """
        구글에서 받아온 토큰으로 유저 정보를 요청하고 반환한다.

        Args:
            token (str): 구글에서 받아온 토큰
        Returns:
            dict:
        """
        headers = {"Authorization": f"Bearer {token}"}

        async with httpx.AsyncClient() as client:
            response = await client.get(self.userinfo_endpoint, headers=headers)
        if response.status_code == 200:
            return response.json()
        else:
            raise GoogleGetUserInfoException()

    async def is_registered(self) -> Optional[User]:
        """
        가입된 유저라면 유저 정보를, 아니라면 None을 반환한다.

        Returns:
            User or None: 정보가 있다면 User 객체, 아니면 None 반환
        """
        token = await self.get_token()
        user_info = await self.get_userinfo(token)
        return await self.get_user_from_db(user_info)

    async def get_user_from_db(self, user_info):
        """
        구글에서 받은 유저 정보를 데이터베이스에 검색하고
        존재하면 User 객체, 아니면 None을 반환한다.

        Args:
            user_info (dict): 구글에서 받은 유저 정보
        Returns:
            User or None: 정보가 있다면 User 객체, 아니면 None 반환
        """

        user = self.db.query(User).filter(User.email == user_info.get("email")).first()
        if user:
            oauth_user_info = (
                self.db.query(OAuth)
                .filter(OAuth.provider == self.provider, OAuth.email == user.email)
                .first()
            )
            if oauth_user_info:
                return user
            else:
                raise EmailDuplicatedException()
        else:
            self.oauth_user_info = user_info
            return None

그림의 로직대로 진행하되 code를 쿼리 파라미터로 입력받고,
DB 내 유저 정보의 유무에 따라 조회만 할지 저장 할지 결정합니다.

# controller.py

@router.get(
    "/oauth-register/google/callback",
    status_code=status.HTTP_201_CREATED,
    description="구글 소셜 로그인",
)
async def kakao_callback(auth_service: GoogleOAuthService = Depends(GoogleOAuthService)):
    return await auth_service.login()
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>Login Page</title>
    <style>
      body {
        font-family: Arial, sans-serif;
        background-color: #f5f5f5;
        display: flex;
        align-items: center;
        justify-content: center;
        height: 100vh;
        margin: 0;
      }

      .login-container {
        background-color: #fff;
        padding: 20px 30px;
        border-radius: 8px;
        box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
        text-align: center;
      }

      .google-login-btn img {
        width: 200px;
        border-radius: 3px;
        transition: opacity 0.3s;
      }

      .google-login-btn:hover img {
        opacity: 0.8;
      }
    </style>
  </head>

  <body>
    <div class="login-container">
      <a
        href="https://accounts.google.com/o/oauth2/v2/auth?client_id=639700898145-445d540qksvfnm3tg29mkht55ufkfeuv.apps.googleusercontent.com&redirect_uri=http://localhost:8000/api/v1/auth/oauth-register/google/callback&response_type=code&scope=email profile"
        class="github-login-btn"
      >
        <img
          src="https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcReH1nivRV_9yG4wz04xIz1EEh-J69U_2JRaA&s"
          alt="Social Login"
        />
      </a>
    </div>
  </body>
</html>

엔드포인트와 인가 코드를 받아올 간단한 html을 작성한 뒤 테스트 해보겠습니다.

작성한 html 파일을 오픈해서 구글 로고를 눌러줍시다.

저는 이미 로그인 해놨었기 때문에, 데이터를 GeneralAuth와 동일한 형태로 받아오는 모습입니다.
로그인이 되어 있지 않다면 구글 계정을 선택하는 화면으로 넘어갈 것 입니다.

profile
가치를 창출하는 개발자! 가 목표입니다

0개의 댓글

관련 채용 정보