Repository와 UoW(Unit of Work)

정성연·2025년 11월 15일
post-thumbnail

아키텍처 설계를 마쳤으니 실제 구현을 해볼 시간이다.
개인적으로 api를 하나 구현할 때는 아래와 같은 순서로 진행한다.

  1. 필요한 api 라우터를 만들고
  2. 대응하는 Service 로직을 구성한 뒤
  3. 필요한 데이터를 Repository에서 전달하는 함수를 구현

이렇게 진행하는 이유는 쓸데없는 코드 구성을 생략하고 정말 필요한 로직만 골라서 구현할 수 있기 때문이다.
다만 실제 흐름은 Repository -> Service -> Controller 순으로 진행되기 때문에 기록은 이 순서를 따라가는게 좋겠다는 생각이다.


session maker 구성

당연하게도 SQLAlchemy를 사용하려면 Session maker부터 구성해야한다.
별건 아니지만 전역 변수 선언과 의존성 주입 구성의 일부분을 담당하기 때문에 살짝 언급하고 싶다.

# database/session.py
import os
from functools import lru_cache
from sqlalchemy.ext.asyncio import (
    AsyncEngine, create_async_engine, async_sessionmaker, AsyncSession
)

@lru_cache(maxsize=1)
def _async_database_url() -> str:
    return (
        f"mysql+asyncmy://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}"
        f"@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT','3306')}/{os.getenv('DB_DATABASE')}?charset=utf8mb4"
    )

def _make_async_engine() -> AsyncEngine:
    return create_async_engine(
        _async_database_url(),
        pool_size=int(os.getenv("DB_POOL_SIZE", 10)),
        max_overflow=int(os.getenv("DB_MAX_OVERFLOW", 20)),
        pool_timeout=int(os.getenv("DB_POOL_TIMEOUT", 30)),
        pool_recycle=int(os.getenv("DB_POOL_RECYCLE", 3600)),
        pool_pre_ping=True,
        pool_use_lifo=True,
        isolation_level=os.getenv("DB_ISOLATION", "READ COMMITTED"),
    )

database 폴더는 세션구성을 위한 설정을 담당한다.

# infra/global_state.py
from functools import lru_cache

@lru_cache(maxsize=1)
def get_async_engine() -> AsyncEngine:
    return _make_async_engine()

@lru_cache(maxsize=1)
def get_async_sessionmaker() -> async_sessionmaker[AsyncSession]:
    return async_sessionmaker(
        bind=get_async_engine(),
        expire_on_commit=False,
        autoflush=False,
        autocommit=False,
        class_=AsyncSession,
    )

설정을 받아서 생성한 session maker는 인프라 레벨에서 딱 한번만 준비되고 공통된 session을 UoW가 받아서 트랜잭션을 관리하는 형태이다.
즉, 세션 생성/해제 책임은 UoW, 연결 수명과 풀 설정은 Session maker가 담당한다.

Repository 구성

세션이 구성됐다면 다음은 실제 DB와 통신을 하는 Repository를 구성할 차례이다.
Repository는 Orm 객체를 통해 데이터에 직접 접근한다.
Orm 객체는 외부로 노출되면 예상치 못한 데이터 수정이 일어날 수 있기때문에 Service 레이어로 데이터를 전달할 때는 Dto를 통해 전달한다.
따라서 Repository를 구성하기 위해서는 Orm, Dto, Session이 필요하다.

Orm

# database/model/user_model.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = 'users'

    id = Column(
        Integer,
        primary_key=True,
        autoincrement=True,
        nullable=False,
        comment="자동 증가 기본 키"
    )
    name = Column(
        String(36),
        nullable=True,
        comment="사용자 이름"
    )

Orm 객체는 DeclarativeBase를 상속받아 구성된다.
예전에 Declarative mapping과 Imperative mapping에 대해 공부한 적이 있는데 생각이 잘 안나서 나중에 따로 또 공부를 해야겠다.

Dto

# schemas/dto/user.py
from pydantic import BaseModel
from typing import Optional

