Python으로 클린 아키텍처 적용하기2

Jepeto·2020년 9월 13일
5
post-thumbnail
post-custom-banner
  • 📝 Implementing the Clean Architecture in Python 을 읽고 정리한 글입니다.
  • Python으로 클린 아키텍처 적용하기의 후속편으로 책을 여러번 읽으면서 microservices에 대해 더 깊이 이해하게 된것들을 정리하고자 글로 작성하게 되었습니다.
  • 작성 일자: 2020-09-13

1. Write Input DTO

  • Data Validation을 Input DTO가 하는 역할이 맞는 것일까. Checking data correctness may not be a role of Input DTOs. => Value Object을 통해 해결 가능
  • Input DTO의 모든 field를 Value Object으로 작성한다.

1) Value Object

  • domain 영역에 속함
  • Input DTO 내부에서 쓰일 Type을 정의하는 역할을 하고, Data Validation 역할을 같이 수행한다.

특징

1. Immutability: 값이나 타입이 변하지 않음. Exposing these fields as read-only properties may still be a good idea
2. Data Validation: invalid value로 만들 수 없음. Value Object은 Initialization을 할때 Data Validation을 동반하여 수행한다.
3. No Identity: Identity를 갖고 있지 않아서, 타입 및 value값이 서로 같다면 같은 값으로 생각한다.

Example

  • Money라는 value object을 만들기 위한 base가 되는 value object ~ currency
class Currency:
    decimal_precision = 2
    iso_code = "OVERRIDE"
    symbol = "OVERRIDE"


class USD(Currency):
    iso_code = "USD"
    symbol = "$"
  • Money라는 value object을 만드는 데 있어서, currency와 amount는 read_only로만 사용
class Money:
    def __init__(self, currency: Type[Currency], amount: str) -> None:
        self._currency = currency
        self._amount = Decimal(amount)
    @property
    def currency(self) -> Type[Currency]:
        return self._currency
    @property
    def amount(self) -> Decimal:
        return self._amount
  • Money라는 value object이 만들어 질때 Data Validation을 진행함.
class Money:
    def __init__(self, currency: Type[Currency], amount: str) -> None:
        if not inspect.isclass(currency) or not issubclass(currency, Currency):
            raise ValueError(f'{currency} is not a subclass of Currency!')
        try:
            decimal_amount = Decimal(amount)
        except decimal.DecimalException:
            raise ValueError(f'"{amount}" is not a valid amount!')
        else:
            decimal_tuple = decimal_amount.as_tuple()
            if decimal_tuple.sign:
                raise ValueError(f'amount must not be negative!')
            elif -decimal_tuple.exponent > currency.decimal_precision:
                raise ValueError(
                    f'given amount has invalid precision! It should have '
                    'no more than {currency.decimal_precision} decimal places!'
             )
            self._currency = currency
            self._amount = decimal_amount
  • Money라는 value object은 크기 비교, 값 비교가 가능
class Money:
    ...
    def __eq__(self, other: Money) -> bool:
        if not isinstance(other, Money):
            raise TypeError
        return self.currency == other.currency and self.amount == other.amount
        
Money(USD, 1) == Money(USD, '1.00')  # True
Money(USD, 1) == Money(USD, '5.00')  # False

Entity와 Value Object의 차이점:
Entity: 'AuctionId', 'BidderId' 와 같은 ID를 가지고 있어서 ID를 통해 구분이 가능함(Identity 속성을 가짐)
Value Object: ID같은 개념이 없어서, 타입 및 값을 통해 서로 같거나 다름을 비교할 수 있음.


2. Write Entity

