[Flask] Flask-JWT-Extended JWT ํ† ํฐ

ํ•˜๋ณต์น˜ยท2023๋…„ 4์›” 26์ผ

PYTHON

๋ชฉ๋ก ๋ณด๊ธฐ
2/5

Flask-JWT-Extended ๊ธฐ๋ณธ ์„ค์ •

  • ์„ค์น˜ pip install flask-jwt-extended
  • config.py์— jwt ํ† ํฐ์„ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•œ Secret Key์™€ ํ† ํฐ ๋งŒ๋ฃŒ์‹œ๊ฐ„์„ ๋“ฑ๋กํ•œ๋‹ค.
JWT_SECRET_KEY = ''  # string ์œผ๋กœ ์ง์ ‘ ์ •์˜
JWT_ACCESS_TOKEN_EXPIRES = datetime.timedelta(minutes=int)  # access ํ† ํฐ ๋งŒ๋ฃŒ ์‹œ๊ฐ„
JWT_REFRESH_TOKEN_EXPIRES = datetime.timedelta(minutes=int) # refresh ํ† ํฐ ๋งŒ๋ฃŒ ์‹œ๊ฐ„

๋กœ๊ทธ์ธ

# ๋กœ๊ทธ์ธ ์‹œ๋„ ํšŸ์ˆ˜
login_attempts = {}
login_attempts_limit = 5 

@bp.route('/login', methods=['POST', 'GET'])
def login():
    if request.method == 'POST':
        user_id = request.get_json()['userId']
        user_pw = request.get_json()['pw']

        # ํ•ด๋‹น ์œ ์ € ์•„์ด๋”” ์กฐํšŒ
        user = User.query.filter_by(user_id=user_id).first() 

        # ๋กœ๊ทธ์ธ 5ํšŒ ์‹คํŒจ ์‹œ 
        login_attempts[user_id] = login_attempts.get(user_id, 0)
        if login_attempts[user_id] >= login_attempts_limit:
            return 'Login attempts exceeded limit'

        if user:
            if check_password_hash(user.pwd.decode('utf-8'), user_pw):
                # ๋กœ๊ทธ์ธ ์„ฑ๊ณต ์‹œ ํ•ด๋‹น ์œ ์ € ์•„์ด๋”” ๋กœ๊ทธ์ธ ์‹œ๋„ ํšŸ์ˆ˜ ์ดˆ๊ธฐํ™”
                login_attempts.pop(user_id, None)

                # ํ† ํฐ ๋ฐœ๊ธ‰
                access_token = create_access_token(identity=user_id)
                refresh_token = create_refresh_token(identity=user_id)

                # ๋ฆฌํ”„๋ ˆ์‹œ ํ† ํฐ ์ •๋ณด
                refresh_token_data = decode_token(refresh_token)  # ํ† ํฐ ๋””์ฝ”๋”ฉ
                created_at = datetime.fromtimestamp(refresh_token_data['iat'])  # ํ† ํฐ ์ƒ์„ฑ์ผ
                expired_at = datetime.fromtimestamp(refresh_token_data['exp'])  # ํ† ํฐ ๋งŒ๋ฃŒ์ผ
                is_revoked = False  # ํ† ํฐ ์ทจ์†Œ ์—ฌ๋ถ€

                # ๋ฆฌํ”„๋ ˆ์‹œ ํ† ํฐ ์ •๋ณด DB์ €์žฅ  
                existing_token = Token.query.filter_by(user_id=user_id).first()  
                if existing_token:  # ํ•ด๋‹น ์•„์ด๋””๋กœ ์ €์žฅ๋œ ํ† ํฐ์ด ์ด๋ฏธ ์žˆ์œผ๋ฉด ์—…๋ฐ์ดํŠธ
                    existing_token.token = refresh_token
                    existing_token.created_at = created_at
                    existing_token.expired_at = expired_at
                    existing_token.is_revoked = is_revoked
                    db.session.commit()
                else: 
                    new_token = Token(token=refresh_token, user_id=user_id, created_at=created_at, expired_at=expired_at, is_revoked=is_revoked)
                    db.session.add(new_token)
                    db.session.commit()
                    
                response={
                    'loginRes': "Y",
                    'access': access_token,
                    'refresh': refresh_token,
                    'resultCode': 200,
                    'resultDesc': "Success",
                    'resultMsg': f'{user.user_name}๋‹˜ ํ™˜์˜ํ•ฉ๋‹ˆ๋‹ค.'}
                return  Response(response=json.dumps(response), status=200, mimetype="application/json")
            else:
                # ๋กœ๊ทธ์ธ ์‹คํŒจ ์‹œ ๋กœ๊ทธ์ธ ์‹œ๋„ ํšŸ์ˆ˜ +1
                login_attempts[user_id] = login_attempts.get(user_id, 0) + 1
                response={
                'loginRes': "N",
                'access': "",
                'refresh': "",
                'resultCode': 401,
                'resultDesc': "Unauthorized",
                'resultMsg': "๋น„๋ฐ€๋ฒˆํ˜ธ๋ฅผ ๋‹ค์‹œ ํ™•์ธํ•ด์ฃผ์„ธ์š”."
                }
                return Response(response=json.dumps(response), status=401, mimetype="application/json")

        else: 
            # ๋กœ๊ทธ์ธ ์‹คํŒจ ์‹œ ๋กœ๊ทธ์ธ ์‹œ๋„ ํšŸ์ˆ˜ +1
            login_attempts[user_id] = login_attempts.get(user_id, 0) + 1
            
            response={
            'loginRes': "N",
            'access': "",
            'refresh': "",
            'resultCode': 404,
            'resultDesc': "Not Found",
            'resultMsg': "์ผ์น˜ํ•˜๋Š” ํšŒ์› ์ •๋ณด๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค."
            }
            return Response(response=json.dumps(response), status=404, mimetype="application/json")
    else:
        return 'login.html ํŒŒ์ผ ์š”์ฒญ'
  • ๋กœ๊ทธ์ธ ์„ฑ๊ณต