class UserDto(BaseModel):
	id: int
    name: Optional[str]
    
    model_config = {
        "from_attributes": True
    }

Dto는 Orm 객체에서 매핑할 컬럼을 대상으로 구성한다.
기본적으로 전체 컬럼을 전부 대응시켜고 좋지만 회사 프로젝트의 경우 데이터가 많아질 경우를 대비해 필요한 정보만 매핑할 수 있는 Dto를 여러개 구성해놨다.

from_attributesPydantic에서 제공하는 매핑 기능을 사용하기 위해 항상 True로 둔다.
Dto가 많아질 수록 매번 model_configfrom_attributes 를 설정해주기는 힘들다.
때문에 BaseDto를 만들어 모든 Dto는 BaseDto를 상속하게 한다.

# schemas/dto/base_dto.py
from pydantic import BaseModel

class BaseDto(BaseModel):
    model_config = {
        "from_attributes": True
    }

Repository

마지막 실제 레포지토리를 구성한다.

from typing import Optional

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from database.model.user_model import User
from schemas.dto.user import UserDto

class UserRepository:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def get_user_by_id(self, user_id: str) -> Optional[UserDto]:
        result = await self.db.execute(
            select(User)
            .where(User.id == user_id)
        )
        
        user_orm = result.scalars().one_or_none()
        
        if not user_orm:
            return None
            
        return UserDto.model_validate(user_orm)

레포지토리는 초기화 시, 세션을 받으며 해당 세션을 통해 쿼리문을 날린다.
단, 세션은 오직 레포지토리에만 주입되며 트랜잭션은 모두 UoW가 관리하게 된다.

UoW(Unit of Work) 구성

트랜잭션 단위

Spring에서는 @Transactional 어노테이션으로 Service 레이어에서 트랜잭션을 관리할 수 있게 해줬다.
파이썬은 그런거 없다.
대신 우리는 디자인 패턴의 힘을 빌려 비슷하게 구성할 수 있다.

# uow.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from crud.repositories.user_repository import UserRepository

class UnitOfWork:
    def __init__(self, sf: async_sessionmaker[AsyncSession]):
        self.sf = sf
        self.session: AsyncSession | None = None

    async def __aenter__(self):
        self.session = self.sf()
        return self

    async def __aexit__(self, exc_type, *_):
        try:
            if exc_type is None:
                await self.session.commit()
            else:
                await self.session.rollback()
        finally:
            await self.session.close()

    @property
    def user(self) -> UserRepository:
        return UserRepository(self.session)

context manager를 통해 자연스럽게 with 구문 내에서 진입과 탈출을 구성할 수 있다.
모든 Commit은 UoW가 닫힐 때 자연스럽게 이뤄지고 중간에 예외가 발생한다면 바로 롤백이 이뤄질수 있다.
모든 레포들은 객체 속성으로 선언된다.
즉, UoW에 등록하고 싶은 객체들은 @propery로 선언하면 된다.

Repository Registry

UoW를 운영하기에 문제가 하나 있다.
레포가 많아질수록 UoW에 등록되는 레포가 자연스럽게 많아진다는 것이고 그럼 레포를 구성할때마다 항상 UoW를 구성하는 파일이 변경되어야 한다는 것이다.
이런 패턴은 협업에 있어서 항상 문제를 일으킨다.
예컨데 동시에 같은 파일에 접근해서 충돌을 만든다거나 하는 그런 것들.
소규모에서는 그러려니하고 마는데 서비스 덩치가 커지면 커질수록 이는 엄청난 개발 오버헤드를 일으킨다.
그래서 UoW를 위한 레지스터를 만들 수 밖에 없었다.

# repo_registry.py
from typing import Dict,Type, TypeVar

T = TypeVar('T')

class RepoRegistry:
    _registry: Dict[Type[T], Type[T]] = {}

    @classmethod
    def register(cls, repo_cls: Type[T]):
        if repo_cls in cls._registry:
            raise ValueError(f"Repository {repo_cls} already registered")
        cls._registry[repo_cls] = repo_cls

    @classmethod
    def get(cls, repo_cls: Type[T]):
        try:
            return cls._registry[repo_cls]
        except KeyError:
            raise ValueError(f"Repository {repo_cls} not found (available: {list(cls._registry.keys())})")
            