특징

  • domain 영역에 속함
  • id를 통해 구분이 가능해야함
  • Entity관련 함수들을 같이 담아야하고, Entity값을 변경(update)하는 함수는 Entity 내부에 있어야한다.(외부 함수가 Entity값을 바꾸는 것은 전형적인 anti-pattern으로 안좋은 패턴)
  • Entity는 Business Rule에 관련된 함수들로만 있어야한다.
  • No dependencies - 어디에도 의존성이 있으면 안됨(Domain 영역이기 때문)
  • 어떤 외부 라이브러리를 써서 Entity를 구현하는 것은 지양해야함. 순수한 python 표준 라이브러리로만 구성되어야함.

    Dependency는 다음과 같이 구성되어야한다.
    (external world->infrastructure->appliction->domain)
    1) domain: enitity, value object
    2) appliction: usecase, quries(abstract), repositories(abstract)
    3) infrastructure: quries(implementation), repositories(implementation)
    4) external world: database, 3rd party api..

Example

class Auction: def __init__(self, id: AuctionId, title: str, starting_price: Money, bids: List[Bid] ) -> None:
	self.id = id
	self.title = title
	self.starting_price = starting_price
	self.bids = sorted(bids, key=lambda bid: bid.amount)
    
	def place_bid(self, bidder_id: BidderId, amount: Money) -> None: 
    		if amount > self.current_price:
			self.bids.append(Bid(id=None, bidder_id=bidder_id, amount=amount))
            
	@property
	def current_price(self) -> Money: if not self.bids:
		return self.starting_price
	else:
		return self._highest_bid.amount
        
	@property
	def winners(self) -> List[BidderId]: 
    		if not self.bids:
			return []
		return [self._highest_bid.bidder_id]

	@property
	def _highest_bid(self) -> Bid: 
    		return self.bids[-1]

3. Write Use Case

특징

  • application 영역에 속함
  • 비즈니스 로직을 수행하는 함수라고 생각하면 됨
  • input DTO를 받아서, 관련 entity를 가져오고, entity의 business rule이 담긴 함수를 실행시키고, 최종적으로 output dto을 출력
  • 이때 필요한 output boundary(abstract),repository(abstract)나, query(abstract)를 dependency injection을 통해 decoupled 시킨 상태로 use case를 구성한다.
  • 각각의 도메인, entity등을 분리해서 test code를 작성하기보다는 test code 작성시 use case를 위주로 test code를 작성한다.

Example


class PlacingBid:
    def __init__(self, output_boundary: PlacingBidOutputBoundary, auctions_repo:AuctionsRepository) -> None:
        self._output_boundary = output_boundary
        self._auctions_repo = auctions_repo
        
    def execute(self, input_dto: PlacingBidInputDto) -> None:
        auction = self._auctions_repo.get(input_dto.auction_id)
        auction.place_bid(bidder_id=input_dto.bidder_id, amount=input_dto.amount)
        self._auctions_repo.save(auction)
        output_dto = PlacingBidOutputDto(
            is_winning=input_dto.bidder_id in auction.winners,
            current_price=auction.current_price,
        )
        self._output_boundary.present(output_dto)
  • DDD: "도메인 모델(상태와 동작을 가진 entity)이 가진 행위를 풍부하게 하고, <서비스 객체(동작만 있는 클래스)= UseCase>는 도메인의 행위를 사용하게 한다"

4. Read-Side를 구축하는 패턴

1) Facade 패턴

특징

  • 어떤 sub시스템의 일련의 인터페이스에 대한 통합된 인터페이스를 제공합니다. Facade에서 고수준 인터페이스를 정의하기 때문에 sub시스템을 더 쉽게 사용할 수 있습니다.

  • Read Model Facade interface will be a bunch of methods for each model/entity.

Example

  • ApiConsumer, dao와 같은 subsystem의 interface들 조합하여 Facade 패턴으로 구축
# payments/payments/facade.py

from sqlalchemy.engine import Connection

from foundation.events import EventBus
from foundation.value_objects import Money

from payments import dao
from payments.api import ApiConsumer, PaymentFailedError
from payments.config import PaymentsConfig
from payments.events import PaymentCaptured, PaymentCharged, PaymentFailed, PaymentStarted


