이전 상태는… mySQL + pymysql(동기 드라이버) + SQL Model (ORM) 의 조합을 사용중임.
이걸 비동기 드라이버로 바꿔야 한다.
aiomysql, asyncmy, mysql-connector-python 등의 후보가 있음.
https://taejoone.jeju.onl/posts/2022-09-02-aiomysql-db-async-executor/
위 블로그 참고
aiomysql과 sqlmodel을 이용해 비동기를 진행해 보려고 한다.
https://github.com/fastapi/sqlmodel/issues/129
사례가 많이 없다. 그나마 위의 이슈 참고했다.
from contextlib import asynccontextmanager
import logging
import os
from dotenv import load_dotenv
from fastapi import FastAPI
from sqlalchemy import create_engine, text
from sqlmodel import SQLModel
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from typing import AsyncGenerator
# 환경 변수 로드
print("--------------------db.py---------------------")
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
# 비동기 엔진 생성
engine = create_async_engine(DATABASE_URL, echo=True)
# 비동기 세션 팩토리 생성
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False
)
@asynccontextmanager
async def lifespan(app: FastAPI):
print("Starting application...")
# 데이터베이스 연결 초기화
app.state.engine = engine
try:
yield
finally:
print("Shutting down application...")
await engine.dispose()
print("Database connection closed.")
# 의존성 주입을 위한 비동기 세션 제공자
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
try:
logging.info(f"💡[ 세션 생성 ] {session}")
yield session
await session.commit()
except Exception as e:
await session.rollback()
logging.error(f"Database error: {str(e)}")
raise
finally:
logging.info(f"💡[ 세션 종료 ] {session}")
await session.close()
# 테이블 초기화 함수
async def init_db():
async with engine.begin() as conn:
# 기존 테이블 삭제
await conn.run_sync(SQLModel.metadata.drop_all)
print("테이블을 삭제했습니다.")
# 새 테이블 생성
await conn.run_sync(SQLModel.metadata.create_all)
print("테이블을 생성했습니다.")
# CSV 데이터 삽입
try:
import pandas as pd
data = pd.read_csv('administrative_division.csv')
await conn.run_sync(
lambda sync_conn: data.to_sql(
'administrative_division',
con=sync_conn,
if_exists='append',
index=False
)
)
print(f"총 {len(data)}개의 행 삽입 완료.")
except Exception as e:
print(f"CSV 데이터 삽입 실패: {e}")
# 테이블 존재 여부 확인
async def check_tables():
print("---------메타데이터 테이블 목록---------")
print(SQLModel.metadata.tables)
print("--------------------------------------")
if __name__ == "__main__":
print("MySQL 연결 테스트를 시작합니다...")
try:
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL")
engine = create_engine(DATABASE_URL, echo=True)
# 엔진으로 직접 연결 테스트
with engine.connect() as connection:
print("MySQL 연결 성공!")
print("테이블 목록을 출력합니다.")
result = connection.execute(text("SHOW TABLES;"))
for row in result:
print(row)
except Exception as e:
print(f"MySQL 연결 실패: {e}")
조합이 특이하다. 세션만 sqlmodel에서, 엔진은 alchemy에서 가져온다.

pymysql은 비동기 드라이버가 아니라고 친절하게도 알려주고 있다.

Db를 붙여보니 실제로 비동기로 동작하는것 같다. 다만 지금 함수들이 전부 def로 되어있어서 코루틴을 처리 못하고 있다.

async/await 처리를 해줬건만 문제가 생기고 있다.
try:
query = select(Member).where((Member.email == email))
member = await session.exec(query).first()
return member.id if member is not None else None
except Exception as e:
print("[ memberRepository ] get_memberId_by_email() 에러 : ", e)
이를테면 이런 코드에서, first() 를 코루틴에 적용하려고 해서 생기는 문제다.
session 이 비동기 세션으로 바뀌었으니… 사용법도 다시 알아야 할 듯.
https://github.com/Me-mind-hackerthon/memind-backend/issues/43
이런 이슈도 있다.
그런데 위 이슈에서는 exec() 가 비동기를 지원하지 않는다고 하는데 다른 이슈에서는 exec() 를 사용하는경우가 있고, 내 코드에서도 비동기를 지원하긴 한다. 위 오류는 exec()가 코루틴을 반환하니까 생긴 오류임

