클러스터에 있는 액터 시스템 간 메시지를 보내기 위해서는 직렬화를 꼭 시켜야 한다. Serialization with Jackson은 좋은 선택지 중 하나이다.
Cluster
extenstion을 사용하기 위해서는 밑과 같은 configuration을 설정해야 한다.
akka {
actor {
provider = "cluster"
}
remote.artery {
canonical {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551",
"akka://ClusterSystem@127.0.0.1:2552"]
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
}
}
cluster extension을 사용하기 위해서는 반드시 host
와 port
를 설정해주어야 하고, akka.actor.provider = "cluster"
를 사용해야 한다. 위의 코드는 Cluster API Extension을 사용하기 위한 최소한의 설정이다.
액터들은 특정한 클러스터 이벤트들에 대한 구독자로 가입한다. 구독이 시작되었을 때, 액터들은 현재 클러스터의 상태에 대응하는 이벤트를 받는다. 그리고 클러스터가 변하는 이벤트가 발생 시 그 상태에 대응하는 이벤트를 받는다.
/*
* Copyright (C) 2018-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package scala.docs.cluster
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.ActorLogging
import akka.actor.Actor
class SimpleClusterListener extends Actor with ActorLogging {
val cluster = Cluster(context.system)
// subscribe to cluster changes, re-subscribe when restart
override def preStart(): Unit = {
//#subscribe
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
//#subscribe
}
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case MemberUp(member) =>
log.info("Member is Up: {}", member.address)
case UnreachableMember(member) =>
log.info("Member detected as unreachable: {}", member)
case MemberRemoved(member, previousStatus) =>
log.info("Member is Removed: {} after {}", member.address, previousStatus)
case _: MemberEvent => // ignore
}
}
위의 코드는 클러스터 extension에 대한 간단한 예제 코드이다. 내부에 Cluster를 선언하고 preStart
에서 클러스터에 가입한다. 그 후, 액터가 살아있는 생명주기 동안 이벤트를 받아 처리하는 코드이다. 그리고 액터가 중지 된 후 (postStop) 클러스터의 구독을 해제한다.
액터가 살아있을 동안은 receive를 통해 이벤트를 처리한다.
클러스터에 있는 노드들에 joining할 수 있다. joining 하는 것은 아래와 같이 간단하게 할 수 있다.
import akka.actor.Address
import akka.cluster.Cluster
val cluster = Cluster(system)
val list: List[Address] = ??? //your method to dynamically get seed nodes
cluster.joinSeedNodes(list)
노드들이 들어있는 리스트를 선언해서 노드들을 직접 삽입해주면 된다. joinSeedNodes
를 통해 클러스터에 등록한다. 중복성이나 빌트인 재시도 매커니즘으로 인해 위의 joinSeedNodes
가 지향된다.
Cluster(system).substribe
를 사용함으로써 클러스터 멤버들의 변경 알림을 구독할 수 있다.
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
위와 같은 간단한 형태로 변경을 구독할 수 있다.
CurrentClusterState
에 있는 전체 상태의 스냅샷은 업데이트들을 담은 첫 메시지를 보낸다.
주의해야할 점은 만약 비어있는 CurrentClusterState
를 받았다면, MemberUp
이벤트를 담아야 한다.