액터 모델 구현 두번째 시간입니다. 이전 포스팅에서 동시성 제어와 각각의 특징들을 살펴보았습니다. 이번 포스팅에서는 액터 모델의 특징을 알아보고 러스트로 직접 구현해보는 시간을 가지도록 하겠습니다. 전체 코드는 링크를 참고해주세요.
액터 모델은 동시성 프로그래밍 모델 중 하나 입니다. 이 모델은 시스템을 독립적인 액터들의 집합으로 보며, 각 액터는 비동기로 메시지를 주고받으며 상호작용합니다. 액터 모델은 다음 세가지 요소로 구성됩니다.
액터 모델은 이메일 시스템에 비유할 수 있습니다. 각 사용자(액터)는 자신의 이메일 계정(상태)과 메일 박스를 가지고 있고, 메일을 통해 서로 메시지를 주고 받습니다. 메일을 받은 사람은 자신이 편할 때 처리할 수 있지만(비동기) 보낸 사람은 알 수 없습니다(물론 메일을 읽은 시간을 표시하는 기능 가진 서비스도 있습니다). 이렇게 각 액터는 독립적으로 동작하며, 서로의 정보에 직접 접근하지 않고 메시지를 통해 상호작용합니다.
액터 모델에서 계산은 각 액터의 상태와 값(환경)을 변경하면서 실행됩니다. 따라서 특정 단계에서의 모델 상태를 파악하는 것이 중요한데, 이 상태를 표기하는 것이 바로 액터 컨피규레이션(actor configuration)입니다. 계산을 실행할 때는 액터 컨피규레이션을 조작하면서 동작합니다.
액터 컨피규레이션의 정의는 다음과 같습니다. 는 액터 이름에서 식으로의 사상(projection)이며, 는 환경입니다. 예를 들어 함수에서 데이터 가 액터 로 송신되면 환경 는 다음과 같이 전이(transfer)됩니다.
여기서 은 에 데이터 를 보내는 것을 의미합니다. 이 메시지를 통해 기존 환경 에 변화된 것을 추가해(으로 표현 됨) 새로운 환경으로 전이됩니다.
특정 단계에서의 액터 모델의 상태는 다음과 같이 쓸 수 있습니다.
이 경우 다음 조건이 성립합니다.
첫번째 조건은 식 안의 자유변수가 유효한 액터명인 것을 나타내며, 두번째 조건은 송신 중 메시지의 수신지가 유효한 액터명인지 나타냅니다.
이 조건들이 성립하지 않는다면, 자유변수가 존재하지 않는 액터를 가르키고 있거나 존재하지 않는 액터에 데이터를 송신하고 있다는 것을 나타냅니다. 자유 변수는 추상과 letrec
을 통해 판단되지만, 이 글에서는 생략하겠습니다.
다음으로 액터 가 개 존재한다면 아래의 식으로 표기할 수 있습니다.
이때 는 각 액터의 식, 는 액터명입니다.
또한, 환경 는 송신 중 데이터의 다중 집합이며, 개의 데이터를 송신 중일 때 는 다음과 같이 표현됩니다.
여기서 은 데이터의 송신을 나타내며, 은 수신 액터를, 은 송신 데이터를 의미합니다. 또한, 송신을 함수로 수신은 로 표기한다면, 각 함수의 정의는 다음과 같습니다.
Send
함수는 송신할 데이터와 타겟 액터를 매개변수로 받습니다. null
을 반환하는 것은 액터 모델은 비동기 처리의 특성과 Fire-and-forget 패턴 때문입니다.
Fire-and-forget 패턴
fire-and-forget 패턴은 발신자가 메시지를 수신자에게 보내고 메시지가 전달되었는지 확인하지 않는 패턴입니다. 수신자는 메시지에 대한 응답을 보낼 수 있지만, 발신자는 응답을 굳이 기다리지 않기 때문에 주로 응답이 필요하지 않거나 높은 처리량이 필요한 분야에서 사용됩니다.
이제 액터 모델을 직접 구현해보겠습니다. 먼저 액터의 생성과 메시지 전파를 다루고, 추가 요소인 구독자(subscriber) 기능도 구현하겠습니다.
액터를 생성하고 메시를 전파하기에 앞서 먼저 액터의 구조체를 정의하겠습니다. Actor
구조체에는 액터를 식별할 ID와 초기 상태, 값 그리고 구독자와 메일 박스가 있습니다. condvar
필드는 조건 변수이며, 다른 스레드가 무언가를 변경하고 마무리할 때까지 스레드를 대기 상태로 만드는데 사용됩니다. 이 필드는 이후 메시지 전달에서 자세히 설명하겠습니다.
struct Actor {
id: usize,
state: RwLock<ActorState>,
value: RwLock<i32>,
subs: RwLock<HashMap<usize, Arc<Actor>>>,
mailbox: Mutex<VecDeque<Message>>,
condvar: Condvar,
}
그 다음 액터의 상태를 정의하는 ActorState
입니다. ActorState::Active
는 말 그대로 현재 활성화 되어 있다는 것을 의미합니다. 액터의 상태는 직접적으로 변경할 수 없고, 메시지를 통해 변경해야 합니다. 예를 들어 상태를 변경하라는 메시지가 전달되거나, 액터가 특정 기준을 넘으면 상태를 전환하는 식을 고려해볼 수 있습니다.
#[derive(Debug, PartialEq, Copy, Clone)]
enum ActorState {
Active,
Inactive,
}
ID는 액터를 식별하는 수단이기 때문에 AtomicUsize
를 이용해 고유 ID를 생성할 것입니다. Actor::new()
로 액터 구조체의 값을 초기화 할 때 같이 ID를 생성하면 됩니다. 이때 .fetch_add(1, Odering::SeqCst)
로 액터가 생성될 때 마다 ID가 1씩 증가하도록 했습니다.
값을 초기화 할 때 state
, value
, subs
는 다른 스레드에서도 읽어야 하기 때문에 RwLock
을 사용했고, 메일박스의 경우 해당 액터에서만 접근하도록 해야 하기 때문에 Mutex
를 사용해서 초기화 했습니다.
메일박스는 VecDeque::with_capacvity()
를 이용해 미리 보관할 메시지의 용량을 지정했고, 메시지를 받을 때마다 메모리를 재할당 하는 것을 피했습니다.
use std::sync::atomic::{AtomicUsize, Ordering};
// 액터의 고유 ID 생성
static ACTOR_ID: AtomicUsize = AtomicUsize::new(0);
static MAILBOX_CAPACITY: usize = 10;
impl Actor {
fn new() -> Arc<Self> {
const INC: usize = 1;
let id = ACTOR_ID.fetch_add(INC, Ordering::SeqCst);
let actor = Actor {
id,
state: RwLock::new(ActorState::Active),
value: RwLock::new(INITIAL_VALUE),
subs: RwLock::new(HashMap::new()),
mailbox: Mutex::new(VecDeque::with_capacity(MAILBOX_CAPACITY)),
condvar: Condvar::new(),
};
Arc::new(actor)
}
}
이제 생성된 액터의 정보를 조작하는 메서드들을 정의하겠습니다. 이 메서드들은 이후 메시지 처리에서 쓰일 것입니다.
impl Actor {
fn get_id(&self) -> usize {
self.id
}
fn get_state(&self) -> Result<ActorState, ActorError> {
let value = self.state.read().unwrap();
Ok(value.to_owned())
}
fn get_value(&self) -> Result<i32, ActorError> {
let value = self.value.read().unwrap();
Ok(value.to_owned())
}
fn set_value(&self, value: i32) -> Result<(), ActorError> {
let mut value_lock = self.value.write().unwrap();
*value_lock = value;
Ok(())
}
}
이렇게 Actor
에 메서드를 계속 추가하는 것도 나쁘진 않지만, 책임의 분산을 위해 액터를 조작하는 인터페이스를 추가했습니다.
ActorPool
은 생성된 액터들의 정보를 저장하는 역할을 하고, 메시지 전송이나 액터의 정보를 가져오는 창구 역할을 합니다.
#[derive(Debug)]
struct ActorPool {
actor_list: Mutex<HashMap<usize, Arc<Actor>>>,
}
해시맵 구조를 가지고 액터의 ID를 키(key)로 생성된 액터들의 정보(Actor
구조체)를 값(value)로 가집니다.
ActorPool
의 구현을 정의하겠습니다. ActorPool::new
로 필드를 초기화 했습니다.
impl ActorPool {
fn new() -> Self {
ActorPool {
actor_list: Mutex::new(HashMap::new()),
}
}
}
이제 기본 설정은 끝냈으니 본격적으로 액터를 구현해볼 차례입니다.
액터를 생성하는 것은 단순합니다. 그냥 액터를 생성하고, 고유한 ID와 기본 값들이 잘 설정되있는지 확인하면 됩니다. 간단히 도표로 표현하면 다음과 같습니다.
먼저 사용자는 CLI와 같은 도구를 사용하여 액터의 생성을 요청합니다. 그 후, 이 요청은 ActorPool
인터페이스로 전달되고 적절하게 처리되어 액터가 생성됩니다. 생성된 액터는 다시 ActorPool
을 통해 사용자에게 자신의 정보인 struct Actor
를 제공합니다.
액터의 생성과 정보 출력을 위한 API는 각각 ActorPool::create_actor
와 ActorPool::get_actor_info(id)
입니다.
ActorPool::create_actor
는 Actor::new
를 사용하여 새로운 액터를 생성한 후, 해당 액터를 ActorPool.actor_list
필드에 저장합니다.
ActorPool::get_actor_info(id)
메서드는 특정 id를 가진 액터를 ActorPool::actor_list
필드에서 검색한 다음, 해당 액터의 정보를 사용자에게 제공하는 역할을 합니다. 이 정보에는 Actor
구조체에 정의된 내용이 모두 포함됩니다.
impl ActorPool {
fn create_actor(&self) -> usize {
let actor = Actor::new(); // 새로운 액터 객체 생성
let id = actor.get_id();
let mut actor_list = self.actor_list.lock().unwrap();
actor_list.insert(id, actor); // 생성한 액터를 `actor_list`에 저장
id
}
fn get_actor_info(&self, actor_id: usize) -> Result<Arc<Arctor>, ActorError> {
// `actor_list`의 정보를 가져오기 위해 lock을 획득합니다.
let actor_list = self.actor_list.lock().unwrap();
let actor = actor_list
.get(&actor_id)
.ok_or(ActorError::TargetActorNotFound(actor_id.to_string()))?;
Ok(actor.to_owned())
}
}
actor_list
는 여러 스레드에서 동시에 접근해야 하는 필드이므로, 안전한 처리를 위해 lock()
을 사용하여 락을 획득해야 합니다. 이를 통해 여러 스레드가 actor_list
필드에 안전하게 접근할 수 있습니다. 예를 들어, ActorPool::create_actor
에서는 필드에 생성된 액터를 추가하기 위해 접근해야 하며, ActorPool::get_actor_info
에서는 필드의 정보를 가져와야 하기 때문에 lock을 획득이 필요합니다.
이제 생성된 액터에 메시지를 보낼 차례입니다. 액터는 상태와 값 뿐만 아니라 받은 메시지를 저장하는 메일 박스도 갖고 있습니다. 메시지는 메일 박스에 순차적으로 쌓이며 적절한 시점에 처리됩니다. 액터는 메시지를 처리하면서 자신의 값과 상태를 업데이트하는 방식으로 동작합니다.
각 액터는 구독자(subscriber)를 가지고 있으며, 액터가 받은 메시지는 반드시 구독자에게도 보내야 합니다. 즉, 액터는 메시지를 처리하는 동안 자신을 구독하고 있는 액터에도 메시지를 전달하는 역할을 수행해야 합니다. 이렇게 함으로써 액터는 자신과 구독자 간에 통신과 상호작용이 이루어집니다.
전체적인 메시지 처리는 다음 흐름으로 나눌 수 있습니다.
ActorPool::create_actor
ActorPoo::message_loop
Actor::add_subscriber
Actor::remove_subscriber
ActorPool::subscribe
Actor::send_message
Actor::propagate_message
Actor::exexute_message
Actor::handle_message
메시지 생성 및 전송
메시지 처리는 액터 생성 부분에서 작성한 ActorPool::create_actor
메서드에서 처리합니다. 이 메서드는 액터를 생성하는 역할을 하지만, 메시지 처리를 위해 별도의 스레드를 생성하고 각 액터의 메일박스로 부터 메시지를 처리하는 역할 또한 수행합니다. 이때 생성한 스레드 내부에서 ActorPool::execute_message
를 호출합니다.
impl ActorPool {
fn create_actor(&self) -> usize {
let actor = Actor::new();
let id = actor.get_id();
let actor_clone = Arc::clone(&actor); // +
// +
std::thread::spawn(move || { // +
actor_clone.execute_messages(); // +
}); // +
let mut actor_list = self.actor_list.lock().unwrap();
actor_list.insert(id, actor);
id
}
fn message_loop(&self, actor_id: usize, message: Message) -> Result<(), ActorError> {
let actor = self.get_actor_info(actor_id)?;
actor.send_message(message)
}
}
Arc::clone
을 이용해 액터의 복제본을 만들었습니다. 이렇게 한 이유는 다른 스레드에서도 Actor
인스턴스를 안전하게 공유하기 위해서입니다.
그 다음, std::thread::spawn
을 사용하여 새 스레드를 생성했습니다. 이 스레드는 Actor
의 메시지 처리(Actor::execute_message
)하는 역할을 담당합니다. 즉, Actor
가 생성되는 즉시 독립적인 스레드에서 실행되며, 자신의 메일박스에서 메시지를 읽고 처리합니다.
독립적인 스레드에서 각 Actor
가 실행되면, Actor
간에 상호 작용이 발생할 때 서로를 차단(blocking)하지 않습니다. 예를 들어, 하나의 액터가 긴 작업을 수행하는 동안에도 다른 액터들은 계속해서 메시지를 처리할 수 있습니다.
업데이트된 Actor::create_actor()
함수를 이용하면 동적으로 액터를 생성하고 메시지를 처리할 수 있습니다. 액터 컨피규레이션 표기법을 사용하면 이 함수의 전이를 다음과 같이 작성할 수 있습니다.
Actor::create_actor
함수는 이라는 액터를 생성하고 반환합니다. 단, 실행 시점의 액터 컨피규레이션이 이면, 이 됩니다. 즉, 위 식을 로 간략히 표현했을때, 이것은 이고 가 되어 새로운 액터 가 액터 컨피규레이션에 추가됐음을 나타냅니다.
생성된 액터는 을 이용해 즉시 메시지를 처리할 수 있는 상태가 됩니다. ActorPool::execute_message
가 인수 의 역할을 한다고 보면 됩니다.
간단히 말해, 이제 Actor::create_actor
는 새로운 액터를 생성하고, 동시에 즉시 메시지를 처리할 수 있게 합니다.
ActorPool::message_loop
는 특정 액터에게 메시지를 전송하는 역할을 합니다. 이 함수는 사용자에게 메시지와 어느 액터에 메시지를 전송할지 ID를 받고, 해당 액터의 Actor::send_message
를 호출하면서 메시지를 전송합니다.
구독 처리
구현할 액터는 메시지를 처리할 때 자신을 구독하고 있는 다른 액터에게도 동일한 메시지를 전송해야 합니다. 이를 위해 액터의 subs
필드에 구독자를 업데이트하는 함수를 작성하겠습니다.
구독자에 가중치를 추가하여 메시지 전송에 우선순위를 두고 싶다면 BTreeMap
이나 우선순위 큐를 사용하여 가중치가 높은 순서대로 정렬한 다음 차례로 처리할 수 있습니다. 하지만, 다행히 여기서 구현할 액터는 이런 시스템이 없기 때문에 단순히 HashMap
을 사용해도 충분합니다.
메시지 업데이트는 간단합니다. 구독 리스트의 락을 획득하고 액터를 해시맵에 추가(삭제)한 다음, 락을 반환하면 됩니다. 이때 subs
필드는 RwLock
으로 래핑되어 있으므로 쓰기 락(.write
)을 사용하여 수정 권한을 얻어야 합니다.
구독자 추가, 삭제는 각각 Actor::add_subscriber
, Actor::remove_subscriber
에 대응됩니다.
impl Actor {
fn add_subscriber(&self, actor: Arc<Actor>) -> Result<Option<Arc<Actor>>, ActorError> {
let mut subs = self.subs.write().unwrap();
if subs.contains_key(&actor.get_id()) {
return Err(ActorError::ActorAlreadyExists(actor.get_id().to_string()));
}
Ok(subs.insert(actor.get_id(), actor))
}
fn remove_subscriber(&self, actor_id: usize) -> Result<(), ActorError> {
let mut subs = self.subs.write().unwrap();
if subs.contains_key(&actor_id) {
subs.remove(&actor_id);
}
Err(ActorError::TargetActorNotFound(actor_id.to_string()))
}
}
구독자 리스트에 넣으려는 액터가 이미 있다면 따로 에러 처리를 했습니다. 반대로 삭제하려는 액터가 없을 때도 에러를 반환하도록 했습니다.
메시지 전파
메시지를 받은 액터는 자신의 값을 업데이트하고, 다른 액터들에게도 메시지를 전송해야 합니다. 메시지를 전송하는 방법은 두 가지가 있습니다. 첫 번째 방법은 특정 조건을 만족하면 메시지를 전송하는 방법이고, 두 번째 방법은 자신과 연결된 액터들에게 동일한 메시지를 전달하는 방법입니다. 이번 구현에서는 두 번째 방법을 사용할 것입니다.
메시지는 Message
타입의 열거형으로 정의되어 있으며, 각각 증가(increment)와 감소(decrement)를 포함합니다. 메시지 타입은 패턴 매칭으로 종류를 파악하고, 메시지에 맞는 함수를 호출합니다. 호출되는 함수는 각각 Actor::increment
, Actor::decrement
에 대응합니다.
Actor::update_value
는 코드 중복을 줄이기 위한 함수입니다. 이 함수는 FnOnce
로 메시지에 대응하는 함수를 호출하고, 액터의 값을 업데이트합니다.
#[derive(Debug, Clone)]
enum Message {
Increment(i32),
Decrement(i32),
}
impl Actor {
fn update_value<F>(&self, modifier: F) -> Result<(), ActorError>
where
// `FnOnce` 트레이트는 클로저를 한 번 호출 할 수 있는 타입으로 제한하는 역할을 합니다.
// 특히, 이 트레이트는 클로저가 소유권을 소비(consume)하는 형태의 호출을 수행할 수 있도록 합니다.
F: FnOnce(i32) -> Result<i32, ActorError>,
{
let mut value = self.get_value()?;
value = modifier(value)?;
self.set_value(value)
}
fn increment(&self, n: i32) -> Result<(), ActorError> {
self.update_value(|value| Ok(value + n))
}
fn decrement(&self, n: i32) -> Result<(), ActorError> {
self.update_value(|value| Ok(value - n))
}
}
이제 메시지를 전달하고, 자신의 구독자에게 전파하는 함수들을 작성해보겠습니다. 각각 Actor::send_message
, Actor::propagate_message
입니다.
impl Actor {
fn send_message(&self, message: Message) -> Result<(), ActorError> {
// 메일박스의 용량이 꽉 찼는지 확인
let mut mailbox = self.mailbox.lock().unwrap();
if mailbox.len() >= mailbox.capacity() {
return Err(ActorError::MailboxOverflow(self.id.to_string()));
}
// 액터가 비활성화(`ActorState::Inactive`) 상태면 메시지를 그냥 보관함.
if self.state.read().unwrap().to_owned() == ActorState::Inactive {
self.mailbox.lock().unwrap().push_back(message.clone());
}
// 메일박스에 보관된 메시지 처리
mailbox.push_back(message.clone());
// 자신을 구독하고 있는 액터에도 메시지 전파
self.propagate_message(message)?;
// 조건 변수를 이용해 메일박스에 메시지가 추가되면
// 스레드를 깨워 `execute_message`가 실행되도록 처리
self.condvar.notify_all();
Ok(())
}
fn propagate_message(&self, message: Message) -> Result<(), ActorError> {
// 현재 액터의 구독 리스트를 가져옴
let subs = self.subs.read().unwrap();
// 구독 리스트에 있는 각 액터에 메시지를 전달
for (_, actor) in subs.iter() {
actor.send_message(message.clone())?;
}
Ok(())
}
}
마지막으로 메시지 처리입니다. 메시지 처리는 Actor::execute_message
와 Actor::handle_message
메서드를 통해 처리됩니다.
Actor::execute_messages
메서드는 루프를 돌면서 메일박스의 보관된 메시지를 읽고 처리하는 함수입니다. 이때도 조건 변수를 사용해 메일박스에 메시지가 없으면 스레드를 대기 상태로 두고, 메시지가 있는 경우에만 처리하도록 처리했습니다.
impl Actor {
fn execute_messages(&self) {
loop {
let mut mailbox = self.mailbox.lock().unwrap();
// 메일박스에 메시지가 들어올 때까지 대기 상태 유지
if mailbox.is_empty() {
mailbox = self.condvar.wait(mailbox).unwrap();
}
// 메시지가 들어오면 가장 먼저 들어온 메시지 소비(consume)
while let Some(msg) = mailbox.pop_front() {
self.handle_message(msg).unwrap();
}
}
}
}
Actor::execute_messages
에서 메시지를 읽을 때 메시지의 종류를 매핑하는 함수는 Actor::handle_messages
입니다. 단순하게 패턴 매칭으로 처리하고, 각 메시지에 해당하는 핸들러 함수를 호출합니다. 각각 이전에 작성한 Actor::increment
와 Actor::decrement
입니다.
메시지를 송수신 하는 것을 액터 컨피규레이션으로 표현하면 다음과 같이 표현할 수 있습니다. 여기서는 액터 A가 액터 B로 메시지를 보내는 상황을 가정했습니다.
impl Actor {
fn handle_message(&self, message: Message) -> Result<(), ActorError> {
// 패턴 매칭을 통해 각 메시지에 해당하는 함수 호출
let result = match message {
Message::Increment(n) => self.increment(n),
Message::Decrement(n) => self.decrement(n),
};
// 조건 변수를 이용해 메시지가 처리되었음을
// `execute_messages` 스레드에 알림
self.condvar.notify_all();
result
}
}
이제 액터끼리 메시지를 주고받을 수 있습니다. 그러나 액터가 서로 구독하고 있다면 서로 무한히 메시지를 전달할 가능성이 있으며, 이는 잠재적으로 오버플로우를 발생시킬 수 있습니다. 이 문제를 해결하려면 액터가 일정 값에 도달하면 값을 리셋하거나, 메시지 처리를 중단하거나, 메시지에 ID를 추가하여 메시지당 처리 횟수를 한 번만 하도록 할 수 있습니다.
그러나 저는 액터를 구독 리스트에 추가할 때 미리 서로 구독하고 있는지 확인하는 알고리즘을 추가하기로 했습니다. 각 액터를 노드로, 구독하고 있는 것을 간선으로 둔다면 유향 그래프 구조가 만들어집니다. 그런 다음 이 그래프에 대해 사이클을 판별하는 알고리즘을 작성하고, 구독 추가 로직에 추가하기만 하면 됩니다. 그러나 여기서 사이클 감지 알고리즘을 설명하면 내용이 길어지기 때문에 이 내용은 다음 편으로 미루겠습니다.
이것으로 액터 모델의 구현의 특징과 동시성, 액터 컨피규레이션 표기법 그리고 구현하는 방법을 알아봤습니다.
먼저 액터 모델의 특징에 대해 알아보았습니다. 액터 모델은 동시성 처리를 위한 모델이며, 각각 독립적으로 동작하는 개체들로 구성되며, 서로 메시지를 주고받으며 상호작용합니다.
액터 모델을 동시성 처리에 사용하면 확장성과 비동기 처리에 큰 이점을 얻을 수 있습니다. 왜냐하면 액터 모델은 수평적으로 확장이 가능하며, 여러 인스턴스를 생성할 수 있기 때문에 처리량을 선형적으로 늘릴 수 있기 때문입니다.
또한 기본적으로 비동기 메시지 전달을 기반으로 동작하기 때문에 블로킹 되지 않고 작업을 수행할 수 있다는 특징도 있었습니다.
이 정도 구현만 해도 액터는 충분히 돌아가지만, 다음 글에서는 값의 오버플로우를 방지할 순환 참조(구독) 탐지 알고리즘을 작성하겠습니다.