snapshot을 정리한 포스트에서 우리는 leveldb
를 사용하여 로컬 journal에 데이터들을 저장했었다. 이번 포스트에서는 이에 대해 조금 더 보충할 예정이다.
스냅샷을 위해 다음과 같이 간단한 PersistentActor
를 선언하자. 이 액터는 "snapshot"이라는 메시지로 그동안 받은 메시지의 개수를 저장하며, "print"라는 메시지를 받으면 현재까지의 메시지 개수를 출력한다. 그 외의 모든 메시지들은 이벤트화하여 Persist시키는 액터이다.
class SimpleActor extends PersistentActor with ActorLogging {
var nMessages = 0
override def persistentId: String = "simple-persistent-actor"
override def receiveCommand: Receive = {
case SaveSnapshotSuccess(metadata) =>
log.info(s"Saving snapshot was successful: $metadata")
case SavesnapshotFailure(_, cause) =>
log.info(s"Saving snapshot failed because of $cause")
case "snapshot" =>
saveSnapshot(nMessages)
case "print" =>
log.info(s"I have persisted $nMessages so far")
case message =>
persist(message){ e =>
log.info(s"Persisting $e")
nMessages += 1
}
}
override def receiveRecover: Receive = {
case SnapshotOffer(metadata, payload) =>
log.info(s"Recovered Snapshot: $payload")
nMessages = payload
case RecoveryCompleted => log.info("Recovery Done")
case message =>
log.info(s"Recovered Message: $message")
nMessages += 1
}
}
위와 같이 간단하게 액터를 선언한다. 그리고 application.conf
에 들어가 다음과 같이 configuration을 작성한다.
localStores {
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb.dir = "target/localStores/journal"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/localStores/snapshots"
akka.persistence.journal.leveldb.compaction-intervals {
# start compactions per persistenceId
simple-persistent-actor = 1000 # 1000개의 메시지 - compaction 시작
"*" = 5000
}
}
구글에서 제공하는 가벼운 Key-value 형태의 로컬 db로 leveldb
를 사용하며 해당 데이터들이 어디에 저장될 지 정하는 config이다. 또한, 너무 많은 양의 snapshot data가 축적되면 compaction이 진행되도록 설정한다. 이는 akka.persistence.journal.leveldb.compaction-intervals
를 통해 진행한다.
이제 이것을 ActorSystem
에 해당 Config를 넣어야하므로 ConfigFactory.load().getConfig("localStores")
를 넣는다. 따라서 다음과 같은 코드가 된다.
object LocalStores extends App {
val system = ActorSystem("localStoreSystem", ConfigFactory.load().getConfig("localStores"))
val persistentActor = system.actorOf(Props[SimplePersistentActor], "simplePersistentActor")
(1 to 10).foreach(num => persistentActor ! s"I love akka $num")
persistentActor ! "print"
(11 to 20).foreach(num => persistentActor ! s"I love akka $num")
persistentActor ! "snapshot"
}
이제 실제 코드를 실행해보면 다음과 같다.
최초로 실행한 결과는 snapshot이 저장되었다는 로그를 볼 수 있다. 이제 snapshot들이 잘 저장된것이 맞는지 재실행해보면 Recovered Snapshot: 20
이라는 문구와 함께 제대로 로컬 스냅샷들을 가져온 것을 확인할 수 있다.