https://github.com/lablup/raftify 에 나와있는 코드를 정리해 보겠습니다.
lib.rs
:lib.rs
는 라이브러리의 진입점입니다. 이 파일에서 Raft 알고리즘의 주요 모듈들이 어떻게 연결되고 있는지 확인할 수 있습니다.이 코드는 Rust로 작성된 Raft 알고리즘 기반의 분산 시스템 라이브러리의 주요 모듈을 구성하고, 필요한 기능을 외부에 노출하기 위한 설정 코드입니다. 각 부분이 무엇을 하는지 살펴보겠습니다.
#[macro_use]
extern crate async_trait;
async_trait
: 이 매크로는 비동기 트레이트를 정의할 수 있게 해줍니다. Rust에서는 트레이트 메서드에서 async
를 사용할 수 없기 때문에, 이 매크로를 사용하여 비동기 트레이트를 정의합니다.mod config;
mod error;
mod formatter;
mod log_entry;
mod peer;
mod peers;
mod raft_bootstrapper;
mod raft_client;
mod raft_node;
mod raft_server;
mod state_machine;
mod storage;
mod utils;
mod request;
mod response;
mod
키워드를 사용하여 여러 모듈을 정의하고 있습니다. 이들은 Raft 알고리즘의 다양한 부분을 구현하는 파일들로, 각각의 모듈이 특정 기능을 담당합니다.raft_node
모듈은 Raft 노드의 로직을 구현하고, storage
모듈은 데이터를 영구적으로 저장하는 방식을 정의합니다.pub mod cli;
pub mod cluster_join_ticket;
pub mod raft_service;
pub
으로 정의되어 있어 외부에서 접근할 수 있습니다. 이들은 라이브러리 사용자에게 노출되는 주요 기능들을 제공할 것입니다.cli
: 명령줄 인터페이스와 관련된 기능cluster_join_ticket
: 클러스터에 노드를 추가하는 티켓 시스템raft_service
: Raft 서비스를 관리하는 기능pub use {
async_trait::async_trait, bincode, formatter::CustomFormatter, jopemachine_raft as raft,
raft::Config as RaftConfig, tonic, tonic::transport::Channel,
};
pub use
는 외부와 내부에서 자주 사용하는 기능들을 쉽게 접근할 수 있도록 재노출하는 구문입니다. async_trait
, bincode
, tonic
같은 외부 크레이트와 라이브러리의 일부를 가져와 다시 노출합니다.jopemachine_raft
라는 외부의 Raft 라이브러리를 raft
라는 이름으로 가져와 사용하고 있습니다.pub use crate::{
cluster_join_ticket::ClusterJoinTicket,
config::Config,
error::{Error, Result},
log_entry::AbstractLogEntry,
peer::Peer,
peers::Peers,
raft_bootstrapper::Raft,
raft_client::create_client,
raft_node::{role::InitialRole, RaftNode},
raft_service::raft_service_client::RaftServiceClient,
request::common::confchange_request::ConfChangeRequest,
state_machine::AbstractStateMachine,
storage::heed_storage::HeedStorage,
storage::StableStorage,
};
RaftNode
는 Raft 알고리즘의 노드를 나타내며, RaftServiceClient
는 클라이언트와 통신하는 서비스입니다.Config
는 Raft 시스템의 설정을 다루는 구조체이고, HeedStorage
는 로그와 상태를 저장하는 스토리지 모듈입니다.pub(crate) use crate::utils::macros::macro_utils;
pub(crate)
키워드는 이 모듈을 크레이트 내부에서만 사용할 수 있도록 제한합니다. macro_utils
는 utils
모듈 내의 매크로 유틸리티를 포함하여, 프로젝트 내에서 코드의 재사용성을 높이기 위해 사용됩니다.raft_node/
폴더:이 파일들은 Raft 알고리즘의 핵심적인 부분을 구현하고 있으며, Raft 노드의 행동과 상태를 관리하는 역할을 합니다.
bootstrap.rs
함수 bootstrap_peers
는 Raft 알고리즘의 초기화 과정 중 클러스터에 있는 모든 팔로워 노드를 추가하는 작업을 수행하는 로직을 구현하고 있습니다. 주석에 따르면, 이 함수는 더 이상 권장되지 않는(deprecated) 방법으로, 클러스터 구성 변경(예: 새로운 노드 추가)을 처리하는 로직을 포함하고 있습니다.
초기 설정:
let mut initial_peers = peers.lock().await.clone();
initial_peers.remove(&raw_node.raft.id);
initial_peers
를 구성합니다.마지막 인덱스 및 임기(term) 가져오기:
let storage = raw_node.store();
let last_index = storage.last_index()?;
let last_term = storage.term(last_index)?;
구성 변경 엔트리 생성:
for (i, peer) in initial_peers.iter().enumerate() {
//...
let mut conf_change = ConfChange::default();
conf_change.set_node_id(*node_id);
conf_change.set_change_type(ConfChangeType::AddNode);
conf_change.set_context(serialize(&vec![node_addr])?);
let conf_state = raw_node.apply_conf_change(&conf_change)?;
raw_node.mut_store().set_conf_state(&conf_state)?;
//...
}
ConfChange
메시지를 생성하여 해당 피어를 클러스터에 추가합니다. ConfChange
는 구성 변경의 세부사항(예: 노드 추가)을 설명합니다.로그 엔트리 추가 및 커밋:
let commit_index = last_index + entries.len() as u64;
let unstable = &mut raw_node.raft.raft_log.unstable;
unstable.entries = entries.clone();
unstable.stable_entries(commit_index, last_term);
let store = raw_node.mut_store();
store.append(&entries)?;
스냅샷 생성 및 로그 압축:
let store = raw_node.mut_store();
let snapshot = store.snapshot(0, commit_index)?;
store.compact(last_applied)?;
store.create_snapshot(snapshot.get_data().to_vec(), last_applied, last_term)?;
리더 노드의 상태 업데이트:
let leader_id = raw_node.raft.leader_id;
let leader_pr = raw_node.raft.mut_prs().get_mut(leader_id).unwrap();
leader_pr.matched = commit_index;
leader_pr.committed_index = commit_index;
leader_pr.next_idx = commit_index + 1;
mod.rs
이 코드베이스는 Raft 알고리즘을 구현하는 RaftNode
와 RaftNodeCore
구조체의 정의 및 관련 메서드를 포함하고 있습니다. 이 코드의 목적은 분산 시스템 내에서 Raft 알고리즘을 구현하여 노드 간의 합의(consensus)를 이루는 것입니다. RaftNode
는 고수준의 인터페이스를 제공하며, RaftNodeCore
는 실제 Raft 알고리즘의 핵심 로직을 처리합니다.
RaftNode
구조체구성:
inner
: RaftNodeCore
를 감싸는 구조체로, Arc<OneShotMutex<RaftNodeCore<LogEntry, FSM>>>
타입을 가집니다. 이 락은 노드가 실행되는 동안 유지됩니다.tx_local
: mpsc::Sender
타입으로, 로컬 요청 메시지를 비동기적으로 전송하는 채널입니다.주요 메서드:
bootstrap
: 노드를 초기화하고 부트스트랩하는 메서드로, RaftNodeCore
를 생성합니다.is_leader
, get_id
, get_leader_id
, get_peers
: 노드의 상태(리더 여부, 노드 ID, 리더 ID, 피어 목록 등)를 확인하는 메서드들입니다.add_peer
, add_peers
: 새로운 피어를 클러스터에 추가합니다.propose
, change_config
: 클러스터에 새로운 제안(proposal)을 하거나, 클러스터의 구성 변경을 요청합니다.make_snapshot
, inspect
, join_cluster
: 스냅샷을 생성하거나 클러스터 상태를 점검하는 기능 등을 제공합니다.RaftNodeCore
구조체구성:
raw_node
: Raft 알고리즘의 핵심을 구현하는 RawNode
구조체입니다.fsm
: 상태 머신(State Machine)입니다.peers
: 클러스터에 속한 피어들을 관리하는 구조체입니다.logger
: 로깅 기능을 담당하는 로거입니다.주요 메서드:
bootstrap
: Raft 노드를 초기화하는 과정에서 스냅샷 복구, 로그 복구, 초기 리더 선출 등을 처리합니다.run
: Raft 노드의 메인 루프를 실행합니다. 이 루프에서 들어오는 메시지를 처리하고, 타이머 이벤트를 처리하며, Raft 알고리즘의 상태를 유지합니다.on_ready
: Raft 노드가 처리할 준비가 된 작업들을 수행합니다. 새로운 로그 항목을 커밋하고, 스냅샷을 복구하며, 다른 노드로 메시지를 전송합니다.handle_committed_entries
, handle_propose_request
, handle_confchange_request
: 커밋된 로그 항목을 처리하고, 제안된 로그 항목이나 구성 변경 요청을 처리하는 메서드들입니다.send_message
, send_messages
: 다른 노드로 메시지를 전송합니다.Raft 알고리즘 구현:
비동기 통신:
tokio
를 사용하여 비동기적으로 노드 간의 메시지를 주고받으며, Raft의 상태를 유지합니다.스냅샷 및 로그 관리:
클러스터 구성 변경:
response_sender.rs
이 코드에서는 ResponseSender
라는 열거형(enum)을 정의하고, 이를 통해 LocalResponseMsg
와 ServerResponseMsg
를 비동기적으로 전송하는 방법을 구현합니다. ResponseSender
는 Raft 노드가 처리한 결과를 클라이언트나 서버에 응답하기 위한 메커니즘으로 사용됩니다.
ResponseSender
열거형pub(crate) enum ResponseSender<LogEntry: AbstractLogEntry, FSM: AbstractStateMachine> {
Local(oneshot::Sender<LocalResponseMsg<LogEntry, FSM>>),
Server(oneshot::Sender<ServerResponseMsg>),
}
ResponseSender
는 두 가지 변형을 가집니다:
Local
: 로컬 메시지(LocalResponseMsg
)를 전송하는 비동기 oneshot::Sender
.Server
: 서버 메시지(ServerResponseMsg
)를 전송하는 비동기 oneshot::Sender
.이 열거형은 Raft 노드가 처리한 결과를 어떤 채널을 통해 전달할지 결정하는 역할을 합니다. Local
변형은 로컬 클라이언트로 응답을 보내고, Server
변형은 원격 서버로 응답을 보냅니다.
send
메서드impl<LogEntry: AbstractLogEntry, FSM: AbstractStateMachine> ResponseSender<LogEntry, FSM> {
pub fn send(self, response: ResponseMessage<LogEntry, FSM>) {
match self {
ResponseSender::Local(tx_local) => {
if let ResponseMessage::Local(response) = response {
tx_local.send(response).unwrap()
} else {
unreachable!()
}
}
ResponseSender::Server(tx_server) => {
if let ResponseMessage::Server(response) = response {
tx_server.send(response).unwrap()
} else {
unreachable!()
}
}
}
}
}
send
메서드는 ResponseSender
열거형을 소비하여(self
), 주어진 응답(response
)을 해당 채널을 통해 보냅니다.
ResponseSender::Local
변형을 사용할 경우, response
가 ResponseMessage::Local
타입이어야 합니다. 그렇지 않으면 unreachable!()
매크로가 호출되어, 프로그램이 패닉(panic)을 일으킵니다.ResponseSender::Server
변형을 사용할 경우, response
가 ResponseMessage::Server
타입이어야 합니다. 역시 그렇지 않으면 unreachable!()
이 호출됩니다.tx_local.send(response).unwrap()
와 tx_server.send(response).unwrap()
는 실제로 메시지를 전송하는 부분입니다. unwrap()
은 전송이 실패할 경우 프로그램이 패닉하도록 합니다. 이는 일반적으로 안전한 실행을 보장하기 위해 사용됩니다.
ResponseSender
는 비동기적으로 응답을 전송하기 위한 도구입니다. Raft 노드에서 발생하는 다양한 이벤트나 결과를 클라이언트(로컬) 또는 서버(원격)로 전송할 때 사용됩니다.send
메서드는 ResponseMessage
가 Local
또는 Server
메시지인지 확인하여, 해당 타입에 맞는 채널로만 전송할 수 있도록 보장합니다. 이는 실수로 잘못된 타입의 메시지를 전송하는 것을 방지합니다.이 코드의 목적은 Raft 노드가 처리한 결과를 비동기적으로 로컬 클라이언트 또는 서버에 전송하는 기능을 제공하는 것입니다. ResponseSender
는 두 가지 유형의 응답을 처리하며, send
메서드를 통해 타입에 맞는 응답을 안전하게 전송합니다. 이를 통해 Raft 노드의 결과를 클라이언트나 서버가 적절히 처리할 수 있도록 합니다.
role.rs
이 코드에서는 InitialRole
이라는 열거형(enum)을 정의하고, 이를 위한 몇 가지 트레이트(interfaces)를 구현하고 있습니다. InitialRole
은 Raft 알고리즘에서 각 노드가 초기화될 때 부여받는 역할을 나타냅니다.
InitialRole
열거형#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum InitialRole {
Leader,
Voter,
Learner,
}
역할: InitialRole
은 Raft 클러스터에서 노드의 초기 역할을 정의합니다. 각각의 역할은 다음과 같습니다:
Leader
: 클러스터의 리더로서, 다른 노드들을 관리하고 로그를 복제하는 역할을 합니다.Voter
: 투표 권한이 있는 노드로, 리더 선출에 참여하고 로그 복제를 통해 클러스터의 일관성을 유지합니다.Learner
: 로그를 복제받지만 투표 권한이 없는 노드로, 주로 데이터를 동기화하는 데 사용됩니다.Derive 어트리뷰트:
Debug
, Clone
, Serialize
, Deserialize
, PartialEq
, Eq
트레이트를 자동으로 구현합니다.Debug
: 디버깅 목적으로 열거형의 값을 출력할 수 있게 합니다.Clone
: 열거형의 값을 복제할 수 있게 합니다.Serialize
및 Deserialize
: 이 열거형을 직렬화 및 역직렬화할 수 있게 합니다. 예를 들어, JSON 형식으로 변환할 수 있습니다.PartialEq
, Eq
: 두 값이 같은지 비교할 수 있게 합니다.fmt::Display
트레이트 구현impl fmt::Display for InitialRole {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
InitialRole::Leader => write!(f, "Leader"),
InitialRole::Voter => write!(f, "Voter"),
InitialRole::Learner => write!(f, "Learner"),
}
}
}
Display
트레이트는 InitialRole
값을 사람이 읽을 수 있는 형식으로 포맷팅할 수 있게 합니다. 예를 들어, InitialRole::Leader
는 "Leader"
문자열로 포맷팅됩니다.fmt(&self, f: &mut fmt::Formatter) -> fmt::Result
: self
값을 fmt::Formatter
를 통해 포맷팅하여 출력합니다.FromStr
트레이트 구현impl FromStr for InitialRole {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
let s = s.to_lowercase();
match s.as_str() {
"leader" => Ok(InitialRole::Leader),
"voter" => Ok(InitialRole::Voter),
"learner" => Ok(InitialRole::Learner),
_ => Err(()),
}
}
}
FromStr
트레이트는 문자열로부터 InitialRole
을 생성할 수 있게 합니다. 문자열이 "leader"
, "voter"
, "learner"
중 하나일 경우, 해당하는 InitialRole
변형을 반환합니다.from_str(s: &str) -> Result<Self, Self::Err>
: 주어진 문자열 s
를 소문자로 변환한 후, 해당 문자열에 맞는 InitialRole
변형을 반환합니다. 일치하는 변형이 없으면 에러를 반환합니다."Leader".parse::<InitialRole>()
는 Ok(InitialRole::Leader)
를 반환합니다."voter".parse::<InitialRole>()
는 Ok(InitialRole::Voter)
를 반환합니다."unknown".parse::<InitialRole>()
는 Err(())
를 반환합니다.이 코드는 Raft 클러스터에서 노드가 가질 수 있는 초기 역할을 나타내는 InitialRole
열거형을 정의하고, 이를 다루기 위한 여러 트레이트를 구현합니다. 이 열거형은 노드의 역할을 직렬화/역직렬화하거나, 문자열로부터 변환하고, 포맷팅하여 출력하는 기능을 제공합니다.
utils.rs
이 코드는 Raft 노드의 상태를 디버깅하고, 그 상태를 사람이 읽을 수 있는 형식으로 포맷팅하는 기능을 제공합니다. 이를 통해 Raft 노드의 내부 상태를 확인하고 분석할 수 있습니다. 주요 함수는 format_debugging_info
와 inspect_raftnode
이며, 각각 Raft 노드의 상태를 포맷팅하고 검사하는 역할을 합니다.
format_debugging_info
pub fn format_debugging_info(hashmap: &HashMap<String, Value>) -> String {
let node_id = hashmap
.get("node_id")
.and_then(|v| v.as_u64())
.expect(EXPECTED_FORMAT_NOT_EXIST);
let leader_id = hashmap
.get("leader_id")
.and_then(|v| v.as_u64())
.expect(EXPECTED_FORMAT_NOT_EXIST);
let term = hashmap
.get("term")
.and_then(|v| v.as_u64())
.expect(EXPECTED_FORMAT_NOT_EXIST);
let outline = format!(
"========= Outline =========\n\
node_id: {}\n\
leader_id: {}\n\
term: {}\n",
node_id, leader_id, term
);
let storage = hashmap
.get("storage")
.and_then(|v| v.as_object())
.cloned()
.expect(EXPECTED_FORMAT_NOT_EXIST);
let hard_state = storage
.get("hard_state")
.and_then(|v| v.as_object())
.cloned()
.expect(EXPECTED_FORMAT_NOT_EXIST);
let conf_state = storage
.get("conf_state")
.and_then(|v| v.as_object())
.cloned()
.expect(EXPECTED_FORMAT_NOT_EXIST);
let last_index = storage
.get("last_index")
.and_then(|v| v.as_number())
.cloned()
.expect(EXPECTED_FORMAT_NOT_EXIST);
let snapshot = storage
.get("snapshot")
.and_then(|v| v.as_str())
.expect(EXPECTED_FORMAT_NOT_EXIST);
let hard_state_formatted = format!("{:?}", hard_state);
let conf_state_formatted = format!("{:?}", conf_state);
let snapshot_formatted = format!("{:?}", snapshot);
let persistence_info = format!(
"========= Persistence Info =========\n\
hard_state: {}\n\
conf_state: {}\n\
last_index: {}\n\
snapshot: {}\n",
hard_state_formatted, conf_state_formatted, last_index, snapshot_formatted,
);
let progress = hashmap
.get("progress")
.and_then(|v| v.as_object().cloned())
.expect(EXPECTED_FORMAT_NOT_EXIST);
let progress_formatted = format!("{:?}", progress);
let progress_info = format!(
"========= Progress Info =========\n\
{}\n",
progress_formatted,
);
let raft_log = hashmap
.get("raft_log")
.and_then(|v| v.as_object().cloned())
.expect(EXPECTED_FORMAT_NOT_EXIST);
let raft_log_formatted = format!("{:?}", raft_log);
let raft_log_info = format!(
"========= RaftLog Info =========\n\
{}\n",
raft_log_formatted,
);
let result = format!("{outline}\n{persistence_info}\n{progress_info}\n{raft_log_info}\n");
result
}
HashMap<String, Value>
형식의 데이터를 입력으로 받아, Raft 노드의 디버깅 정보를 포맷팅된 문자열로 변환합니다.hashmap
에서 node_id
, leader_id
, term
등의 필수 정보를 가져와 출력의 머리말(outline
)을 구성합니다.storage
관련 정보를 가져와 Persistence Info
섹션을 구성합니다. 여기에는 hard_state
, conf_state
, last_index
, snapshot
등이 포함됩니다.progress
와 raft_log
정보도 마찬가지로 포맷팅되어 출력됩니다.inspect_raftnode
pub fn inspect_raftnode<T: StableStorage>(raw_node: &RawNode<T>) -> Result<String> {
let id = raw_node.raft.id;
let leader_id = raw_node.raft.leader_id;
let prs = if id == leader_id {
raw_node
.raft
.prs()
.iter()
.map(|(node_id, pr)| {
(
node_id,
json!({
"matched": pr.matched,
"next_idx": pr.next_idx,
"paused": pr.paused,
"pending_snapshot": pr.pending_request_snapshot,
"pending_request_snapshot": pr.pending_request_snapshot,
"recent_active": pr.recent_active,
"commit_group_id": pr.commit_group_id,
"committed_index": pr.committed_index,
"ins": format!("{:?}", pr.ins),
"state": format!("{}", pr.state),
}),
)
})
.collect::<HashMap<_, _>>()
} else {
HashMap::new()
};
let store = raw_node.store();
let hard_state = store.hard_state()?;
let conf_state = store.conf_state()?;
let snapshot = store.snapshot(0, 0)?;
let last_index = raw_node.raft.raft_log.last_index();
let last_applied = raw_node.raft.raft_log.applied;
let last_committed = raw_node.raft.raft_log.committed;
let last_persisted = raw_node.raft.raft_log.persisted;
let result = json!({
"node_id": id,
"leader_id": leader_id,
"term": raw_node.raft.term,
"storage": {
"hard_state": {
"term": hard_state.term,
"vote": hard_state.vote,
"commit": hard_state.commit,
},
"conf_state": {
"voters": conf_state.voters,
"learners": conf_state.learners,
"voters_outgoing": conf_state.voters_outgoing,
"learners_next": conf_state.learners_next,
},
"snapshot": format_snapshot(&snapshot),
"last_index": last_index,
},
"progress": prs,
"raft_log": {
"committed": last_committed,
"applied": last_applied,
"persisted": last_persisted,
},
});
Ok(result.to_string())
}
RawNode
의 상태를 검사하고, 그 결과를 JSON 형식으로 반환합니다.id
), 리더 ID(leader_id
), 임기(term
) 등의 기본 정보를 추출합니다.prs
에 저장됩니다.hard_state
, conf_state
, snapshot
, last_index
등의 정보를 가져옵니다.raft_log
에 저장합니다.이 코드는 Raft 노드의 상태를 포맷팅하고 분석할 수 있는 도구를 제공합니다. format_debugging_info
함수는 주어진 상태 정보를 사람이 읽을 수 있는 문자열 형식으로 변환하고, inspect_raftnode
함수는 Raft 노드의 현재 상태를 JSON 형식으로 추출합니다. 이를 통해 시스템 운영자는 노드의 상태를 쉽게 파악하고 디버깅할 수 있습니다.