[FastAPI] MySQL, MongoDB 연결

최더디·2022년 4월 18일
3
post-thumbnail

📍 목표

✅ MySQL 연결 후 데이터 추출
✅ 비동기 SQLAlchemy 사용
✅ MongoDB 연결 후 데이터 추출
✅ 데이터 가져오는 SQL 작성 후 간단한 API 생성 및 배포

📍 fastapi-sqlalchemy 패키지를 통한 MySQL DB 연결

FastAPI와 관련된 패키지들을 모아두는 Github Repo에서 fastapi-sqlalchemy 패키지가 존재하는 것을 보고 사용해보려고 한다.

필요 패키지 설치

$ pip install fastapi-sqlalchemy  # https://github.com/mfreeborn/fastapi-sqlalchemy
$ pip install pymysql

모델 선언

pharmacy, worker DB Table을 만들어 놓고 작업한 내용입니다.

# models.py

from sqlalchemy import Column, BigInteger, SmallInteger, String, DateTime
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class Pharmacy(Base):
  __tablename__ = 'pharmacy'

  id = Column(BigInteger, primary_key=True, autoincrement=True)
  name = Column(String, nullable=False)
  address = Column(String, nullable=False)
  is_deleted = Column(SmallInteger, nullable=False)
  created_at = Column(DateTime, nullable=False)
  updated_at = Column(DateTime, nullable=False)

class Worker(Base):
  __tablename__ = 'worker'

  id = Column(BigInteger, primary_key=True, autoincrement=True)
  pharmacy_id = Column(BigInteger, nullable=False)
  type = Column(String, nullable=False)
  name = Column(String, nullable=False)
  is_deleted = Column(SmallInteger, nullable=False)
  created_at = Column(DateTime, nullable=False)
  updated_at = Column(DateTime, nullable=False)

fastapi-sqlalchemy 사용

# main.py

from fastapi import FastAPI
from fastapi_sqlalchemy import DBSessionMiddleware
from fastapi_sqlalchemy import db

from models import Pharmacy, Worker

HOSTNAME = ''
PORT = 
USERNAME = ''
PASSWORD = ''
DBNAME = ''
MYSQL_URL = f'mysql+pymysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DBNAME}'

app = FastAPI()

app.add_middleware(DBSessionMiddleware, db_url=MYSQL_URL)

@app.get('/worker')
async def select_worker():
  query = db.session.query(Worker)
  return query.all()

@app.get('/pharmacy')
async def select_pharmacy():
  query = db.session.query(Pharmacy)
  return query.all()

@app.get('/worker-pharmacy')
async def join_worker_pharmacy():
  query = db.session.query(
      Worker.id,
      Worker.pharmacy_id,
      Worker.name,
      Worker.type,
      Worker.created_at,
      Worker.updated_at,
      Worker.is_deleted,
      Pharmacy.id.label('ph-id'),
      Pharmacy.name.label('ph-name'),
      Pharmacy.address.label('ph-address'),
      Pharmacy.created_at.label('ph-created_at'),
      Pharmacy.updated_at.label('ph-updated_at')
  ).join(
      Pharmacy,
      Worker.pharmacy_id == Pharmacy.id
  )
  return query.all()

API 확인

📍 오류

Timeout Error

[오류 상황]

DB를 연결 및 로컬에서 테스트 했을 때, 원하는 값이 나오는 것을 확인했다.
하지만 AWS SAM을 통해 배포를 했는데, 값이 나오지 않는 에러 발생.

[오류 원인]

Timeout Error

[해결 방법]

template.yaml 파일에서 Timeout 값을 수정

Timeout 최대값

[오류 상황]

template.yaml 파일에서 Timeout 값을 3600 으로 수정한 후, 배포 명령어 실행함.

Resource handler returned message: "1 validation error detected: Value '3600' at 'timeout' failed to satisfy constraint: Member must have value less than or equal to 900

[오류 원인]

Timeout의 최대값은 900

[해결 방법]

Timeout 값을 3600 → 900 으로 수정

DB Connection Error

[오류 상황]

로컬에서 테스트 진행까지 완료 후, 배포를 진행했음.
하지만 /worker, /pharmacy, /worker-pharmacy 엔드포인트를 실행하면 오류 발생

로그를 확인해 보니 아래와 같은 문구 발생

sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2003, "Can't connect to MySQL server on 'My AWS RDS EndPoint Link' (timed out)")

[오류 원인]

DB 접근 권한이 없었음.

[해결 방법]