def register_repo():
    def decorator(repo_cls: Type[T]):
        RepoRegistry.register(repo_cls)
        return repo_cls
    return decorator

RepoRegistry는 캐싱을 위한 레포지토리 딕셔너리를 속성으로 갖는다.
클래스 매서드로 registerget을 구성해 인스턴스 없이 전역으로 레지스트리에 접근 가능하게 구성한다.

마지막으로는 register_repo() 데코레이터를 구성해 레포지토리 클래스에서 간단하게 레지스트리에 등록할 수 있도록 구성한다.

이제 아래와 같이 UoW에 간단하게 등록가능하다.

@register_repo()
class UserRepository:
    def __init__(self, db: AsyncSession):
        self.db = db

레지스트리에 맞춰 UoW도 조금 코드를 바꿔준다.

T = TypeVar('T')

class UnitOfWork:
    def __init__(self, sf: async_sessionmaker[AsyncSession]):
        self.sf = sf
        self.session: AsyncSession | None = None
        self._cache = {}
        self._active = False

    async def __aenter__(self):
        self.session = self.sf()
        self._active = True
        return self

    async def __aexit__(self, exc_type, *_):
        try:
            if exc_type is None:
                await self.session.commit()
            else:
                await self.session.rollback()
        finally:
            await self.session.close()
            self._active = False
            self.session = None
            self._cache.clear()

    def _ensure_active(self):
        if not self._active or self.session is None:
            raise RuntimeError("UnitOfWork is not active. Use 'async with uow_factory() as uow:'")
        
        return self.session
    
    def get_repo(self, repo_cls: Type[T]) -> T:
        if repo_cls not in self._cache:
            session = self._ensure_active()
            repo_cls = RepoRegistry.get(repo_cls)
            self._cache[repo_cls] = repo_cls(session)
        return self._cache[repo_cls]

달라진 점은 레포지토리 캐시와 활성화 여부를 체크해주는 private 속성이 추가되었다는 것이다.

캐시를 통해 컨텍스트 내에서 한번 초기화된 레포지토리는 다음에 좀 더 빠르게 접근이 가능해지고 캐시에 등록되지 않은 레포지토리는 레지스트리에서 가져와서 캐시에 등록한다.

UoW의 활성화 여부에 대한 체크는 개발단계에서의 실수를 줄이기 위한 설계로 async context manager를 통한 접근을 강제한다.

마지막으로 async with문이 닫히면 UoW를 비활성화하고 캐시를 비움으로써 메모리의 부담을 줄인다.

실제 UoW는 아래와 같이 사용할 수 있다.

async with UnitOfWork(get_async_sessionmaker()) as uow:
	user_repo = uow.get_repo(UserRepository)
    
    user_dto = user_repo.get_user_by_id(user_id)

FastAPI DI 구성

구성된 UoW를 실제 라우터에 전달하기 위한 두 가지의 방법이 있다.

  1. 활성화된 UoW 객체를 라우터에 직접 주입
  2. UoW 객체를 생성할 수 있는 Factory를 주입

UoW 객체 주입

deps.py에 UoW 객체를 활성화 시킨 상태로 구성해두고 라우터에서 Depens()를 통해 주입받는 형태이다.

# api/deps.py
async def get_uow():
	async with UnitOfWork(get_async_sessionmaker()) as uow:
    	yield uow

yield를 통해 Generator를 구성한다.
Depends()는 Generator를 구성하는 Callable 객체를 전달 받을 시 라우터의 동작이 끝나면 Generator를 구성했던 자원들을 정리한다.
즉, 라우터가 종료될 때 자연스럽게 UoW의 커밋이 이뤄진다는 것이다.
때문에 라우터 단위의 트랜잭션을 구성할 수 있다.

@router("/user/{user_id}")
async def get_user(
	request: Request,
    user_id: int = Path(),
    user_service: UserService = Depends(get_user_service),
    uow: UnitOfWork = Depends(get_uow)
):
	...

