raft_client.rs
:이 코드는 비동기 함수 create_client
를 정의하여 Raft 서비스에 대한 gRPC 클라이언트를 생성합니다. 이 클라이언트는 주어진 주소로 연결된 gRPC 서버와 통신할 수 있습니다. 함수는 HTTP 프로토콜을 사용하여 gRPC 채널을 생성하고, 그 채널을 통해 RaftServiceClient
를 반환합니다.
create_client
함수pub async fn create_client<A: ToSocketAddrs>(
addr: A,
) -> Result<RaftServiceClient<Channel>, TonicError> {
let addr = addr
.to_socket_addrs()
.expect("Invalid socket address format")
.next()
.unwrap();
let addr = format!("http://{}", addr);
let addr = Bytes::copy_from_slice(addr.as_bytes());
let channel = Channel::from_shared(addr).unwrap().connect().await?;
let client = RaftServiceClient::new(channel);
Ok(client)
}
입력 파라미터:
addr
: 네트워크 주소를 나타내며, ToSocketAddrs
트레이트를 구현해야 합니다. 이 타입은 IP 주소와 포트를 포함한 네트워크 주소로 변환될 수 있습니다.동작 과정:
addr.to_socket_addrs()
를 호출하여 입력된 주소를 소켓 주소로 변환합니다. 변환된 주소 중 첫 번째 주소를 사용합니다.expect("Invalid socket address format")
는 주소 변환이 실패할 경우 패닉을 일으키며, 오류 메시지를 출력합니다."http://<addr>"
형식의 문자열로 변환합니다.Bytes
로 변환하여 gRPC 채널을 생성하는 데 사용합니다.Channel::from_shared(addr).unwrap().connect().await?
을 통해 gRPC 채널을 생성하고, 해당 채널을 사용하여 RaftServiceClient
인스턴스를 만듭니다.connect().await?
은 비동기적으로 채널 연결을 시도하며, 실패할 경우 TonicError
를 반환합니다.Ok(client)
로 반환합니다.출력:
Result<RaftServiceClient<Channel>, TonicError>
타입을 반환합니다. 성공 시 RaftServiceClient
를 반환하며, 실패 시 TonicError
를 반환합니다.이 함수는 주어진 네트워크 주소를 이용해 gRPC 클라이언트를 생성하는 역할을 합니다. 주로 Raft 클러스터 내에서 노드 간의 통신을 위해 사용될 수 있으며, HTTP 프로토콜을 기반으로 gRPC 서버와 연결을 설정합니다. 주소 변환, 채널 생성, 클라이언트 생성 등의 단계를 거쳐 비동기적으로 실행되며, 성공 시 RaftServiceClient
객체를 반환합니다.
raft_server.rs
:이 코드에서는 RaftServer
구조체와 gRPC를 통한 Raft 서비스 구현을 다룹니다. RaftServer
는 Raft 클러스터에서 서버 역할을 하며, 클라이언트와의 상호작용을 처리하고, 다른 노드와의 통신을 관리합니다. 주요 부분을 설명드리겠습니다.
RaftServer
구조체#[derive(Clone)]
pub struct RaftServer<LogEntry: AbstractLogEntry, FSM: AbstractStateMachine> {
tx: mpsc::Sender<ServerRequestMsg<LogEntry, FSM>>,
raft_addr: SocketAddr,
config: Config,
logger: Arc<dyn Logger>,
}
구성 요소:
tx
: Raft 노드에 메시지를 보내기 위한 mpsc::Sender
. 이 채널을 통해 서버가 Raft 노드에게 요청을 전달합니다.raft_addr
: 서버가 리스닝할 소켓 주소.config
: Raft 서버의 설정을 담고 있는 구조체.logger
: 로그를 기록하기 위한 로거 객체.new
메서드:
raft_addr
를 ToSocketAddrs
트레이트를 사용하여 SocketAddr
로 변환합니다.run
메서드:
Server::builder().add_service(RaftServiceServer::new(self)).serve_with_shutdown(raft_addr, quit_signal)
을 통해 서버를 실행합니다.rx_quit_signal
)를 수신하면 안전하게 종료됩니다.tonic::async_trait
)이 부분에서는 gRPC를 통해 제공되는 Raft 서비스의 메서드를 구현합니다. 각 메서드는 클라이언트 요청을 처리하고, 해당 요청을 Raft 노드로 전달합니다.
request_id
:
change_config
:
send_message
:
propose
:
debug_node
:
get_peers
:
leave_joint
:
set_peers
:
create_snapshot
:
fn print_send_error(&self, function_name: &str) {
self.logger.error(&format!(
"Error occurred in sending message ('RaftServer --> RaftNode'). Function: '{}'",
function_name
));
}
function_name!()
매크로를 사용하여 오류가 발생한 함수의 이름을 로그에 포함시킵니다.