액터 모델과 동시성 제어 - 액터 구현

notJoon·2023년 6월 30일
1

액터 모델

목록 보기
2/3
post-thumbnail

액터 모델과 동시성 제어

소개

액터 모델 구현 두번째 시간입니다. 이전 포스팅에서 동시성 제어와 각각의 특징들을 살펴보았습니다. 이번 포스팅에서는 액터 모델의 특징을 알아보고 러스트로 직접 구현해보는 시간을 가지도록 하겠습니다. 전체 코드는 링크를 참고해주세요.

액터 모델

액터 모델은 동시성 프로그래밍 모델 중 하나 입니다. 이 모델은 시스템을 독립적인 액터들의 집합으로 보며, 각 액터는 비동기로 메시지를 주고받으며 상호작용합니다. 액터 모델은 다음 세가지 요소로 구성됩니다.

  • 액터(Actor) : 시스템의 기본 단위로, 메시지를 주고받으며 상호작용할 수 있는 독립적인 계산 엔티티입니다. 각 액터는 자신만의 상태를 가지고 있지만, 상태를 외부에서 직접 변경하는 것은 불가능합니다. 상태 변경은 오직 액터가 받은 메시지에 의해서만 가능합니다.
  • 메시지(Message) : 액터들 간의 상호작용은 메시지를 통해서만 이루어집니다. 메시지는 비동기적으로 전달됩니다. 어떤 액터 A가 다른 액터 B(수신 액터)로 메시지를 전송하면, 수신 액터 B는 그 메시지를 처리하기 위해 적절한 연산을 수행합니다. 메시지 전달은 액터 간에 직접적인 데이터 공유가 없기 때문에 경쟁 조건(race condition)과 같은 동시성 문제를 피할 수 있습니다.
  • 메일 박스(Mailbox) : 각 액터는 전달받은 메시지를 보관하는 메일박스를 가지고 있습니다. 메시지는 전달받은 순서대로 메일박스에 저장되며, 액터에서 처리할 준비가 됐을 때 순차적으로 처리됩니다.이러한 기능 덕분에 시스템의 병렬성이나 효율성을 높일 수 있습니다.

액터 모델은 이메일 시스템에 비유할 수 있습니다. 각 사용자(액터)는 자신의 이메일 계정(상태)과 메일 박스를 가지고 있고, 메일을 통해 서로 메시지를 주고 받습니다. 메일을 받은 사람은 자신이 편할 때 처리할 수 있지만(비동기) 보낸 사람은 알 수 없습니다(물론 메일을 읽은 시간을 표시하는 기능 가진 서비스도 있습니다). 이렇게 각 액터는 독립적으로 동작하며, 서로의 정보에 직접 접근하지 않고 메시지를 통해 상호작용합니다.

Actor Configuration

액터 모델에서 계산은 각 액터의 상태와 값(환경)을 변경하면서 실행됩니다. 따라서 특정 단계에서의 모델 상태를 파악하는 것이 중요한데, 이 상태를 표기하는 것이 바로 액터 컨피규레이션(actor configuration)입니다. 계산을 실행할 때는 액터 컨피규레이션을 조작하면서 동작합니다.

액터 컨피규레이션의 정의는 다음과 같습니다. α\alpha는 액터 이름에서 식으로의 사상(projection)이며, EE는 환경입니다. 예를 들어 함수에서 데이터 xx가 액터 α\alpha로 송신되면 환경 EE는 다음과 같이 전이(transfer)됩니다.

EE{αx}E \rightarrow E \cup \{ \langle \alpha \Leftarrow x \rangle\}

여기서 αx\langle \alpha \Leftarrow x \rangleα\alpha에 데이터 xx를 보내는 것을 의미합니다. 이 메시지를 통해 기존 환경 EE에 변화된 것을 추가해(\cup으로 표현 됨) 새로운 환경으로 전이됩니다.

특정 단계에서의 액터 모델의 상태는 다음과 같이 쓸 수 있습니다.

α  E\alpha \space \| \space E

이 경우 다음 조건이 성립합니다.

adom(α): fv(α(a))dom(α)av: {a}fv(v)dom(α)\forall a \in dom(\alpha) : \space fv(\alpha(a)) \subseteq dom(\alpha) \\ \forall \langle a \Leftarrow v \rangle : \space \{a\} \cup fv(v) \subseteq dom(\alpha)

