bevy의 ecs 프레임워크를 활용해서 websocket
통신을 구현하려고 한다.
특히 권위 서버(Authoritative Server)
형태로 만들 예정이다.
https://github.com/wangki-kyu/bevy_ecs_authoritative
지속적으로 업데이트할 예정이다.
간단하게 WebSocket
을 tokio
runtime
을 통해서 비동기로 accept
후 처리하는 핸들러를 등록할 예정이다.
stream
을 넘겨주면서 새로운 WebSocket handling
Task
를 만들어준다. Task
에서 클라이언트의 요청을 처리하여 이벤트를 발생시킨다.Bevy
에서 제공해 주는 Event
를 사용하지 않고, tokio
의 channel
을 통해서 만들어 사용하였다.Resource
로 WebSocketAcceptEvent
를 선언해 놓았다.
#[derive(Resource)]
struct WebSocketAcceptEvent(Receiver<ClientEventMessage>);
// ----------------- system
fn setup_server(mut commands: Commands, tokio_runtime: Res<TokioRuntime>) {
// websocket server Message channel
let (tx, rx) = tokio::sync::mpsc::channel::<ClientEventMessage>(10);
// resource 추가
commands.insert_resource(WebSocketAcceptEvent(rx));
let handle = tokio_runtime.0.clone();
handle.spawn(async move {
handle_websocket(tx).await;
println!("finish the websocker waiting...");
});
}
setup_server
가 최초에 호출이 될 때, resource
를 추가시켜주었다.
추가해놓은 TokioRuntime
리소스
를 활용하여 Task
를 생성하여 tokio
의 runtime
으로 넘겨주었다. tokio runtime
을 가지고 있는 상태가 아니라면 Task
를 runtime
에 넘기지 못해서 에러가 발생한다.
따라서 최초에 리소스로 tokio runtime handle
을 가지고 있어야한다.
handle_websocket
이 호출이 되는데 새로운 task
로 만들어져서
새로운 클라이언트의 연결 요청이 올 때까지 논블로킹 방식으로 대기를 하고 있는다.
연결 요청에 성공을 하게 되면
Ok((stream, _)) => {
tokio::spawn(async {
handle_accept(stream, cloned_tx).await;
});
},
handle_accept
을 실행시켜주는 새로운 task
를 생성해주는 역할을 한다. 연결 후 실제 동작은 위 함수에서 처리해준다.
async fn handle_accept(stream: tokio::net::TcpStream, tx: Sender<ClientEventMessage>) {
println!("[Websocket Recv] start handle websocket strream");
let ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap();
// -------- Entity를 생성하기 위해서 메시지를 보내준다?
match tx.send(ClientEventMessage::Connect).await {
Ok(_) => {
},
Err(e) => {
eprintln!("fail to send message that requests to make client entity, error: {}", e);
return;
},
}
let (_, stream) = ws_stream.split();
let cloned_tx = tx.clone();
let stream_future = stream.try_for_each(|msg| {
if msg.is_empty() {
return futures_util::future::ok(());
}
println!("message recevied!, msg: {}", msg);
// 여기서 만약에 msg가 연결에 대한 요청이라면 entity를 만들어주고
// 다른 내용이라면 내용에 따라서 처리를 해주어야한다.
let tx_in_future = cloned_tx.clone();
tokio::spawn(async move {
let msg_str = msg.to_text().unwrap();
if let Err(e) = tx_in_future.send(ClientEventMessage::Move(msg_str.into())).await {
eprintln!("ClientEventMessage send error: {}", e);
}
});
futures_util::future::ok(())
});
stream_future.await.unwrap();
println!("[Websocket Recv] finish handle websocket strream");
}
TCP 연결을 통해 웹소켓 핸드쉐이크를 수행하여 프로토콜을 웹소켓으로 전환한다.
websocket stream
으로 split
메서드를 사용하여 수신 stream
을 얻는다. stream_future
라는 Future
를 생성하여 계속해서 데이터를 얻어올 수 있도록 하였고 mpsc
채널의 Receiver
로 필요한 데이터를 보내줄 수 있도록 만들었다.
src/bin/test.rs
에 client
코드를 작성해 놓았다. 웹소켓 연결이 된 후 방향 키를 누르면 웹소켓으로 데이터를 보내도록 작성했고 테스트 완료하였다. 다음 포스팅에서는 클라이언트의 엔티티의 움직임을 서버 쪽에서 처리하는 권위 서버를 만드는 것을 다룰 예정이다.