rust trait extension

wangki·2025년 8월 5일
0

Rust

목록 보기
45/54

trait 지옥에 대해서 분석해보자.

async fn handle_websocket(tx: UnboundedSender<TcpStream>, tcp_stream: TcpStream) {
    // ws_stream을 얻은 후 
    // split을 통해서 
    let ws_stream = match tokio_tungstenite::accept_async(tcp_stream).await {
        Ok(ws_stream) => Some(ws_stream),
        Err(e) => {
            eprintln!("fail to accept websocket stream, error: {}", e);
            None
        },
    };

    if let Some(stream) = ws_stream {
        let (outgoing, incoming) = stream.split();
        let received_ws = incoming.try_for_each(|t| {

        });

    } else {
        return;
    }
}

여기서 incoming 변수의 메서드인 try_for_each를 호출하는데 이게 어떻게 가능한지에 대해서 분석하다가 정리를 해야 할 것 같아서 작성한다.

트레이트 괴물

일단 try_for_each의 원형은 아래처럼 생김

fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
    where
        F: FnMut(Self::Ok) -> Fut,
        Fut: TryFuture<Ok = (), Error = Self::Error>,
        Self: Sized,
    {
        assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
    }

매개변수로 F타입을 받음. 즉, 제네릭 타입임.
그럼 F: FnMut(Self::Ok) -> Fut인데 이 부분이 이해가 어려웠음.

먼저 SelfTryStream + ?Sized을 구현한 타입이어야 함.
Self::Ok가 있다는 건 연관 타입이 있다고 생각할 수 있음.

TryStream의 시그니처를 보면 아래와 같음

pub trait TryStream: Stream + private_try_stream::Sealed {
    /// The type of successful values yielded by this future
    type Ok;

    /// The type of failures yielded by this future
    type Error;

    /// Poll this `TryStream` as if it were a `Stream`.
    ///
    /// This method is a stopgap for a compiler limitation that prevents us from
    /// directly inheriting from the `Stream` trait; in the future it won't be
    /// needed.
    fn try_poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Result<Self::Ok, Self::Error>>>;
}

연관 타입이 OkError가 있음.


SplitStream이 TryStream을 구현하고 있기 때문에 TryStreamEx가 블랭킷 구현이 되어서 .try_for_each 메서드를 쓸 수 있는 거임
https://docs.rs/futures-util/0.3.31/futures_util/stream/struct.SplitStream.html
이 문서에서 찾으면 나옴

impl<S, T, E> TryStream for S
where
197    S: ?Sized + Stream<Item = Result<T, E>>,
198{
199    type Ok = T;
200    type Error = E;
201
202    fn try_poll_next(
203        self: Pin<&mut Self>,
204        cx: &mut Context<'_>,
205    ) -> Poll<Option<Result<Self::Ok, Self::Error>>> {
206        self.poll_next(cx)
207    }
208}
209

이거 보면 S에 대해서 TryStream을 구현하고 있는데 S의 조건을 보면 Stream<Item = Result<T, E>>이다.

impl<S: Stream> Stream for SplitStream<S> {
38    type Item = S::Item;
39
40    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
41        ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx)
42    }
43}

StreamSplitStream에 대해서 구현한걸 보면 type Item = S::Item이다. 위에서 SWebSocketStream이다.

따라서 WebSocketStreamStream에 대한 구현을 확인해야한다.

impl<T> Stream for WebSocketStream<T>
where
    T: AsyncRead + AsyncWrite + Unpin,
{
    type Item = Result<Message, WsError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        trace!("{}:{} Stream.poll_next", file!(), line!());

        // The connection has been closed or a critical error has occurred.
        // We have already returned the error to the user, the `Stream` is unusable,
        // so we assume that the stream has been "fused".
        if self.ended {
            return Poll::Ready(None);
        }

        match futures_util::ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| {
            trace!("{}:{} Stream.with_context poll_next -> read()", file!(), line!());
            cvt(s.read())
        })) {
            Ok(v) => Poll::Ready(Some(Ok(v))),
            Err(e) => {
                self.ended = true;
                if matches!(e, WsError::AlreadyClosed | WsError::ConnectionClosed) {
                    Poll::Ready(None)
                } else {
                    Poll::Ready(Some(Err(e)))
                }
            }
        }
    }
}

연관타입인 Item을 보면 Result<Message, WsError>라고 나온다.
종합해보면 SplitStream의 연관 타입인 ItemResult<Message, WsError>가 되는거고, TryStream에 대해서

impl<S, T, E> TryStream for S
where
    S: ?Sized + Stream<Item = Result<T, E>>,
{
    type Ok = T;
    type Error = E;

S 타입이 Stream을 구현한 타입인데 SplitStreamStream을 구현했고 연관 타입인 ItemResult<Message, WsError>이므로 TMessage가 되고 EWsError가 된다.
OkError도 타입이 정해지게 된다.

다시 try_for_each로 돌아가게 되면

fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
    where
        F: FnMut(Self::Ok) -> Fut,
        Fut: TryFuture<Ok = (), Error = Self::Error>,
        Self: Sized,
    

위 메서드에서 Self::OkMessage가 되는 것이고, Self::ErorrWsError가 된다.

let received_ws = incoming.try_for_each(|m| {

        });

여기서 closure의 매개변수인 m
Message인게 확인 되었다.

        let received_ws = incoming.try_for_each(|m| {
            println!("[recv] msg: {}", m.to_text().unwrap());
           future::ok(()) 
        });

        received_ws.await.unwrap();

내가 설명하고도 모르겠다... 너무 복잡한 것 같다. 도대체 이걸 누가 만들었는지 경이롭다.

일단 이해한 데로 작성을 했는데 또 수정을 해야겠다.

0개의 댓글