웹소켓에서의 스트림은 한 번 만들어진 연결 통로를 통해 데이터가 끊임없이, 그리고 양방향으로 흐르는 것 자체를 의미.
데이터의 강물
이나 파이프라인
을 상상하면 이해하기 쉽다.
주요 특징
웹소켓의 스트림이란, 하나의 TCP 연결 통로 위에서 클라이언트와 서버가 지속적으로, 그리고 양방향 메시지를 주고받는 데이터의 흐름 그 자체를 가리킨다.
이러한 특징 덕분에 실시간 채팅, 주식 시세 업데이트, 온라인 게임처럼 즉각적인 데이터 교환이 필요한 서비스에 매우 효과적으로 사용된다.
서버 참조
https://github.com/snapview/tokio-tungstenite/blob/master/examples/server.rs
클라이언트 참조
https://github.com/snapview/tokio-tungstenite/blob/master/examples/client.rs
클라이언트에서 연결 요청 시
url
에ws://
를 붙여 주어야 한다.
use std::env;
use futures_util::{future, pin_mut, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[tokio::main]
async fn main() {
let url = env::args().nth(1).unwrap_or_else(|| "ws://127.0.0.1:9001".to_string());
let (stdin_tx, stdin_rx) = futures_channel::mpsc::unbounded();
tokio::spawn(read_stdin(stdin_tx));
let (ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
println!("WebSocket handshake has been successfully completed");
// sink -> 데이터를 보내는 곳
// stream -> 원천 데이터에서 흘러 나오는 느낌?
let (write, read) = ws_stream.split();
// forward는 stdin_rx가 닫히기 전까지 받은 데이터를 write으로 전달함
// 메시지를 비동기적으로 기다리고 있음.
let stdin_to_ws = stdin_rx.map(Ok).forward(write);
let ws_to_stdout = {
read.for_each(|message| async {
let data = message.unwrap().into_data();
tokio::io::stdout().write_all(&data).await.unwrap();
})
};
pin_mut!(stdin_to_ws, ws_to_stdout);
future::select(stdin_to_ws, ws_to_stdout).await;
}
// client -> terminal ->
async fn read_stdin(tx: futures_channel::mpsc::UnboundedSender<Message>) {
let mut stdin = tokio::io::stdin();
loop {
let mut buf = vec![0; 1024];
let n = match stdin.read(&mut buf).await {
Err(_) | Ok(0) => break,
Ok(n) => n,
};
//
buf.truncate(n);
// 터미널에서 입력받은 문자열을
// 메시지로 변환하여 채널간의 send를 해준다.
tx.unbounded_send(Message::Binary(buf.into())).unwrap();
}
}
let (write, read) = ws_stream.split();
let stdin_to_ws = stdin_rx.map(Ok).forward(write);
위 부분이 헷갈려서 정리해본다.
ws_stream
변수를 split()
함수를 통해서 sink
와 stream
으로 분리한다.
stdin_rx
는 사용자가 terminal
에서 입력한 메시지를 받는 receiver
이고,
stdin_to_ws
는 받은 데이터를 Ok
로 감싸서 write
싱크로 전달하는 하나의 퓨처(future)
객체이다.
비동기적으로 기다리고 있다가 데이터가 들어오면 다시 write(sink)으로 보내주는 역할이다.
future::select(stdin_to_ws, ws_to_stdout).await;
future::select을 통해서 두 future 객체를 동시에 실행을 한다.
솔직히 완벽하게 내부를 다 확인하고 이해하려고 노력했지만 쉽지는 않은 것 같다. 그래도 최대한 분석하고 이해하려고 노력해야겠다. 이런 코드를 ai가 뚝딱 제시해 주기는 하지만 익숙해지면 스스로 생각하는 힘이 약해지는 것 같다.