첫번째 조건은 식 안의 자유변수가 유효한 액터명인 것을 나타내며, 두번째 조건은 송신 중 메시지의 수신지가 유효한 액터명인지 나타냅니다.

이 조건들이 성립하지 않는다면, 자유변수가 존재하지 않는 액터를 가르키고 있거나 존재하지 않는 액터에 데이터를 송신하고 있다는 것을 나타냅니다. 자유 변수는 λ\lambda 추상과 letrec을 통해 판단되지만, 이 글에서는 생략하겠습니다.

다음으로 액터 α\alphann개 존재한다면 아래의 식으로 표기할 수 있습니다.

α=[A0]v0, [A1]v1, ..., [An1]vn1\alpha = [A_0]_{v_0}, \space [A_1]_{v_1}, \space ..., \space [A_{n-1}]_{v_{n-1}}

이때 Ai  i{0, ..., n1}A_i \space \vert \space i \in \{0, \space ..., \space n - 1\}는 각 액터의 식, vi  i{0, , n1}v_i \space \vert \space i \in \{0, \space …, \space n-1\}는 액터명입니다.

또한, 환경 EE는 송신 중 데이터의 다중 집합이며, mm개의 데이터를 송신 중일 때 EE는 다음과 같이 표현됩니다.

E={d0e0, d1e1, ..., dm1em1}E = \{\langle d_0 \Leftarrow e_0 \rangle, \space \langle d_1 \Leftarrow e_1 \rangle, \space ..., \space \langle d_{m-1} \Leftarrow e_{m-1} \rangle \}

여기서 dstdata\langle dst \Leftarrow data \rangle은 데이터의 송신을 나타내며, di  i{0,,m1}d_i \space \vert \space i \in \{0, …, m-1\}은 수신 액터를, ei  i{0,,m1}e_i \space \vert \space i \in \{0, …, m-1\}은 송신 데이터를 의미합니다. 또한, 송신을 Send(data,actor)Send(data, actor) 함수로 수신은 Recv(message)Recv(message)로 표기한다면, 각 함수의 정의는 다음과 같습니다.

Send(data,actor)nullRecv(λy.M)(λy.M)xSend(data, actor) \rightarrow null \newline Recv(\lambda y.M) \rightarrow (\lambda y.M) x

Send 함수는 송신할 데이터와 타겟 액터를 매개변수로 받습니다. null을 반환하는 것은 액터 모델은 비동기 처리의 특성과 Fire-and-forget 패턴 때문입니다.

Fire-and-forget 패턴
fire-and-forget 패턴은 발신자가 메시지를 수신자에게 보내고 메시지가 전달되었는지 확인하지 않는 패턴입니다. 수신자는 메시지에 대한 응답을 보낼 수 있지만, 발신자는 응답을 굳이 기다리지 않기 때문에 주로 응답이 필요하지 않거나 높은 처리량이 필요한 분야에서 사용됩니다.

액터 모델 구현

이제 액터 모델을 직접 구현해보겠습니다. 먼저 액터의 생성과 메시지 전파를 다루고, 추가 요소인 구독자(subscriber) 기능도 구현하겠습니다.

기본 설정

액터를 생성하고 메시를 전파하기에 앞서 먼저 액터의 구조체를 정의하겠습니다. Actor 구조체에는 액터를 식별할 ID와 초기 상태, 값 그리고 구독자와 메일 박스가 있습니다. condvar 필드는 조건 변수이며, 다른 스레드가 무언가를 변경하고 마무리할 때까지 스레드를 대기 상태로 만드는데 사용됩니다. 이 필드는 이후 메시지 전달에서 자세히 설명하겠습니다.

struct Actor {
	id: usize,
    state: RwLock<ActorState>,
    value: RwLock<i32>,
    subs: RwLock<HashMap<usize, Arc<Actor>>>,
    mailbox: Mutex<VecDeque<Message>>,
	condvar: Condvar,
}

그 다음 액터의 상태를 정의하는 ActorState 입니다. ActorState::Active는 말 그대로 현재 활성화 되어 있다는 것을 의미합니다. 액터의 상태는 직접적으로 변경할 수 없고, 메시지를 통해 변경해야 합니다. 예를 들어 상태를 변경하라는 메시지가 전달되거나, 액터가 특정 기준을 넘으면 상태를 전환하는 식을 고려해볼 수 있습니다.

