Window와 MAC 설치 방법이 다르기 때문에 각각 자신에 맞는 방법으로 프로그램을 설치해야한다.
MySQL 공식 웹사이트 접속
먼저 MySQL 공식 웹사이트 (MySQL Downloads 페이지)로 이동합니다.
MySQL Community Server 선택
페이지에 접속하면 여러 MySQL 제품이 있다. 여기서 "MySQL Community (GPL) Downloads" 섹션에서 "MySQL Community Server"를 찾아 선택한다.
운영 체제 선택
MySQL Community Server를 사용할 운영 체제를 선택한다. 여기서는 Windows를 선택한다.
다운로드 페이지로 이동
선택한 운영 체제에 따라 다운로드 페이지로 이동한니다. 여기서는 Windows용이므로 "Windows (x86, 64-bit), MSI Installer"를 선택한니다.
다운로드 및 설치
MySQL Workbench(GUI 프로그램) 설치
MySQL 데이터베이스를 시각적으로 관리하기 위해 MySQL Workbench를 설치할 수 있다.
Homebrew 설치
Homebrew가 설치되어 있지 않다면 터미널을 열고 다음 명령어를 실행하여 Homebrew를 설치한다. 아래코드는 Homebrew에서 복사하여 붙여준다.
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install.sh)"
설치가 완료되면 터미널을 다시 시작한다.
MySQL 설치
Homebrew를 사용하여 MySQL을 설치한다.
brew install mysql
명령을 실행하면 Homebrew가 필요한 파일을 다운로드하고 MySQL을 설치한다.
MySQL 시작
MySQL을 설치하면 Homebrew에서 자동으로 MySQL 서버가 시작된다. 만약 자동으로 시작되지 않았다면 다음 명령어로 수동으로 시작할 수 있다.
brew services start mysql
brew services stop mysql
루트 비밀번호 설정
MySQL 서버가 시작된 후에는 루트 계정의 초기 비밀번호가 설정되어 있다. 다음 명령어로 MySQL에 로그인한다.
mysql -u root
로그인 후에는 비밀번호를 설정한다.
ALTER USER 'root'@'localhost' IDENTIFIED BY 'oz-password';
새로운 비밀번호를 설정한 후 MySQL을 나간다.
exit
mysql workbench 설치
Window에서 방법과 동일하게 MySQL :: MySQL Workbench사이트에서 프로그램을 다운받아 설치한다.
Flask와 MySQL을 연결해서 사용하려면 가상 환경에서 flask-mysqldb
모듈을 설치해야한다.
> pip install flask-mysqldb
CREATE DATABASE oz;
USE oz;
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
);
DB 관련 코드 추가
(1) app.py
from flask import Flask
from flask_mysqldb import MySQL
from flask_smorest import Api
# user_routes에서 Blueprint 직접 임포트 대신 함수 임포트
from user_routes import create_user_blueprint
app = Flask(__name__)
# MySQL 연결 설정
app.config['MYSQL_HOST'] = 'localhost'
app.config['MYSQL_USER'] = 'root'
app.config['MYSQL_PASSWORD'] = '비밀번호'
app.config['MYSQL_DB'] = 'oz'
mysql = MySQL(app)
# blueprint 생성 및 등록
app.config["API_TITLE"] = "My API"
app.config["API_VERSION"] = "v1"
app.config["OPENAPI_VERSION"] = "3.1.3"
app.config["OPENAPI_URL_PREFIX"] = "/"
app.config["OPENAPI_SWAGGER_UI_PATH"] = "/swagger-ui"
app.config["OPENAPI_SWAGGER_UI_URL"] = "https://cdn.jsdelivr.net/npm/swagger-ui-dist/"
user_blp = create_user_blueprint(mysql)
api = Api(app)
api.register_blueprint(user_blp)
from flask import render_template
@app.route('/users_interface')
def users_interface():
return render_template('users.html')
if __name__ == '__main__':
app.run(debug=True)
(2) user_routes.py
: Schema 정의없이 단순 flask Blueprint를 사용한 방식
from flask_smorest import Blueprint, abort
from flask import request, jsonify
def create_user_blueprint(mysql):
user_blp = Blueprint('user_routes', __name__, url_prefix='/users')
@user_blp.route('/', methods=['GET'])
def get_users():
cursor = mysql.connection.cursor()
cursor.execute("SELECT * FROM users")
users = cursor.fetchall() # fetchall()의 결과 값은 튜플 데이터
cursor.close()
# 튜플을 딕셔너리로 변환
users_list = []
for user in users:
users_list.append({
'id': user[0],
'name': user[1],
'email': user[2]
})
return jsonify(users_list)
@user_blp.route('/', methods=['POST'])
def add_user():
user_data = request.json
cursor = mysql.connection.cursor()
cursor.execute("INSERT INTO users (name, email) VALUES (%s, %s)",
(user_data['name'], user_data['email']))
mysql.connection.commit()
cursor.close()
return jsonify({'message': 'User added successfully'}), 201
@user_blp.route('/<int:user_id>', methods=['PUT'])
def update_user(user_id):
user_data = request.json
cursor = mysql.connection.cursor()
cursor.execute("UPDATE users SET name = %s, email = %s WHERE id = %s",
(user_data['name'], user_data['email'], user_id))
mysql.connection.commit()
cursor.close()
return jsonify({'message': 'User updated successfully'})
@user_blp.route('/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
cursor = mysql.connection.cursor()
cursor.execute("DELETE FROM users WHERE id = %s", (user_id,))
mysql.connection.commit()
cursor.close()
return jsonify({'message': 'User deleted successfully'})
return user_blp
(3) templates/users.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Users</title>
<script>
function fetchUsers() {
fetch("/users")
.then((response) => response.json())
.then((users) => {
const usersList = document.getElementById("users-list");
usersList.innerHTML = "";
users.forEach((user) => {
const userItem = document.createElement("li");
userItem.textContent = `ID: ${user.id}, Name: ${user.name}, Email: ${user.email}`;
usersList.appendChild(userItem);
});
})
.catch((error) => console.error("Error:", error));
}
function addUser() {
const name = document.getElementById("name").value;
const email = document.getElementById("email").value;
fetch("/users", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ name: name, email: email }),
})
.then((response) => response.json())
.then((data) => {
console.log(data);
fetchUsers();
})
.catch((error) => console.error("Error:", error));
}
window.onload = fetchUsers;
function updateUser() {
const userId = document.getElementById("update-user-id").value;
const name = document.getElementById("update-name").value;
const email = document.getElementById("update-email").value;
fetch(`/users/${userId}`, {
method: "PUT",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ name: name, email: email }),
})
.then((response) => response.json())
.then((data) => {
console.log(data);
fetchUsers();
})
.catch((error) => console.error("Error:", error));
}
function deleteUser() {
const userId = document.getElementById("update-user-id").value;
fetch(`/users/${userId}`, {
method: "DELETE",
})
.then((response) => response.json())
.then((data) => {
console.log(data);
fetchUsers();
})
.catch((error) => console.error("Error:", error));
}
</script>
</head>
<body>
<h1>Users</h1>
<ul id="users-list">
<!-- 사용자 목록이 여기에 표시됩니다. -->
</ul>
<h2>Add User</h2>
<input type="text" id="name" placeholder="Name" />
<input type="email" id="email" placeholder="Email" />
<button onclick="addUser()">Add User</button>
<h2>Update/Delete User</h2>
<input type="number" id="update-user-id" placeholder="User ID" />
<input type="text" id="update-name" placeholder="New Name" />
<input type="email" id="update-email" placeholder="New Email" />
<button onclick="updateUser()">Update User</button>
<button onclick="deleteUser()">Delete User</button>
</body>
</html>
SQLAlchemy란 Python의 객체 관계 매핑(ORM) 라이브러리다. SQL이 아닌 ORM 방식으로 DB의 데이터를 조회할 수 있도록 도와준다.
ORM(Object-Relational Mapping)이란, 문자 그대로 객체와 관계형 데이터베이스 간의 매핑을 의미한다.
*객체 : 객체, 클래스, 속성의 구조 - Python (Flask, Django)
*관계형 데이터베이스 : 테이블, 로우, 컬럼과 같은 구조 - RDBMS
데이터베이스의 테이블을 객체로 매핑하고, 객체 간의 관계를 데이터베이스의 외래 키 등으로 매핑하는 방식으로 위 2개를 Mapping(연결) 시켜주는 것이다.
→ DB에 있는 데이터들을 객체처럼 사용할 수 있도록 도와준다. SQL 쿼리문 없이 데이터 CRUD가 가능하다.
기능
ORM 사용하는 이유
Flask-SQLAlchemy란?
: Flask에서 SQLAlchemy(ORM)을 쉽게 사용할 수 있도록 도와주는 라이브러리다.
> pip install SQLAlchemy
> pip install Flask-SQLAlchemy
> pip install Flask-SQLAlchemy
> pip install pymysql # SQLALCHEMY를 통해서 mysql에 연동할 때 필요
from flask import Flask
from flask_smorest import Api
from flask_sqlalchemy import SQLAlchemy
from db import db
from models import User, Board
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:oz-password@localhost/oz'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
# bluepring 설정 및 등록
app.config["API_TITLE"] = "My API"
app.config["API_VERSION"] = "v1"
app.config["OPENAPI_VERSION"] = "3.1.3"
app.config["OPENAPI_URL_PREFIX"] = "/"
app.config["OPENAPI_SWAGGER_UI_PATH"] = "/swagger-ui"
app.config["OPENAPI_SWAGGER_UI_URL"] = "https://cdn.jsdelivr.net/npm/swagger-ui-dist/"
from routes.users import user_blp
from routes.board import board_blp
api = Api(app)
api.register_blueprint(user_blp)
api.register_blueprint(board_blp)
from flask import render_template
@app.route('/manage-boards')
def manage_boards():
return render_template('boards.html')
@app.route('/manage-users')
def manage_users():
return render_template('users.html')
if __name__ == '__main__':
with app.app_context():
print("여기 실행?")
db.create_all()
app.run(debug=True)
from db import db
class User(db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), unique=True, nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
boards = db.relationship('Board', back_populates='author', lazy='dynamic')
# address = db.Column(db.String(120), unique=True, nullable=False) # 추가된 필드
class Board(db.Model):
__tablename__ = "boards"
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
content = db.Column(db.String(200))
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
author = db.relationship('User', back_populates='boards')
User
모델의 boards
필드Board
)들의 목록을 나타낸다. back_populates='author'
는 Board
모델의 author
필드와 이 관계를 양방향으로 연결한다.Board
모델의 author
필드User
)를 나타낸다. back_populates='boards'
는 User
모델의 boards
필드와 이 관계를 양방향으로 연결한다.lazy='dynamic
옵션예시) user.boards를 통해 접근하면, 즉시 모든 게시판 글을 로드하는 것이 아니라 쿼리셋을 반환한다. 이 쿼리셋을 사용하여 다양한 쿼리 연산을 수행할 수 있다.
user = User.query.get(some_user_id)
# user.boards는 즉시 데이터를 로드하지 않습니다.
boards_query = user.boards
# 필요할 때 쿼리를 실행합니다.
# 예를 들어, 최근 10개의 게시글만 가져오기
recent_boards = boards_query.order_by(Board.created_at.desc()).limit(10).all()
# 또는 특정 조건을 만족하는 게시글만 필터링
filtered_boards = boards_query.filter(Board.title.contains('Python')).all()
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
from flask import request, jsonify
from flask.views import MethodView
from flask_smorest import Blueprint
from db import db
from models import User
user_blp = Blueprint('Users', 'users', description='Operations on users', url_prefix='/users')
@user_blp.route('/')
class UserList(MethodView):
def get(self):
users = User.query.all()
user_data = [{"id":user.id, "name": user.name, "email": user.email} for user in users] # Convert to list
return jsonify(user_data)
def post(self):
print("요청은 오는가?")
user_data = request.json
new_user = User(name=user_data['name'], email=user_data['email'])
db.session.add(new_user)
db.session.commit()
return jsonify({"message": "User created"}), 201
@user_blp.route('/<int:user_id>')
class Users(MethodView):
def get(self, user_id):
user = User.query.get_or_404(user_id)
return {"name": user.name, 'email': user.email}
def put(self, user_id):
user = User.query.get_or_404(user_id)
user_data = request.json
user.name = user_data['name']
user.email = user_data['email']
db.session.commit()
return {"message": "User updated"}
def delete(self, user_id):
user = User.query.get_or_404(user_id)
db.session.delete(user)
db.session.commit()
return {"message": "User deleted"}
from flask.views import MethodView
from flask_smorest import Blueprint, abort
from marshmallow import Schema, fields
from db import db
from models import User
# 스키마 정의
class UserSchema(Schema):
id = fields.Int(dump_only=True)
name = fields.Str(required=True)
email = fields.Str(required=True)
# Flask-Smorest Blueprint 설정
user_blp = Blueprint('users', 'users', url_prefix='/users', description='Operations on users')
@user_blp.route('/')
class UserList(MethodView):
@user_blp.response(200, UserSchema(many=True))
def get(self):
# 사용자 데이터 조회 로직
@user_blp.arguments(UserSchema)
@user_blp.response(201, UserSchema)
def post(self, new_data):
# 새 사용자 추가 로직
@user_blp.route('/<int:user_id>')
class UserResource(MethodView):
@user_blp.response(200, UserSchema)
def get(self, user_id):
# 특정 사용자 데이터 조회 로직
@user_blp.arguments(UserSchema)
@user_blp.response(200, UserSchema)
def put(self, new_data, user_id):
# 사용자 데이터 수정 로직
@user_blp.response(204)
def delete(self, user_id):
# 사용자 데이터 삭제 로직
from flask import request, jsonify
from flask.views import MethodView
from flask_smorest import Blueprint
from db import db
from models import Board
board_blp = Blueprint('Boards', 'boards', description='Operations on boards', url_prefix='/board')
@board_blp.route('/')
class BoardList(MethodView):
def get(self):
boards = Board.query.all()
return jsonify([{"user_id": board.user_id,
"id": board.id,
"title": board.title, "content": board.content, "author": board.author.name} for board in boards])
def post(self):
data = request.json
new_board = Board(title=data['title'], content=data['content'], user_id=data['user_id'])
db.session.add(new_board)
db.session.commit()
return jsonify({"message": "Board created"}), 201
@board_blp.route('/<int:board_id>')
class BoardResource(MethodView):
def get(self, board_id):
board = Board.query.get_or_404(board_id)
return jsonify({"title": board.title, "content": board.content, "author": board.author.name})
def put(self, board_id):
board = Board.query.get_or_404(board_id)
data = request.json
board.title = data['title']
board.content = data['content']
db.session.commit()
return jsonify({"message": "Board updated"})
def delete(self, board_id):
board = Board.query.get_or_404(board_id)
db.session.delete(board)
db.session.commit()
return jsonify({"message": "Board deleted"})
<!DOCTYPE html>
<html>
<head>
<title>Board Management</title>
</head>
<body>
<h1>Board Management</h1>
<!-- 게시글 생성 폼 -->
<form id="createBoardForm">
<input type="text" id="boardTitle" placeholder="Title" />
<textarea id="boardContent" placeholder="Content"></textarea>
<input type="number" id="userId" placeholder="User ID" />
<button type="submit">Create Board</button>
</form>
<!-- 게시글 조회 -->
<button onclick="getBoards()">Get All Boards</button>
<div id="boards"></div>
<!-- 게시글 수정 폼 -->
<form id="updateBoardForm">
<input type="number" id="updateBoardId" placeholder="Board ID" />
<input type="text" id="updateBoardTitle" placeholder="New Title" />
<textarea id="updateBoardContent" placeholder="New Content"></textarea>
<button type="submit">Update Board</button>
</form>
<!-- 게시글 삭제 -->
<form id="deleteBoardForm">
<input type="number" id="deleteBoardId" placeholder="Board ID" />
<button type="submit">Delete Board</button>
</form>
<script>
document
.getElementById("createBoardForm")
.addEventListener("submit", function (e) {
e.preventDefault();
const title = document.getElementById("boardTitle").value;
const content = document.getElementById("boardContent").value;
const userId = document.getElementById("userId").value;
fetch("/board", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ title, content, user_id: userId }),
})
.then((response) => response.json())
.then((data) => {
console.log(data);
getBoards(); // 게시글 목록을 다시 불러옵니다.
});
});
function getBoards() {
fetch("/board")
.then((response) => response.json())
.then((data) => {
const boardsDiv = document.getElementById("boards");
boardsDiv.innerHTML = "";
data.forEach((board) => {
boardsDiv.innerHTML += `<p>${board.title} - ${board.content} (User ID: ${board.user_id}, Board ID: ${board.id})</p>`;
});
});
}
document
.getElementById("updateBoardForm")
.addEventListener("submit", function (e) {
e.preventDefault();
const boardId = document.getElementById("updateBoardId").value;
const title = document.getElementById("updateBoardTitle").value;
const content = document.getElementById("updateBoardContent").value;
fetch("/board/" + boardId, {
method: "PUT",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ title, content }),
})
.then((response) => response.json())
.then((data) => {
console.log(data);
getBoards(); // 게시글 목록을 다시 불러옵니다.
});
});
document
.getElementById("deleteBoardForm")
.addEventListener("submit", function (e) {
e.preventDefault();
const boardId = document.getElementById("deleteBoardId").value;
fetch("/board/" + boardId, {
method: "DELETE",
})
.then((response) => response.json())
.then((data) => {
console.log(data);
getBoards(); // 게시글 목록을 다시 불러옵니다.
});
});
</script>
</body>
</html>
<!DOCTYPE html>
<html>
<head>
<title>User Management</title>
</head>
<body>
<h1>User Management</h1>
<!-- 사용자 생성 폼 -->
<form id="createUserForm">
<input type="text" id="name" placeholder="Username" />
<input type="text" id="email" placeholder="Email" />
<button type="submit">Create User</button>
</form>
<!-- 사용자 조회 -->
<button onclick="getUsers()">Get All Users</button>
<div id="users"></div>
<!-- 사용자 수정 폼 -->
<form id="updateUserForm">
<input type="number" id="updateUserId" placeholder="User ID" />
<input type="text" id="updateUsername" placeholder="New Username" />
<input type="text" id="updateEmail" placeholder="New Email" />
<button type="submit">Update User</button>
</form>
<!-- 사용자 삭제 -->
<form id="deleteUserForm">
<input type="number" id="deleteUserId" placeholder="User ID" />
<button type="submit">Delete User</button>
</form>
<script>
document
.getElementById("createUserForm")
.addEventListener("submit", function (e) {
e.preventDefault();
const name = document.getElementById("name").value;
const email = document.getElementById("email").value;
fetch("/users", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ name, email }),
})
.then((response) => response.json())
.then((data) => {
console.log(data);
getUsers(); // 사용자 목록을 다시 불러옵니다.
});
});
function getUsers() {
fetch("/users")
.then((response) => response.json())
.then((data) => {
const usersDiv = document.getElementById("users");
usersDiv.innerHTML = "";
data.forEach((user) => {
usersDiv.innerHTML += `<p>ID: ${user.id}, Name: ${user.name}, Email: ${user.email}</p>`;
});
});
}
document
.getElementById("updateUserForm")
.addEventListener("submit", function (e) {
e.preventDefault();
const userId = document.getElementById("updateUserId").value;
const name = document.getElementById("updateUsername").value;
const email = document.getElementById("updateEmail").value;
fetch("/users/" + userId, {
method: "PUT",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ name, email }),
})
.then((response) => response.json())
.then((data) => {
console.log(data);
getUsers(); // 사용자 목록을 다시 불러옵니다.
});
});
document
.getElementById("deleteUserForm")
.addEventListener("submit", function (e) {
e.preventDefault();
const userId = document.getElementById("deleteUserId").value;
fetch("/users/" + userId, {
method: "DELETE",
})
.then((response) => response.json())
.then((data) => {
console.log(data);
getUsers(); // 사용자 목록을 다시 불러옵니다.
});
});
</script>
</body>
</html>
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class User(db.Model):
# ... User 모델 정의 ...
class Board(db.Model):
# ... Board 모델 정의 ...
위 모델을 각각 파일 별로 따로 관리를 해주게 된다면 위 방식은 사용이 불가능하고, SQLAlchemy()를 중앙에서 관리하는 방법으로 변경해줘야한다.
from flask import Flask
from models import db, User, Board
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///data.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
app.py에서 이 모델들을 임포트하고 db 인스턴스를 초기화한다.
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
그리고 models.py
파일과 app.py
에서 이 DB를 임포트합니다.
from db import db
class User(db.Model):
# ... User 모델 정의 ...
class Board(db.Model):
# ... Board 모델 정의 ...
from flask import Flask
from db import db
from models import User, Board
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///example.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
# 나머지 애플리케이션 코드...
이렇게 구조화함으로써 DB인스턴스는 모든 모델과 Flask 애플리케이션에서 공유되며, 데이터베이스 작업의 일관성을 유지할 수 있다.
VSC의 기본 터미널 (Shell)에서 python기반 ORM을 테스트 하는 방법이 있다.
> python # 파이썬 쉘로 접속
# 필요한 DB 임포트하기
from db import db
from app import app
from models import User
with app.app_context():
# 코드 작성
기본적으로 위 코드로 각 DB에 접근할 수 있고 python shell script에서 ORM을 실제 query로 작성해보고 실행할 수 있는 공간이다.
아래 코드들을 입력할 때에는 들여쓰기에 주의해주길 바란다.
with app.app_context():
new_user = User(name='newuser', email='newuser@example.com')
db.session.add(new_user)
db.session.commit()
user = User.query.filter_by(name='newuser').first()
print(user)
user.name # newuser
user.email # newuser@example.com
SQLAlchemy의 쿼리 인터페이스를 사용하고, 단일 레코드 조회, 전체 레코드 조회 등 다양한 방식이 존재한다.
(1) 모든 레코드 조회하기
with app.app_context():
# 모든 User 레코드 조회
users = User.query.all()
for user in users:
print(user.id, user.name, user.email)
(2) 조건 기반 조회하기
# 1. 이름이 코딩인 조건을 만족하는 User 전체 조회
with app.app_context():
users = User.query.filter_by(name='coding').all()
print(users)
# 2. 이름이 newuser인 조건을 만족하는 첫번째 User 조회
with app.app_context():
user = User.query.filter_by(name='newuser').first()
if user:
print(f'Name: {user.name}, Email: {user.email}')
else:
print('User not found')
데이터를 업데이트하기 위해서는 먼저 해당 데이터를 조회한 다음에 필드 값을 변경하고 데이터베이스 세션에 커밋한다.
with app.app_context():
user = User.query.filter_by(name='newuser').first()
if user:
user.email = 'updated@example.com'
db.session.commit()
print('User updated')
else:
print('User not found')
데이터를 삭제하기 위해서는 해당 데이터를 조회한 후, 데이터베이스 세션에서 삭제하고 커밋한다.
with app.app_context():
user = User.query.filter_by(name='newuser').first()
if user:
db.session.delete(user)
db.session.commit()
print('User deleted')
else:
print('User not found')
모델은 데이터베이스의 테이블을 나타내며 SQLAlchemy 등의 ORM(Object-Relational Mapping) 도구를 통해 데이터베이스와 상호작용한다.
반면, 스키마는 데이터의 직렬화(Serialization) 및 역직렬화(Deserialization)와 유효성 검증을 위해 사용된다.
이는 주로 API 요청 및 응답에서 데이터 포맷을 관리하는데 사용된다.
모델 (Model):
스키마 (Schema):
스키마의 필요성은 주로 API 개발에서 드러난다.
예를 들어, 클라이언트가 JSON 형식으로 데이터를 보낼 때, 이 데이터를 검증하고 파이썬 객체로 변환하는 역할을 스키마가 한다. 반대로 서버가 클라이언트에 데이터를 응답할 때, 파이썬 객체를 JSON 형식으로 변환하는 것도 스키마의 역할이다.
아래의 코드 예시에서 UserSchema
는 사용자(User) 데이터를 JSON 형식으로 직렬화하거나, 클라이언트로부터 받은 JSON 데이터를 역직렬화하여 사용하기 쉬운 파이썬 객체로 변환하는 역할을 한다. 이러한 과정을 통해 Flask 애플리케이션의 API는 일관된 데이터 포맷을 유지하고, 데이터의 유효성을 보장할 수 있다.
직렬화는 복잡한 데이터 구조(예: Python 객체)를 JSON과 같은 포맷으로 변환하는 과정이다. 이 변환은 데이터를 API 응답으로 전송하거나, 파일로 저장할 때 유용하게 사용된다.
User
모델의 직렬화User
모델 인스턴스를 JSON 형식으로 변환하는 것을 생각해보자.모델 인스턴스: Python에서 User 객체가 다음과 같이 있다고 가정한다.
user_instance = User(id=1, username='JohnDoe')
직렬화 과정: 이 객체를 JSON 포맷으로 변환한다. 이를 위해 Marshmallow의 스키마를 사용할 수 있다.
class UserSchema(Schema):
id = fields.Int(dump_only=True)
username = fields.Str()
결과: 직렬화된 JSON 데이터는 다음과 같다.
{
"id": 1,
"username": "JohnDoe"
}
역직렬화는 JSON과 같은 포맷의 데이터를 복잡한 데이터 구조(예: Python 객체)로 변환하는 과정이다. 이는 클라이언트에서 받은 데이터를 서버의 내부 데이터 모델로 변환할 때 사용된다.
User
모델의 역직렬화
: JSON 포맷의 데이터를 User 모델 인스턴스로 변환하는 것을 생각해보자.
JSON 데이터: 클라이언트로부터 다음과 같은 JSON 데이터를 받았다고 가정한다.
{
"username": "JaneDoe"
}
역직렬화 과정: 이 JSON 데이터를 User 모델의 인스턴스로 변환한다.
user_data = {"username": "JaneDoe"}
user_schema = UserSchema()
user_instance = user_schema.load(user_data)
결과: 역직렬화된 User 인스턴스는 다음과 같다.
User(username='JaneDoe')
API 응답에서 직렬화 사용
: 서버가 데이터베이스에서 User
인스턴스를 조회한 후, 이를 JSON 형식으로 클라이언트에게 전송할 때 사용한다.
클라이언트 요청에서 역직렬화 사용
: 클라이언트로부터 JSON 형식의 데이터를 받아, 이를 User
모델 인스턴스로 변환하여 데이터베이스에 저장할 때 사용한다.
직렬화와 역직렬화는 데이터를 안전하고 효율적으로 전송하고, API 요청 및 응답을 처리하는 데 필수적인 과정이다. 이 과정을 통해 복잡한 객체 구조를 쉽게 관리하고, 데이터의 유효성을 검증할 수 있다.
Flask-SQLAlchemy 설정
Flask 애플리케이션에 Flask-SQLAlchemy를 설정한다. 이를 위해 SQLAlchemy 객체를 생성하고 Flask 앱과 연결한다.
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///yourdatabase.db' # 여기서 데이터베이스 URI 설정
db = SQLAlchemy(app)
모델 정의
데이터베이스 테이블에 해당하는 모델 클래스를 정의한다. 이 클래스는 db.Model을 상속받아 SQLAlchemy ORM을 통해 데이터베이스 작업을 수행할 수 있다.
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
데이터 조회
데이터베이스로부터 데이터를 가져오는 과정은 주로 query 객체를 사용하여 수행된다. 다음은 몇 가지 일반적인 조회 방법이다.
users = User.query.all()
for user in users:
print(user.username)
user = User.query.filter_by(username='exampleuser').first()
if user:
print(user.username)
user = User.query.get(user_id)
if user:
print(user.username)
조회 결과 활용
조회된 결과는 Python 객체로 반환되므로, 이를 직접 사용하거나 필요한 데이터만 추출하여 다른 형식(JSON 등)으로 변환할 수 있다. 예를 들어, Flask 뷰에서 이러한 조회 결과를 클라이언트에 JSON 형태로 응답할 수 있다.
from flask import jsonify
@app.route('/users')
def list_users():
users = User.query.all()
return [{'id': user.id, 'username': user.username} for user in users]
> pip install Flask-Migrate
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
db = SQLAlchemy(app)
migrate = Migrate(app, db)
Migrate
객체를 생성하고 Flask 애플리케이션 및 SQLAlchemy 데이터베이스 인스턴스와 연결한다.처음으로 데이터베이스 마이그레이션을 사용하기 위해 초기 마이그레이션 파일 생성이 필요하다.
> flask db init
flask db init
): 앱을 처음 설정할 때 한 번만 수행합니다. 마이그레이션을 관리할 수 있는 migrations
디렉토리를 생성한다.migrations
디렉토리 생성
: flask db init
명령어를 실행하면, 마이그레이션 스크립트가 저장될 migrations
디렉토리가 프로젝트 루트에 생성된다.
설정 파일 생성
: Alembic의 설정 파일(alembic.ini
)이 프로젝트 루트에 생성된다. 이 파일은 데이터베이스 마이그레이션을 위한 설정을 담고 있다.
versions
디렉토리 생성
: 마이그레이션 스크립트 파일들이 저장될 versions
디렉토리가 migrations
디렉토리 내에 생성된다.
이렇게 초기화된 마이그레이션 구조는 데이터베이스 스키마를 변경할 때 사용된다. flask db init
명령어는 프로젝트에서 한 번만 실행하면 된다. 초기화 이후에는 새로운 마이그레이션을 생성하기 위해 flask db migrate
명령어를 사용한다.
모델에 변경사항이 있을 때마다 새로운 마이그레이션을 생성해야한다. 이는 다음 명령어로 수행된다.
> flask db migrate
> flask db migrate -m "Your migration message"
flask db migrate
명령어는 Flask-Migrate를 사용하여 데이터베이스 마이그레이션을 생성하는 역할이다.migrations/versions
디렉토리에 저장된다.flask db upgrade
명령어를 사용하여 실제로 데이터베이스에 마이그레이션을 적용할 수 있다.생성된 마이그레이션을 적용하여 데이터베이스를 업그레이드한다.
flask db upgrade
flask db upgrade
명령어마이그레이션 스크립트 실행
: flask db upgrade
명령어를 실행하면 migrations/versions
디렉토리에 있는 마이그레이션 스크립트가 실행된다. 이 스크립트는 SQLAlchemy를 사용하여 데이터베이스의 스키마를 변경하고 모델의 변경 사항을 적용한다.
데이터베이스 업그레이드
: 스크립트의 실행 결과, 데이터베이스는 새로운 스키마로 업그레이드된다. 이는 모델에 대한 변경 사항이 실제 데이터베이스에 적용되어 데이터베이스가 최신의 상태로 유지되도록 하는 과정이다.
flask db upgrade
명령어를 통해 데이터베이스를 업그레이드하면 애플리케이션의 모델이나 데이터베이스 스키마에 변경이 있을 때, 해당 변경 사항을 적용할 수 있다.
flask db upgrade
)필요한 경우 마이그레이션을 되돌릴 수 있다.
flask db downgrade
위 명령어는 이전 마이그레이션 상태로 데이터베이스를 되돌린다.
from db import db
class User(db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), unique=True, nullable=False)
email = db.Column(db.String(100), unique=True, nullable=False)
boards = db.relationship('Board', back_populates='author', lazy='dynamic')
# address = db.Column(db.String(120), unique=True, nullable=False) # 추가된 필드
class Board(db.Model):
__tablename__ = "boards"
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(100), nullable=False)
content = db.Column(db.String(200))
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
author = db.relationship('User', back_populates='boards')
마이그레이션 파일 생성
변경 사항을 포함한 새로운 마이그레이션 파일을 생성한다.
flask db migrate -m "Added email to User and created_at to Board."
마이그레이션 실행
생성된 마이그레이션을 데이터베이스에 적용한다.
flask db upgrade
확인
데이터베이스에 새로운 필드가 정상적으로 추가되었는지 확인하다. 필요한 경우, Python 쉘을 사용하거나 데이터베이스 관리 도구를 통해 테이블 구조를 검토할 수 있다.
이러한 업그레이드 관리 방법은 Django에서도 동일하기 때문에 방법에 익숙해지면 도움이 된다.
Flask와 MySQL 데이터베이스를 사용하여 간단한 블로그 시스템을 구축하는 실습을 해본다. 이 시스템은 사용자가 블로그 포스트를 생성, 조회, 수정, 삭제할 수 있게 한다.
blog
데이터베이스를 생성한다.posts
테이블을 다음과 같이 정의한다.id
: INT, Primary Key, 자동 증가title
: VARCHAR(100), 게시글 제목content
: TEXT, 게시글 내용created_at
: TIMESTAMP, 기본값 CURRENT_TIMESTAMPdb.yaml
파일에 데이터베이스 연결 정보를 저장하고, Flask 애플리케이션에서 이를 로드한다.GET /posts
: 모든 게시글을 조회한다.GET /posts/<id>
: 특정 ID를 가진 게시글을 조회한다.POST /posts
: 새 게시글을 생성한다.PUT /posts/<id>
: 특정 ID를 가진 게시글을 수정한다.DELETE /posts/<id>
: 특정 ID를 가진 게시글을 삭제한다.db.yaml
)에 저장하고, 이를 읽어서 사용한다.my_flask_app/
│
├── app.py
├── posts_routes.py
├── db.yaml
└── templates/
└── posts.html
먼저 MySQL에서 blog 데이터베이스와 posts 테이블을 생성합니다. MySQL 콘솔 또는 관리 도구를 사용하여 다음 쿼리를 실행한다.
USE oz;
CREATE TABLE posts (
id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(100),
content TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Flask 애플리케이션을 설정하고 Flask-MySQL 연결을 구성한다. db.yaml
파일에 데이터베이스 연결 정보를 저장하고, Flask 애플리케이션에서 이를 로드한다.
db.yaml
mysql_host: 'localhost'
mysql_user: 'root'
mysql_password: '비밀번호'
mysql_db: 'oz'
app.py
from flask import Flask, render_template
from flask_mysqldb import MySQL
import yaml # pip install pyyaml
from flask_smorest import Api
from posts_routes import create_posts_blueprint
app = Flask(__name__)
db = yaml.load(open('db.yaml'), Loader=yaml.FullLoader)
app.config['MYSQL_HOST'] = db['mysql_host']
app.config['MYSQL_USER'] = db['mysql_user']
app.config['MYSQL_PASSWORD'] = db['mysql_password']
app.config['MYSQL_DB'] = db['mysql_db']
mysql = MySQL(app)
# blueprint 설정 및 등록
app.config["API_TITLE"] = "My API "
app.config["API_VERSION"] = "v1" # 1.0도 가능
app.config["OPENAPI_VERSION"] = "3.1.3"
app.config["OPENAPI_URL_PREFIX"] = "/"
app.config["OPENAPI_SWAGGER_UI_PATH"] = "/swagger-ui"
app.config["OPENAPI_SWAGGER_UI_URL"] = "https://cdn.jsdelivr.net/npm/swagger-ui-dist/"
api = Api(app)
posts_blp = create_posts_blueprint(mysql)
api.register_blueprint(posts_blp)
@app.route('/blogs')
def manage_posts():
return render_template('posts.html')
if __name__ == "__main__":
app.run(debug=True)
게시글 CRUD 기능에 대한 라우트와 뷰 함수를 구현한다.
posts_routes.py
from flask import request, jsonify
from flask_smorest import Blueprint, abort
def create_posts_blueprint(mysql):
posts_blp = Blueprint("posts", __name__, description="posts api", url_prefix="/posts")
@posts_blp.route("/", methods=["GET", "POST"])
def posts():
cursor = mysql.connection.cursor()
# 게시글 조회
if request.method == "GET":
cursor.execute("SELECT * FROM posts")
posts = cursor.fetchall()
cursor.close()
post_list = []
for post in posts:
post_list.append(
{
"id": post[0],
"title": post[1],
"content": post[2],
}
)
return jsonify(post_list)
# 게시글 생성
elif request.method == "POST":
title = request.json.get("title")
content = request.json.get("content")
if not title or not content:
abort(400, message="Title or Content is required.")
cursor.execute("INSERT INTO posts(title, content) VALUES(%s, %s)", (title, content))
mysql.connection.commit()
return jsonify({"message": "successfully created post data", "title": title, "content": content}), 201
@posts_blp.route("/<int:id>", methods=["GET", "PUT", "DELETE"])
def post(id):
cursor = mysql.connection.cursor()
cursor.execute("SELECT * FROM posts WHERE id = %s", (id))
post = cursor.fetchone()
# 게시글 상세 조회
if request.method == "GET":
if not post:
abort(404, "Not found post.")
return jsonify({
"id": post[0],
"title": post[1],
"content": post[2],
})
# 게시글 수정
elif request.method == "PUT":
# data = request.json
# title = data['title']
title = request.json.get("title")
content = request.json.get("content")
if not title or not content:
abort(400, message="Not found title or content.")
if not post:
abort(404, message="Not found post.")
cursor.execute(f"UPDATE posts SET title=%s, content=%s WHERE id=%s", (title, content, id))
mysql.connection.commit()
return jsonify({"message": "Successfully updated title & content"})
# 게시글 삭제
elif request.method == "DELETE":
if not post:
abort(404, message="Not found post.")
cursor.execute("DELETE FROM posts WHERE id=%s", (id))
mysql.connection.commit()
return jsonify({"message": "Successfully deleted title & content"})
return posts_blp
#### (4) 프론트엔드 HTML 템플릿 생성
```html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<title>Blog Posts</title>
<script src="https://cdn.jsdelivr.net/npm/axios/dist/axios.min.js"></script>
</head>
<body>
<h1>Blog Posts</h1>
<h2>Create a New Post</h2>
<form id="postForm">
<input type="hidden" id="postId" value="" />
Title: <input type="text" id="title" /><br />
Content:<br />
<textarea id="content"></textarea><br />
<input type="submit" value="Create Post" id="createButton" />
<input
type="button"
value="Update Post"
id="updateButton"
onclick="submitUpdateForm()"
style="display: none"
/>
</form>
<hr />
<h2>All Posts</h2>
<ul id="postsList"></ul>
<script>
let currentEditingId = null;
document
.getElementById("postForm")
.addEventListener("submit", function (e) {
e.preventDefault();
const title = document.getElementById("title").value;
const content = document.getElementById("content").value;
if (currentEditingId === null) {
createNewPost(title, content);
} else {
updatePost(currentEditingId, title, content);
}
});
function createNewPost(title, content) {
axios
.post("/posts", { title, content })
.then(function (response) {
console.log(response);
loadPosts();
resetForm();
})
.catch(function (error) {
console.error(error);
});
}
function updatePost(id, title, content) {
axios
.put("/posts/" + id, { title, content })
.then(function (response) {
console.log(response);
loadPosts();
resetForm();
})
.catch(function (error) {
console.error(error);
});
}
function loadPosts() {
axios
.get("/posts")
.then(function (response) {
const posts = response.data;
const postsList = document.getElementById("postsList");
postsList.innerHTML = "";
posts.reverse().forEach((post) => {
const listItem = document.createElement("li");
listItem.innerHTML = `
<h3>ID: ${post.id}</h3>
<h3>TITLE: ${post.title}</h3>
<p>CONTENT: ${post.content}</p>
<buttontoken interpolation">${post.id})">Delete</button>
<buttontoken interpolation">${post.id})">Edit</button>
`;
postsList.appendChild(listItem);
});
})
.catch(function (error) {
console.error(error);
});
}
function loadPostForEditing(id) {
axios
.get("/posts/" + id)
.then(function (response) {
const post = response.data;
console.log("post", post);
currentEditingId = post.id;
document.getElementById("title").value = post.title;
document.getElementById("content").value = post.content;
document.getElementById("createButton").style.display = "none";
document.getElementById("updateButton").style.display = "inline";
})
.catch(function (error) {
console.error(error);
});
}
function resetForm() {
currentEditingId = null;
document.getElementById("title").value = "";
document.getElementById("content").value = "";
document.getElementById("createButton").style.display = "inline";
document.getElementById("updateButton").style.display = "none";
}
function submitUpdateForm() {
const title = document.getElementById("title").value;
const content = document.getElementById("content").value;
console.log("currentEditingId", currentEditingId);
if (currentEditingId) {
updatePost(currentEditingId, title, content);
} else {
console.error("No post is currently being edited.");
}
}
function deletePost(id) {
if (confirm("Are you sure you want to delete this post?")) {
axios
.delete("/posts/" + id)
.then(function (response) {
console.log(response);
loadPosts();
})
.catch(function (error) {
console.error(error);
});
}
}
loadPosts();
</script>
</body>
</html>
애플리케이션을 실행하고 localhost:5000/blogs
주소로 접속하여 게시글 CRUD를 해본다.
강의랑 강의자료랑 차이가 좀 있어서 실행이 안되는게 화가난다... 하^^ 그래도 난 극복!!