class PaymentsFacade:
    def __init__(self, config: PaymentsConfig, connection: Connection, event_bus: EventBus) -> None:
        self._api_consumer = ApiConsumer(config.username, config.password)
        self._connection = connection
        self._event_bus = event_bus

    def get_pending_payments(self, customer_id: int) -> List[dao.PaymentDto]:
        return dao.get_pending_payments(customer_id, self._connection)

    def start_new_payment(self, payment_uuid: UUID, customer_id: int, amount: Money, description: str) -> None:
        dao.start_new_payment(payment_uuid, customer_id, amount, description, self._connection)
        self._event_bus.post(PaymentStarted(payment_uuid, customer_id))

    def charge(self, payment_uuid: UUID, customer_id: int, token: str) -> None:
        payment = dao.get_payment(payment_uuid, customer_id, self._connection)
        if payment.status != dao.PaymentStatus.NEW.value:
            raise Exception(f"Can't pay - unexpected status {payment.status}")

        try:
            charge_id = self._api_consumer.charge(payment.amount, token)
        except PaymentFailedError:
            dao.update_payment(payment_uuid, customer_id, {"status": dao.PaymentStatus.FAILED.value}, self._connection)
            self._event_bus.post(PaymentFailed(payment_uuid, customer_id))
        else:
            update_values = {"status": dao.PaymentStatus.CHARGED.value, "charge_id": charge_id}
            dao.update_payment(payment_uuid, customer_id, update_values, self._connection)
            self._event_bus.post(PaymentCharged(payment_uuid, customer_id))

    def capture(self, payment_uuid: UUID, customer_id: int) -> None:
        charge_id = dao.get_payment_charge_id(payment_uuid, customer_id, self._connection)
        assert charge_id, f"No charge_id available for {payment_uuid}, aborting capture"
        self._api_consumer.capture(charge_id)
        dao.update_payment(payment_uuid, customer_id, {"status": dao.PaymentStatus.CAPTURED.value}, self._connection)
        self._event_bus.post(PaymentCaptured(payment_uuid, customer_id))

2) Query 패턴

특징

  • application layer의 query를 abstract해서 infrastructure layer에서 implementation 구현
  • dependency injection

Example

#auctions/auctions/applications/queries/auctions.py
@dataclass
class AuctionDto:
    id: int
    title: str
    current_price: Money
    starting_price: Money
    ends_at: datetime


class GetSingleAuction(abc.ABC):
    @abc.abstractmethod
    def query(self, auction_id: int) -> AuctionDto:
        pass


class GetActiveAuctions(abc.ABC):
    @abc.abstractmethod
    def query(self) -> List[AuctionDto]:
        pass


# auctions_infrastructure/auctions_infrastructure/queries/auctions.py

class SqlGetActiveAuctions(GetActiveAuctions, SqlQuery):
    def query(self) -> List[AuctionDto]:
        return [
            _row_to_dto(row) for row in self._conn.execute(auctions.select().where(auctions.c.ends_at > func.now()))
        ]


class SqlGetSingleAuction(GetSingleAuction, SqlQuery):
    def query(self, auction_id: int) -> AuctionDto:
        row = self._conn.execute(auctions.select().where(auctions.c.id == auction_id)).first()
        return _row_to_dto(row)

5. Event/Listener 방식을 통한 Decoupling

1) Event/Listener

event_bus

2) Event 정의

  • Event: Data Transfer Object이라고 생각하면 됨. 과거 시제로 표현함.
  • domain 영역에 속함
@dataclass
class BidderHasBeenOverbid: 
  auction_id: AuctionId 
  bidder_id: BidderId 
  new_price: Money

3) 어떻게 Entitiy가 Event를 전달하게 할 수 있을까 (Events vs transactions vs side effects)

문제:

  1. HTTP request is received
  2. A database transaction is started
  3. The Auction is pulled from a storage using AuctionsRepository
  4. A new bid is placed
  5. The auction emits an event
  6. Subscriber invokes background job to send an e-mail
  7. The Auction is saved back to database
  8. HTTP response is built
  9. The transaction is committed
  10. HTTP response is sent to the client.

다음과 같은 일련의 과정을 거칠 때 발생할 수 있는 문제가 있다.
1. If we react to the event right after the Entity/Repository emits it, we will be sending e- mail before a transaction is committed, meaning that overbid may still fail. The Auction would look like as if an e-mail receiver has not been overbid at all
2. if the background job is triggered before commit of the original transaction AND it reaches to the database for data it may happen it will not find simply because the first transaction is still in progress and changes it made are invisible to other connections. A classic race condition.

