✅ MySQL 연결 후 데이터 추출
✅ 비동기 SQLAlchemy 사용
✅ MongoDB 연결 후 데이터 추출
✅ 데이터 가져오는 SQL 작성 후 간단한 API 생성 및 배포
FastAPI와 관련된 패키지들을 모아두는 Github Repo에서 fastapi-sqlalchemy 패키지가 존재하는 것을 보고 사용해보려고 한다.
$ pip install fastapi-sqlalchemy # https://github.com/mfreeborn/fastapi-sqlalchemy
$ pip install pymysql
pharmacy, worker DB Table을 만들어 놓고 작업한 내용입니다.
# models.py
from sqlalchemy import Column, BigInteger, SmallInteger, String, DateTime
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class Pharmacy(Base):
__tablename__ = 'pharmacy'
id = Column(BigInteger, primary_key=True, autoincrement=True)
name = Column(String, nullable=False)
address = Column(String, nullable=False)
is_deleted = Column(SmallInteger, nullable=False)
created_at = Column(DateTime, nullable=False)
updated_at = Column(DateTime, nullable=False)
class Worker(Base):
__tablename__ = 'worker'
id = Column(BigInteger, primary_key=True, autoincrement=True)
pharmacy_id = Column(BigInteger, nullable=False)
type = Column(String, nullable=False)
name = Column(String, nullable=False)
is_deleted = Column(SmallInteger, nullable=False)
created_at = Column(DateTime, nullable=False)
updated_at = Column(DateTime, nullable=False)
# main.py
from fastapi import FastAPI
from fastapi_sqlalchemy import DBSessionMiddleware
from fastapi_sqlalchemy import db
from models import Pharmacy, Worker
HOSTNAME = ''
PORT =
USERNAME = ''
PASSWORD = ''
DBNAME = ''
MYSQL_URL = f'mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DBNAME}'
app = FastAPI()
app.add_middleware(DBSessionMiddleware, db_url=MYSQL_URL)
@app.get('/worker')
async def select_worker():
query = db.session.query(Worker)
return query.all()
@app.get('/pharmacy')
async def select_pharmacy():
query = db.session.query(Pharmacy)
return query.all()
@app.get('/worker-pharmacy')
async def join_worker_pharmacy():
query = db.session.query(
Worker.id,
Worker.pharmacy_id,
Worker.name,
Worker.type,
Worker.created_at,
Worker.updated_at,
Worker.is_deleted,
Pharmacy.id.label('ph-id'),
Pharmacy.name.label('ph-name'),
Pharmacy.address.label('ph-address'),
Pharmacy.created_at.label('ph-created_at'),
Pharmacy.updated_at.label('ph-updated_at')
).join(
Pharmacy,
Worker.pharmacy_id == Pharmacy.id
)
return query.all()
[오류 상황]
DB를 연결 및 로컬에서 테스트 했을 때, 원하는 값이 나오는 것을 확인했다.
하지만 AWS SAM을 통해 배포를 했는데, 값이 나오지 않는 에러 발생.
[오류 원인]
Timeout Error
[해결 방법]
template.yaml
파일에서 Timeout 값을 수정
[오류 상황]
template.yaml
파일에서 Timeout 값을 3600 으로 수정한 후, 배포 명령어 실행함.
Resource handler returned message: "1 validation error detected: Value '3600' at 'timeout' failed to satisfy constraint: Member must have value less than or equal to 900
[오류 원인]
Timeout의 최대값은 900
[해결 방법]
Timeout 값을 3600 → 900 으로 수정
[오류 상황]
로컬에서 테스트 진행까지 완료 후, 배포를 진행했음.
하지만 /worker
, /pharmacy
, /worker-pharmacy
엔드포인트를 실행하면 오류 발생
로그를 확인해 보니 아래와 같은 문구 발생
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2003, "Can't connect to MySQL server on 'My AWS RDS EndPoint Link' (timed out)")
[오류 원인]
DB 접근 권한이 없었음.
[해결 방법]
모두 접근 가능하도록 수정했을 때는 원하는 결과 추출됨.
인바운드 규칙에 넣어줘야 함. Lambda를 VPC에 넣어줘야 할 것 같음.
FastAPI의 장점 중 하나는 '비동기'로 작동한다는 것인데, fastapi-sqlalchemy를 사용하게 된다면 여전히 DB 관련 API는 '동기'로 작동하게 될 것이다. 그래서 비동기로 DB 관련 API를 만들어 보고 싶었다.
$ pip install 'sqlalchemy[asyncio]'
$ pip install aiomysql
기존과 동일
# main.py
from asyncio import current_task
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_scoped_session
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker
from models import Pharmacy, Worker
HOSTNAME = ''
PORT =
USERNAME = ''
PASSWORD = ''
DBNAME = ''
MYSQL_URL = f'mysql+aiomysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DBNAME}'
app = FastAPI()
engine = create_async_engine(MYSQL_URL, echo=True)
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
session = async_scoped_session(async_session, scopefunc=current_task)
@app.get('/worker')
async def select_worker():
query = select(Worker)
result = await session.execute(query)
return result.scalars().all()
@app.get('/pharmacy')
async def select_pharmacy():
query = select(Pharmacy)
result = await session.execute(query)
return result.scalars().all()
@app.get('/worker-pharmacy')
async def join_worker_pharmacy():
query = select(
Worker.id,
Worker.pharmacy_id,
Worker.name,
Worker.type,
Worker.created_at,
Worker.updated_at,
Worker.is_deleted,
Pharmacy.id.label('ph-id'),
Pharmacy.name.label('ph-name'),
Pharmacy.address.label('ph-address'),
Pharmacy.created_at.label('ph-created_at'),
Pharmacy.updated_at.label('ph-updated_at')
).join(
Pharmacy,
Worker.pharmacy_id == Pharmacy.id
)
result = await session.execute(query)
return result.all()
[오류 상황]
ValueError: the greenlet library is required to use this function. No module named 'greenlet’
[오류 원인]
SQLAlchemy 1.4.x 에서 async 관련 작업을 하기 위해서는 greenlet
패키지가 필요함.
[해결 방법]
SQLAlchemy 패키지를 설치할 때 async 관련 패키지를 같이 설치해준다.
# asyncio 안 붙이면 greenlet 설치 안 됨
$ pip install sqlalchemy
...
Installing collected packages: sqlalchemy
Successfully installed sqlalchemy-1.4.35
# 이와 같이 설치해야 함
$ pip install 'sqlalchemy[asyncio]'
...
Installing collected packages: sqlalchemy, greenlet
Successfully installed greenlet-1.1.2 sqlalchemy-1.4.35
MySQL과 더불어 MongoDB 연결도 해보았다.
$ pip install pymongo
해당 부분은 생략
from asyncio import current_task
from fastapi import FastAPI
from pymongo import MongoClient
app = FastAPI()
HOST = ''
PORT =
client = MongoClient(HOST, PORT)
# client = MongoClient(f'mongodb://{HOST}:{PORT}')
db = client['mediscountDB']
# db = client.mediscountDB
@app.get('/mongo')
async def get_users_in_mongo():
users = db['users']
return list(users.find().limit(10))
☑️ Python에서 MongoDB ORM 확인하기
간단하게 MySQL, MongoDB를 연결해 봤다. 우선 저장된 데이터가 추출되는지만 확인을 진행했는데, 앞으로 더 많은 기능들을 사용해보려고 한다.
'비동기 SQLAlchemy' 사용하는 부분에서 내가 익숙했던 db.session.query()
문구를 쓰지 못한다는 것이 어색했는데, 2.0버전으로 계속 쓰다보면 익숙해질 거라고 생각한다:)
pymongo는 처음 사용해봤는데, 아직 MongoDB 문법에도 어색해서 그런지 낯선 느낌이 강하다. 하지만 이것 또한 계속 쓰다보면 익숙해질 거라고 생각한다.