[Akka] persistAsync

smlee·2023년 10월 20일
0

Akka

목록 보기
39/50
post-thumbnail

우리는 Akka의 Event Sourcing을 정리하며 persistpersistAll 메서드를 사용하여 이벤트들을 저장하고 recovery 시 재생될 수 있도록 하였다. (이 포스트에 해당 내용을 정리해놓았다.)

persist와 persistAll 메서드에 대해 정리를 하며 persist가 호출되는 시점과 persis의 인자로 들어온 콜백 함수가 실행되는 시점이 다르다는 것을 공부했었다.

persist(이벤트)/*		TIME	GAP			*/(처리 콜백)

최대한 눈으로 볼 수 있도록 주석을 가운데에 넣어놓았다. 위처럼 persist가 직접 호출되는 시점과 처리 콜백 사이의 시간 차가 존재하고, 일반 persistpersistAll은 Persist와 콜백 호출 시간의 차이 동안 들어오는 메시지들은 stash 처리가 되어 기다리게 될 수 밖에 없다.

하지만, persistAsyncpersistAllAsync를 사용하면 persist 호출 시점과 실제 콜백의 실행 시점의 시간 차 동안에도 들어오는 메시지들의 stash가 이루어지지 않고, 바로바로 메시지들이 처리가 된다. 즉, 실행 순서가 보장되지 않지만 많은 작업들을 동시간 내에 처리할 수 있다는 장점이 있다.

이를 알아보기 위해 간단한 예제를 보자.

class EventAggregator extends Actor with ActorLogging{
	def receive:Receive = {
    	case message => log.info(s"Aggregating $message")
    }
}

위는 메시지를 받으면 바로 출력하는 액터이며, 이 이벤트 처리 액터에게 메시지를 보낼 액터는 다음과 같다.

object CriticalStreamProcessor {
	case class Command(contents: String)
    case class Event(contents: String)
}

class CriticalStreamProcessor(eventAggregator: ActorRef) extends PersistentActor with ActorLogging {

	import CriticalStreamProcessor._
	
	override def persistentId: String = "critical-stream-processor"
    override def receiveRecover: Receive = {
    // 현재 포스트에서는 receiveRecover가 중요하지 않으므로 로깅 처리만 진행하도록 하였다.
    	case message => log.info(s"Recovered $message")
    }
    
    override def receiveCommand: Receive = {
    	case Command(contents) =>
        	      eventAggregator ! s"Processing $contents"

        val event = Event(contents)

        persist(event){
          record =>
            eventAggregator ! record
        }

        val processedContents = contents + "_processed"
        persist(Event(processedContents)){
          record =>
            eventAggregator ! record
        }
    }
}

위와 같이 Command(contents)를 받는다면 해당 이벤트를 저장시키고, 이벤트 aggregator로 해당 메시지를 보낸다. 그리고 컨텐츠를 프로세싱하여 프로세싱된 데이터 역시 event aggregator로 보내는 코드이다. 위의 액터에게 밑과 같은 메시지를 보낸 후 실행하면

criticalStreamProcessor ! Command("command1")
criticalStreamProcessor ! Command("command2")


위와 같은 결과를 얻을 수 있다. 즉, persist가 순서대로 처리되며, 메시지 역시 순차적으로 처리됨을 알 수 있다. 만약 위의 PersistentActor의 persist를 persistAsync로 변경한다면 어떻게 될까?

Aggregating Event(command2)Event(command1)보다 먼저 들어온 것을 확인할 수 있다. 즉, persist와 콜백 사이의 시간 차에 다른 메시지가 처리된 것을 확인할 수 있다.

따라서 높은 처리율이 필요하며, 이벤트가 순서가 덜 중요할 때 => persistAsyncpersistAllAsync를 사용해야 하며, 처리율이 떨어지더라도 이벤트의 순서가 중요할 때 persistpersistAll을 사용해야 한다.

0개의 댓글