async fn handle_websocket(tx: UnboundedSender<TcpStream>, tcp_stream: TcpStream) {
// ws_stream을 얻은 후
// split을 통해서
let ws_stream = match tokio_tungstenite::accept_async(tcp_stream).await {
Ok(ws_stream) => Some(ws_stream),
Err(e) => {
eprintln!("fail to accept websocket stream, error: {}", e);
None
},
};
if let Some(stream) = ws_stream {
let (outgoing, incoming) = stream.split();
let received_ws = incoming.try_for_each(|t| {
});
} else {
return;
}
}
여기서 incoming
변수의 메서드인 try_for_each
를 호출하는데 이게 어떻게 가능한지에 대해서 분석하다가 정리를 해야 할 것 같아서 작성한다.
일단 try_for_each
의 원형은 아래처럼 생김
fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = Self::Error>,
Self: Sized,
{
assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
}
매개변수로 F
타입을 받음. 즉, 제네릭 타입임.
그럼 F: FnMut(Self::Ok) -> Fut
인데 이 부분이 이해가 어려웠음.
먼저 Self
는 TryStream + ?Sized
을 구현한 타입이어야 함.
Self::Ok
가 있다는 건 연관 타입이 있다고 생각할 수 있음.
TryStream
의 시그니처를 보면 아래와 같음
pub trait TryStream: Stream + private_try_stream::Sealed {
/// The type of successful values yielded by this future
type Ok;
/// The type of failures yielded by this future
type Error;
/// Poll this `TryStream` as if it were a `Stream`.
///
/// This method is a stopgap for a compiler limitation that prevents us from
/// directly inheriting from the `Stream` trait; in the future it won't be
/// needed.
fn try_poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Ok, Self::Error>>>;
}
연관 타입이 Ok
와 Error
가 있음.
SplitStream이 TryStream
을 구현하고 있기 때문에 TryStreamEx
가 블랭킷 구현이 되어서 .try_for_each
메서드를 쓸 수 있는 거임
https://docs.rs/futures-util/0.3.31/futures_util/stream/struct.SplitStream.html
이 문서에서 찾으면 나옴
impl<S, T, E> TryStream for S
where
197 S: ?Sized + Stream<Item = Result<T, E>>,
198{
199 type Ok = T;
200 type Error = E;
201
202 fn try_poll_next(
203 self: Pin<&mut Self>,
204 cx: &mut Context<'_>,
205 ) -> Poll<Option<Result<Self::Ok, Self::Error>>> {
206 self.poll_next(cx)
207 }
208}
209
이거 보면 S
에 대해서 TryStream
을 구현하고 있는데 S
의 조건을 보면 Stream<Item = Result<T, E>>
이다.
impl<S: Stream> Stream for SplitStream<S> {
38 type Item = S::Item;
39
40 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
41 ready!(self.0.poll_lock(cx)).as_pin_mut().poll_next(cx)
42 }
43}
Stream
을 SplitStream
에 대해서 구현한걸 보면 type Item = S::Item
이다. 위에서 S
는 WebSocketStream
이다.
따라서 WebSocketStream
의 Stream
에 대한 구현을 확인해야한다.
impl<T> Stream for WebSocketStream<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Item = Result<Message, WsError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
trace!("{}:{} Stream.poll_next", file!(), line!());
// The connection has been closed or a critical error has occurred.
// We have already returned the error to the user, the `Stream` is unusable,
// so we assume that the stream has been "fused".
if self.ended {
return Poll::Ready(None);
}
match futures_util::ready!(self.with_context(Some((ContextWaker::Read, cx)), |s| {
trace!("{}:{} Stream.with_context poll_next -> read()", file!(), line!());
cvt(s.read())
})) {
Ok(v) => Poll::Ready(Some(Ok(v))),
Err(e) => {
self.ended = true;
if matches!(e, WsError::AlreadyClosed | WsError::ConnectionClosed) {
Poll::Ready(None)
} else {
Poll::Ready(Some(Err(e)))
}
}
}
}
}
연관타입인 Item
을 보면 Result<Message, WsError>
라고 나온다.
종합해보면 SplitStream
의 연관 타입인 Item
은 Result<Message, WsError>
가 되는거고, TryStream
에 대해서
impl<S, T, E> TryStream for S
where
S: ?Sized + Stream<Item = Result<T, E>>,
{
type Ok = T;
type Error = E;
S
타입이 Stream
을 구현한 타입인데 SplitStream
이 Stream
을 구현했고 연관 타입인 Item
이 Result<Message, WsError>
이므로 T
는 Message
가 되고 E
는 WsError
가 된다.
Ok
와 Error
도 타입이 정해지게 된다.
다시 try_for_each
로 돌아가게 되면
fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
where
F: FnMut(Self::Ok) -> Fut,
Fut: TryFuture<Ok = (), Error = Self::Error>,
Self: Sized,
위 메서드에서 Self::Ok
가 Message
가 되는 것이고, Self::Erorr
가 WsError
가 된다.
let received_ws = incoming.try_for_each(|m| {
});
여기서 closure의 매개변수인 m
은
Message
인게 확인 되었다.
let received_ws = incoming.try_for_each(|m| {
println!("[recv] msg: {}", m.to_text().unwrap());
future::ok(())
});
received_ws.await.unwrap();
내가 설명하고도 모르겠다... 너무 복잡한 것 같다. 도대체 이걸 누가 만들었는지 경이롭다.
일단 이해한 데로 작성을 했는데 또 수정을 해야겠다.