๋กœ๊ทธ์ธ์ด ํ•„์š”ํ•œ API ์š”์ฒญํ•œ ๊ฒฝ์šฐ

jwt_required()๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ api ํ•จ์ˆ˜๋ฅผ ์‹คํ–‰ํ•˜๊ธฐ ์ „ ํ† ํฐ์ด ์œ ํšจ์„ฑ์„ ๊ฒ€์ฆํ•˜๊ณ  ์œ ํšจํ•œ ๊ฒฝ์šฐ์—๋งŒ ํ•ด๋‹น ํ•จ์ˆ˜๋ฅผ ์‹คํ–‰ํ•œ๋‹ค.

@bp.route('/my_page', methods=['GET'])
@jwt_required()  # access ํ† ํฐ์˜ ์œ ํšจ์„ฑ์„ ๊ฒ€์ฆ ํ›„ ํ† ํฐ์ด ์œ ํšจํ•˜๋ฉด ์ฝ”๋“œ ์ˆ˜ํ–‰
def my_page():
    current_user = get_jwt_identity()
    return jsonify(msg=f'Welcome, {current_user}')

ํ† ํฐ์ด ๋งŒ๋ฃŒ๋œ ๊ฒฝ์šฐ

@bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
    refresh_token = request.headers['Authorization'].replace('Bearer ', '')

    # DB์— ์žˆ๋Š” ๋ฆฌํ”„๋ ˆ์‹œ ํ† ํฐ ์ƒํƒœ ๊ฐ€์ ธ์˜ค๊ธฐ
    is_revoked = Token.query.filter_by(token=refresh_token).first().is_revoked

    # ๋ฆฌํ”„๋ ˆ์‹œ ํ† ํฐ์ด DB์—์„œ ์ทจ์†Œ๋œ ๊ฒฝ์šฐ(is_revoked==True)
    if is_revoked:
        response={
            'resultCode': 401,
            'resultDesc': "Unauthorized",
            'resultMsg': "๋กœ๊ทธ์ธ์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค."
            }
        return Response(response=json.dumps(response), status=401, mimetype="application/json")
        # return url_for()
    else: 
        current_user = get_jwt_identity()
        access = create_access_token(identity=current_user)
        response={
            'resultCode': 200,
            'resultDesc': "Success",
            'access': access
            }
        return Response(response=json.dumps(response), status=200, mimetype="application/json")
  • Refresh Token ๋งŒ๋ฃŒ

  • Access Token ์žฌ๋ฐœ๊ธ‰ ์„ฑ๊ณต

  • Refresh Token ๊ถŒํ•œ ์ทจ์†Œํ•œ ๊ฒฝ์šฐ


๋กœ๊ทธ์•„์›ƒ

ํ† ํฐ์„ ์ €์žฅํ•˜๊ธฐ ์œ„ํ•œ jwt_blocklist๋ฅผ ์ƒ์„ฑํ•˜์—ฌ ์‚ฌ์šฉํ•œ๋‹ค.

jwt_blocklist = set()

@bp.route('/logout', methods=['POST'])
@jwt_required()  
def logout():
    jti = get_jwt()['jti']  # jti : ํ† ํฐ์„ ๊ณ ์œ ID๋กœ ์ €์žฅ
    jwt_blocklist.add(jti)  # jwt_blocklist : ํ† ํฐ์˜ ๊ณ ์œ ID, ํ† ํฐ ์œ ์ง€ ๊ธฐ๊ฐ„, ํ† ํฐ ์œ ์ง€ ๊ธฐ๊ฐ„ ์„ค์ • ์—ฌ๋ถ€
    return jsonify(msg='Successfully logged out.')  # jwt_bloacklist์— jti๋งŒ ๋„ฃ์–ด์ฃผ๊ณ  ๋‚˜๋จธ์ง€ ์ƒ๋žตํ•˜๋ฉด ํ† ํฐ ์ฆ‰์‹œ ํŒŒ๊ดด
  • ๋กœ๊ทธ์•„์›ƒ ์„ฑ๊ณต

0๊ฐœ์˜ ๋Œ“๊ธ€