해결방법: Unit of Work Pattern

unit_of_work

  • If we combine it with task queues (every event subscriber schedules a task to be executed after transaction commit), we will get the desired behaviour. A more formal approach is to use Unit of Work pattern.
    ex) kafka, rabbitmq등을 써서 taskq queueus구현, 'at-least-delivery' guarantee
  • Example 1: unit_of_work method~
    begin, commit,rollback,register_callback_after_commit
class UnitOfWork(abc.ABC):
  @abc.abstractmethod
  def begin(self) -> None:
  	pass
  @abc.abstractmethod
  def rollback(self) -> None:
  	pass
  @abc.abstractmethod
  def commit(self) -> None:
  	pass
  @abc.abstractmethod
  def register_callback_after_commit(self, callback: typing.Callable) -> None:
  	pass
  • example 2
def setup_event_subscriptions(event_bus: EventBus) -> None:
    event_bus.subscribe(
        BidderHasBeenOverbid,
        lambda event: (

    inject.instance(UnitOfWork).register_callback_after_commit(
    lambda event: send_email.delay(event.auction_id, event.bidder_id, event.money.amount
    )
   )
  )

4) Relation between Unit Of Work and Event Bus

  • Event Bus has to be aware of the Unit Of Work existence.
  • subscriber를 synchronously (Unit Of Work) 하게 할지, asynchronously(in a background task queue) 하게 할지는 개발자의 몫
  • synchronously하게 한다면, Event가 발생하면 바로 실행시키면 됨
  • asynchronously하게 한다면, Unit of Work’s register_callback_after_commit to ensure subscriber gets Event 하게 Even Bus를 설정해줘야함.
  • IoC Container을 통해서 설정

5) Optimistic Locking:

  • race condition을 방지하기 위한 방법: optimistic locking

Optimistic Locking:

  • 모든 entitiy에 version을 할당
  • data store에서 fetch할때 이전에 무슨 version의 entity를 들고 있었는지 기억하고, 처음 시작할때 들고 있던 entitiy와 같은지 비교
  • 만약 version이 다르다면 전체과정을 retry 또는 fail시킴

Optimistict Locking 과 Pessimistic Locking의 차이점
참고자료:
[MSSQL] Pessimistic, Optimistic Locking - 비관적 잠금과 낙관적 잠금 (DB)

6) Dependency 관계 정리

Dependency 관계 정리


6. 모듈간 Communication방법들

1) Direct Dependency

1-1) Use Case를 추상화한 Input Boundary 사용한 호출

  • A모듈에서 B모듈의 Use Case를 사용하고 싶을 때, Input Boundary를 사용하여 추상화 시켜서 decoupled된 채로 사용

input boundary

1-2) Port-Adapter 패턴

  • A모듈이 B모듈의 Use Case를 부르고 싶을 때 A모듈에 Port를, B모듈에는 Adapter를 작성하여, decouple된채로 호출

port-adapter

2) Indirect dependency (= rely on events)

Event Bus

3) saga 패턴 (For a multi-stage process with cascades of events)

  • event가 연쇄적으로 발생하는 복잡한 로직이 발생할때 이를 한 모듈에서 전담해서 처리함 :saga 패턴
  • pessmestic locking 사용

Example

  1. Upon AuctionEnded we call Facades of Payments and Customer Relationship
  2. Upon PaymentCaptured we call Facades of Customer Relationship and Shipping
  3. Upon PackageShipped we call Facade of Customer Relationship
# process/process/paying_for_one_item/saga.py

class State(Enum):
    PAYMENT_STARTED = "PAYMENT_STARTED"
    TIMED_OUT = "TIMED_OUT"
    FINISHED = "FINISHED"


@dataclass
class PayingForWonItemData:
    process_uuid: uuid.UUID
    state: Optional[State] = None
    timeout_at: Optional[datetime] = None
    winning_bid: Optional[Money] = None
    auction_title: Optional[str] = None
    auction_id: Optional[int] = None
    winner_id: Optional[int] = None


