우리는 Akka의 Event Sourcing을 정리하며 persist
나 persistAll
메서드를 사용하여 이벤트들을 저장하고 recovery 시 재생될 수 있도록 하였다. (이 포스트에 해당 내용을 정리해놓았다.)
persist와 persistAll 메서드에 대해 정리를 하며 persist가 호출되는 시점과 persis의 인자로 들어온 콜백 함수가 실행되는 시점이 다르다는 것을 공부했었다.
persist(이벤트)/* TIME GAP */(처리 콜백)
최대한 눈으로 볼 수 있도록 주석을 가운데에 넣어놓았다. 위처럼 persist
가 직접 호출되는 시점과 처리 콜백 사이의 시간 차가 존재하고, 일반 persist
나 persistAll
은 Persist와 콜백 호출 시간의 차이 동안 들어오는 메시지들은 stash 처리가 되어 기다리게 될 수 밖에 없다.
하지만, persistAsync
나 persistAllAsync
를 사용하면 persist 호출 시점과 실제 콜백의 실행 시점의 시간 차 동안에도 들어오는 메시지들의 stash가 이루어지지 않고, 바로바로 메시지들이 처리가 된다. 즉, 실행 순서가 보장되지 않지만 많은 작업들을 동시간 내에 처리할 수 있다는 장점이 있다.
이를 알아보기 위해 간단한 예제를 보자.
class EventAggregator extends Actor with ActorLogging{
def receive:Receive = {
case message => log.info(s"Aggregating $message")
}
}
위는 메시지를 받으면 바로 출력하는 액터이며, 이 이벤트 처리 액터에게 메시지를 보낼 액터는 다음과 같다.
object CriticalStreamProcessor {
case class Command(contents: String)
case class Event(contents: String)
}
class CriticalStreamProcessor(eventAggregator: ActorRef) extends PersistentActor with ActorLogging {
import CriticalStreamProcessor._
override def persistentId: String = "critical-stream-processor"
override def receiveRecover: Receive = {
// 현재 포스트에서는 receiveRecover가 중요하지 않으므로 로깅 처리만 진행하도록 하였다.
case message => log.info(s"Recovered $message")
}
override def receiveCommand: Receive = {
case Command(contents) =>
eventAggregator ! s"Processing $contents"
val event = Event(contents)
persist(event){
record =>
eventAggregator ! record
}
val processedContents = contents + "_processed"
persist(Event(processedContents)){
record =>
eventAggregator ! record
}
}
}
위와 같이 Command(contents)
를 받는다면 해당 이벤트를 저장시키고, 이벤트 aggregator로 해당 메시지를 보낸다. 그리고 컨텐츠를 프로세싱하여 프로세싱된 데이터 역시 event aggregator로 보내는 코드이다. 위의 액터에게 밑과 같은 메시지를 보낸 후 실행하면
criticalStreamProcessor ! Command("command1")
criticalStreamProcessor ! Command("command2")
위와 같은 결과를 얻을 수 있다. 즉, persist
가 순서대로 처리되며, 메시지 역시 순차적으로 처리됨을 알 수 있다. 만약 위의 PersistentActor
의 persist를 persistAsync
로 변경한다면 어떻게 될까?
Aggregating Event(command2)
가 Event(command1)
보다 먼저 들어온 것을 확인할 수 있다. 즉, persist와 콜백 사이의 시간 차에 다른 메시지가 처리된 것을 확인할 수 있다.
따라서 높은 처리율이 필요하며, 이벤트가 순서가 덜 중요할 때 => persistAsync
나 persistAllAsync
를 사용해야 하며, 처리율이 떨어지더라도 이벤트의 순서가 중요할 때 persist
나 persistAll
을 사용해야 한다.