대신 다른 이슈에서 살펴본 위의 코드 예시를 보면, 코루틴이 최종 결과를 반환하면 거기서 first() 함수들을 사용한다. 이 패턴이 맞는듯.

그리고 여기서는 scalar라는 함수를 쓰라고 한다. 1.4부터 도입되었다고 하는데…
https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html

공식문서에도 이렇게 되어있네? 아오…
For Core use, the
create_async_engine()function creates an instance ofAsyncEnginewhich then offers an async version of the traditionalEngineAPI. TheAsyncEnginedelivers anAsyncConnectionvia itsAsyncEngine.connect()andAsyncEngine.begin()methods which both deliver asynchronous context managers. TheAsyncConnectioncan then invoke statements using either theAsyncConnection.execute()method to deliver a bufferedResult, or theAsyncConnection.stream()method to deliver a streaming server-sideAsyncResult:
Core 사용의 경우 create_async_engine() 함수는 AsyncEngine의 인스턴스를 생성한 다음 기존 엔진 API의 비동기 버전을 제공합니다. AsyncEngine은 비동기 컨텍스트 관리자를 제공하는 AsyncEngine.connect() 및 AsyncEngine.begin() 메서드를 통해 AsyncConnection을 제공합니다. 그런 다음 AsyncConnection은 AsyncConnection.execute() 메서드를 사용하여 버퍼링된 결과를 전달하거나 AsyncConnection.stream() 메서드를 사용하여 스트리밍 서버 측 AsyncResult를 전달하는 명령문을 호출할 수 있습니다.
Using 2.0 style querying, the
AsyncSessionclass provides full ORM functionality. Within the default mode of use, special care must be taken to avoid lazy loading or other expired-attribute access involving ORM relationships and column attributes; the next section Preventing Implicit IO when Using AsyncSession details this. The example below illustrates a complete example including mapper and session configuration:
AsyncSession 클래스는 2.0 스타일 쿼리를 사용하여 전체 ORM 기능을 제공합니다. 기본 사용 모드에서는 지연 로드 또는 ORM 관계 및 열 속성과 관련된 기타 만료된 속성 액세스를 방지하기 위해 특별한 주의를 기울여야 합니다. 이에 대한 자세한 내용은 다음 섹션인 AsyncSession 사용 시 암시적 IO 방지에서 자세히 설명합니다. 아래 예는 매퍼 및 세션 구성을 포함한 전체 예를 보여줍니다.
이런 내용이 있다.
scalars() 의 역할이 뭘까..?
이게 SQLAlchemy2.0 스타일인듯. 결과집합에서 데이터 가져오는 방식이 메서드형태로 바뀌었다. 그래서 아래처럼 사용해봤다.
async def is_exist_member_by_email(email: str, oauth: str, session: AsyncSession) -> bool:
try:
print("session type : ", type(session))
query = select(Member).where((Member.email == email) & (Member.oauth == oauth))
result = await session.exec(query)
member = result.scalars().first()
return True if not member == None else False
except Exception as e:
print("[ memberRepository ] is_exist_member_by_email() 에러 : ", e)

그런데 위와 같은 에러 발생.
query = select(Spot).where(Spot.id == spot_id)
result = await session.exec(query)
spot = result.first()
이런 패턴으로 가져온 결과집합의 타입은 전부 ScalarResult이고, 이럴때는 그냥 first()등으로 커서만 이동해 결과 가져오게끔 되어있다.
드라이버 비동기로 변경 끝!