이 방식의 경우 단순히 UoW를 주입하는 것만으로 트랜잭션 단위를 묶을 수 있다.
단, 라우터가 무사히 끝나야만 커밋이 이뤄진다.
덕분에 만약 외부 Api와 통신을 하는 동작이 라우터 내부에서 이뤄진다면 예기치 못한 데이터 불일치가 발생할 수 있다.

예를 들면, "S3에 데이터 저장 -> DB에 저장 위치 및 메타데이터 기록 -> 나머지 라우터 동작" 이라는 로직이 있다고 가정해보자.
각각 a, b, c 동작이라 할때 커밋은 c가 끝나고 난 뒤에 이뤄진다.
따라서 a (성공) -> b (성공) -> c (에러) 와 같은 상황이 발생하면 c단계에서 UoW는 롤백이 이뤄진다.
덕분에 S3에는 데이터가 성공적으로 업로드 되었지만 b단계에서 DB에 저장된 정보는 c의 에러로 인해 롤백이 되어 데이터 불일치가 일어날 수 있다.

이를 방지하기 위해서는 외부 I/O 동작에 대한 추가적 예외처리, 혹은 명시적 커밋 동작이 필요하다.

UoW Factory 주입

첫 번째 방법과 다르게 UoW를 구성하는 Factory를 구성해서 이를 라우터 혹은 서비스에 전달하는 방식이다.

# api/deps.py
def get_uow_factory() -> Callable[[], UnitOfWork]:
    def factory() -> UnitOfWork:
        return UnitOfWork(get_async_sessionmaker())
    return factory

마찬가지로 라우터에서 Factory를 전달 받는다.

@router("/user/{user_id}")
async def get_user(
	request: Request,
    user_id: int = Path(),
    user_service: UserService = Depends(get_user_service),
    uow_factory: Callable[[], UnitOfWork] = Depends(get_uow_factory)
):
	...

다만 Factory의 경우 자체적으로 async with를 통해 트랜잭션을 구성해야한다.

async uow_factory() as uow:
	...

이 덕분에 트랜잭션 단위를 좀 더 세부적으로 조절할 수 있게된다.

더욱이 이렇게되면 굳이 라우터에서 UoW를 주입받을 이유가 없다.
라우터는 애초에 Orm에 접근해서는 안되니 실수를 방지하기 위해서라도 서비스에 직접 Factory를 주입하도록 구성한다.

# api/deps.py
def get_uow_factory() -> Callable[[], UnitOfWork]:
    def factory() -> UnitOfWork:
        return UnitOfWork(get_async_sessionmaker())
    return factory

def get_user_service() -> UserService:
    return UserService(get_uow_factory())

이제 라우터는 단순히 서비스만 주입받아 동작할 수 있다.
물론 서비스 레이어에서 UoW를 사용할 때는 반드시 async with문을 구성해야한다.
좀 귀찮을 수 있지만 회사 프로젝트에서는 첫 번째 방식의 단점이 더 크다고 판단해 거의 모든 서비스에서 Factory를 명시적으로 주입받도록 했다.

물론 외부 I/O 없이 단순한 동작만 있는 경우에는 UoW를 직접 주입받아도 상관없다.
결국 중요한 건 어떤 레이어가 어디까지 책임을 가져가는지, 트랜잭션 경계를 어떻게 잡는지, 협업과 유지보수에 어떤 영향을 주는지를 이해하고, 그 장단점을 알고 서비스 형태에 따라 유연하게 선택하는 것이라고 생각한다.

마무리

이번 글에서는 세션 구성, Repository, UoW, DI 방식까지 한 번에 훑으면서 데이터 접근과 트랜잭션을 어떻게 다룰 것인가에 초점을 맞춰봤다.
다음 글에서는 이 기반 위에서 Service 레이어를 어떻게 설계하고, 어디까지 역할을 줘야 하는지를 구체적인 예제와 함께 풀어보도록 하겠다.

0개의 댓글