class PayingForWonItem:
    def __init__(self, payments: PaymentsFacade, customer_relationship: CustomerRelationshipFacade) -> None:
        self._payments = payments
        self._customer_relationship = customer_relationship

    def timeout(self, data: PayingForWonItemData) -> None:
        assert data.timeout_at is not None and datetime.now() >= data.timeout_at
        assert data.state == State.PAYMENT_STARTED
        data.state = State.TIMED_OUT

    @method_dispatch
    def handle(self, event: Any, data: PayingForWonItemData) -> None:
        raise Exception(f"Unhandled event {event}")

    @handle.register(AuctionEnded)
    def handle_auction_ended(self, event: AuctionEnded, data: PayingForWonItemData) -> None:
        assert data.state is None
        payment_uuid = uuid.uuid4()
        self._payments.start_new_payment(payment_uuid, event.winner_id, event.winning_bid, event.auction_title)
        self._customer_relationship.send_email_about_winning(event.winner_id, event.winning_bid, event.auction_title)

        data.state = State.PAYMENT_STARTED
        data.auction_title = event.auction_title
        data.winning_bid = event.winning_bid
        data.timeout_at = datetime.now() + timedelta(days=3)
        data.auction_id = event.auction_id
        data.winner_id = event.winner_id

    @handle.register(PaymentCaptured)
    def handle_payment_captured(self, event: PaymentCaptured, data: PayingForWonItemData) -> None:
        assert data.state == State.PAYMENT_STARTED
        self._customer_relationship.send_email_after_successful_payment(
            event.customer_id, data.winning_bid, data.auction_title
        )

        data.state = State.FINISHED
        data.timeout_at = None
  • @handle.registered(event)을 통해서 event를 handle한다.
# process/process/paying_for_one_item/saga_handler.py

class PayingForWonItemHandler:
    LOCK_TIMEOUT = 30

    @injector.inject
    def __init__(
        self, process_manager: PayingForWonItem, repo: ProcessManagerDataRepo, lock_factory: LockFactory
    ) -> None:
        self._process_manager = process_manager
        self._repo = repo
        self._lock_factory = lock_factory

    @method_dispatch
    def __call__(self, event: Event) -> None:
        raise NotImplementedError

    @__call__.register(PaymentCaptured)
    def handle_payment_captured(self, event: PaymentCaptured) -> None:
        data = self._repo.get(event.payment_uuid, PayingForWonItemData)
        lock_name = f"pm-lock-{data.auction_id}-{data.winner_id}"
        self._run_process_manager(lock_name, data, event)

    @__call__.register(AuctionEnded)
    def handle_beginning(self, event: AuctionEnded) -> None:
        data = PayingForWonItemData(process_uuid=uuid.uuid4())
        lock_name = f"pm-lock-{event.auction_id}-{event.winner_id}"
        self._run_process_manager(lock_name, data, event)

    def _run_process_manager(self, lock_name: str, data: PayingForWonItemData, event: Event) -> None:
        lock = self._lock_factory(lock_name, self.LOCK_TIMEOUT)

        with lock:
            self._process_manager.handle(event, data)
            self._repo.save(data.process_uuid, data)
  • saga_handler.py를 통해 데이터를 구성하고, pessimistic locking을 구현한다.
  • Before one calls any handling logic of Saga, a lock has to be explicitly acquired. When we finish with the processing, the lock is released. If we fail to acquire it in the first place - we abort

7. Write Event Handler

  • Event가 발생시 각각 특정 Event를 수신중인 Event Handler가 Event처리

Example

1) handler의 속성 정의(동기처리, 비동기처리)

binder.multibind(
    Handler[AuctionEnded],
    to=...SYNCHRONOUS HANDLER..
)
binder.multibind(
    AsyncHandler[AuctionEnded],
    to=...ASYNCHRONOUS HANDLER..
)

