이번 강의에서는 context.become을 활용한 Counter Actor 구현을 학습했다.
이 Actor는 자신의 상태를 변경하지 않으면서 카운터를 증가시키거나 감소시키는 기능을 제공한다.
이를 위해 context.become 메서드를 사용해 현재 카운트 값을 변경하면서 자신의 수신 동작을 바꾼다.
또한 간단한 투표 시스템 구현을 하였는데 이 시스템은 시민들이 특정 후보에 투표할 수 있으며, 투표 집계기는 모든 시민들의 투표 상태를 요청하여 투표 결과를 집계하도록 만들어졌다.
[ 메시지 ]
Increment: 카운터를 증가
Decrement: 카운터를 감소
Print: 현재 카운터 값을 출력
[ 동작]
Increment나 Decrement 메시지를 받으면 context.become을 사용하여 카운트 값을 업데이트하면서 자신의 수신 동작을 바꾸는 countReceive 메서드를 호출한다.
Print 메시지를 받으면 현재 카운트 값을 출력한다.
[ 메시지 ]
Vote(candidate: String): 특정 후보에 투표
VoteStatusRequest: 현재 투표 상태를 요청
VoteStatusReply(candidate: Option[String]): 현재 투표 상태를 반환
[ 동작]
Vote 메시지를 받으면 해당 후보에 투표한 상태로 동작을 바꿈
VoteStatusRequest를 받으면 현재 투표한 후보의 이름을 반환
[ 메시지 ]
AggregateVotes(citizens: Set[ActorRef]): 주어진 시민들의 투표 결과를 집계
[ 동작 ]
AggregateVotes 메시지를 받으면 모든 시민들에게 VoteStatusRequest 메시지를 보내 투표 결과를 요청한다. 시민들로부터 투표 상태를 받으면 집계를 수행한다.
마지막 코드 부분은 각 시민들이 후보에 투표하는 것과 투표 결과를 집계하는 VoteAggregator에 투표 상태 집계를 요청하는 코드이다.
// 카운터 actor를 `context.become`과 불변 상태로 재생성하는 연습문제
object Counter {
case object Increment
case object Decrement
case object Print
}
class Counter extends Actor {
import Counter._
override def receive: Receive = countReceive(0)
// 현재 카운트를 기반으로 메시지를 처리하는 핸들러
def countReceive(currentCount: Int): Receive = {
case Increment =>
println(s"[countReceive($currentCount)] incrementing")
context.become(countReceive(currentCount + 1))
case Decrement =>
println(s"[countReceive($currentCount)] decrementing")
context.become(countReceive(currentCount - 1))
case Print => println(s"[countReceive($currentCount)] my current count is $currentCount")
}
}
import Counter._
// 이제 ActorSystem을 시작하고 `Counter` actor 인스턴스를 생성한다.
val counter = system.actorOf(Props[Counter], "myCounter")
// 카운터 증가 및 감소 테스트
(1 to 5).foreach(_ => counter ! Increment)
(1 to 3).foreach(_ => counter ! Decrement)
counter ! Print
/**
* 간단한 투표 시스템 연습문제
*/
// Citizen actor에 대한 메시지들
// Vote(후보): 투표를 위해 시민에게 보낸다.
case class Vote(candidate: String)
//VoteStatusRequest: 집계기가 각 시민에게 투표 내용을 문의하기 위해 보낸다.
case object VoteStatusRequest
//VoteStatusReply(후보 옵션): 투표한 후보를 포함하는 시민의 응답 (투표하지 않은 경우 None).
case class VoteStatusReply(candidate: Option[String])
class Citizen extends Actor {
override def receive: Receive = {
case Vote(c) => context.become(voted(c)) // 투표한 후의 상태로 변경
case VoteStatusRequest => sender() ! VoteStatusReply(None) // 아직 투표하지 않은 상태에서는 None을 반환
}
// 투표 후의 행동
def voted(candidate: String): Receive = {
case VoteStatusRequest => sender() ! VoteStatusReply(Some(candidate)) // 투표한 후에는 해당 후보자를 반환
}
}
// AggregateVotes(시민): 제공된 시민 목록에게 투표 내용을 묻기 위해 투표 집계기에 보낸다.
case class AggregateVotes(citizens: Set[ActorRef])
class VoteAggregator extends Actor {
override def receive: Receive = awaitingCommand
def awaitingCommand: Receive = {
case AggregateVotes(citizens) =>
citizens.foreach(citizenRef => citizenRef ! VoteStatusRequest)
context.become(awaitingStatuses(citizens, Map()))
}
// 투표 결과를 기다리는 핸들러
def awaitingStatuses(stillWaiting: Set[ActorRef], currentStats: Map[String, Int]): Receive = {
case VoteStatusReply(None) =>
// 아직 투표하지 않은 시민에게 다시 요청
sender() ! VoteStatusRequest
case VoteStatusReply(Some(candidate)) =>
val newStillWaiting = stillWaiting - sender()
val currentVotesOfCandidate = currentStats.getOrElse(candidate, 0)
val newStats = currentStats + (candidate -> (currentVotesOfCandidate + 1))
if (newStillWaiting.isEmpty) {
println(s"[aggregator] poll stats: $newStats")
} else {
context.become(awaitingStatuses(newStillWaiting, newStats))
}
}
}
// 시민들 생성 및 투표
val alice = system.actorOf(Props[Citizen])
val bob = system.actorOf(Props[Citizen])
val charlie = system.actorOf(Props[Citizen])
val daniel = system.actorOf(Props[Citizen])
alice ! Vote("Martin")
bob ! Vote("Jonas")
charlie ! Vote("Roland")
daniel ! Vote("Roland")
val voteAggregator = system.actorOf(Props[VoteAggregator])
voteAggregator ! AggregateVotes(Set(alice, bob, charlie, daniel))