[Akka] Classic Cluster Aware Routers

smlee·2023년 9월 5일
0

Akka

목록 보기
14/50
post-thumbnail

발신자가 보낸 메시지를 수신자에게로 보낼 때, 노드의 멤버들을 클러스터가 인식할 수 있도록 할 수 있다. 만약 노드에 갈 수 없거나 경로를 떠날 수 없을 때 사용된다. 만약 새로운 노드가 클러스터에 들어온다면 configuration에 근거하여 추가적인 경로가 라우터에 추가된다.

클러스터는 이렇게 추가되고 변경되는 멤버들을 관리한다.

Router with group of Routees

Group을 사용하면 클러스터 멤버 노드들의 routee actor들을 시작시켜주어야 한다. 왜냐하면 이러한 동작은 라우터에 의해 실행되지 않기 때문이다.

routee actors들은 액터 시스템이 시작된 후 가능한 빨리 시작되는 것이 좋다. 그 이유는 멤버들의 status가 'Up'으로 되자마자 라우터는 사용하려고 하기 때문이다.

configuration

밑은 group을 위한 configuration이다.

akka.actor.deployment {
  /statsService/workerRouter {
      router = consistent-hashing-group
      routees.paths = ["/user/statsWorker"]
      cluster {
        enabled = on
        allow-local-routees = on
        use-roles = ["compute"]
      }
    }
}

액터들의 경로는 routees.paths에 넣는다. 이때, path는 프로토콜이나 주소 정보를 포함해서는 안 된다. 왜냐하면 클러스터 멤버들로부터 동적으로 읽어오기 때문이다.

메시지는 ActorSelection을 사용하여 정해진 Routee들에게 전해진다. 따라서 같은 참조(semantic)인 것이 보장된다. 또는 use-roles를 통해 구체적인 역할들을 지정할 수 있다.

여기에 추가로, max-total-nr-of-instances를 통해 클러스터의 총 Routee수를 정의할 수 있다. 이것의 기본 값은 10000이라는 값을 갖는다. 이는 노드가 클러스터에 들어오면 새로운 routee를 추가하기 때문인다. 만약 routee 수를 제한하고 싶다면 적은 값을 설정하는 것이 좋다.

굳이 configuration을 사용하고 싶지 않다면 코드 안에 정의할 수도 있다.

import akka.cluster.routing.{ ClusterRouterGroup, ClusterRouterGroupSettings }
import akka.routing.ConsistentHashingGroup

val workerRouter = context.actorOf(
  ClusterRouterGroup(
    ConsistentHashingGroup(Nil),
    ClusterRouterGroupSettings(
      totalInstances = 100,
      routeesPaths = List("/user/statsWorker"),
      allowLocalRoutees = true,
      useRoles = Set("compute"))).props(),
  name = "workerRouter2")

위에 쓴 configuration 파일의 내용을 akka 파일 내에서 액터를 선언할 때 직접 선언할 수도 있다.

Group of Routees Router 예제

위쪽에서 configuration을 작성하는 법을 공부했고, 어떨 때 사용하는지 알아보았다. 따라서 예제 코드를 보며 코드는 어떤 방식으로 작성되는지 확인할 예정이다.

예시 코드는 텍스트에 대한 통계를 계산하는 서비스 애플리케이션이다. 만약 어떠한 text가 서비스로 수신되었을 때, 서비스는 단어 단위로 나누고, 각각의 worker들(a routee of a router)에게 문자를 세는 작업을 위임한다. 각 단어에서 세어진 문자들은 단어들의 문자 수의 평균값을 계산하는 aggregator로 보내지는 로직을 가진다.

밑은 작업을 위한 메시지를 정의한 것이다.

final case class StatsJob(text: String) extends CborSerializable
final case class StatsResult(meanWordLength: Double) extends CborSerializable
final case class JobFailed(reason: String) extends CborSerializable

또한, 각 단어에 대한 문자의 수를 세는 프로그램 코드이다.

class StatsWorker extends Actor {
  var cache = Map.empty[String, Int]
  def receive = {
    case word: String =>
      val length = cache.get(word) match {
        case Some(x) => x
        case None =>
          val x = word.length
          cache += (word -> x)
          x
      }

      sender() ! length
  }
}

그리고 밑은 단어 단위로 text를 분리하고, worker들과 aggregates들에게 위임하는 코드이다.

class StatsService extends Actor {
  // This router is used both with lookup and deploy of routees. If you
  // have a router with only lookup of routees you can use Props.empty
  // instead of Props[StatsWorker.class].
  val workerRouter = context.actorOf(FromConfig.props(Props[StatsWorker]()), name = "workerRouter")

  def receive = {
    case StatsJob(text) if text != "" =>
      val words = text.split(" ")
      val replyTo = sender() // important to not close over sender()
      // create actor that collects replies from workers
      val aggregator = context.actorOf(Props(classOf[StatsAggregator], words.size, replyTo))
      words.foreach { word =>
        workerRouter.tell(ConsistentHashableEnvelope(word, word), aggregator)
      }
  }
}

class StatsAggregator(expectedResults: Int, replyTo: ActorRef) extends Actor {
  var results = IndexedSeq.empty[Int]
  context.setReceiveTimeout(3.seconds)

  def receive = {
    case wordCount: Int =>
      results = results :+ wordCount
      if (results.size == expectedResults) {
        val meanWordLength = results.sum.toDouble / results.size
        replyTo ! StatsResult(meanWordLength)
        context.stop(self)
      }
    case ReceiveTimeout =>
      replyTo ! JobFailed("Service unavailable, try again later")
      context.stop(self)
  }
}

receive에서 받은 text가 빈 문자열이 아니라면 공백 단위로 단어를 쪼갠다. 그리고, 결과값을 보내는데 타임 아웃 시간을 정해서 보내준다.

Remote Deployed Routees Pool

우리는 스레드풀이나 커넥션풀과 같은 것들에서 이란 무엇인가를 미리 만들어두고 담아두는 곳이라는 것을 공부했었다. pool을 사용하기 위해서는 다음과 같은 configuration 작성이 필요하다.

akka.actor.deployment {
  /statsService/singleton/workerRouter {
      router = consistent-hashing-pool
      cluster {
        enabled = on
        max-nr-of-instances-per-node = 3
        allow-local-routees = on
        use-roles = ["compute"]
      }
    }
}

위의 configuration 코드는 group of Routees를 설정했을 때 처럼 코드 안에서도 정의할 수 있다.

import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings }
import akka.routing.ConsistentHashingPool

val workerRouter = context.actorOf(
  ClusterRouterPool(
    ConsistentHashingPool(0),
    ClusterRouterPoolSettings(totalInstances = 100, maxInstancesPerNode = 3, allowLocalRoutees = false))
    .props(Props[StatsWorker]()),
  name = "workerRouter3")

이때 주의해야할 점은 사용되는 모든 Props가 직렬화되어야 한다는 점이다.

위의 코드들을 사용하여 ClusterRouterPool을 만들어 두는 전략을 채택할 수 있다.

Reference

0개의 댓글