#[derive(Debug, PartialEq, Copy, Clone)]
enum ActorState {
    Active,
    Inactive,
}

ID는 액터를 식별하는 수단이기 때문에 AtomicUsize를 이용해 고유 ID를 생성할 것입니다. Actor::new()로 액터 구조체의 값을 초기화 할 때 같이 ID를 생성하면 됩니다. 이때 .fetch_add(1, Odering::SeqCst)로 액터가 생성될 때 마다 ID가 1씩 증가하도록 했습니다.

값을 초기화 할 때 state, value, subs는 다른 스레드에서도 읽어야 하기 때문에 RwLock을 사용했고, 메일박스의 경우 해당 액터에서만 접근하도록 해야 하기 때문에 Mutex를 사용해서 초기화 했습니다.

메일박스는 VecDeque::with_capacvity()를 이용해 미리 보관할 메시지의 용량을 지정했고, 메시지를 받을 때마다 메모리를 재할당 하는 것을 피했습니다.

use std::sync::atomic::{AtomicUsize, Ordering};

// 액터의 고유 ID 생성
static ACTOR_ID: AtomicUsize = AtomicUsize::new(0);
static MAILBOX_CAPACITY: usize = 10;

impl Actor {
	fn new() -> Arc<Self> {
        const INC: usize = 1;
        let id = ACTOR_ID.fetch_add(INC, Ordering::SeqCst);

        let actor = Actor {
            id,
            state: RwLock::new(ActorState::Active),
            value: RwLock::new(INITIAL_VALUE),
            subs: RwLock::new(HashMap::new()),
            mailbox: Mutex::new(VecDeque::with_capacity(MAILBOX_CAPACITY)),
			condvar: Condvar::new(),
        };

        Arc::new(actor)
    }
}

이제 생성된 액터의 정보를 조작하는 메서드들을 정의하겠습니다. 이 메서드들은 이후 메시지 처리에서 쓰일 것입니다.

impl Actor {
		fn get_id(&self) -> usize {
        self.id
    }

    fn get_state(&self) -> Result<ActorState, ActorError> {
        let value = self.state.read().unwrap();
        Ok(value.to_owned())
    }

    fn get_value(&self) -> Result<i32, ActorError> {
        let value = self.value.read().unwrap();
        Ok(value.to_owned())
    }

	fn set_value(&self, value: i32) -> Result<(), ActorError> {
        let mut value_lock = self.value.write().unwrap();
        *value_lock = value;

        Ok(())
    }
}

이렇게 Actor에 메서드를 계속 추가하는 것도 나쁘진 않지만, 책임의 분산을 위해 액터를 조작하는 인터페이스를 추가했습니다.

ActorPool은 생성된 액터들의 정보를 저장하는 역할을 하고, 메시지 전송이나 액터의 정보를 가져오는 창구 역할을 합니다.

#[derive(Debug)]
struct ActorPool {
    actor_list: Mutex<HashMap<usize, Arc<Actor>>>,
}

해시맵 구조를 가지고 액터의 ID를 키(key)로 생성된 액터들의 정보(Actor 구조체)를 값(value)로 가집니다.

ActorPool의 구현을 정의하겠습니다. ActorPool::new로 필드를 초기화 했습니다.

impl ActorPool {
    fn new() -> Self {
        ActorPool {
	        actor_list: Mutex::new(HashMap::new()),
	    }
    }
}

이제 기본 설정은 끝냈으니 본격적으로 액터를 구현해볼 차례입니다.

액터 생성

액터를 생성하는 것은 단순합니다. 그냥 액터를 생성하고, 고유한 ID와 기본 값들이 잘 설정되있는지 확인하면 됩니다. 간단히 도표로 표현하면 다음과 같습니다.

먼저 사용자는 CLI와 같은 도구를 사용하여 액터의 생성을 요청합니다. 그 후, 이 요청은 ActorPool 인터페이스로 전달되고 적절하게 처리되어 액터가 생성됩니다. 생성된 액터는 다시 ActorPool을 통해 사용자에게 자신의 정보인 struct Actor를 제공합니다.

액터의 생성과 정보 출력을 위한 API는 각각 ActorPool::create_actorActorPool::get_actor_info(id)입니다.

ActorPool::create_actorActor::new를 사용하여 새로운 액터를 생성한 후, 해당 액터를 ActorPool.actor_list 필드에 저장합니다.

