앞에서 mysql-connector-python이라는 DBAPI로 API를 MySQL와 연결하였다.
그리고 SQLAlchemy이라는 ORM을 통해 데이터베이스를 핸들링 할 수 있도록 준비하였다.
이어서 이번에는 우리가 구현한 Minitter의 엔드포인트를 Database에 연결하여 SQLAlchemy를 통해 CRUD 즉 핸들링하고자 한다.
앞에서 블로그Flask - API-MySQL 연동 SQLAlchemy
만든 create_app
에 회원가입 엔드포인트를 넣어 구현한다.
# app.py
def create_app(test_config = None):
app = Flask(__name__)
if test_config is None:
app.config.from_pyfile("config.py")
else:
app.config.update(test_config)
database = create_engine(app.config['DB_URL'], encoding = 'utf-8', max_overflow = 0)
app.database = database
@app.route("/sign-up", methods=['POST'])
def sign_up():
new_user = request.json
new_user_id = app.database.execute(text(""" # 1)
INSERT INTO users(
name,
email,
profile,
hashed_password
) VALUES (
:name,
:email,
:profile,
:password
)
"""), new_user). lastrowid # 2)
row = current_app.database.execute(text("""
SELECT
id,
name,
email,
profile
FROM users
WHERE id =:user_id
"""), {
'user_id' : new_user_id
}).fetchone() # 3)
created_user = { # 4)
'id' : row['id'],
'name' : row['name'],
'email' : row['email'],
'profile' : row['profile']
} if row else None
return jsonify(created_user)
return app
1) HTTP요청을 통해 전달받은 회원 가입 정보를 데이터베이스에 저장한다.
app.database
는 SQLAlchemy를 통해 MySQL 데이터베이스에 연결된 Enigne 객체이다.
이 app.database
를 통해 원하는 SQL구문을 전달하고 데이터베이스에서 실핼 할 수 있다.
위에서는 INSERT
구문을 통해 사용자의 정보를 users
테이블에 저장한다.
2) 1)의 SQL구문에 사용될 실제 데이터들은 HTTP요청(request)에서 읽어 들인 데이터를 그대로 사용한다.
HTTP 요청을 통해 전송된 JSON데이터 구조가 1)의 SQL에서 필요한 필드를 모두 포함하고 있기 때문이다.
(ex. name, email등)
만일 필드의 이름이 틀리거나 필드가 부재인 경우 오류가 나게 된다.
새로 사용자가 생성되면, 새로 생선된 사용자의 id를 lastrowid
를 통해 읽어 들인다.
3) 2)에서 새로 생성된 사용자의 정보를 데이터베이스에서 SELECT
구문으로 읽어들인다.
4) 3)에서 읽어 들인 새 사용자의 정보를 딕셔너리 형태로 변경한다. 그리고 JSON으로 HTTP응답(response)보낼 수 있도록 created_user
에 담는다.
$ http -v POST localhost:5000/sign-up name=퐝 email=pang@gmail.com password=12345678 profile="hello my name is pang"
POST /sign-up HTTP/1.1
Accept: application/json, */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Content-Length: 105
Content-Type: application/json
Host: localhost:5000
User-Agent: HTTPie/0.9.8
{
"email": "pang@gmail.com",
"name": "퐝",
"password": "12345678",
"profile": "hello my name is pang"
}
HTTP/1.0 200 OK
Content-Length: 105
Content-Type: application/json
Date: Fri, 31 Jan 2020 06:37:25 GMT
Server: Werkzeug/0.16.1 Python/3.7.6
{
"email": "pang@gmail.com",
"id": 10,
"name": "퐝",
"profile": "hello my name is pang"
}
def create_app(test_config = None):
app = Flask(__name__)
if test_config is None:
app.config.from_pyfile("config.py")
else:
app.config.update(test_config)
database = create_engine(app.config['DB_URL'], encoding = 'utf-8', max_overflow = 0)
app.database = database
@app.route('/tweet', methods=['POST'])
def tweet():
user_tweet = request.json
tweet = user_tweet['tweet']
if len(tweet) > 300:
return '300자를 초과했습니다', 400
app.database.execute(text(""" # 1)
INSERT INTO tweets (
user_id,
tweet
) VALUES (
:id,
:tweet
)
"""), user_tweet) # 2)
return '', 200
return app
1) HTTP요청(request)을 통해 전달받은 트윗 데이터를 데이터베이스에 저장한다.
INSERT
구문을 통해 트윗 데이터를 tweets
테이블에 저장하도록 한다.
2) 1)의 SQL 구문을 통해 insert될 트윗 데이터는 HTTP 요청을 통해 전송된 JSON 데이터를 그대로 사용한다.
$ http -v POST localhost:5000/tweet id=10 tweet="My first tweet!"
POST /tweet HTTP/1.1
Accept: application/json, */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Content-Length: 40
Content-Type: application/json
Host: localhost:5000
User-Agent: HTTPie/0.9.8
{
"id": "10",
"tweet": "My first tweet!"
}
HTTP/1.0 200 OK
Content-Length: 0
Content-Type: text/html; charset=utf-8
Date: Fri, 31 Jan 2020 06:41:52 GMT
Server: Werkzeug/0.16.1 Python/3.7.6
def create_app(test_config = None):
app = Flask(__name__)
if test_config is None:
app.config.from_pyfile("config.py")
else:
app.config.update(test_config)
database = create_engine(app.config['DB_URL'], encoding = 'utf-8', max_overflow = 0)
app.database = database
@app.route('/timeline/<int:user_id>', methods=['GET'])
def timeline(user_id):
row = app.database.execute(text(""" # 1)
SELECT
t.user_id,
t.tweet
FROM tweets t
LEFT JOIN users_follow_list ufl ON ufl.user_id = :user_id
WHERE t.user_id = :user_id
OR t.user_id = ufl.follow_user.id
"""), {
'user_id' : user_id
}).fetchall()
timeline = [{ # 2)
'user_id' : row['user_id'],
'tweet' : row['tweet']
} for row in rows]
return jsonify({ # 3)
'user_id' : user_id,
'timeline' : timeline
})
return app
1) SELECT
구문과 JOIN
구문을 사용하여 해당 사용자의 트윗들과 해당 사용자가 팔로우하는 사용자들의 트윗들을 데이터베이스에서 한번의 Query로 읽어 들인다.
여기서 LEFT JOIN
을 사용하여 혹시나 사용자가 팔로우하는 사용자가 없더라도 해당 사용자의 트윗만을 읽어 들일 수 있게 한다.
읽어들인 로우(row)가 여러개일 것을 감안하여 fetchall()
메소드를 사용해서 전부 다 읽어 들인다.
2) 1)에서 읽어 들인 트윗 리스트를 딕셔너러 리스트(list) 형태로 변환한다. 즉, 하나 하나의 로우를 딕셔너리로 변환하여 전체를 for문을 활용하여 리스트에 담는 것
3) 2)에서 변환한 딕셔너리 리스트를 JSON형태로 HTTP 응답(response)보낸다.
$ http -v GET localhost:5000/timeline/10
GET /timeline/10 HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate
Connection: keep-alive
Host: localhost:5000
User-Agent: HTTPie/0.9.8
HTTP/1.0 200 OK
Content-Length: 109
Content-Type: application/json
Date: Fri, 31 Jan 2020 06:57:39 GMT
Server: Werkzeug/0.16.1 Python/3.7.6
{
"timeline": [
{
"tweet": "My first tweet!",
"user_id": 10
}
],
"user_id": 10
}
위에서 실습한 내용을 이제 하나의 create_app
에 각 API들을 넣어서 정리해볼 필요가 있다.
그리고 SQL구문들은 별도 함수로 정리하여 API들이 때에 따라서 불러서 사용하는 것도 가능하다.
from flask import Flask, request, jsonify, current_app
from flask.json import JSONEncoder
from sqlalchemy import create_engine, text
## Default JSON encoder는 set를 JSON으로 변환할 수 없다.
## 그럼으로 커스텀 엔코더를 작성해서 set을 list로 변환하여
## JSON으로 변환 가능하게 해주어야 한다.
class CustomJSONEncoder(JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return JSONEncoder.default(self, obj)
def get_user(user_id):
user = current_app.database.execute(text("""
SELECT
id,
name,
email,
profile
FROM users
WHERE id = :user_id
"""), {
'user_id' : user_id
}).fetchone()
return {
'id' : user['id'],
'name' : user['name'],
'email' : user['email'],
'profile' : user['profile']
} if user else None
def insert_user(user):
return current_app.database.execute(text("""
INSERT INTO users (
name,
email,
profile,
hashed_password
) VALUES (
:name,
:email,
:profile,
:password
)
"""), user).lastrowid
def insert_tweet(user_tweet):
return current_app.database.execute(text("""
INSERT INTO tweets (
user_id,
tweet
) VALUES (
:id,
:tweet
)
"""), user_tweet).rowcount
def insert_follow(user_follow):
return current_app.database.execute(text("""
INSERT INTO users_follow_list (
user_id,
follow_user_id
) VALUES (
:id,
:follow
)
"""), user_follow).rowcount
def insert_unfollow(user_unfollow):
return current_app.database.execute(text("""
DELETE FROM users_follow_list
WHERE user_id = :id
AND follow_user_id = :unfollow
"""), user_unfollow).rowcount
def get_timeline(user_id):
timeline = current_app.database.execute(text("""
SELECT
t.user_id,
t.tweet
FROM tweets t
LEFT JOIN users_follow_list ufl ON ufl.user_id = :user_id
WHERE t.user_id = :user_id
OR t.user_id = ufl.follow_user_id
"""), {
'user_id' : user_id
}).fetchall()
return [{
'user_id' : tweet['user_id'],
'tweet' : tweet['tweet']
} for tweet in timeline]
def create_app(test_config = None):
app = Flask(__name__)
app.json_encoder = CustomJSONEncoder
if test_config is None:
app.config.from_pyfile("config.py")
else:
app.config.update(test_config)
database = create_engine(app.config['DB_URL'], encoding = 'utf-8', max_overflow = 0)
app.database = database
@app.route("/ping", methods=['GET'])
def ping():
return "pong"
@app.route("/sign-up", methods=['POST'])
def sign_up():
new_user = request.json
new_user_id = insert_user(new_user)
new_user = get_user(new_user_id)
return jsonify(new_user)
@app.route('/tweet', methods=['POST'])
def tweet():
user_tweet = request.json
tweet = user_tweet['tweet']
if len(tweet) > 300:
return '300자를 초과했습니다', 400
insert_tweet(user_tweet)
return '', 200
@app.route('/follow', methods=['POST'])
def follow():
payload = request.json
insert_follow(payload)
return '', 200
@app.route('/unfollow', methods=['POST'])
def unfollow():
payload = request.json
insert_unfollow(payload)
return '', 200
@app.route('/timeline/<int:user_id>', methods=['GET'])
def timeline(user_id):
return jsonify({
'user_id' : user_id,
'timeline' : get_timeline(user_id)
})
return app