여기는 보이지 않지만 project 폴더안에 아래 구조를 갖고 있습니다.
project folider
routes/auth.py
from datetime import datetime, timedelta
import bcrypt
import jwt
from fastapi import APIRouter, Depends
# TODO:
from sqlalchemy.orm import Session
from starlette.responses import JSONResponse
from app.common.consts import JWT_SECRET, JWT_ALGORITHM
from app.database.conn import db
from app.database.schema import Users
from app.models import SnsType, Token, UserToken
"""
1. 구글 로그인을 위한 구글 앱 준비 (구글 개발자 도구)
2. FB 로그인을 위한 FB 앱 준비 (FB 개발자 도구)
3. 카카오 로그인을 위한 카카오 앱준비( 카카오 개발자 도구)
4. 이메일, 비밀번호로 가입 (v)
5. 가입된 이메일, 비밀번호로 로그인, (v)
6. JWT 발급 (v)
7. 이메일 인증 실패시 이메일 변경
8. 이메일 인증 메일 발송
9. 각 SNS 에서 Unlink
10. 회원 탈퇴
11. 탈퇴 회원 정보 저장 기간 동안 보유(법적 최대 한도차 내에서, 가입 때 약관 동의 받아야 함, 재가입 방지 용도로 사용하면 가능)
"""
router = APIRouter()
@router.post("/register/{sns_type}", status_code=200, response_model=Token)
async def register(sns_type: SnsType, reg_info: models.UserRegister, session: Session = Depends(db.session)):
"""
회원가입 API
:param sns_type:
:param reg_info:
:param session:
:return:
"""
if sns_type == SnsType.email:
is_exist = await is_email_exist(reg_info.email)
if not reg_info.email or reg_info.pw:
return JSONResponse(status_code=400, content=dict(msg="Email and PW must be provided'"))
if is_exist:
return JSONResponse(status_code=400, content=dict(msg="EMAIL_EXISTS"))
hash_pw = bcrypt.hashpw(reg_info.pw.encode("utf-8"), bcrypt.gensalt())
new_user = Users.create(session, auto_commit=True, pw=hash_pw, email=reg_info.email)
token = dict(Authorization=f"Bearer {create_access_token(data=UserToken.from_orm(new_user).dict(exclude={'pw', 'marketing_agree'}),)}")
return token
return JSONResponse(status_code=400, content=dict(msg="NOT_SUPPORTED"))
@router.post("/login/{sns_type}", status_code=200)
async def login(sns_type: SnsType, user_info: models.UserRegister):
...
async def is_email_exist(email: str):
get_email = Users.get(email=email)
if get_email:
return True
return False
def create_access_token(*, data: dict = None, expires_delta: int = None):
to_encode = data.copy()
if expires_delta:
to_encode.update({"exp": datetime.utcnow() + timedelta(hours=expires_delta)})
encoded_jwt = jwt.encode(to_encode, JWT_SECRET, algorithm=JWT_ALGORITHM)
return encoded_jwt
register/{sns_type} 엔드 포인트가 보일텐데요. path 파라미터로 받았어요.
register함수의 매개변수의 reg_info는 models.py에서 Pydantic.main.BaseModel을 상속 받아서 작성된 UserRegister클래스를 data validation으로 검증을 하게되요.
참고로 파일들 왔다갔다 하는 점 유의해주세요.
models.py로 한번 가서 구체적으로 코드를 볼게요.
정의된 클래스들이 실제적으로 작동하는 이유는 Json으로 들어오고 나가는 모든 데이터를 BaseModel로 객체화 시켜 사용할 수 있게 하기위해 만들어 주는 것이에요.
오해 할 수 있는 부분이 있는데요. 파이썬에서 "email: str = 'email'"과 같이 타입을 지정한다고 해서 속도가 빨라지는 것은 절대 아닙니다. 왜냐?! 인터프리터 언어이기 때문이조.
가장 큰 목적은 가독성 때문이 큽니다.
pip installl 'pydantic[email]'을 콘솔에서 다운로드 해줘야해요. 그리고 따옴표는 필수로 입력할게요.
더불어 pip install PyJWT 도 실행시켜 jwt도 사용할 수 있도록 할게요.
그리고 pip install bcrypt도 설치할게요.(비밀번호를 단방향 해쉬함수인 bcrypt패키지를 통해서 암호화하기 위함)
from enum import Enum
from pydantic.main import BaseModel
from pydantic.networks import EmailStr
class UserRegister(BaseModel):
# pip install 'pydantic[email]'
email: EmailStr = None
pw: str = None
class SnsType(str, Enum):
email: str = "email"
facebook: str = "facebook"
google: str = "google"
kakao: str = "kakao"
class Token(BaseModel):
Authorization: str = None
class UserToken(BaseModel):
id: int
pw: str = None
email: str = None
name: str = None
phone_number: str = None
profile_img: str = None
sns_type: str = None
class Config:
orm_mode = True
Token클래스는 Response모델이에요. 요청이 왔을때 해당 응답으로 던져주는 데이터의 형식이 바로 Token클래스라는점. auth.py를 보게되면 regiser 함수의 @router.post()의 매개변수의 response_model=Token
이 보 일거에요. 즉, register 메서드가 반환할 값의 틀을 정해주는 격이조.
다시 auth.py에 대한 설명이어 갈게요.
임포트된 부분이
import bcrypt
import jwt
from fastapi import APIRouter, Depends
# TODO:
from sqlalchemy.orm import Session
from starlette.responses import JSONResponse
from app.common.consts import JWT_SECRET, JWT_ALGORITHM
from app.database.conn import db
from app.database.schema import Users
from app.models import SnsType, Token, UserToken
...
...
jwt에 대한 설명은 생략하고 혹시 필요하다면 구글링을 통해서 설명과 사용방법을 익히는 것으로 합니다.
common/consts.py 파일작성을 할게요.
consts.py의 용도는 항상 변하지 않는 값들을 정의하기 위함이에요.
이와 반대로 config.py는 환경별(프로덕션, 로컬, 개발, 스테이징)로 다르게 적용하기 위함과는 대비되조.
하지만 추후 consts.py에는 JWT_SECRET을 넣지 않으거에요.
AWS의 Secret manager라는 서비스를 이용해서 jwt secret를 동적으로 가져오게 할 수 있어요.
# jwt을 encode, decode하기 위한 상수값을 정의할게요.
JWT_SECRET = "1q2w3e4r!@#"
JWT_ALGORITHM = "HS256"
이제 register 함수의 비즈니스 로직을 살펴볼게요.
...
...
...
router = APIRouter()
@router.post("/register/{sns_type}", status_code=200, response_model=Token)
async def register(sns_type: SnsType, reg_info: models.UserRegister, session: Session = Depends(db.session)):
"""
회원가입 API
:param sns_type:
:param reg_info:
:param session:
:return:
"""
if sns_type == SnsType.email:
is_exist = await is_email_exist(reg_info.email)
if not reg_info.email or reg_info.pw:
return JSONResponse(status_code=400, content=dict(msg="Email and PW must be provided'"))
if is_exist:
return JSONResponse(status_code=400, content=dict(msg="EMAIL_EXISTS"))
hash_pw = bcrypt.hashpw(reg_info.pw.encode("utf-8"), bcrypt.gensalt())
new_user = Users.create(session, auto_commit=True, pw=hash_pw, email=reg_info.email)
token = dict(Authorization=f"Bearer {create_access_token(data=UserToken.from_orm(new_user).dict(exclude={'pw', 'marketing_agree'}),)}")
return token
return JSONResponse(status_code=400, content=dict(msg="NOT_SUPPORTED"))
...
...
...
if sns_type == SnsType.email: 부분은 SnsType클래스를 Enum클래스를 이용해서 만들었다는 사실을 일단 참고하세요.
만약 '/register/{sns_type}'에서 K(kakao)가 들어가게 되면 SnsType.email의 값은 SnsType.K or SnsType.G(github) or SnsType.FB(facebook)과 같이 값을 뱉어내요.
다음 코드, is_exist = await is_email_exist(reg_info.email)
우항은 호출부인데요. 정의한 부분은 아래와 같아요. DB에서 해당 email일이 있는지 없는지 별도의 헬퍼 함수로 빼버립니다. 왜냐? 다른 곳에서 자주 사용할 것으로 예측되기 때문입니다.
async def is_email_exist(email: str):
get_email = Users.get(email=email)
if get_email:
return True
return False
r그리고 아래 reg_info
가 있는데요. 함수의 매개변수에서 가져오게되요.
UserRegister를 통해서 직렬화하여 python 객체로 가져올테고 말이조.
혹여 둘중 email or pw 입력이 되지 않았다면 JSONResponse()로 status_code=400과 content키워드 인자를 딕셔너리 값으로 return 키워드를 이용하여 돌려주게 됩니다.
(추후 return대신 raise로 오류를 띄워 미들웨어에서 처리하도록 할 거에요.)
bcrypt를 통한 hash알고리즘의 자세한 설명은 생략하도록 하겠습니다.
대신 짧게 말하자면, hashpw()는 매개변수를 첫번째에 바이트값, 두번째 salting, 세번째 사용할 알고리즘(ex. HS256)를 넣어 shakit shakeit해서 단방향 헤쉬함수로써 만든 복호화가 불가한 비밀번호를 만들게 되요.
new_user = Users.create(session, auto_commit=True, pw=hash_pw, email=reg_info.email)
이 코드는 기존에는
new_user = Users().create(~~) 이렇게 클래스를 호출하고 인스턴스 메소드를 호출해서 사용했는데요. 하지만 그냥 클래스메소드로 호출하는 것이 더 효율적이라 개선했어요.
왼쪽이 개선한 것 오른쪽이 이전 작성 코드에요.
@classmethod 데코레이터를 메서드 위에 위치시키고 기존 self 키워드를 cls로 대체하고 또한 obj 변수를 cls()를 통해서 할당하여 사용 할수 있게 했어요.
어찌보면 django처럼 보일 수 있지만 여러 패턴을 익히는 측면에서 매우 좋아요.
token = dict(Authorization=f"Bearer {create_access_token(data=UserToken.from_orm(new_user).dict(exclude={'pw', 'marketing_agree'}),)}")
전 이 줄을 보고 뭐이리 길어! 했지만 뜯어보니깐.
django의 ORM과 닮은 부분이 많더군요.
우선 아래 pydantic의 문법을 많이 이용했더군요. 왜냐하면 UserToken의 DNA(부모)가 BaseModel이기 때문이조.
from_orm()의 인자를 db에서 반환 받은 객체를 위치시키고.객체의 dict()메서드를 사용하여 비밀번호나 marketing_agree과 같은 클래스 변수를 제외시켜 나머지 정보를 반환 하게되요.
참고 링크 - https://pydantic-docs.helpmanual.io/usage/exporting_models/
그리고 내부 로직중 string부분에서 create_access_token()호출부가 있는데요. 해당 정의부를 볼게요. 처음 매개변수가 *가 왔는데. 이건 다음에 오게되는 매개변수는 반드시 키워드인자 name='이름'과 같은 형식으로 키와 벨류 형식으로 와야 해요 그렇지 않으면 오류를 뿜어냅니다.
uvicorn app.main:app --reload
서버를 구동하고 endpoint로 넘어간 다음 스웨거에서 아래와 같이 테스트하면 토큰이 생성되요.
디비에서도 물론 데이터가 이상없이 잘 저장된 것이 확인되네요.
회원가입을 통해서 디비에 정보를 저장하고 로그인 인증을 위한 토큰을 발급 받은 것까지 했어요.
우선 app/database/schema.py로 갈게요.
get 클래스 메소드를 만들었어요. 지난 번 create()클래스 메소드를 만든것과 크게 다르지는 않게 구조적으로 짯지만.차이점은 군데 군데 있어요.
...
...
...
@classmethod
def get(cls, **kwargs):
"""
Simply get a Row
:param kwargs:
:return:
"""
session = next(db.session())
query = session.query(cls)
for key, val in kwargs.items():
col = getattr(cls, key)
query = query.filter(col == val)
if query.count() > 1:
raise Exception("Only one row is supposed to be returned, but got more than one.")
return query.first()
...
...
여기서 없는 이유는 commit작업이 이루어 지지 않고 조회만 하기에 그렇습니다.
query()메서드에 cls를 넣어서 해당 클래스와 매핑된 정보를 받아와서 query 변수에 할당합니다.
특히 query.count()가 1보다 큰 값일 경우 데이터 무결성에 위배되기에 오류를 발생시키게 소스코드를 짜야해요.
현재는 get까지만 되었지만 filter class method 구현을 통해 더 많은 쿼리셋 정보를 가져 오게 할거에요.
@router.post("/login/{sns_type}", status_code=200)
async def login(sns_type: SnsType, user_info: UserRegister):
if sns_type == SnsType.email:
is_exist = await is_email_exist(user_info.email)
if not user_info.email or not user_info.pw:
return JSONResponse(status_code=400, content=dict(msg="Email and PW must be provided'"))
if not is_exist:
return JSONResponse(status_code=400, content=dict(msg="NO_MATCH_USER"))
user = Users.get(email=user_info.email)
is_verified = bcrypt.checkpw(user_info.pw.encode("utf-8"), user.pw.encode('utf-8'))
if not is_verified:
return JSONResponse(status_code=400, content=dict(msg='NO_MATCH_USER'))
token = dict(Authorization=f"Bearer {create_access_token(data=UserToken.from_orm(user).dict(exclude={'pw', 'marketig_agree'}),)}")
return token
return JSONResponse(status_code=400, content=dict(msg="NOT_SUPPORTED"))
회원 가입 비즈니스 로직과 매우 비슷한데요. 하지만
차이점은 bcrypt.checkpw를 사용한 부분인데요. 즉, request body에서 가져온 데이터와 db에서 가져온 데이터를 byte로 변환하여 비교하는 메서드에요.
잘 나오네요.