메시지들은 어떠한 종류의 객체도 가능하나, 반드시 불변(immutable)해야 한다. 스칼라는 강제로 immutability를 집행할 수 없으므로 컨벤션으로 immutability를 지킨다.
추천하는 접근 방법은 Scala case class
들을 사용하여 immutable하게 만들어 패턴 매칭을 활용하는 것이다.
case class User(name: String)
case class Register(user: User)
val user = User("Mike")
val message = Register(user)
위의 예시 코드처럼 case class를 만들어 immutable하게 사용을 해야한다.
메시지를 보내는 메서드에는 !
와 ?
가 있다.
이전에 공부 내용들을 정리하면서 해당 메서드를 사용하여 코드를 작성했었다.
// Main.scala
object Main extends App{
val system = ActorSystem("ping-pong")
val pinger = system.actorOf(Props[PingActor](), "pinger")
val ponger = system.actorOf(classOf(Props[PongActor], pinger), "ponger")
system.scheduler.scheduleOnce(500 millis) {
ponger ! Ping
}
}
위의 Main.scala에서 스케줄링을 시작할 때 ponger
에게 Ping
이라는 메시지를 !
를 통해 전달하였다.
!
의 의미는 메시지를 보낸 후 잊는다(fire-and-forget)는 뜻이다. 즉, 메시지를 비동기적으로 보내고 다시 돌아온다는 의미이다. tell
이라고도 알려져 있다.
Tell : fire-and-forget
!
, 즉 tell은 메시지를 전송할 때 선호되는 방식이다. 메시지를 처리하느라 블록킹을 하지 않기 때문이다. 이러한tell
은 확장성과 동시성에 가장 좋은 특징들을 제공해준다.
ActorRef ! message
를 하면, Actor로부터 발생하여 Actor reference를 메시지와 함께 보낸다. 따라서sender():ActorRef
가 동반된다. 따라서 메시지 수신인이 발신자에게 답변을 보낼 때sender() ! replyMsg
와 같은 형태로 응답할 수 있다.
만약, Actor가 아닌 것에서부터 메시지가 발생한다면deadLetters
라는 Actor reference를 디폴트로 제공한다.
?
역시 메시지를 보내는 메서드이다. 이 메서드 역시 비동기적으로 메시지를 보내고, 가능한 답변을 나타내는 Future
를 리턴한다. 이러한 ?
메서드는 ask
라고도 알려져 있다.
import akka.pattern.{ ask, pipe }
import system.dispatcher
final case class Result(x: Int, s: String, d: Double)
case object Request
implicit val timeout: Timeout = 5.seconds
val f: Future[Result] =
for {
x <- ask(actorA, Request).mapTo[Int]
s <- actorB.ask(Request).mapTo[String]
d <- (actorC ? Request).mapTo[Double]
} yield Result(x, s, d)
f.pipeTo(actorD)
pipe(f) to actorD
위에서 결과값이 Future
타입인 것을 확인할 수 있다.
Ask: Send-And-Receive-Future
ask
패턴은 액터 뿐만 아니라 futures를 가지고 있다. 따라서, ActorRef보다는패턴을 사용
하는 것이 권장된다. 위의 코드에서pipeTo
라는 패턴을 통해 사용한 것처럼 말이다. 위의 코드는 non-blocking asynchronous code이다. ask가 Future을 만들고, x, s, d라는 3가지 요소가 새로운 future를 만들어 내기 때문이다.
ask
를 사용하는 방식은 타임 아웃이 되었을 때 tell
에 비해 작업해야 하는 양이 많다. 따라서 꼭 필요한 경우를 제외하고서는 tell(!)
을 사용하는 것이 권장된다.
앞쪽에서 정리한 액터 코드를 다시 한 번 가져와서 확인해보면
// PingActor.scala
package actors
import akka.actor.{Actor, ActorRef, PoisonPill, Props}
import akka.event.Logging
case object Pong
case object Ping
class PingActor extends Actor{
private val log = Logging(context.system, this)
private var countDown:Int = 100
override def receive: Receive = {
case Pong =>
log.info(s"${self.path} received pong, count down $countDown")
if (countDown > 0) {
countDown -= 1
sender() ! Ping
} else {
sender() ! PoisonPill
self ! PoisonPill
}
}
}
// PongActor.scala
package actors
import akka.actor.{Actor, ActorRef}
import akka.event.Logging
class PongActor(pinger: ActorRef) extends Actor{
private val log = Logging(context.system, this)
override def receive: Receive = {
case Ping =>
log.info(s"${self.path} received ping")
pinger ! Pong
}
}
위의 액터들의 공통점은 모두 Actor
라는 trait를 확장했다는 점과 receive
메서드를 오버라이드 해서 재정의했다는 점이다. 실제로, Actor trait를 상속하면 receive 메서드를 반드시 구현해야한다.
이때, 이러한 receive 메서드는 받은 메시지를 어떻게 처리하는지 다루는 블럭이다.
Receive
라는 PartialFunction을 리턴하므로 패턴매칭을 통해 메시지를 분류해서 어떻게 반응할지 작성할 수 있다.
만약 메시지에 대한 답을 보내고 싶으면 sender
메서드를 통해 메시지의 발신인을 ActorRef 형태로 가져오고. !
메서드를 통해 메시지를 보내면 된다.
sender() ! replyMsg
형태의 코드를 작성하면 답장을 보내는 코드이다.
Receive time out
메시지가 전송된 후 inactivity time out
되는 것을 뜻한다. 자세하게 말하면, akka.actor.ReceiveTimeout
메시지로 핸들하는 것이다. 타임아웃 설정시간 중 최소시간은 1millisecond이다.
이때 주의해야할 점은 receive time out이 발생하고 ReceiveTimeout
을 enqueue되는 시점은 다른 메시지가 enqueue되는 시점이다. 그러므로 receive time out에 대한 reception을 보장하지 않는다.
import akka.actor.ReceiveTimeout
import scala.concurrent.duration._
class MyActor extends Actor {
// To set an initial delay
context.setReceiveTimeout(30 milliseconds)
def receive = {
case "Hello" =>
// To set in a response to a message
context.setReceiveTimeout(100 milliseconds)
case ReceiveTimeout =>
// To turn it off
context.setReceiveTimeout(Duration.Undefined)
throw new RuntimeException("Receive timed out")
}
}
위와 같이 사용할 수 있다. 만약, receive timeout에 관한 알림을 받고 싶지 않다면, cancelReceiveTimeout
을 사용하면 된다. 그리고, NotInfluenceReceiveTimeout
으로 마킹된 메시지들은 타이머를 리셋시키지 않는다.