rust Websocket

wangki·2025년 7월 30일
0

Rust

목록 보기
42/54

WebSocket

stream(스트림)

웹소켓에서의 스트림은 한 번 만들어진 연결 통로를 통해 데이터가 끊임없이, 그리고 양방향으로 흐르는 것 자체를 의미.
데이터의 강물이나 파이프라인을 상상하면 이해하기 쉽다.

주요 특징

  1. 양방향 (Bi-directional)
  • 클라이언트와 서버 중 어느 한쪽이 요청하고 다른 쪽이 응당하는 방식이 아니다.
  • 일단 연결되면, 클라이언트와 서버 모두 원할 때 언제든지 상대방에게 데이터를 보낼 수 있다.
    마치 양쪽에서 동시에 말을 할 수 있는 전화 통화화 같다.
  1. 지속성
  • 스트림을 만들기 위한 연결은 한 번만 맺어지며, 명시적으로 닫을 때까지 계속 유지됩니다.
  • HTTP처럼 매번 데이터를 주고받기 위해 연결을 새로 맺고 끊는 과정이 없어서 매우 효율적입니다.
  1. 메시지 기반
  • 스트림을 통해 데이터는 '메시지'라는 정해진 덩어리 단위로 전달됩니다
  • 데이터가 그냥 바이트의 연속으로 흐르는 것이 아니라, "안녕하세요" "지금 시각은 10시 52분 입니다."와 같은 의미 있는 단위로 구분되어 오고 간다.

핵심 요약

웹소켓의 스트림이란, 하나의 TCP 연결 통로 위에서 클라이언트와 서버가 지속적으로, 그리고 양방향 메시지를 주고받는 데이터의 흐름 그 자체를 가리킨다.

이러한 특징 덕분에 실시간 채팅, 주식 시세 업데이트, 온라인 게임처럼 즉각적인 데이터 교환이 필요한 서비스에 매우 효과적으로 사용된다.

소스

서버 참조
https://github.com/snapview/tokio-tungstenite/blob/master/examples/server.rs

클라이언트 참조
https://github.com/snapview/tokio-tungstenite/blob/master/examples/client.rs

클라이언트에서 연결 요청 시 urlws://를 붙여 주어야 한다.

클라이언트 소스

use std::env;

use futures_util::{future, pin_mut, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};

#[tokio::main]
async fn main() {
    let url = env::args().nth(1).unwrap_or_else(|| "ws://127.0.0.1:9001".to_string());

    let (stdin_tx, stdin_rx) = futures_channel::mpsc::unbounded();
    tokio::spawn(read_stdin(stdin_tx));

    let (ws_stream, _) = connect_async(&url).await.expect("Failed to connect");
    println!("WebSocket handshake has been successfully completed");

    // sink -> 데이터를 보내는 곳 
    // stream -> 원천 데이터에서 흘러 나오는 느낌? 
    let (write, read) = ws_stream.split();

    // forward는 stdin_rx가 닫히기 전까지 받은 데이터를 write으로 전달함
    // 메시지를 비동기적으로 기다리고 있음.
    let stdin_to_ws = stdin_rx.map(Ok).forward(write);

    let ws_to_stdout = {
        read.for_each(|message| async {
            let data = message.unwrap().into_data();
            tokio::io::stdout().write_all(&data).await.unwrap();
        })
    };

    pin_mut!(stdin_to_ws, ws_to_stdout);
    future::select(stdin_to_ws, ws_to_stdout).await;
}

// client -> terminal -> 

async fn read_stdin(tx: futures_channel::mpsc::UnboundedSender<Message>) {
    let mut stdin = tokio::io::stdin();
    loop {
        let mut buf = vec![0; 1024];
        let n = match stdin.read(&mut buf).await {
            Err(_) | Ok(0) => break,
            Ok(n) => n,
        };
        // 
        buf.truncate(n);
        // 터미널에서 입력받은 문자열을 
        // 메시지로 변환하여 채널간의 send를 해준다.
        tx.unbounded_send(Message::Binary(buf.into())).unwrap();
    }
}
let (write, read) = ws_stream.split();
let stdin_to_ws = stdin_rx.map(Ok).forward(write);

위 부분이 헷갈려서 정리해본다.

ws_stream 변수를 split()함수를 통해서 sinkstream으로 분리한다.

stdin_rx는 사용자가 terminal에서 입력한 메시지를 받는 receiver이고,
stdin_to_ws는 받은 데이터를 Ok로 감싸서 write싱크로 전달하는 하나의 퓨처(future) 객체이다.

비동기적으로 기다리고 있다가 데이터가 들어오면 다시 write(sink)으로 보내주는 역할이다.

future::select(stdin_to_ws, ws_to_stdout).await;

future::select을 통해서 두 future 객체를 동시에 실행을 한다.

결론

솔직히 완벽하게 내부를 다 확인하고 이해하려고 노력했지만 쉽지는 않은 것 같다. 그래도 최대한 분석하고 이해하려고 노력해야겠다. 이런 코드를 ai가 뚝딱 제시해 주기는 하지만 익숙해지면 스스로 생각하는 힘이 약해지는 것 같다.

0개의 댓글