다른 액터에게 request를 보내고, 해당 액터로부터 response를 받아야 하는 경우가 많다. 하지면 이 경우에 tell을 사용할 경우 액터의 캡슐화를 깨뜨릴 수 있다. 따라서 비동기적으로 일들을 처리하는 방법에는 ask를 사용하는 방법이 있다.
이를 학습하기 위해 예제로 인증을 위한 액터를 엄청 간단하게 짜보려고 한다.
예제에는 크게 3가지 액터가 사용될 것이다. username
과 password
를 key-value 형태로 저장하는 UserAuthActor
, 유저가 username과 password를 입력하면 올바른 정보인지 여부를 가려내는 AuthManager
, 여기에 AuthManager
와 같은 역할을 하지만 pipe pattern
을 사용하는 PipedAuthManager
를 구현할 것이다.
// UserAuthActor.scala
object UserAuthActor {
case class Read(username:String)
case class Write(username:String, password: String)
}
class UserAuthActor extends Actor with ActorLogging{
import UserAuthActor._
override def receive: Receive = online(Map())
def online(userInfos:Map[String, String]): Receive = {
case Read(username) =>
log.info(s"${username}에 대한 password를 읽습니다...")
sender() ! userInfos.get(username)
case Write(username, password) =>
log.info(s"${username}과 그의 비밀번호 ${password}를 등록합니다...")
context.become(online(userInfo + (username -> password)))
}
}
위는 유저의 아이디와 비밀번호를 저장하고 읽는 액터이다. 물론 실제로는 복잡한 보안 과정을 거치고, 로그에 등록하려는 username과 password를 보여주진 않을 것이지만 예시를 위한 것이므로 간단하게 나타내었다.
// AuthManager.scala
import akka.pattern.ask
import scala.concurrent.duration._
object AuthManager {
case class RegisterUser(username:String, password:String)
case class Authenticate(username:String, password:String)
case class AuthFailure(message:String)
case object AuthSuccess
val AUTH_FAILURE_USERNAME_NOT_FOUND = "없는 유저명입니다."
val AUTH_FAILURE_PASSWORD_INCORRECT = "올바르지 않은 비밀번호입니다."
val AUTH_FAILURE_SYSTEM_ERROR = "시스템 에러"
}
class AuthManager extends Actor with ActorLogging {
import AuthManager._
import UserAuthActor._
protected userDB = context.actorOf(Props[UserAuthActor])
implicit val timeout:Timeout = Timeout(1 second)
implicit val executionContext: ExecutionContext = context.dispatcher
override def receive:Receive = {
case RegisterUser(username, password) =>
log.info(s"${username}님을 등록합니다.")
userDB ! Write(username, password)
case Authenticate(username, password) =>
val originalSender = sender()
log.info(s"${username}님의 비밀번호를 확인합니다.")
val future = userDB ? Read(username)
future.onComplete {
case Success(None) => originalSender ! AuthFailure(AUTH_FAILURE_USERNAME_NOT_FOUND)
case Success(Some(dbPassword)) => originalSender ! AuthSuccess
case Failure(_) => originalSender ! AuthFailure(AUTH_FAILURE_SYSTEM_ERROR)
}
}
}
위와 같이 Authenticate(username, password)
메시지를 받았을 때 ?(ask)
를 사용한 것이 보일 것이다. ask를 사용하기 위해서는 akka.pattern.ask
를 임포트시켜야 한다.
ask의 return 타입은 Future[Any]이다. 따라서 요청을 처리하는 것이 비동기적으로 이루어진다. 이때 주의해야 할 점은 sender()
를 미리 value로 빼놓았다는 점이다. 이는 sender()
가 가장 마지막으로 메시지를 보낸 발신인을 리턴하는데, 비동기적으로 실행하는 동안 sender()가 바뀔 수 있으므로 value로 따로 빼놓는 것이다.
또한, timeout
과 executionContext
를 암시적으로 선언했다. 이는 Future의 timeout 시간과 execution context를 위한 것이다.
그리고 onComplete
, 즉 Future 작업이 완료되었을 때의 동작을 선언하면 된다. 이때 주의해야 할 점은 콜백 안에서 mutable state에 접근하거나 method를 호출하는 것을 지양해야 한다는 점이다.
위의 코드의 Future 부분을 따로 빼면 다음과 같다.
// AuthManager.scala
import akka.pattern.ask
import scala.concurrent.duration._
object AuthManager {
case class RegisterUser(username:String, password:String)
case class Authenticate(username:String, password:String)
case class AuthFailure(message:String)
case object AuthSuccess
val AUTH_FAILURE_USERNAME_NOT_FOUND = "없는 유저명입니다."
val AUTH_FAILURE_PASSWORD_INCORRECT = "올바르지 않은 비밀번호입니다."
val AUTH_FAILURE_SYSTEM_ERROR = "시스템 에러"
}
class AuthManager extends Actor with ActorLogging {
import AuthManager._
import UserAuthActor._
protected userDB = context.actorOf(Props[UserAuthActor])
implicit val timeout:Timeout = Timeout(1 second)
implicit val executionContext: ExecutionContext = context.dispatcher
override def receive:Receive = {
case RegisterUser(username, password) =>
log.info(s"${username}님을 등록합니다.")
userDB ! Write(username, password)
case Authenticate(username, password) => handleAuthentication(username, password)
}
def handleAuthentication(username:String, password:String) = {
val originalSender = sender()
log.info(s"${username}님의 비밀번호를 확인합니다.")
val future = userDB ? Read(username)
future.onComplete {
case Success(None) => originalSender ! AuthFailure(AUTH_FAILURE_USERNAME_NOT_FOUND)
case Success(Some(dbPassword)) => originalSender ! AuthSuccess
case Failure(_) => originalSender ! AuthFailure(AUTH_FAILURE_SYSTEM_ERROR)
}
}
}
위의 handleAuthentication
부분에서 pipe pattern
을 적용할 수도 있다.
class PipedAuthManager extends AuthManager {
import AuthManager._
override def handleAuthentication(username: String, password: String): Unit = {
val future = userDB ? Read(username)
val passwordFuture = future.mapTo[Option[String]]
val responseFuture = passwordFuture.map{
case None => AuthFailure(AUTH_FAILURE_USERNAME_NOT_FOUND)
case Some(dbPassword) =>
if(dbPassword == password) AuthSuccess
else AuthFailure(AUTH_FAILURE_PASSWORD_INCORRECT)
}
responseFuture.pipeTo(sender())
}
}
위의 PipedAuthManager
는 handleAuthentication
만을 오버라이드하여 변경했다.
future까지는 같다. 이때, future의 데이터타입은 Future[Any]
이다. 이때, passwordFuture에서 mapTo
를 통해 Future[Option[String]]
으로 타입을 명시해준다. 그리고 String을 받아왔으면 데이터에 대해 로직을 수행한다. 그 후 pipeTo
메서드를 통해 sender()
를 사용하면 된다.