bevy의 ecs 프레임워크를 활용해서 websocket 통신을 구현하려고 한다.
특히 권위 서버(Authoritative Server) 형태로 만들 예정이다.
https://github.com/wangki-kyu/bevy_ecs_authoritative
지속적으로 업데이트할 예정이다.
간단하게 WebSocket을 tokio runtime을 통해서 비동기로 accept 후 처리하는 핸들러를 등록할 예정이다.
stream을 넘겨주면서 새로운 WebSocket handling Task를 만들어준다. Task에서 클라이언트의 요청을 처리하여 이벤트를 발생시킨다.Bevy에서 제공해 주는 Event를 사용하지 않고, tokio의 channel을 통해서 만들어 사용하였다.Resource로 WebSocketAcceptEvent를 선언해 놓았다.
#[derive(Resource)]
struct WebSocketAcceptEvent(Receiver<ClientEventMessage>);
// ----------------- system
fn setup_server(mut commands: Commands, tokio_runtime: Res<TokioRuntime>) {
// websocket server Message channel
let (tx, rx) = tokio::sync::mpsc::channel::<ClientEventMessage>(10);
// resource 추가
commands.insert_resource(WebSocketAcceptEvent(rx));
let handle = tokio_runtime.0.clone();
handle.spawn(async move {
handle_websocket(tx).await;
println!("finish the websocker waiting...");
});
}
setup_server가 최초에 호출이 될 때, resource를 추가시켜주었다.
추가해놓은 TokioRuntime 리소스를 활용하여 Task를 생성하여 tokio의 runtime으로 넘겨주었다. tokio runtime을 가지고 있는 상태가 아니라면 Task를 runtime에 넘기지 못해서 에러가 발생한다.
따라서 최초에 리소스로 tokio runtime handle을 가지고 있어야한다.
handle_websocket이 호출이 되는데 새로운 task로 만들어져서
새로운 클라이언트의 연결 요청이 올 때까지 논블로킹 방식으로 대기를 하고 있는다.
연결 요청에 성공을 하게 되면
Ok((stream, _)) => {
tokio::spawn(async {
handle_accept(stream, cloned_tx).await;
});
},
handle_accept을 실행시켜주는 새로운 task를 생성해주는 역할을 한다. 연결 후 실제 동작은 위 함수에서 처리해준다.
async fn handle_accept(stream: tokio::net::TcpStream, tx: Sender<ClientEventMessage>) {
println!("[Websocket Recv] start handle websocket strream");
let ws_stream = tokio_tungstenite::accept_async(stream).await.unwrap();
// -------- Entity를 생성하기 위해서 메시지를 보내준다?
match tx.send(ClientEventMessage::Connect).await {
Ok(_) => {
},
Err(e) => {
eprintln!("fail to send message that requests to make client entity, error: {}", e);
return;
},
}
let (_, stream) = ws_stream.split();
let cloned_tx = tx.clone();
let stream_future = stream.try_for_each(|msg| {
if msg.is_empty() {
return futures_util::future::ok(());
}
println!("message recevied!, msg: {}", msg);
// 여기서 만약에 msg가 연결에 대한 요청이라면 entity를 만들어주고
// 다른 내용이라면 내용에 따라서 처리를 해주어야한다.
let tx_in_future = cloned_tx.clone();
tokio::spawn(async move {
let msg_str = msg.to_text().unwrap();
if let Err(e) = tx_in_future.send(ClientEventMessage::Move(msg_str.into())).await {
eprintln!("ClientEventMessage send error: {}", e);
}
});
futures_util::future::ok(())
});
stream_future.await.unwrap();
println!("[Websocket Recv] finish handle websocket strream");
}
TCP 연결을 통해 웹소켓 핸드쉐이크를 수행하여 프로토콜을 웹소켓으로 전환한다.
websocket stream으로 split메서드를 사용하여 수신 stream을 얻는다. stream_future라는 Future를 생성하여 계속해서 데이터를 얻어올 수 있도록 하였고 mpsc 채널의 Receiver로 필요한 데이터를 보내줄 수 있도록 만들었다.
src/bin/test.rs에 client 코드를 작성해 놓았다. 웹소켓 연결이 된 후 방향 키를 누르면 웹소켓으로 데이터를 보내도록 작성했고 테스트 완료하였다. 다음 포스팅에서는 클라이언트의 엔티티의 움직임을 서버 쪽에서 처리하는 권위 서버를 만드는 것을 다룰 예정이다.