Raft 코드 뜯어보기 2)

Tasker_Jang·2024년 8월 17일
0

1. raft_client.rs:

이 코드는 비동기 함수 create_client를 정의하여 Raft 서비스에 대한 gRPC 클라이언트를 생성합니다. 이 클라이언트는 주어진 주소로 연결된 gRPC 서버와 통신할 수 있습니다. 함수는 HTTP 프로토콜을 사용하여 gRPC 채널을 생성하고, 그 채널을 통해 RaftServiceClient를 반환합니다.

주요 구성 요소

1. create_client 함수

pub async fn create_client<A: ToSocketAddrs>(
    addr: A,
) -> Result<RaftServiceClient<Channel>, TonicError> {
    let addr = addr
        .to_socket_addrs()
        .expect("Invalid socket address format")
        .next()
        .unwrap();
    let addr = format!("http://{}", addr);
    let addr = Bytes::copy_from_slice(addr.as_bytes());

    let channel = Channel::from_shared(addr).unwrap().connect().await?;
    let client = RaftServiceClient::new(channel);

    Ok(client)
}
  • 입력 파라미터:

    • addr: 네트워크 주소를 나타내며, ToSocketAddrs 트레이트를 구현해야 합니다. 이 타입은 IP 주소와 포트를 포함한 네트워크 주소로 변환될 수 있습니다.
  • 동작 과정:

    1. addr.to_socket_addrs()를 호출하여 입력된 주소를 소켓 주소로 변환합니다. 변환된 주소 중 첫 번째 주소를 사용합니다.
      • expect("Invalid socket address format")는 주소 변환이 실패할 경우 패닉을 일으키며, 오류 메시지를 출력합니다.
    2. 주소를 "http://<addr>" 형식의 문자열로 변환합니다.
    3. 이 문자열을 Bytes로 변환하여 gRPC 채널을 생성하는 데 사용합니다.
    4. Channel::from_shared(addr).unwrap().connect().await?을 통해 gRPC 채널을 생성하고, 해당 채널을 사용하여 RaftServiceClient 인스턴스를 만듭니다.
      • connect().await?은 비동기적으로 채널 연결을 시도하며, 실패할 경우 TonicError를 반환합니다.
    5. 성공적으로 클라이언트를 생성하면 Ok(client)로 반환합니다.
  • 출력:

    • Result<RaftServiceClient<Channel>, TonicError> 타입을 반환합니다. 성공 시 RaftServiceClient를 반환하며, 실패 시 TonicError를 반환합니다.

이 함수는 주어진 네트워크 주소를 이용해 gRPC 클라이언트를 생성하는 역할을 합니다. 주로 Raft 클러스터 내에서 노드 간의 통신을 위해 사용될 수 있으며, HTTP 프로토콜을 기반으로 gRPC 서버와 연결을 설정합니다. 주소 변환, 채널 생성, 클라이언트 생성 등의 단계를 거쳐 비동기적으로 실행되며, 성공 시 RaftServiceClient 객체를 반환합니다.

2. raft_server.rs:

이 코드에서는 RaftServer 구조체와 gRPC를 통한 Raft 서비스 구현을 다룹니다. RaftServer는 Raft 클러스터에서 서버 역할을 하며, 클라이언트와의 상호작용을 처리하고, 다른 노드와의 통신을 관리합니다. 주요 부분을 설명드리겠습니다.

주요 구성 요소

1. RaftServer 구조체

#[derive(Clone)]
pub struct RaftServer<LogEntry: AbstractLogEntry, FSM: AbstractStateMachine> {
    tx: mpsc::Sender<ServerRequestMsg<LogEntry, FSM>>,
    raft_addr: SocketAddr,
    config: Config,
    logger: Arc<dyn Logger>,
}
  • 구성 요소:

    • tx: Raft 노드에 메시지를 보내기 위한 mpsc::Sender. 이 채널을 통해 서버가 Raft 노드에게 요청을 전달합니다.
    • raft_addr: 서버가 리스닝할 소켓 주소.
    • config: Raft 서버의 설정을 담고 있는 구조체.
    • logger: 로그를 기록하기 위한 로거 객체.
  • new 메서드:

    • 서버 인스턴스를 생성하는 역할을 합니다.
    • raft_addrToSocketAddrs 트레이트를 사용하여 SocketAddr로 변환합니다.
  • run 메서드:

    • Raft 서버를 시작하여 gRPC 요청을 수신합니다.
    • Server::builder().add_service(RaftServiceServer::new(self)).serve_with_shutdown(raft_addr, quit_signal)을 통해 서버를 실행합니다.
    • 서버는 종료 신호(rx_quit_signal)를 수신하면 안전하게 종료됩니다.

2. Raft 서비스 구현 (tonic::async_trait)

이 부분에서는 gRPC를 통해 제공되는 Raft 서비스의 메서드를 구현합니다. 각 메서드는 클라이언트 요청을 처리하고, 해당 요청을 Raft 노드로 전달합니다.

  • request_id:

    • 클라이언트가 노드 ID를 요청할 때 호출됩니다.
    • Raft 노드에게 ID 요청을 전달하고, 응답을 클라이언트에게 반환합니다.
  • change_config:

    • 클러스터의 구성 변경 요청을 처리합니다.
    • 예를 들어, 새로운 노드를 추가하거나 기존 노드를 제거하는 요청을 처리합니다.
    • 요청이 리더 노드에서 처리되지 않은 경우, 리더에게 요청을 전달합니다.
  • send_message:

    • Raft 노드 간의 메시지 전송을 처리합니다.
    • 클러스터 내의 다른 노드로부터 전송된 메시지를 Raft 노드로 전달합니다.
  • propose:

    • 클라이언트로부터 로그 제안을 받아 Raft 노드로 전달합니다.
    • 리더가 아닌 경우, 리더에게 요청을 전달합니다.
  • debug_node:

    • 디버깅 목적으로 노드의 상태를 요청합니다.
    • Raft 노드의 상태 정보를 JSON 형식으로 반환합니다.
  • get_peers:

    • 현재 클러스터의 피어 목록을 요청합니다.
    • Raft 노드로부터 피어 정보를 받아 클라이언트에게 반환합니다.
  • leave_joint:

    • 공동 구성 상태에서 노드를 탈퇴시키는 요청을 처리합니다.
  • set_peers:

    • 클러스터의 피어 구성을 설정합니다.
    • 클러스터의 피어 목록을 설정하는 요청을 처리합니다.
  • create_snapshot:

    • 스냅샷 생성 요청을 처리합니다.
    • Raft 노드에게 스냅샷을 생성하도록 지시합니다.

3. 에러 처리 및 로깅

fn print_send_error(&self, function_name: &str) {
    self.logger.error(&format!(
        "Error occurred in sending message ('RaftServer --> RaftNode'). Function: '{}'",
        function_name
    ));
}
  • 역할:
    • 메시지를 Raft 노드로 보내는 도중 오류가 발생할 경우, 로그에 오류 메시지를 기록합니다.
    • function_name!() 매크로를 사용하여 오류가 발생한 함수의 이름을 로그에 포함시킵니다.
profile
터널을 지나고 있을 뿐, 길은 여전히 열려 있다.

0개의 댓글