1. Immutability: 값이나 타입이 변하지 않음. Exposing these fields as read-only properties may still be a good idea
2. Data Validation: invalid value로 만들 수 없음. Value Object은 Initialization을 할때 Data Validation을 동반하여 수행한다.
3. No Identity: Identity를 갖고 있지 않아서, 타입 및 value값이 서로 같다면 같은 값으로 생각한다.
class Currency:
decimal_precision = 2
iso_code = "OVERRIDE"
symbol = "OVERRIDE"
class USD(Currency):
iso_code = "USD"
symbol = "$"
class Money:
def __init__(self, currency: Type[Currency], amount: str) -> None:
self._currency = currency
self._amount = Decimal(amount)
@property
def currency(self) -> Type[Currency]:
return self._currency
@property
def amount(self) -> Decimal:
return self._amount
class Money:
def __init__(self, currency: Type[Currency], amount: str) -> None:
if not inspect.isclass(currency) or not issubclass(currency, Currency):
raise ValueError(f'{currency} is not a subclass of Currency!')
try:
decimal_amount = Decimal(amount)
except decimal.DecimalException:
raise ValueError(f'"{amount}" is not a valid amount!')
else:
decimal_tuple = decimal_amount.as_tuple()
if decimal_tuple.sign:
raise ValueError(f'amount must not be negative!')
elif -decimal_tuple.exponent > currency.decimal_precision:
raise ValueError(
f'given amount has invalid precision! It should have '
'no more than {currency.decimal_precision} decimal places!'
)
self._currency = currency
self._amount = decimal_amount
class Money:
...
def __eq__(self, other: Money) -> bool:
if not isinstance(other, Money):
raise TypeError
return self.currency == other.currency and self.amount == other.amount
Money(USD, 1) == Money(USD, '1.00') # True
Money(USD, 1) == Money(USD, '5.00') # False
Entity와 Value Object의 차이점:
Entity: 'AuctionId', 'BidderId' 와 같은 ID를 가지고 있어서 ID를 통해 구분이 가능함(Identity 속성을 가짐)
Value Object: ID같은 개념이 없어서, 타입 및 값을 통해 서로 같거나 다름을 비교할 수 있음.
Dependency는 다음과 같이 구성되어야한다.
(external world->infrastructure->appliction->domain)
1) domain: enitity, value object
2) appliction: usecase, quries(abstract), repositories(abstract)
3) infrastructure: quries(implementation), repositories(implementation)
4) external world: database, 3rd party api..
class Auction: def __init__(self, id: AuctionId, title: str, starting_price: Money, bids: List[Bid] ) -> None:
self.id = id
self.title = title
self.starting_price = starting_price
self.bids = sorted(bids, key=lambda bid: bid.amount)
def place_bid(self, bidder_id: BidderId, amount: Money) -> None:
if amount > self.current_price:
self.bids.append(Bid(id=None, bidder_id=bidder_id, amount=amount))
@property
def current_price(self) -> Money: if not self.bids:
return self.starting_price
else:
return self._highest_bid.amount
@property
def winners(self) -> List[BidderId]:
if not self.bids:
return []
return [self._highest_bid.bidder_id]
@property
def _highest_bid(self) -> Bid:
return self.bids[-1]
class PlacingBid:
def __init__(self, output_boundary: PlacingBidOutputBoundary, auctions_repo:AuctionsRepository) -> None:
self._output_boundary = output_boundary
self._auctions_repo = auctions_repo
def execute(self, input_dto: PlacingBidInputDto) -> None:
auction = self._auctions_repo.get(input_dto.auction_id)
auction.place_bid(bidder_id=input_dto.bidder_id, amount=input_dto.amount)
self._auctions_repo.save(auction)
output_dto = PlacingBidOutputDto(
is_winning=input_dto.bidder_id in auction.winners,
current_price=auction.current_price,
)
self._output_boundary.present(output_dto)
어떤 sub시스템의 일련의 인터페이스에 대한 통합된 인터페이스를 제공합니다. Facade에서 고수준 인터페이스를 정의하기 때문에 sub시스템을 더 쉽게 사용할 수 있습니다.
Read Model Facade interface will be a bunch of methods for each model/entity.
# payments/payments/facade.py
from sqlalchemy.engine import Connection
from foundation.events import EventBus
from foundation.value_objects import Money
from payments import dao
from payments.api import ApiConsumer, PaymentFailedError
from payments.config import PaymentsConfig
from payments.events import PaymentCaptured, PaymentCharged, PaymentFailed, PaymentStarted
class PaymentsFacade:
def __init__(self, config: PaymentsConfig, connection: Connection, event_bus: EventBus) -> None:
self._api_consumer = ApiConsumer(config.username, config.password)
self._connection = connection
self._event_bus = event_bus
def get_pending_payments(self, customer_id: int) -> List[dao.PaymentDto]:
return dao.get_pending_payments(customer_id, self._connection)
def start_new_payment(self, payment_uuid: UUID, customer_id: int, amount: Money, description: str) -> None:
dao.start_new_payment(payment_uuid, customer_id, amount, description, self._connection)
self._event_bus.post(PaymentStarted(payment_uuid, customer_id))
def charge(self, payment_uuid: UUID, customer_id: int, token: str) -> None:
payment = dao.get_payment(payment_uuid, customer_id, self._connection)
if payment.status != dao.PaymentStatus.NEW.value:
raise Exception(f"Can't pay - unexpected status {payment.status}")
try:
charge_id = self._api_consumer.charge(payment.amount, token)
except PaymentFailedError:
dao.update_payment(payment_uuid, customer_id, {"status": dao.PaymentStatus.FAILED.value}, self._connection)
self._event_bus.post(PaymentFailed(payment_uuid, customer_id))
else:
update_values = {"status": dao.PaymentStatus.CHARGED.value, "charge_id": charge_id}
dao.update_payment(payment_uuid, customer_id, update_values, self._connection)
self._event_bus.post(PaymentCharged(payment_uuid, customer_id))
def capture(self, payment_uuid: UUID, customer_id: int) -> None:
charge_id = dao.get_payment_charge_id(payment_uuid, customer_id, self._connection)
assert charge_id, f"No charge_id available for {payment_uuid}, aborting capture"
self._api_consumer.capture(charge_id)
dao.update_payment(payment_uuid, customer_id, {"status": dao.PaymentStatus.CAPTURED.value}, self._connection)
self._event_bus.post(PaymentCaptured(payment_uuid, customer_id))
#auctions/auctions/applications/queries/auctions.py
@dataclass
class AuctionDto:
id: int
title: str
current_price: Money
starting_price: Money
ends_at: datetime
class GetSingleAuction(abc.ABC):
@abc.abstractmethod
def query(self, auction_id: int) -> AuctionDto:
pass
class GetActiveAuctions(abc.ABC):
@abc.abstractmethod
def query(self) -> List[AuctionDto]:
pass
# auctions_infrastructure/auctions_infrastructure/queries/auctions.py
class SqlGetActiveAuctions(GetActiveAuctions, SqlQuery):
def query(self) -> List[AuctionDto]:
return [
_row_to_dto(row) for row in self._conn.execute(auctions.select().where(auctions.c.ends_at > func.now()))
]
class SqlGetSingleAuction(GetSingleAuction, SqlQuery):
def query(self, auction_id: int) -> AuctionDto:
row = self._conn.execute(auctions.select().where(auctions.c.id == auction_id)).first()
return _row_to_dto(row)
@dataclass
class BidderHasBeenOverbid:
auction_id: AuctionId
bidder_id: BidderId
new_price: Money
- HTTP request is received
- A database transaction is started
- The Auction is pulled from a storage using AuctionsRepository
- A new bid is placed
- The auction emits an event
- Subscriber invokes background job to send an e-mail
- The Auction is saved back to database
- HTTP response is built
- The transaction is committed
- HTTP response is sent to the client.
다음과 같은 일련의 과정을 거칠 때 발생할 수 있는 문제가 있다.
1. If we react to the event right after the Entity/Repository emits it, we will be sending e- mail before a transaction is committed, meaning that overbid may still fail. The Auction would look like as if an e-mail receiver has not been overbid at all
2. if the background job is triggered before commit of the original transaction AND it reaches to the database for data it may happen it will not find simply because the first transaction is still in progress and changes it made are invisible to other connections. A classic race condition.
class UnitOfWork(abc.ABC):
@abc.abstractmethod
def begin(self) -> None:
pass
@abc.abstractmethod
def rollback(self) -> None:
pass
@abc.abstractmethod
def commit(self) -> None:
pass
@abc.abstractmethod
def register_callback_after_commit(self, callback: typing.Callable) -> None:
pass
def setup_event_subscriptions(event_bus: EventBus) -> None:
event_bus.subscribe(
BidderHasBeenOverbid,
lambda event: (
inject.instance(UnitOfWork).register_callback_after_commit(
lambda event: send_email.delay(event.auction_id, event.bidder_id, event.money.amount
)
)
)
Optimistic Locking:
- 모든 entitiy에 version을 할당
- data store에서 fetch할때 이전에 무슨 version의 entity를 들고 있었는지 기억하고, 처음 시작할때 들고 있던 entitiy와 같은지 비교
- 만약 version이 다르다면 전체과정을 retry 또는 fail시킴
Optimistict Locking 과 Pessimistic Locking의 차이점
참고자료:
[MSSQL] Pessimistic, Optimistic Locking - 비관적 잠금과 낙관적 잠금 (DB)
- Upon AuctionEnded we call Facades of Payments and Customer Relationship
- Upon PaymentCaptured we call Facades of Customer Relationship and Shipping
- Upon PackageShipped we call Facade of Customer Relationship
# process/process/paying_for_one_item/saga.py
class State(Enum):
PAYMENT_STARTED = "PAYMENT_STARTED"
TIMED_OUT = "TIMED_OUT"
FINISHED = "FINISHED"
@dataclass
class PayingForWonItemData:
process_uuid: uuid.UUID
state: Optional[State] = None
timeout_at: Optional[datetime] = None
winning_bid: Optional[Money] = None
auction_title: Optional[str] = None
auction_id: Optional[int] = None
winner_id: Optional[int] = None
class PayingForWonItem:
def __init__(self, payments: PaymentsFacade, customer_relationship: CustomerRelationshipFacade) -> None:
self._payments = payments
self._customer_relationship = customer_relationship
def timeout(self, data: PayingForWonItemData) -> None:
assert data.timeout_at is not None and datetime.now() >= data.timeout_at
assert data.state == State.PAYMENT_STARTED
data.state = State.TIMED_OUT
@method_dispatch
def handle(self, event: Any, data: PayingForWonItemData) -> None:
raise Exception(f"Unhandled event {event}")
@handle.register(AuctionEnded)
def handle_auction_ended(self, event: AuctionEnded, data: PayingForWonItemData) -> None:
assert data.state is None
payment_uuid = uuid.uuid4()
self._payments.start_new_payment(payment_uuid, event.winner_id, event.winning_bid, event.auction_title)
self._customer_relationship.send_email_about_winning(event.winner_id, event.winning_bid, event.auction_title)
data.state = State.PAYMENT_STARTED
data.auction_title = event.auction_title
data.winning_bid = event.winning_bid
data.timeout_at = datetime.now() + timedelta(days=3)
data.auction_id = event.auction_id
data.winner_id = event.winner_id
@handle.register(PaymentCaptured)
def handle_payment_captured(self, event: PaymentCaptured, data: PayingForWonItemData) -> None:
assert data.state == State.PAYMENT_STARTED
self._customer_relationship.send_email_after_successful_payment(
event.customer_id, data.winning_bid, data.auction_title
)
data.state = State.FINISHED
data.timeout_at = None
@handle.registered(event)
을 통해서 event를 handle한다.# process/process/paying_for_one_item/saga_handler.py
class PayingForWonItemHandler:
LOCK_TIMEOUT = 30
@injector.inject
def __init__(
self, process_manager: PayingForWonItem, repo: ProcessManagerDataRepo, lock_factory: LockFactory
) -> None:
self._process_manager = process_manager
self._repo = repo
self._lock_factory = lock_factory
@method_dispatch
def __call__(self, event: Event) -> None:
raise NotImplementedError
@__call__.register(PaymentCaptured)
def handle_payment_captured(self, event: PaymentCaptured) -> None:
data = self._repo.get(event.payment_uuid, PayingForWonItemData)
lock_name = f"pm-lock-{data.auction_id}-{data.winner_id}"
self._run_process_manager(lock_name, data, event)
@__call__.register(AuctionEnded)
def handle_beginning(self, event: AuctionEnded) -> None:
data = PayingForWonItemData(process_uuid=uuid.uuid4())
lock_name = f"pm-lock-{event.auction_id}-{event.winner_id}"
self._run_process_manager(lock_name, data, event)
def _run_process_manager(self, lock_name: str, data: PayingForWonItemData, event: Event) -> None:
lock = self._lock_factory(lock_name, self.LOCK_TIMEOUT)
with lock:
self._process_manager.handle(event, data)
self._repo.save(data.process_uuid, data)
1) handler의 속성 정의(동기처리, 비동기처리)
binder.multibind(
Handler[AuctionEnded],
to=...SYNCHRONOUS HANDLER..
)
binder.multibind(
AsyncHandler[AuctionEnded],
to=...ASYNCHRONOUS HANDLER..
)
2) handler함수 정의 ~ BidderHasBeenOverbid
event가 발생시 처리하는 handler, __call__
이라는 magic method를 통해 클래스를 부르면 함수처럼 사용할 수 있다.
class BidderHasBeenOverbidHandler:
@injector.inject
def __init__(self, facade: CustomerRelationshipFacade) -> None:
self._facade = facade
def __call__(self, event: BidderHasBeenOverbid) -> None:
self._facade.do_something(...)
3) Event bus - handler 중재
#foundation/foundation/events.py
class EventBus(abc.ABC):
@abc.abstractmethod
def post(self, event: Event) -> None:
raise NotImplementedError
class InjectorEventBus(EventBus):
"""A simple Event Bus that leverages injector.
It requires Injector to be created with auto_bind=False.
Otherwise UnsatisfiedRequirement is not raised. Instead,
TypeError is thrown due to usage of `Handler` and `AsyncHandler` generics.
"""
def __init__(self, injector: Injector, run_async_handler: RunAsyncHandler) -> None:
self._injector = injector
self._run_async_handler = run_async_handler
def post(self, event: Event) -> None:
try:
handlers = self._injector.get(Handler[type(event)]) # type: ignore
except UnsatisfiedRequirement:
pass
else:
assert isinstance(handlers, list)
for handler in handlers:
handler(event)
try:
async_handlers = self._injector.get(AsyncHandler[type(event)]) # type: ignore
except UnsatisfiedRequirement:
pass
else:
assert isinstance(async_handlers, list)
for async_handler in async_handlers:
self._run_async_handler(async_handler, event)
4) payment event 처리 종합example: event발생시, event_bus를 통해서 그 event에 맞는 handler를 찾아가도록 하고, 그 event를 수신중인handler는 event발생시 __call__
함수를 통해 그에 맞는 함수를 실행하게됨
class PaymentsFacade:
def charge(self, payment_uuid: UUID, customer_id: int, token: str) -> None:
payment = self._dao.get_payment(payment_uuid, customer_id)
try:
charge_id = self._api_consumer.charge(payment.amount, token)
except PaymentFailedError:
self._event_bus.post(PaymentFailed(payment_uuid, customer_id))
else:
... # code skipped
self._event_bus.post(PaymentCharged(payment_uuid, customer_id))
class PaymentChargedHandler:
@injector.inject
def __init__(self, facade: PaymentsFacade) -> None:
self._facade = facade
def __call__(self, event: PaymentCharged) -> None:
self._facade.capture(event.payment_uuid, event.customer_id)
class Payments(injector.Module):
def configure(self, binder: injector.Binder) -> None:
binder.multibind(
AsyncHandler[PaymentCharged],
to=AsyncEventHandlerProvider(PaymentChargedHandler)
)
5) Auction모듈에서 발생한 event를 Customer Relationship모듈에서 handle하는 example
record_event
함수를 통해 event발생시 기록해둠class Auction:
def place_bid(self, bidder_id: BidderId, amount: Money) -> None:
old_winner = self.winners[0] if self.bids else None
if amount > self.current_price:
...
if old_winner:
self._record_event(
BidderHasBeenOverbid(self.id, old_winner, amount, self.title)
)
BidderHasBeenOverbid
event가 발생하였고, 이를 handle하는 BidderHasBeenOverbidHandler
가 그 event를 처리함class BidderHasBeenOverbidHandler:
@injector.inject
def __init__(self, facade: CustomerRelationshipFacade) -> None:
self._facade = facade
def __call__(self, event: BidderHasBeenOverbid) -> None:
self._facade.send_email_about_overbid(
event.bidder_id, event.new_price, event.auction_title
)
🏝이 글이 도움이 되셨다면 추천 클릭을 부탁드립니다 :)
참고자료: