나는 지금까지 DB 모델 중심의 개발만 해왔다. 사실 DB 중심 개발인 것도 모르고, 그냥 첫 회사에서 그렇게 시작해서 다들 그렇게 하는지 알았다. 스타트업에서 일하다 보니 기획이 자주 변경되었고, 그때마다 DB 설계를 수정해야 했다.
이런 과정이 반복되면서 DB 모델링 중심의 개발 방식에 한계를 느꼈고, 분명 이런 문제를 해결하기 위해 연구한 방법이 있을 것이라고 생각해 검색을 하다가 여러 아키텍처 방법론에 대해 공부해보게 되었다. 그때 처음 DDD, 클린 아키텍처 등에 대해 알게되었지만 당시 프로젝트에 직접 적용해보진 않았다.
이직한 회사에서 DDD 방법론을 주로 사용해왔기 때문에, 이번 기회에 지금까지 늘 해 왔던 데이터 중심 설계에서 벗어나 새로운 방식으로 설계를 시도해보았다.
내가 담당하는 프로젝트는 사내의 흩어져 있는 데이터들을 한 곳에 모아서 관리하는 Data Lake 같은 내부 솔루션을 개발하는 것이다.
이 솔루션의 주요 기능 중 하나는 데이터 버전 관리 시스템이다. 저장된 데이터가 수정될 때마다 변경 이력이 기록되며, 필요한 경우 이전 버전으로 복원할 수 있다.
처음 이 프로젝트를 시작할 때는 내가 늘 하던 방식으로 개발했다.
class BaseModel(BASE):
__abstract__ = True
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), onupdate=func.now(), server_default=func.now()
)
# Generate __tablename__ auto
@declared_attr
def __tablename__(cls) -> str:
return cls.__name__.lower()
def to_dict(self) -> dict:
"""
모델의 기본 속성을 딕셔너리로 변환
"""
return {
column.name: getattr(self, column.name) for column in self.__table__.columns
}
class Dataset(BaseModel):
meta_data: Mapped[dict] = mapped_column(JSONB)
elements: Mapped[List["Element"]] = relationship(
"Element", back_populates="dataset", cascade="all, delete-orphan"
)
def to_dict(self, with_elements: bool = False) -> dict:
base_dict = super().to_dict()
if with_elements:
base_dict.update(
{"elements": [element.to_dict() for element in self.elements]}
)
return base_dict
class Version(BaseModel):
__tablename__ = "element_version"
element_id: Mapped[int] = mapped_column(
ForeignKey(
"element.id", ondelete="CASCADE", name="fk_version_element_id"
)
)
full_state: Mapped[dict] = mapped_column(JSONB)
checksum: Mapped[str] = mapped_column(String)
element: Mapped["Element"] = relationship(
"Element", back_populates="versions", foreign_keys=[element_id]
)
class Element(BaseModel):
dataset_id: Mapped[int] = mapped_column(
ForeignKey("dataset.id", ondelete="CASCADE", name="fk_element_dataset_id")
)
element_type: Mapped[str] = mapped_column(String)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
dataset: Mapped["Dataset"] = relationship("Dataset", back_populates="elements")
versions: Mapped[List["Version"]] = relationship(
"Version",
back_populates="element",
foreign_keys=[Version.element_id],
cascade="all, delete-orphan",
)
current_version_id: Mapped[Optional[int]] = mapped_column(
ForeignKey(
"element_version.id",
ondelete="SET NULL",
name="fk_element_current_version_id",
)
)
current_version: Mapped[Optional["Version"]] = relationship(
"Version", foreign_keys=[current_version_id]
)
def to_dict(self, with_versions: bool = False) -> dict:
base_dict = super().to_dict()
base_dict.update(
{
"current_version": (
self.current_version.to_dict() if self.current_version else None
)
}
)
if with_versions:
base_dict.update(
{"versions": [version.to_dict() for version in self.versions]}
)
return base_dict
먼저 모델을 설계하고, 모델에 맞는 데이터베이스 테이블을 생성하는 전형적인 데이터베이스 주도 방식으로 개발을 진행했다. 테스트 코드조차 실제 테스트 데이터베이스에 의존하여 작성했다.
내가 비즈니스 로직이라고 생각했던 부분들은 사실상 ORM을 활용한 데이터베이스 쿼리 로직에 불과했다. 이는 다음과 같은 문제점을 가지고 있었다
처음에는 '필요한 기능들을 가볍게 구현해보자'는 의도로 시작한 프로젝트였다. 하지만 데이터베이스 중심의 개발에 익숙했던 나에게는 '가벼운 구현'이라는 것 자체가 불가능했다.
간단한 로직이라도 테스트해보기 위해서는 DB container를 실행시키고 테이블을 실제로 생성하고 더미 데이터까지 만들어야했기 때문이다.
또한 데이터베이스 스키마에 종속된 코드를 작성하다 보니 작은 변경사항에도 전체 구조를 수정해야 하는 상황이 반복되었다.
이외에도 다른 개념들이 있지만, 나는 내가 직접 구현해본 것들만 정리해보았다.
class Dataset:
def __init__(
self,
dataset_id: int,
meta_data: dict,
created_at: datetime = datetime.now(),
updated_at: datetime = datetime.now(),
elements: Optional[List[Element]] = None,
):
self.created_at = created_at
self.updated_at = updated_at
self.dataset_id = dataset_id
self.meta_data = meta_data
self.elements = elements
def update_meta_data(self, meta_data: dict):
"""Dataset의 meta_data를 업데이트합니다."""
self.meta_data = meta_data
def add_element(self, element: Element):
"""Dataset에 Element를 추가합니다."""
self.elements.append(element)
def remove_element(self, element: Element, repository ):
"""Element를 제거합니다."""
await repository.delete_element(element)
def count_elements(self) -> int:
"""Dataset에 포함된 Element의 수를 반환합니다."""
return len(self.elements)
def to_dict(self) -> dict:
if self.elements:
return {
"created_at": self.created_at,
"updated_at": self.updated_at,
"dataset_id": self.dataset_id,
"meta_data": self.meta_data,
"elements": [element.to_dict() for element in self.elements],
}
else:
return {
"created_at": self.created_at,
"updated_at": self.updated_at,
"dataset_id": self.dataset_id,
"meta_data": self.meta_data,
}
@classmethod
def from_model(cls, model, with_elements: bool = False) -> "Dataset":
"""SQLAlchemy 모델에서 도메인 객체로 변환합니다."""
if not model:
return None
elements = [Element.from_model(em) for em in model.elements] if with_elements else []
return cls(
created_at=model.created_at,
updated_at=model.updated_at,
dataset_id=model.id,
meta_data=model.meta_data,
elements=elements,
)
class Element:
def __init__(
self,
element_id: Optional[int] = None,
element_type: Optional[str] = None,
current_version: Optional[Version] = None,
versions: Optional[Dict[str, Version]] = None,
created_at: datetime = datetime.now(),
updated_at: datetime = datetime.now()
):
self.created_at = created_at
self.updated_at = updated_at
self.element_id = element_id
self.element_type = element_type
self.current_version = current_version
self.versions = versions or {}
@classmethod
def create_with_element_type(cls, element_type: str) -> "Element":
"""element_type을 통해 Element 생성."""
return cls(element_id=None, element_type=element_type)
def add_version(self, version: Version):
"""새로운 Version을 추가합니다."""
if version.checksum in self.versions:
raise ValueError("Version with this checksum already exists.")
self.versions[version.checksum] = version
self.current_version = version
async def update_element_type(self, element_type: str):
"""Element의 타입을 업데이트합니다."""
self.element_type = element_type
async def change_current_version(self, checksum: str):
"""현재 Version을 설정합니다."""
if checksum not in self.versions:
raise ValueError("Version not found in versions.")
self.current_version = self.versions[checksum]
async def remove_version(self, checksum: str, repository):
"""특정 Version을 삭제합니다."""
if checksum not in self.versions:
raise ValueError("Version not found.")
# Version 삭제
await repository.delete_version(self.element_id, checksum)
del self.versions[checksum]
if self.current_version.checksum == checksum:
self._update_current_version()
def _update_current_version(self):
"""현재 Version을 다른 Version으로 교체합니다."""
print("self.versions", self.versions)
if self.versions:
# created_at이 가장 최신인 Version을 찾음
latest_version = max(self.versions.values(), key=lambda v: v.created_at)
self.current_version = latest_version
else:
self.current_version = None
def get_versions_in_period(
self, start_date: datetime, end_date: datetime, page: int, per_page: int
) -> Tuple[List[Version], int]:
"""특정 기간의 Version을 페이지네이션하여 조회합니다."""
# 1. 기간에 해당하는 Versions 필터링
filtered_versions = [
version
for version in self.versions.values()
if start_date <= version.created_at <= end_date
]
# 2. 전체 레코드 수 계산
total_count = len(filtered_versions)
# 3. 페이지네이션 처리
offset = (page - 1) * per_page
paginated_versions = filtered_versions[offset : offset + per_page]
return paginated_versions, total_count
def retrieve_version(self, checksum: str) -> Version:
return self.versions[checksum]
def to_dict(self) -> dict:
return {
"created_at": self.created_at,
"updated_at": self.updated_at,
"element_id": self.element_id,
"element_type": self.element_type,
"current_version": self.current_version.to_dict(),
}
@classmethod
def from_model(cls, model: ElementModel, with_versions: bool = False) -> "Element":
if not model:
return None
current_version = Version.from_model(model.current_version)
if with_versions:
versions = {version.checksum: version for version in model.versions}
else:
versions = {}
return cls(
created_at=model.created_at,
updated_at=model.updated_at,
element_id=model.id,
element_type=model.element_type,
current_version=current_version,
versions=versions,
)
from dataclasses import dataclass
from typing import Dict, Any, Optional
from datetime import datetime
@dataclass(frozen=True)
class Version:
created_at: datetime
updated_at: datetime
full_state: Dict[str, Any]
checksum: str
def __init__(
self,
full_state: Dict[str, Any],
checksum: Optional[str] = None,
created_at: datetime = datetime.now(),
updated_at: datetime = datetime.now()
):
# dataclass가 frozen=True이므로 object.__setattr__를 사용
object.__setattr__(self, 'created_at', created_at)
object.__setattr__(self, 'updated_at', updated_at)
# full_state의 불변성을 보장하기 위해 깊은 복사 수행
object.__setattr__(self, 'full_state', deepcopy(full_state))
object.__setattr__(self, 'checksum', checksum or generate_checksum(full_state))
def __eq__(self, other: "Version") -> bool:
if not isinstance(other, Version):
return False
return self.checksum == other.checksum
def __hash__(self) -> int:
return hash(self.checksum)
def contains_key(self, key: str) -> bool:
"""Version의 상태에 특정 키가 포함되어 있는지 확인합니다."""
return key in self.full_state
def to_dict(self) -> dict:
return {
"created_at": self.created_at,
"updated_at": self.updated_at,
"full_state": self.full_state,
"checksum": self.checksum,
}
@classmethod
def from_model(cls, model: VersionModel) -> Optional["Version"]:
if not model:
return None
return cls(
created_at=model.created_at,
updated_at=model.updated_at,
full_state=model.full_state,
checksum=model.checksum,
)
지금까지 나는 도메인 모델과 데이터베이스 모델을 동일시했기 때문에 이런 개념들을 정의하는 것이 쉽지 않았다. 그래서 도메인 개념 간의 관계에 집중해 이를 풀어보고자 했다.
Dataset을 Aggregate Root로 선택한 이유
Element를 Entity로 정의한 이유
Version을 Value Object로 정의한 이유
이렇게 도메인 모델을 정의한 후에, 기존 로직에서 비즈니스 로직을 분리해서 도메인 모델의 메서드로 정의했다.
비즈니스 로직 중에서 조회 관련 로직을 개발하는데 고민이 있었다. DDD의 규칙을 엄격히 따르고자 한다면, 도메인 규칙이 포함된 조회나 객체의 상태를 확인하는 조회 로직은 도메인 모델의 메서드로 정의해야 하고, 대규모 데이터 필터링이나 집계 등과 같은 순수한 데이터 조회 로직은 Repository에 정의해야 한다고 생각했다.
예를 들어, Element의 현재 버전을 조회하거나 특정 조건을 만족하는 버전을 찾는 것은 도메인 모델의 책임이고, 수천 개의 Element를 날짜별로 그룹화하거나, 복잡한 조건으로 검색하는 작업은 데이터베이스의 최적화된 쿼리 기능을 활용할 수 있도록 Repository의 책임으로 두는 것이다.
하지만 현재 프로젝트의 상황을 고려했을 때, 조회 로직을 도메인 모델과 Repository로 나누는 것보다는 Repository에 통합하는 것이 더 적절하다고 판단했다. 그 이유는 다음과 같다:
물론 이러한 결정은 도메인 모델의 순수성을 일부 훼손하는 트레이드오프가 있지만, 실용성과 개발 생산성을 위해 이러한 타협은 감수할 만한 것으로 판단했다.
두 번째 단계로, Repository의 인터페이스를 정의했다. Repository 패턴을 적용하면서 Command와 Query를 분리하는 CQRS(Command Query Responsibility Segregation) 패턴도 함께 적용했다.
class CommandRepository(ABC):
@abstractmethod
async def create_dataset(self, data: Dict[str, Any]) -> Dataset:
pass
async def add_element_to_dataset(self, dataset_id: int, element: Element) -> None:
pass
async def update_element_version(self, element_id: int, version: Version) -> None:
pass
class QueryRepository(ABC):
@abstractmethod
async def _count_records(self, model, filter: Dict[str, Any]) -> int:
pass
@abstractmethod
async def get_datasets(
self,
page: int,
per_page: int,
order_by: str = "created_at",
order: str = "desc",
) -> Tuple[List[Dataset] | int]:
pass
@abstractmethod
async def add_element_version(self, element: Element) -> None:
pass
...
class Repository(CommandRepository, QueryRepository):
pass
지금까지는 바로 구현체를 개발했으나 이번에는 처음으로 인터페이스를 먼저 개발해보았다. 처음에는 인터페이스를 먼저 정의하는 것이 추가적인 작업처럼 느껴졌지만, 실제로 해보니 이걸 왜 이제 알았나 싶었다.
다음으로 Repository 패턴을 직접 적용하면서 경험한 주요 장점들을 정리해보았다.
CQRS? (Command Query Responsibility Segregation)
DDD에서는 모든 데이터는 Aggregate Root를 통해 접근할 수 있기 때문에, Repository는 Aggregate Root를 반환하는 메서드만 정의하도록 해야 한다.
그러다 보면 Repository의 책임이 너무 많아지기 때문에, 이를 대비해 CQRS 패턴을 적용해 데이터 조회 로직과 데이터 조작 로직을 분리하는 방법을 사용한다.
아래는 위에서 정의한 Repository 인터페이스를 구현한 구현체의 예시이다.
class PgCommandRepository(ABC):
def __init__(self, session : AsyncSession):
self.session = session
@abstractmethod
async def create_dataset(self, data: Dict[str, Any]) -> Dataset:
...
async def add_element_to_dataset(self, dataset_id: int, element: Element) -> None:
...
async def update_element_version(self, element_id: int, version: Version) -> None:
...
class PgQueryRepository(ABC):
def __init__(self, session : AsyncSession):
self.session = session
@abstractmethod
async def _count_records(self, model, filter: Dict[str, Any]) -> int:
...
@abstractmethod
async def get_datasets(
self,
page: int,
per_page: int,
order_by: str = "created_at",
order: str = "desc",
) -> Tuple[List[Dataset] | int]:
...
@abstractmethod
async def add_element_version(self, element: Element) -> None:
...
class PgRepository(PgCommandRepository, PgQueryRepository):
pass
나는 늘 DB 중심의 개발을 해왔기 때문에 "비즈니스 로직과 데이터 접근 로직 분리"라는 개념이 처음에는 매우 낯설었고, 테이블 구조를 먼저 설계하고, 그에 맞춰 코드를 작성하던 방식에서 벗어나는 것이 쉽지 않았다.
시작하기 전에 블로그, 유튜브 영상, 관련 도서 등을 통해 DDD를 공부했지만, 역시 직접 해보는 것이 최고였다. 미리 공부한 개념들도 막상 실전에서 부딪혀보고 시행착오를 겪어야만 제대로 이해할 수 있었다.
특히 '도메인 모델'과 'Repository 패턴'이라는 개념이 머릿속에서 추상적으로 맴돌던 것과 실제로 구현해보는 것은 큰 차이가 있었다.
이번 DDD 도입 경험은 단순히 하나의 프로젝트를 완성한 것 이상의 의미가 있다. 새로운 패러다임을 받아들이고 적용해본 경험을 통해, 앞으로 마주하게 될 새로운 기술과 개념들도 더 열린 마음으로 수용할 수 있을 것 같다.
특히 다음 단계로 도메인 이벤트나 Aggregate 패턴 같은 더 심화된 DDD 개념들을 적용해보고 싶은 욕심이 생겼다. 고작 한번 적용해본 것 가지고는 깨달았다라고 말하기엔 아직 부족하다.
무엇보다 이번 경험을 통해 깨달은 것은, 좋은 설계란 결국 비즈니스의 본질을 얼마나 잘 담아내느냐에 달려있다는 점이다. 앞으로도 이러한 관점을 잊지 않고 개발을 해나가고 싶다.