https://github.com/craftsangjae/sqlalchemy-tutorial 레포지토리를 학습한 내용입니다.
해당 래포지토리는 sqlalchemy 패키지를 이용하여 다음과 같은 동작을 이해하는 목적을 가진다.
보통의 3티어 형태의 서비스에선 대부분의 동작 시간을 데이터베이스와의 연결이 차지. 때문에 서비스의 성능이 데이터베이스와의 연결이 얼마나 빠르고 정확하게 이뤄지는지에의해 결정.
# pool_pre_ping 설정
test_engine = create_async_engine(
'postgresql+asyncpg://user:password@localhost:5434/testdb',
echo=True,
pool_pre_ping=True)
# pool_recycle 설정
test_engine = create_async_engine(
'postgresql+asyncpg://user:password@localhost:5434/testdb',
echo=True,
pool_recycle=1)
커넥션 풀을 통해 DB와 연결하는 방법에는 여러 필요 설정이 있음
# pool_timeout 설정
# 10의 커넥션을 poool에 저장하고 11개째의 요청은 1초만에 timeout
test_engine = create_async_engine(
'postgresql+asyncpg://user:password@localhost:5434/testdb',
echo=True,
pool_size=10,
max_overflow=0,
pool_timeout=1)
# statement_timeout 설정
# 1000ms -> 1s 이상 걸리는 쿼리 요청에 대해 timeout 에러
test_engine = create_async_engine(
'postgresql+asyncpg://user:password@localhost:5434/testdb',
echo=True,
connect_args={"server_settings": {"statement_timeout": "1000"}} # 단위가 ms, 보통 15s 정도로 설정함
)
sqlalchemy의 sessionmaker 를 이용하여 세션을 생성, 관리
test_engine = create_async_engine('postgresql+asyncpg://user:password@localhost:5434/testdb', echo=True)
session_factory = sessionmaker(test_engine, class_=AsyncSession, expire_on_commit=False)
AsyncScopedSession = scoped_session(session_factory)
세션은 하나의 coroutine에 의해 처리됨. 요청이 들어온 테스크를 coroutine 큐에 넣어놓고 순차적으로 처리. 처리중 awaite를 만나면 처리중엔 task는 다시 큐의 가장 윗단에 넣어놓고 다음 task를 처리.
위의 과정에서 동일 세션에 대한 task가 비동기 적으로 동시에 처리된다면 에러 발생. 기본적으로 세션은 쓰레드 별로 할당되기 때문.
이를 위해 세션을 테스크 별 할당으로 전환 하여 사용한다.
async_session_factory = async_sessionmaker(test_engine, expire_on_commit=False)
AsyncScopedSession = async_scoped_session(async_session_factory, scopefunc=asyncio.current_task)
transient : 객체가 생성되었지만 아직 세션에 추가되지 않은 상태, entity 만 만든 상태
given_user0 = UserEntity(id=None, name="test0")
pending : 세션에 추가된 상태, orm에서만 관리하고 있음
session.add(given_user0)
persistent : 세션에 추가되어서 관리되는 상태,
session.flush([given_user0])
flush : session에 추가된 Entity의 상태(CRUD)에 대한 모든 명령을 DB에 전달한다.
expired : 변경이 적용된 Entity에 대한 상태를 보장할 수 없는 상태
session.commit()
commit: flush 통해 전달된 상태를 DB에 적용한다.
expire_on_commit=False 로 설정하면, commit 이후에도 expired = true가 되지않아서 데이터에 대한 접근이 가능함
다만 이 방법은 expire 문제를 야기할수 있음
detached : 세션 아웃
session.refresh(given_user0)
관계형 테이블을 어떻게 entity로 표현할것인가
@dataclass
class User:
id: Optional[int]
name: str
posts: List['Post']
@staticmethod
def new(name: str):
return User(id=None, name=name, posts=[])
@dataclass
class Post:
id: Optional[int]
user_id: str
title: str
@staticmethod
def new(user_id: str, title: str):
return Post(id=None, user_id=user_id, title=title)
UserEntity 만 조회. user_entity.awaitable_attrs.posts 를 통해 posts가 필요할 때만 다시 조회
단일
async with test_scoped_session() as session:
stmt = select(UserEntity)
result = await session.execute(stmt)
user_entities = result.scalars().all()
for user_entity in user_entities:
post_entities = await user_entity.awaitable_attrs.posts
select 시 options을 추가하여 where in , 또는 join 등으로 조회
테이블 마다 한번의 쿼리문이 발생하게 됨, 모든 데이터를 가져옴
SELECT users.id, users.name FROM users
SELECT posts.user_id AS posts_user_id, posts.id AS posts_id, posts.title AS posts_title FROM posts WHERE posts.user_id IN ($1::INTEGER, $2::INTEGER)
async with test_scoped_session() as session:
stmt = select(UserEntity).options(selectinload(UserEntity.posts))
result = await session.execute(stmt)
user_entities = result.scalars().all()
for user_entity in user_entities:
post_entities = user_entity.posts
join을 통해 한번에 가져옴, 1대N의 경우 중복된 값이 많아져 네트워크 비용이 늘어남. 중복 제거가 필요함
SELECT users.id, users.name, posts_1.id AS id_1, posts_1.title, posts_1.user_id
FROM users LEFT OUTER JOIN posts AS posts_1 ON users.id = posts_1.user_id
async with test_scoped_session() as session:
stmt = select(UserEntity).options(joinedload(UserEntity.posts))
result = await session.execute(stmt)
user_entities = result.scalars().unique().all() # 중복 제거
for user_entity in user_entities:
post_entities = user_entity.posts