Persistent FSM은 Persistence Typed를 사용하여 나타내어진다. 스냅샷 어댑터와 이벤트 어댑터를 사용하는 EventSourcedBehavior
를 사용하여 Persistence FSM에 저장된 데이터를 읽을 수 있다.
FSM이 유저 데이터를와 스냅샷들을 직접적으로 저장하지 않으므로 어댑터들이 필요하다.
Classic Persistence
Akka Persistence는at-least-once
메시지 전달 과정을 통해point-to-point
통신을 지원한다.
이러한 Akka Persistence를 사용하기 위해서는akka-persistence
라는 dependency를 추가해야 한다. (자세한 내용은 맨 밑 Reference에 있는 Classic Persistence 링크를 참고하면 된다.)
replayTo
라는 ActorRef를 포함하도록 새로운 커맨드를 만들거나 커맨드를 변경한다.PersistentFSM
을 모방한 EventSourcedBehavior
를 생성한다.Behaviors.withTimers
를 사용하여 상태 타임아웃을 하드 코딩이나 저장된 상태에 대하여 설정한다.EventAdapter
를 추가하여 PersistentFSM
에 의해 추가된 상태 이전 이벤트를 private event로 변경한다.SnapshotAdapter
를 추가하여 PersistentFSM 스냅샷들을 EventSourcedBehavior
의 State
로 변경한다.지난 포스트에서 정리했던 내용들
sealed trait Command
case class AddItem(item: Item) extends Command
case object Buy extends Command
case object Leave extends Command
case class GetCurrentCart(replyTo: ActorRef[ShoppingCart]) extends Command
private case object Timeout extends Command
FSM의 상태들은 EventSourcedBehavior
의 상태 파라미터와 더불어 커맨드 핸들러와 이벤트를 사용하여 나타낸다. 다음은 State
를 나타낸 코드이다.
sealed trait State
case class LookingAround(cart: ShoppingCart) extends State
case class Shopping(cart: ShoppingCart) extends State
case class Inactive(cart: ShoppingCart) extends State
case class Paid(cart: ShoppingCart) extends State
커맨드 핸들러는 PersitentFSM의 상태들 각각에 대해 분리된 영역을 가지고 있다.
def commandHandler(timers: TimerScheduler[Command])(state: State, command: Command): Effect[DomainEvent, State] =
state match {
case LookingAround(cart) =>
command match {
case AddItem(item) =>
Effect.persist(ItemAdded(item)).thenRun(_ => timers.startSingleTimer(StateTimeout, Timeout, 1.second))
case GetCurrentCart(replyTo) =>
replyTo ! cart
Effect.none
case _ =>
Effect.none
}
case Shopping(cart) =>
command match {
case AddItem(item) =>
Effect.persist(ItemAdded(item)).thenRun(_ => timers.startSingleTimer(StateTimeout, Timeout, 1.second))
case Buy =>
Effect.persist(OrderExecuted).thenRun(_ => timers.cancel(StateTimeout))
case Leave =>
Effect.persist(OrderDiscarded).thenStop()
case GetCurrentCart(replyTo) =>
replyTo ! cart
Effect.none
case Timeout =>
Effect.persist(CustomerInactive)
}
case Inactive(_) =>
command match {
case AddItem(item) =>
Effect.persist(ItemAdded(item)).thenRun(_ => timers.startSingleTimer(StateTimeout, Timeout, 1.second))
case Timeout =>
Effect.persist(OrderDiscarded)
case _ =>
Effect.none
}
case Paid(cart) =>
command match {
case Leave =>
Effect.stop()
case GetCurrentCart(replyTo) =>
replyTo ! cart
Effect.none
case _ =>
Effect.none
}
}
위과 같이 상태에 따라 분리하여 원하는 작업을 실행시킬 수 있다.
.receiveSignal {
case (state, RecoveryCompleted) =>
state match {
case _: Shopping | _: Inactive =>
timers.startSingleTimer(StateTimeout, Timeout, 1.second)
case _ =>
}
}
위는 recovery와 state에 따른 행동을 나눈것이다.
def eventHandler(state: State, event: DomainEvent): State = {
state match {
case la @ LookingAround(cart) =>
event match {
case ItemAdded(item) => Shopping(cart.addItem(item))
case _ => la
}
case Shopping(cart) =>
event match {
case ItemAdded(item) => Shopping(cart.addItem(item))
case OrderExecuted => Paid(cart)
case OrderDiscarded => state // will be stopped
case CustomerInactive => Inactive(cart)
}
case i @ Inactive(cart) =>
event match {
case ItemAdded(item) => Shopping(cart.addItem(item))
case OrderDiscarded => i // will be stopped
case _ => i
}
case Paid(_) => state // no events after paid
}
}
위는 마이그레이션 과정 중에서 5번에 해당하는 것으로, 이벤트 어댑터를 선언한 것이다. 들어온 이벤트의 종류에 따라 나누어 해야할 일을 정한 코드이다.
class PersistentFsmEventAdapter extends EventAdapter[DomainEvent, Any] {
override def toJournal(e: DomainEvent): Any = e
override def manifest(event: DomainEvent): String = ""
@nowarn("msg=deprecated")
override def fromJournal(journalEvent: Any, manifest: String): EventSeq[DomainEvent] = {
journalEvent match {
case _: StateChangeEvent =>
// In this example the state transitions can be inferred from the events
// Alternatively the StateChangeEvent can be converted to a private event if either the StateChangeEvent.stateIdentifier
// or StateChangeEvent.timeout is required
// Many use cases have the same timeout so it can be hard coded, otherwise it cane be stored in the state
EventSeq.empty
case other =>
// If using a new domain event model the conversion would happen here
EventSeq.single(other.asInstanceOf[DomainEvent])
}
}
}
val persistentFSMSnapshotAdapter: SnapshotAdapter[State] = PersistentFSMMigration.snapshotAdapter[State] {
case (stateIdentifier, data, _) =>
val cart = data.asInstanceOf[ShoppingCart]
stateIdentifier match {
case "Looking Around" => LookingAround(cart)
case "Shopping" => Shopping(cart)
case "Inactive" => Inactive(cart)
case "Paid" => Paid(cart)
case id => throw new IllegalStateException(s"Unexpected state identifier $id")
}
}
위의 과정은 마이그레이션 마지막에 해당하는 단계로, FSM 상태에 따라 해야할 메서드를 나타낸 것이다. 이때, 상태는 stateIdentifier
에 저장되어 있으므로, 이것을 패턴매칭을 통해 해야할 일을 설정한 것이다.