ActorPool::get_actor_info(id) 메서드는 특정 id를 가진 액터를 ActorPool::actor_list 필드에서 검색한 다음, 해당 액터의 정보를 사용자에게 제공하는 역할을 합니다. 이 정보에는 Actor 구조체에 정의된 내용이 모두 포함됩니다.

impl ActorPool {
	fn create_actor(&self) -> usize {
		let actor = Actor::new();     // 새로운 액터 객체 생성
		let id = actor.get_id();
		
		let mut actor_list = self.actor_list.lock().unwrap();
		actor_list.insert(id, actor); // 생성한 액터를 `actor_list`에 저장

		id
	}

	fn get_actor_info(&self, actor_id: usize) -> Result<Arc<Arctor>, ActorError> {
		// `actor_list`의 정보를 가져오기 위해 lock을 획득합니다.
		let actor_list = self.actor_list.lock().unwrap();
		let actor = actor_list
				.get(&actor_id)
				.ok_or(ActorError::TargetActorNotFound(actor_id.to_string()))?;

		Ok(actor.to_owned())
	}
}

actor_list는 여러 스레드에서 동시에 접근해야 하는 필드이므로, 안전한 처리를 위해 lock()을 사용하여 락을 획득해야 합니다. 이를 통해 여러 스레드가 actor_list 필드에 안전하게 접근할 수 있습니다. 예를 들어, ActorPool::create_actor에서는 필드에 생성된 액터를 추가하기 위해 접근해야 하며, ActorPool::get_actor_info에서는 필드의 정보를 가져와야 하기 때문에 lock을 획득이 필요합니다.

메시지 처리

이제 생성된 액터에 메시지를 보낼 차례입니다. 액터는 상태와 값 뿐만 아니라 받은 메시지를 저장하는 메일 박스도 갖고 있습니다. 메시지는 메일 박스에 순차적으로 쌓이며 적절한 시점에 처리됩니다. 액터는 메시지를 처리하면서 자신의 값과 상태를 업데이트하는 방식으로 동작합니다.

각 액터는 구독자(subscriber)를 가지고 있으며, 액터가 받은 메시지는 반드시 구독자에게도 보내야 합니다. 즉, 액터는 메시지를 처리하는 동안 자신을 구독하고 있는 액터에도 메시지를 전달하는 역할을 수행해야 합니다. 이렇게 함으로써 액터는 자신과 구독자 간에 통신과 상호작용이 이루어집니다.

전체적인 메시지 처리는 다음 흐름으로 나눌 수 있습니다.

  1. 메시지 생성 및 전송 (Send)
    1. ActorPool::create_actor
    2. ActorPoo::message_loop
  2. 구독 처리 (Subscribe)
    1. Actor::add_subscriber
    2. Actor::remove_subscriber
    3. ActorPool::subscribe
  3. 메시지 전파 (Propagate)
    1. Actor::send_message
    2. Actor::propagate_message
  4. 처리 (Handle)
    1. Actor::exexute_message
    2. Actor::handle_message

메시지 생성 및 전송
메시지 처리는 액터 생성 부분에서 작성한 ActorPool::create_actor 메서드에서 처리합니다. 이 메서드는 액터를 생성하는 역할을 하지만, 메시지 처리를 위해 별도의 스레드를 생성하고 각 액터의 메일박스로 부터 메시지를 처리하는 역할 또한 수행합니다. 이때 생성한 스레드 내부에서 ActorPool::execute_message를 호출합니다.

impl ActorPool {
	fn create_actor(&self) -> usize {
        let actor = Actor::new();
        let id = actor.get_id();
        let actor_clone = Arc::clone(&actor);  // +
				                               // +
        std::thread::spawn(move || {           // +
            actor_clone.execute_messages();    // +
        });                                    // +

        let mut actor_list = self.actor_list.lock().unwrap();
        actor_list.insert(id, actor);

        id
    }

	fn message_loop(&self, actor_id: usize, message: Message) -> Result<(), ActorError> {
        let actor = self.get_actor_info(actor_id)?;
        actor.send_message(message)
    }
}

Arc::clone을 이용해 액터의 복제본을 만들었습니다. 이렇게 한 이유는 다른 스레드에서도 Actor 인스턴스를 안전하게 공유하기 위해서입니다.