모두 접근 가능하도록 수정했을 때는 원하는 결과 추출됨.
인바운드 규칙에 넣어줘야 함. Lambda를 VPC에 넣어줘야 할 것 같음.

📍 비동기 SQLAlchemy 사용하기

FastAPI의 장점 중 하나는 '비동기'로 작동한다는 것인데, fastapi-sqlalchemy를 사용하게 된다면 여전히 DB 관련 API는 '동기'로 작동하게 될 것이다. 그래서 비동기로 DB 관련 API를 만들어 보고 싶었다.

필요한 패키지 설치

$ pip install 'sqlalchemy[asyncio]'
$ pip install aiomysql

모델 선언

기존과 동일

Async SQLAlchemy 사용하기

  • SQLAlchemy 2.0 스타일로 작성해야 함.
# main.py

from asyncio import current_task
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_scoped_session
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker

from models import Pharmacy, Worker

HOSTNAME = ''
PORT = 
USERNAME = ''
PASSWORD = ''
DBNAME = ''
MYSQL_URL = f'mysql+aiomysql://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DBNAME}'

app = FastAPI()

engine = create_async_engine(MYSQL_URL, echo=True)
async_session = sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
session = async_scoped_session(async_session, scopefunc=current_task)

@app.get('/worker')
async def select_worker():
  query = select(Worker)
  result = await session.execute(query)
  return result.scalars().all()

@app.get('/pharmacy')
async def select_pharmacy():
  query = select(Pharmacy)
  result = await session.execute(query)
  return result.scalars().all()

@app.get('/worker-pharmacy')
async def join_worker_pharmacy():
  query = select(
      Worker.id,
      Worker.pharmacy_id,
      Worker.name,
      Worker.type,
      Worker.created_at,
      Worker.updated_at,
      Worker.is_deleted,
      Pharmacy.id.label('ph-id'),
      Pharmacy.name.label('ph-name'),
      Pharmacy.address.label('ph-address'),
      Pharmacy.created_at.label('ph-created_at'),
      Pharmacy.updated_at.label('ph-updated_at')
  ).join(
      Pharmacy,
      Worker.pharmacy_id == Pharmacy.id
  )
  result = await session.execute(query)
  return result.all()

API 확인

참고

📍 오류

패키지 오류

[오류 상황]

ValueError: the greenlet library is required to use this function. No module named 'greenlet’

[오류 원인]

SQLAlchemy 1.4.x 에서 async 관련 작업을 하기 위해서는 greenlet 패키지가 필요함.

[해결 방법]

SQLAlchemy 패키지를 설치할 때 async 관련 패키지를 같이 설치해준다.

# asyncio 안 붙이면 greenlet 설치 안 됨
$ pip install sqlalchemy
...
Installing collected packages: sqlalchemy
Successfully installed sqlalchemy-1.4.35

# 이와 같이 설치해야 함
$ pip install 'sqlalchemy[asyncio]'
...
Installing collected packages: sqlalchemy, greenlet
Successfully installed greenlet-1.1.2 sqlalchemy-1.4.35

📍 pymongo를 사용해 MongoDB 연결

MySQL과 더불어 MongoDB 연결도 해보았다.

패키지 설치

$ pip install pymongo

DB 생성 및 데이터 적재

해당 부분은 생략

pymongo 사용하기

from asyncio import current_task
from fastapi import FastAPI
from pymongo import MongoClient

app = FastAPI()

HOST = ''
PORT = 

client = MongoClient(HOST, PORT)
# client = MongoClient(f'mongodb://{HOST}:{PORT}')

db = client['mediscountDB']
# db = client.mediscountDB

@app.get('/mongo')
async def get_users_in_mongo():
  users = db['users']
  return list(users.find().limit(10))

TODO

☑️ Python에서 MongoDB ORM 확인하기

📍 마무리

간단하게 MySQL, MongoDB를 연결해 봤다. 우선 저장된 데이터가 추출되는지만 확인을 진행했는데, 앞으로 더 많은 기능들을 사용해보려고 한다.

'비동기 SQLAlchemy' 사용하는 부분에서 내가 익숙했던 db.session.query() 문구를 쓰지 못한다는 것이 어색했는데, 2.0버전으로 계속 쓰다보면 익숙해질 거라고 생각한다:)

pymongo는 처음 사용해봤는데, 아직 MongoDB 문법에도 어색해서 그런지 낯선 느낌이 강하다. 하지만 이것 또한 계속 쓰다보면 익숙해질 거라고 생각한다.

profile
focus on why

0개의 댓글