Persistent store는 데이터를 읽기 위하여 사용한다. 이때, Persistence Query를 사용한다면 다양한 기능들을 사용할 수 있다.
1. SELECT persistence IDs
2. select events by persistence ID
3. select events across persistence IDs by tags
즉, 어떠한 Persistent actor가 살아있는지 확인할 수 있으며, 오래된 상태들을 재생성하거나 쫓을 수 있다. 그리고, 전체 store에서 이벤트에 대한 데이터 프로세싱을 할 수 있다.
가장 먼저, 실행할 object에 다음과 같이 작성한다.
val system = ActorSystem("persistenceQueryDemo", ConfigFactory.load().getConfig("persistenceQueryExcercise"))
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraJournal.Identifier)
readJournal
이 Persistence Query를 담당해줄 값으로, CassandraReadJournal
이다. 이때 CassandraReadJournal은 scaladsl
패키지 내에 있는 것으로 임포트 시켜야 한다. 즉, import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
이어야 한다. (인텔리제이에서 자동 import를 시키면 javadsl로 임포트 되는 경우가 종종 있다.)
persistenceIds()
라는 메서드를 사용하면 모든 Persistence Id를 불러올 수 있다.
val persistenceIds = readJournal.persistenceIds()
implicit val materializer = ActorMaterializer()(system)
persistenceIds.runForeach{
persistenceId =>
println(s"Found persistence ID : $persistenceId")
}
persistentIds에 persistenceIds() 메서드를 실행시킨 결과값을 담는다.
그렇다면 이 value의 데이터 타입은 Source[String, NotUsed]
가 된다. Source
는 Akka streams의 일부이며, string objects의 infinite collection이다.
이제 이 persistenceIds를 각각 실행하기 위해서는 runForeach
메서드를 돌려야 한다. 이때, ActorMaterializer
가 필요하므로 implicit
로 선언해준다. 그리고, runForeach
내부에 출력하는 코드를 작성하면 실행 결과는 다음과 같다.
위와 같이 Cassandra
내부에 존재하는 모든 persistentActor
의 persistenceId
가 불러와진다. (위의 persistenceId들은 이전 포스트들에서 공부하며 정리했던 액터들이다.)
이때, persistenceIds()
메서드는 실시간으로 갱신된다. 이를 알아보기 위해 간단한 액터를 추가한 코드를 실행시켜본다.
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val persistenceIds = readJournal.persistenceIds() // persistence query API is only available on these regional objects
implicit val materializer = ActorMaterializer()(system)
persistenceIds.runForeach{
persistenceId =>
println(s"Found persistence ID : $persistenceId")
}
val simpleActor = system.actorOf(Props[SimplePersistentActor], "simpleActor")
import system.dispatcher
system.scheduler.scheduleOnce(5 seconds) {
val message = "hello, persistent actor"
simpleActor ! message
}
위와 같이 "simpleActor"를 만들어 실행시킨다면 결과는 어떻게 될까?
simpleActor
에서 답변이 오고, persistenceId가 갱신된 것이 보인다. 즉, 실시간 갱신이 가능한 것을 알 수 있다.
우리는 여러 개의 PersistentActor가 존재하는 환경에서 특정 PersistenceActor의 이벤트만 사용하고 싶다. 이럴 때는 eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
메서드를 사용해준다.
val events = readJournal.eventsByPersistenceId("persistence-query-id-1", 0, Long.MaxValue)
events.runForeach{ event =>
println(s"readEvent: $event")
}
즉, 위와 같이 사용한다면 "persistence-query-id-1"을 persistenceId로 가지고 있는 PersistentActor의 이벤트들을 불러올 수 있다.
1번에서 실행된 하나의 이벤트가 잘 출력되는 것을 볼 수 있다.
이러한 eventsByPersistenceId
역시 실시간으로 실행된다. 만약 해당 persistentActor에 메시지 1개를 더 보냈다면 다음과 같이 출력이 된다.