그 다음, std::thread::spawn을 사용하여 새 스레드를 생성했습니다. 이 스레드는 Actor의 메시지 처리(Actor::execute_message)하는 역할을 담당합니다. 즉, Actor가 생성되는 즉시 독립적인 스레드에서 실행되며, 자신의 메일박스에서 메시지를 읽고 처리합니다.

독립적인 스레드에서 각 Actor가 실행되면, Actor 간에 상호 작용이 발생할 때 서로를 차단(blocking)하지 않습니다. 예를 들어, 하나의 액터가 긴 작업을 수행하는 동안에도 다른 액터들은 계속해서 메시지를 처리할 수 있습니다.

업데이트된 Actor::create_actor() 함수를 이용하면 동적으로 액터를 생성하고 메시지를 처리할 수 있습니다. 액터 컨피규레이션 표기법을 사용하면 이 함수의 전이를 다음과 같이 작성할 수 있습니다.

[new(M)]a  E[a]a, [recv(M)]a  E[new(M)]_a \space || \space E \rightarrow [a']_a, \space [recv(M)]_{a'} \space || \space E

Actor::create_actor 함수는 aa’이라는 액터를 생성하고 반환합니다. 단, 실행 시점의 액터 컨피규레이션이 α  E\alpha \space \| \space E이면, adom(α)a’ \notin dom(\alpha)이 됩니다. 즉, 위 식을 α  E[new:a, a’] α  E\alpha \space \| \space E \overset{\text{[new:a, a']}}{\longrightarrow} \space \alpha' \space \| \space E로 간략히 표현했을때, 이것은 adom(α)a’ \notin dom(\alpha)이고 adom(α)a’ \in dom(\alpha')가 되어 새로운 액터 aa’가 액터 컨피규레이션에 추가됐음을 나타냅니다.

생성된 액터는 MM을 이용해 즉시 메시지를 처리할 수 있는 상태가 됩니다. ActorPool::execute_message가 인수 MM의 역할을 한다고 보면 됩니다.

간단히 말해, 이제 Actor::create_actor는 새로운 액터를 생성하고, 동시에 즉시 메시지를 처리할 수 있게 합니다.

ActorPool::message_loop는 특정 액터에게 메시지를 전송하는 역할을 합니다. 이 함수는 사용자에게 메시지와 어느 액터에 메시지를 전송할지 ID를 받고, 해당 액터의 Actor::send_message를 호출하면서 메시지를 전송합니다.


구독 처리

구현할 액터는 메시지를 처리할 때 자신을 구독하고 있는 다른 액터에게도 동일한 메시지를 전송해야 합니다. 이를 위해 액터의 subs 필드에 구독자를 업데이트하는 함수를 작성하겠습니다.

구독자에 가중치를 추가하여 메시지 전송에 우선순위를 두고 싶다면 BTreeMap이나 우선순위 큐를 사용하여 가중치가 높은 순서대로 정렬한 다음 차례로 처리할 수 있습니다. 하지만, 다행히 여기서 구현할 액터는 이런 시스템이 없기 때문에 단순히 HashMap을 사용해도 충분합니다.

메시지 업데이트는 간단합니다. 구독 리스트의 락을 획득하고 액터를 해시맵에 추가(삭제)한 다음, 락을 반환하면 됩니다. 이때 subs 필드는 RwLock으로 래핑되어 있으므로 쓰기 락(.write)을 사용하여 수정 권한을 얻어야 합니다.

구독자 추가, 삭제는 각각 Actor::add_subscriber, Actor::remove_subscriber에 대응됩니다.

impl Actor {
		fn add_subscriber(&self, actor: Arc<Actor>) -> Result<Option<Arc<Actor>>, ActorError> {
	      let mut subs = self.subs.write().unwrap();
	
	      if subs.contains_key(&actor.get_id()) {
	          return Err(ActorError::ActorAlreadyExists(actor.get_id().to_string()));
	      }
	
	      Ok(subs.insert(actor.get_id(), actor))
	  }
	
	  fn remove_subscriber(&self, actor_id: usize) -> Result<(), ActorError> {
	      let mut subs = self.subs.write().unwrap();
	
	      if subs.contains_key(&actor_id) {
	          subs.remove(&actor_id);
	      }
	
	      Err(ActorError::TargetActorNotFound(actor_id.to_string()))
	  }
}

구독자 리스트에 넣으려는 액터가 이미 있다면 따로 에러 처리를 했습니다. 반대로 삭제하려는 액터가 없을 때도 에러를 반환하도록 했습니다.


메시지 전파

메시지를 받은 액터는 자신의 값을 업데이트하고, 다른 액터들에게도 메시지를 전송해야 합니다. 메시지를 전송하는 방법은 두 가지가 있습니다. 첫 번째 방법은 특정 조건을 만족하면 메시지를 전송하는 방법이고, 두 번째 방법은 자신과 연결된 액터들에게 동일한 메시지를 전달하는 방법입니다. 이번 구현에서는 두 번째 방법을 사용할 것입니다.

메시지는 Message 타입의 열거형으로 정의되어 있으며, 각각 증가(increment)와 감소(decrement)를 포함합니다. 메시지 타입은 패턴 매칭으로 종류를 파악하고, 메시지에 맞는 함수를 호출합니다. 호출되는 함수는 각각 Actor::increment, Actor::decrement에 대응합니다.

Actor::update_value는 코드 중복을 줄이기 위한 함수입니다. 이 함수는 FnOnce로 메시지에 대응하는 함수를 호출하고, 액터의 값을 업데이트합니다.

#[derive(Debug, Clone)]
enum Message {
	Increment(i32),
	Decrement(i32),
}

impl Actor {
	fn update_value<F>(&self, modifier: F) -> Result<(), ActorError>
    where
	// `FnOnce` 트레이트는 클로저를 한 번 호출 할 수 있는 타입으로 제한하는 역할을 합니다.
	// 특히, 이 트레이트는 클로저가 소유권을 소비(consume)하는 형태의 호출을 수행할 수 있도록 합니다.
        F: FnOnce(i32) -> Result<i32, ActorError>,
    {
        let mut value = self.get_value()?;
        value = modifier(value)?;

        self.set_value(value)
    }

    fn increment(&self, n: i32) -> Result<(), ActorError> {
        self.update_value(|value| Ok(value + n))
    }

    fn decrement(&self, n: i32) -> Result<(), ActorError> {
        self.update_value(|value| Ok(value - n))
    }
}

이제 메시지를 전달하고, 자신의 구독자에게 전파하는 함수들을 작성해보겠습니다. 각각 Actor::send_message, Actor::propagate_message입니다.

impl Actor {
	fn send_message(&self, message: Message) -> Result<(), ActorError> {
		// 메일박스의 용량이 꽉 찼는지 확인
        let mut mailbox = self.mailbox.lock().unwrap();
        if mailbox.len() >= mailbox.capacity() {
            return Err(ActorError::MailboxOverflow(self.id.to_string()));
        }

		// 액터가 비활성화(`ActorState::Inactive`) 상태면 메시지를 그냥 보관함.
        if self.state.read().unwrap().to_owned() == ActorState::Inactive {
            self.mailbox.lock().unwrap().push_back(message.clone());
        }
	
		// 메일박스에 보관된 메시지 처리
        mailbox.push_back(message.clone());

		// 자신을 구독하고 있는 액터에도 메시지 전파
        self.propagate_message(message)?;

		// 조건 변수를 이용해 메일박스에 메시지가 추가되면 
        // 스레드를 깨워 `execute_message`가 실행되도록 처리
        self.condvar.notify_all();

        Ok(())
    }

    fn propagate_message(&self, message: Message) -> Result<(), ActorError> {
		// 현재 액터의 구독 리스트를 가져옴
        let subs = self.subs.read().unwrap();

		// 구독 리스트에 있는 각 액터에 메시지를 전달
        for (_, actor) in subs.iter() {
            actor.send_message(message.clone())?;
        }

        Ok(())
    }
}

마지막으로 메시지 처리입니다. 메시지 처리는 Actor::execute_messageActor::handle_message 메서드를 통해 처리됩니다.

Actor::execute_messages 메서드는 루프를 돌면서 메일박스의 보관된 메시지를 읽고 처리하는 함수입니다. 이때도 조건 변수를 사용해 메일박스에 메시지가 없으면 스레드를 대기 상태로 두고, 메시지가 있는 경우에만 처리하도록 처리했습니다.

impl Actor {
	fn execute_messages(&self) {
        loop {
            let mut mailbox = self.mailbox.lock().unwrap();

            // 메일박스에 메시지가 들어올 때까지 대기 상태 유지
            if mailbox.is_empty() {
                mailbox = self.condvar.wait(mailbox).unwrap();
            }
						
			// 메시지가 들어오면 가장 먼저 들어온 메시지 소비(consume)
            while let Some(msg) = mailbox.pop_front() {
                self.handle_message(msg).unwrap();
            }
        }
    }
}

Actor::execute_messages에서 메시지를 읽을 때 메시지의 종류를 매핑하는 함수는 Actor::handle_messages입니다. 단순하게 패턴 매칭으로 처리하고, 각 메시지에 해당하는 핸들러 함수를 호출합니다. 각각 이전에 작성한 Actor::incrementActor::decrement입니다.

메시지를 송수신 하는 것을 액터 컨피규레이션으로 표현하면 다음과 같이 표현할 수 있습니다. 여기서는 액터 A가 액터 B로 메시지를 보내는 상황을 가정했습니다.

[Send(x,b)]a, [Recv(λy.M)]b  { }[null]a, [Recv(λy.M)]b  {bx}[null]a, [(λy.Mx)]b  { }[Send(x, b)]_a, \space [Recv(\lambda y.M)]_b \space \| \space \{\space\} \rightarrow [null]_a, \space [Recv(\lambda y.M)]_b \space \| \space \{\langle b \Leftarrow x \rangle\} \newline \rightarrow [null]_a, \space [(\lambda y.M x)]_b \space \| \space \{\space\}
impl Actor {
		fn handle_message(&self, message: Message) -> Result<(), ActorError> {
		// 패턴 매칭을 통해 각 메시지에 해당하는 함수 호출
        let result = match message {
            Message::Increment(n) => self.increment(n),
            Message::Decrement(n) => self.decrement(n),
        };

		// 조건 변수를 이용해 메시지가 처리되었음을 
		// `execute_messages` 스레드에 알림
        self.condvar.notify_all();

        result
    }
}

이제 액터끼리 메시지를 주고받을 수 있습니다. 그러나 액터가 서로 구독하고 있다면 서로 무한히 메시지를 전달할 가능성이 있으며, 이는 잠재적으로 오버플로우를 발생시킬 수 있습니다. 이 문제를 해결하려면 액터가 일정 값에 도달하면 값을 리셋하거나, 메시지 처리를 중단하거나, 메시지에 ID를 추가하여 메시지당 처리 횟수를 한 번만 하도록 할 수 있습니다.

그러나 저는 액터를 구독 리스트에 추가할 때 미리 서로 구독하고 있는지 확인하는 알고리즘을 추가하기로 했습니다. 각 액터를 노드로, 구독하고 있는 것을 간선으로 둔다면 유향 그래프 구조가 만들어집니다. 그런 다음 이 그래프에 대해 사이클을 판별하는 알고리즘을 작성하고, 구독 추가 로직에 추가하기만 하면 됩니다. 그러나 여기서 사이클 감지 알고리즘을 설명하면 내용이 길어지기 때문에 이 내용은 다음 편으로 미루겠습니다.

정리

이것으로 액터 모델의 구현의 특징과 동시성, 액터 컨피규레이션 표기법 그리고 구현하는 방법을 알아봤습니다.

먼저 액터 모델의 특징에 대해 알아보았습니다. 액터 모델은 동시성 처리를 위한 모델이며, 각각 독립적으로 동작하는 개체들로 구성되며, 서로 메시지를 주고받으며 상호작용합니다.

액터 모델을 동시성 처리에 사용하면 확장성과 비동기 처리에 큰 이점을 얻을 수 있습니다. 왜냐하면 액터 모델은 수평적으로 확장이 가능하며, 여러 인스턴스를 생성할 수 있기 때문에 처리량을 선형적으로 늘릴 수 있기 때문입니다.

또한 기본적으로 비동기 메시지 전달을 기반으로 동작하기 때문에 블로킹 되지 않고 작업을 수행할 수 있다는 특징도 있었습니다.

이 정도 구현만 해도 액터는 충분히 돌아가지만, 다음 글에서는 값의 오버플로우를 방지할 순환 참조(구독) 탐지 알고리즘을 작성하겠습니다.


같이 보기

  1. 동시성 프로그래밍 (다카노 유키, 옮긴이: 김모세, 2022, 한빛미디어, p.313-340)
profile
Uncertified Quasi-polyglot pseudo dev

0개의 댓글