2) handler함수 정의 ~ BidderHasBeenOverbid event가 발생시 처리하는 handler, __call__이라는 magic method를 통해 클래스를 부르면 함수처럼 사용할 수 있다.

class BidderHasBeenOverbidHandler:
   @injector.inject
   def __init__(self, facade: CustomerRelationshipFacade) -> None:
       self._facade = facade
   def __call__(self, event: BidderHasBeenOverbid) -> None:
       self._facade.do_something(...)

3) Event bus - handler 중재

#foundation/foundation/events.py

class EventBus(abc.ABC):
    @abc.abstractmethod
    def post(self, event: Event) -> None:
        raise NotImplementedError
        
        
class InjectorEventBus(EventBus):
    """A simple Event Bus that leverages injector.

    It requires Injector to be created with auto_bind=False.
    Otherwise UnsatisfiedRequirement is not raised. Instead,
    TypeError is thrown due to usage of `Handler` and `AsyncHandler` generics.
    """

    def __init__(self, injector: Injector, run_async_handler: RunAsyncHandler) -> None:
        self._injector = injector
        self._run_async_handler = run_async_handler

    def post(self, event: Event) -> None:
        try:
            handlers = self._injector.get(Handler[type(event)])  # type: ignore
        except UnsatisfiedRequirement:
            pass
        else:
            assert isinstance(handlers, list)
            for handler in handlers:
                handler(event)

        try:
            async_handlers = self._injector.get(AsyncHandler[type(event)])  # type: ignore
        except UnsatisfiedRequirement:
            pass
        else:
            assert isinstance(async_handlers, list)
            for async_handler in async_handlers:
                self._run_async_handler(async_handler, event)
        
      

4) payment event 처리 종합example: event발생시, event_bus를 통해서 그 event에 맞는 handler를 찾아가도록 하고, 그 event를 수신중인handler는 event발생시 __call__ 함수를 통해 그에 맞는 함수를 실행하게됨

class PaymentsFacade:
  def charge(self, payment_uuid: UUID, customer_id: int, token: str) -> None:
    payment = self._dao.get_payment(payment_uuid, customer_id)
    try:
      charge_id = self._api_consumer.charge(payment.amount, token)
    except PaymentFailedError:
      self._event_bus.post(PaymentFailed(payment_uuid, customer_id))
else:
      ...  # code skipped
      self._event_bus.post(PaymentCharged(payment_uuid, customer_id))
class PaymentChargedHandler:
    @injector.inject
    def __init__(self, facade: PaymentsFacade) -> None:
        self._facade = facade
    def __call__(self, event: PaymentCharged) -> None:
        self._facade.capture(event.payment_uuid, event.customer_id)
class Payments(injector.Module):
    def configure(self, binder: injector.Binder) -> None:
        binder.multibind(
            AsyncHandler[PaymentCharged],
            to=AsyncEventHandlerProvider(PaymentChargedHandler)
)

5) Auction모듈에서 발생한 event를 Customer Relationship모듈에서 handle하는 example

  • record_event함수를 통해 event발생시 기록해둠
  • 발생한 event를 수신중인 handler에 event bus가 전달함.
class Auction:
    def place_bid(self, bidder_id: BidderId, amount: Money) -> None:
        old_winner = self.winners[0] if self.bids else None
        if amount > self.current_price:
...
            if old_winner:
                self._record_event(
                    BidderHasBeenOverbid(self.id, old_winner, amount, self.title)
                )
  • BidderHasBeenOverbid event가 발생하였고, 이를 handle하는 BidderHasBeenOverbidHandler가 그 event를 처리함
class BidderHasBeenOverbidHandler:
    @injector.inject
    def __init__(self, facade: CustomerRelationshipFacade) -> None:
        self._facade = facade
    def __call__(self, event: BidderHasBeenOverbid) -> None:
        self._facade.send_email_about_overbid(
            event.bidder_id, event.new_price, event.auction_title
        )

🏝이 글이 도움이 되셨다면 추천 클릭을 부탁드립니다 :)

참고자료:

profile
데이터, 아키텍처, 클라우드와 함께 탱고춤을~!!
post-custom-banner

0개의 댓글