인증은 user의 identification을 확인하는 절차, 일반적으로 웹사이트에서 사용자가 로그인을 하여 아이디와 비번을 확인하는 즉 로그인 기능 endpoint를 의미한다.
시스템적으로 프론트엔드와 벡엔드 api 상에서의 로그인의 다음의 절차를 통해 구현
- 1) 먼저, 사용자 가입 절차를 진행해서 사용자의 아이디와 비밀번호를 생성해야함. (예시로 들고있는 미니터 API의 경우 sign-up 엔드포인트를 통해서 사용자 가입을 할 수 있음.)
사용자의 비밀번호를 데이터베이스에 저장할 때 암호화해서 저장해야하는가?
외부의 해킹 (hacking) 공격에 의해 데이터베이스가 노출되었을 경우에 대비
내부 인력에 의해 데이터베이스가 노출되었을 경우에 대비
사용자의 비밀번호를 암호활 때는 단방향 해쉬함수 (one-way hash function)가 일반적으로 사용
단방향 해쉬함수는 복호화를 할 수 없는 암호화 알고리즘
예를 들어, test password라는 비밀번호를 hash256이라는 단방향 해쉬 함수를 사용하면 0b47~~~4e가 나온다.
이 해쉬 값 (암호화된 값)은 원본 비밀번호 값인 test password로 복호화될 수 없음
import hashlib
m = hashlib.sha256()
m.update(b"test password")
m.hexdigest()
>>> '0b47c69b1033498d5f33f5f7d97bb6a3126134751629f4d0185c115db44c094e'
단방향 해쉬함수 알고리즘은 rainbow attack이라는 방법으로 해킹이 가능하다.
rainbow attack은 미리 해쉬값들을 계산해놓은 테이블인 rainbow table을 먼저 생성 ->
해쉬 함수값을 역추적해서 본래값을 찾아냄
해쉬 함수의 실행 속도가 굉장히 빠르므로 공격자는 매우 빠른 속도로 임의의 문자열의 값을 해쉬화해서 해킹할 대상의 해쉬값과 비교할 수 있고, 이런 방식으로 패스워드를 추측하면 패스워드가 충분히 길거나 복잡하지 않은 경우에는 그리 긴 시간이 걸리지 않음. 따라서 단방향 해쉬 함수의 취약점들을 보완하기위해 일반적으로 salting과 key stretching이라는 두 가지 방법이 사용된다.
salting
실제 비밀번호 값 이외에 랜덤 값을 더해서 해쉬값을 계산
본래 비밀번호에 랜덤 값을 더해서 해쉬화를 하기 때문에 해킹을 당한 경우에도 어느 부분이 실제 비밀번호 값이고 어느 부분이 랜덤 값인 지 알 수가 없다. 이 방식으로 rainbow attack 처럼 미리 해쉬값들을 계산하여 해킹하는 공격들을 무효화시킬 수 있따.
key stretching
비밀번호의 값에 단방향 해쉬값을 계산 한 후, 그 해쉬값에 대하여 다시 단방향 해쉬값을 계산한다.
pip install bcrypt
import bcrypt # 해시화하고자 하는 값과, salt 값을 파라미터로 받음 bcrypt.hashpw(password=b"secrete password", salt=bcrypt.gensalt()) >> b'$2b$12$pFtEBNg2rYhpQLVUT0BEde01qVaJtmzd7XAUW6u.oS2x2.ObzZ9SW'
사용자가 로그인에 성공한 후에는 백엔드 API 서버는 access token이라고 하는 데이터를 프론트엔드에 전송하고, 프론트엔드에서는 해당 사용자의 access token을 HTTP 요청 (request)에 첨부해서 서버에 전송함.
HTTP는 stateless로 각각의 HTTP 통신은 독립적이며 그러므로 이전에 어떠한 HTTP 통신이 실행되었는 지 알 수 없음. 즉, 이미 인증이 진행되었는 지 알 수 없음.
위와 같은 이유로 HTTP 통신을 할 때는 해당 HTTP 요청 (request)를 처리하기위해서 필요한 모든 데이터를 첨부해서 요청을 보내야함.
로그인 정보도 마찬가지로 로그인 정보 또한 HTTP 요청에 첨부해서 보내야 API 서버에서 해당 사용자가 이미 로그인된 상태인 것을 알 수 있음
api 서버에서 사용자의 로그인 정보를 access token의 형태로 생성하여 프론트엔드 서버에 전송하면 프론트엔드 서버는 백엔드 서버로부터 전송받은 acess token을 그대로 다시 백엔드 서버에 HTTP 요청을 보낼 때 첨부해서 보내며, 이 access token을 통해 백엔드 api 서버에서는 해당 사용자의 로그인 여부를 알 수 있음.
acess token을 생성해내는 방법중 가장 많이 사용되는 JSON 데이터를 token으로 변환하는 방식
어떤 사용자가 로그인하기위해 다음과 같은 HTTP 요청을 백엔드 API 서버에 전송했다고 가정
사용자 아이디는 "joe"이고, 비밀번호는 "password"
HTTP Post 요청을 /auth 엔드포인트에 전송함.
POST /auth HTTP/1.1
Host: localhost:5000
Content-Type: application/json
{
"username": "joe",
"password": "password"
}
백엔드 API 서버에서는 전송받은 사용자 아이디와 비밀번호를 확인하는 절차를 거친 후, 인증이 되면 해당 사용자의 아이디를 다음과 같은 JSON 형태로 생성
로그인한 사용자 아이디를 JSON 형태로 저장함으로써 어느 사용자가 이미 로그인한 상태인 지 알 수 있도록 함.
{
"user_id": 1
}
해당 정보를 네트워크상에서 주고받아야하므로 JSON 데이터를 token 데이터로 변환시켜서 HTTP 응답을 다음과 같이 보냄.
access token:
HTTP/1.1 200 OK
Content-Type: application/json
{
"access_token": "eyJ....._E"
}
access token을 받은 프론트엔드는 쿠키 등에 access token을 저장하고 있다가 해당 사용자를 위한 HTTP 요청을 백엔드 서버 API에 보낼 때, access token을 첨부해서 보냄.
백엔드 API 서버는 프론트엔드 사용자로부터 받은 access token을 복호화해서 JSON 데이터를 얻으므로, 이를 통해 이미 로그인한 사용자 아이디를 읽어 들임으로써 사용자가 이미 로그인한 상태임을 확인함.
JWT는 header, payload, signature로 구성되어 있으며 일반적으로 아래와 같은 형태임.
xxxxx.yyyyy.zzzzz
x에 해당하는 부분은 헤더 (header), y에 해당하는 부분은 페이로드 (payload), z에 해당하는 부분은 시그니쳐 (signature)
header
헤더 (header)는 두 부분으로 되어있으며, 토큰 타입 (JWT) 그리고 사용되는 해쉬 알고리즘 (hashing algorithm)을 지정함.
헤더를 Base64URL 방식으로 코드화해서 JWT의 첫 부분을 구성
{
"alg": "HS256",
"typ": "JWT"
}
payload
페이로드 (payload)는 JWT를 통해 실제로 서버 간에 전송하고자 하는 데이터 부분. HTTP 메시지의 body 부분과 비슷함.
페이로드도 Base64URL 방식으로 코드화되어서 JWT의 두 번째 부분을 구성하며, Base64URL은 코드화 (encoding) 시키는 것이지 암호화가 아님을 상기, 그러므로 실제 사용자의 개인정보 등은 절대로 넣지않도록 한다.
{
"user_id": 2,
"exp": 1539517391
}
signature
JWT가 원본 그대로라는 것을 확인할 때 사용하는 부분으로 시그니쳐는
- Base64URL 코드화된 header
- payload
- JWT secret을 헤더에 지정된 암호 알고리즘으로 암호화하여 전송 (복호화가 가능한 암호)
프론트엔드가 JWT를 백엔드 API 서버로 전송하면 서버에서는 전송받은 JWT의 시그니쳐 부분을 복호화하여, 서버에서 생성한 JWT가 맞는 지 확인함.
이 시그니쳐 부분이 잘못되어 있으면 JWT를 누군가 임의적으로 바꾼 것이거나 해킹의 목적 등으로 임의로 생성한 것이라고 간주할 수 있음.
pip install PyJWT
```![](https://media.vlpt.us/images/eclat12450/post/f7bfbb50-93c5-46ec-92c8-a0ac15dc51ef/image.png)
```python
import jwt
data_to_encode = {"some": "payload"}
encryption_secret = "secrete"
algorithm = "HS256"
encoded = jwt.encode(payload=data_to_encode,
key=encryption_secret,
algorithm=algorithm)
decoded = jwt.decode(jwt=encoded,
key=encryption_secret,
algorithm=[algorithm])
print(encoded)
print(decoded)
JWT를 사용할 때 주의해야할 점은 시그니쳐 부분 외에 다른 부분들은 암호화가 아닌 Base64URL 방식으로 코드화되어있다는 것, 즉 누구나 원본 데이터를 볼 수 있으므로 민감한 데이터는 저장하지 않도록 함.
sign-up endpoint
import bycrypt
@app.route("/sign-up", methods=["POST"]) # 4
def sign_up():
new_user = request.json
#bycrypt 모듈을 사용하여, 암호화하며, hashpw 함수는 byte 값을 받으므로, 사용자의 비밀번호를 utf-8로 인코딩
new_user["password"] = bcrypt.hashpw(password=new_user["password"].encode("utf-8"), # 2
salt=bcrypt.gensalt())
new_user_id = insert_user(new_user)
new_user = get_user(new_user_id)
return jsonify(new_user)
인증 endpoint
인증 엔드포인트는 HTTP POST request에 JSON 데이터로 해당 사용자의 아이디 (예제인 미니터의 경우 사용자의 이메일)와 비밀번호를 전송받아서 데이터베이스에 저장되어 있는 해당 사용자의 비밀번호와 동일한 지 확인하는 기능을 수행하면된다. JWT access token을 생성해서 전달
import jwt
def get_user_id_and_password(email):
row = current_app.database.execute(text("""
SELECT
id,
hashed_password
FROM users
WHERE email = :email
"""), {'email' : email}).fetchone()
return {
'id' : row['id'],
'hashed_password' : row['hashed_password']
} if row else None
...
# Flask가 create_app이라는 이름의 함수를 자동으로 factory 함수로 인식, Flask 실행함.
# test_config이라는 parameter는 unit test를 실행시킬 때, 테스트용 데이터베이스의 설정 정보를 적용하기 위함.
def create_app(test_config=None):
app = Flask(__name__)
if not test_config:
app.config.from_pyfile("config.py")
else:
app.config.update(test_config)
database = create_engine(app.config["DB_URL"], encoding="utf-8", max_overflow=0) # 데이터베이스와 연결
app.database = database # Flask instance의 attribute로 가리킴
...
@app.route("/login", methods=["POST"])
def login():
credential = request.sjon
email = credential["email"] #2
password = credential["password"] #3
user_credential = get_user_id_and_password(email) #4
#5
if user_credential and bcrypt.checkpw(password=password.encode("utf-8"),
hashed_password=user_credential["hashed_password"].encode("utf-8")):
user_id = user_credential["id"]
payload = { #6
"user_id": user_id,
"exp": datetime.utcnow() + timedelta(seconds = 60 * 60 * 24)
}
#7
token = jwt.encode(payload=payload,
key=app.config["JWT_SECRET_KEY"],
algorithm="HS256")
#8
return jsonify({
"access_token": token.decode("utf-8")
})
else:
return "", 401 #9
인증 엔드포인트 (endpoint)를 구현하였으므로, 인증 엔드포인트를 통해 인증을 하고 생성된 access token (JWT)를 통하여 인증된 사용자만 사용 가능하도록 다른 엔드포인트들에 적용한다. 예시로 계속 사용하고 있는 미니터 API의 경우 인증 절차가 필요한 엔드포인트들은 아래와 같다.
서로 다른 3개의 엔드포인트들이 공통적으로 인증 절차를 필요로하므로 이를 파이썬의 decorator를 이용해 인증 절차를 각 엔드포인트에 추가
인증 decorator 함수
인증 엔드포인트를 통해 생성된 JWT access token을 프론트엔드는 HTTP 요청 (request)에서 헤더 (header)에 포함시켜 보내게된다.
Authorization 헤더에 포함시켜 보내게 되는 데, 인증 decorator 함수는 전송된 HTTP 요청에서 Authorization 헤더 값을 읽어 들여서 JWT access token을 읽어들임.
읽어들인 후 복호화하여 사용자 아이디를 읽어 들임으로써 해당 사용자의 로그인 여부를 결정함.
import os
import bcrypt
import jwt
from functools import wraps
from flask import Flask, jsonify, current_app, request, Response, g
from flask.json import JSONEncoder
from datetime import datetime, timedelta
from sqlalchemy import create_engine, text
...
#########################################################
# Decorators
#########################################################
def login_required(f): #1
@wraps(f) #2
def decorated_function(*args, **kwargs):
access_token = request.headers.get('Authorization') #3
if access_token is not None: #4
try:
payload = jwt.decode(access_token, current_app.config['JWT_SECRET_KEY'], #5
'HS256')
except jwt.InvalidTokenError:
payload = None #6
if payload is None:
return Response(status=401) #7
user_id = payload['user_id'] #8
g.user_id = user_id
g.user = get_user(user_id) if user_id else None
else:
return Response(status=401) #9
return f(*args, **kwargs)
return decorated_function
인증 decorator 함수 적용
...
@app.route("/tweet", methods=["POST"])
@login_required
def tweet():
user_tweet = request.json
user_tweet['id'] = g.user_id
tweet = user_tweet["tweet"]
if len(tweet) > 300:
return "300자를 초과했습니다", 400
insert_tweet(user_tweet)
return "", 200
...
@app.route("/follow", methods=["POST"])
@login_required
def follow():
payload = request.json
payload['id'] = g.user_id
insert_follow(payload)
return "", 200
@app.route("/unfollow", methods=["POST"])
@login_required
def unfollow():
payload = request.json
payload['id'] = g.user_id
insert_unfollow(payload)
return "", 200