Cancellation Safety in Rust

pandawithcat·2021년 10월 23일
0

Rust 잡동사니

목록 보기
4/5

최근에 업무에서 Rust를 사용하기보다는 CI/CD 및 운영에 집중하냐고 실제 코드를 쓰지 못해 몸이 근질근질거리더라구요. 그래서 주말을 사용해서 Rust 개인 프로젝트를 다시 진행하면서 여러가지 문서를 읽던 도중 제가 평소에 즐겨 사용하는 macro 인 tokio::select 에 https://docs.rs/tokio/1.12.0/tokio/macro.select.html#cancellation-safety 부분이 눈에 들어와서 이를 살펴보았는데 Rust에서 비동기 프로그래밍을 하신다면 꼭 아셔야 하는 부분인것 같아서 이렇게 글로 작성하게 되었습니다. 비동기 프로그래밍을 할때 두 개 이상의 future가 있을때 이 중 먼저 완료된것을 return 해야하는 상황에 마주친적이 있는 분이라면 이 글을 쉽고 재미있게 읽으실수 있습니다. 글 자체는 https://carllerche.com/2021/06/17/six-ways-to-make-async-rust-easier/ 의 내용의 번역에 조금 더 자세하게 설명을 추가했습니다.

항상 그렇듯, 글에 틀린점이나 궁금한점이 있으시면 pandawithcat@gmail.com 으로 보내주시면 감사하겠습니다.

Tokio::select! 파헤치기

tokio::select 는 두 개 이상의 future 가 있을때 먼저 완료되는 future의 값을 return 하게 해주는 macro 입니다. 예를 들면 다음과 같은 상황에서 select! 문은 parse_line과 channel의 recv() 중 먼저 완료되는 future 의 값을 리턴하게 됩니다.

async fn parse_line(socket: &TcpStream) -> Result<String, Error> {
    let len = socket.read_u32().await?;
    let mut line = vec![0; len];
    socket.read_exact(&mut line).await?;
    let line = str::from_utf8(line)?;
    Ok(line)
}

fn broadcase_line(s: String){
	println!("{}", s);
}

async fn handle_connection(socket: TcpStream, channel: Channel) {
  loop {
		select! {
        line_in = parse_line(&socket) => {
            if let Some(line_in) = line_in {
                broadcast_line(line_in);
            } else {
                // connection closed, exit loop
                break;
            }
        }
        line_out = channel.recv() => {
            write_line(&socket, line_out).await;
        }
  	}
  }
}

근데 select! 문서를 보다 보면 cancellation safety 의 부분이 있습니다. select! 매크로는 내부적 여러개의 branch에 대해서 wait를 하다가 그 중 하나의 branch가 complete가 되면 나머지 branch들은 cancel 하는 로직을 가지고 있습니다. 즉, 하나의 future가 있을때 이것이 완료되지 않았다면 중간에 cancel 되는 상황이죠. 일반적으로 저희가 생각할때 "어 그럼 future가 완료되기 전에 cancel 되면 문제가 없지 않을까?" 라고 생각을 할수 있는데 그렇지 않습니다. 위의 예시를 조금 더 자세히 보죠. 위 예시를 아무생각 없이 실행을 하다보면 어느 순간 소켓에서 데이터가 없어지는 마술을 목격할수 있습니다. 데이터는 분명 socket 으로 들어왔는데 데이터는 사라지는 마법이죠. cancellation safety에 대해 알지 못한다면 이를 디버깅 하는것은 거의 불가능에 가깝습니다. tokio::select! 문서를 보면 다음과 같은 함수들은 cancellation safe하지 않고 데이터 손실이 발생할수 있다고 나오네요.

이해를 돕기 위해 먼저 data loss가 발생할수 있는 함수들중 read_exact의 소스 코드를 보겠습니다.

pub(crate) fn read_exact<'a, A>(reader: &'a mut A, buf: &'a mut [u8]) -> ReadExact<'a, A>
where
    A: AsyncRead + Unpin + ?Sized,
{
    ReadExact {
        reader,
        buf: ReadBuf::new(buf),
        _pin: PhantomPinned,
    }
}

pin_project! {
    /// Creates a future which will read exactly enough bytes to fill `buf`,
    /// returning an error if EOF is hit sooner.
    ///
    /// On success the number of bytes is returned
    #[derive(Debug)]
    #[must_use = "futures do nothing unless you `.await` or poll them"]
    pub struct ReadExact<'a, A: ?Sized> {
        reader: &'a mut A,
        buf: ReadBuf<'a>,
        // Make this future `!Unpin` for compatibility with async trait methods.
        #[pin]
        _pin: PhantomPinned,
    }
}

fn eof() -> io::Error {
    io::Error::new(io::ErrorKind::UnexpectedEof, "early eof")
}

impl<A> Future for ReadExact<'_, A>
where
    A: AsyncRead + Unpin + ?Sized,
{
    type Output = io::Result<usize>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<usize>> {
        let mut me = self.project();

        loop {
            // if our buffer is empty, then we need to read some data to continue.
            let rem = me.buf.remaining();
            if rem != 0 {
                ready!(Pin::new(&mut *me.reader).poll_read(cx, &mut me.buf))?;
                if me.buf.remaining() == rem {
                    return Err(eof()).into();
                }
            } else {
                return Poll::Ready(Ok(me.buf.capacity()));
            }
        }
    }
}

보시면 read_exact 는 ReadExact라는 struct 를 만들게 되고 이 ReadExact는 Future를 implement 하고 있습니다(Rust에서 비동기 함수들은 future를 implement 하기 때문에 이처럼 struct를 하나 만들고 이 struct에 대해서 Future를 구현하게 됩니다. 만약 비동기의 poll, Future 등이 헷갈리신다면 여기서 잠시 멈추시고 제 이전 글들을 읽어주시면 감사하겠습니다) 여기서 주목해서 봐야 할 부분은 fn poll() 함수 안에서 loop 부분입니다. 보시다시피 여기서 buf가 다 찰때까지 data를 계속 읽어주게 됩니다. 만약 데이터를 다 읽지 못했는데 select! 문의 다른 브랜치가 먼저 complete 되서 read_exact future가 cancell 되게 된다면 저 buf에 남아있는 데이터는 그냥 drop이 되게 됩니다. 즉, socket 에서 읽어서 buf 에 넣었지만 buf 에 있다가 drop 이 됨으로써 데이터가 사라지게 되는 케이스이죠. 러스트가 "안전한" 언어임을 강조하는셈치고는 매우 무서운 일이 발생할수 있다는거죠.

이게 다가 아닙니다. tokio 공식문서에서 다음과 같은 함수들은 내부적으로 queue를 사용하기 때문에 cancellation safe 하지 않다고 명시하고 있습니다.

보시다시피 Mutex, RwLock 등 Lock에 관련된 함수들입니다. 즉, 일반적으로 lock 들에 대해서 lock() 함수가 여러번 호출될때 Rust는 이 호출한 순서를 queue로 관리하기 때문에 발생하는 문제입니다. 일반적으로 mutex는 kernel space 에 queue data structure(Linux에는 futux)를 가지고 있고 이 queue에 lock을 사용하려는 threads 들의 순서를 저장하게 됩니다. 즉, queue에서 자기 순서가 되는 순간 queue에서 자기 순서는 빠지면서 실제 object 에 대한 Lock 권한을 가지게 되는데 이 상태에서 만약 future가 cancel 이 된다면 다시 queue의 마지막으로 자신의 순서가 이동되는 안 좋은 상황이 발생하기 때문입니다.

select! 안전하게 사용하기

여기까지 읽으신 분들 중 많이 당황하신 분들도 있으실거라고 생각합니다. 만약 위에 언급된 cancellation safe가 아닌 함수들을 cancel safe 하게 사용하려면 어떻게 해야 할까요? 간단한 답은 cancel safe하게 cancellatoin safe 하지 않은 함수들을 wrapping 하면 됩니다. 예시로 자세히 살펴보죠.

async fn handle_connection(socket: TcpStream, channel: Channel) {
    let reader = Arc::new(socket);
    let writer = reader.clone();
    
    let read_task = task::spawn(async move {
        while let Some(line_in) in parse_line(&reader).await? {
            broadcast_line(line_in)?;
        }
        Ok(())
    });
    
    loop {
        // `channel` and JoinHandle are both "channel-like" types.
        select! {
            _ = read_task.join() => {
                // The connection closed or we encountered an error,
                // exit the loop
                break;
            }
            line_out = channel.recv() => {
                if write_line(&writer, line_out).await.is_err() {
                    read_task.cancel();
                    read_task.join();
                }
            }
        }
    }
}

즉, 이전에 parse_line 을 select! 안에서 직접 호출하는 대신 parse_line(및 broadcast_line)을 하는것을 다른 task로 분리하고 해당 task 가 만약 종료된다면(socket connection이 끊겨서 종료되거나 broadcase_line에서 에러가 발생할수 있습니다) read_task.join() 함수가 호출되면서 전체 loop이 멈추게 됩니다. 그리고 모종의 이유로 write_line에서 에러가 발생할 경우 read_task를 cancel 함으로써 socket connection 을 닫아버리게 됩니다. 즉, 이전에 parse_line이 select!안에서 직접 호출되어서 데이터 손실로 인한 장애를 겪기 보다는 parse_line을 다른 task로 분리함으로써 문제 발생시 socket connection 연결을 끊어서 데이터 손실로 인한 장애를 미연에 방지를 하는 방식입니다.

마무리하며

오늘은 tokio의 select! 문이 가지고 있는 cancellation safety 문제와 이를 해결하는 방법을 알아보았습니다. 안전한 언어로 알려진 Rust 에 이런 문제가 있는 건 상당히 곤란한 문제이므로 상당히 다양한 해결방안들이 제시되고 있습니다(대표적으로는 AsyncDrop을 추가함으로써 안전하게 future를 drop할수 있는 방법이 있습니다). 그러나 아직 Rust community에서 결론이 나지 않아서 이 문제가 공식적으로 해결되기에는 시간이 조금 더 걸릴 것 같습니다. 결국 그 전까지 async rust를 안전하기 쓰기 위해서는 공식 문서를 꼼꼼히 읽고 혹시나 발생할수 있는 문제를 미리 확인하는 방법 밖에 없습니다. 다음번에도 만약 비슷한 문제를 보게 된다면 더 재미있는 글로 돌아오도록 하겠습니다. 감사합니다.

Reference

https://carllerche.com/2021/06/17/six-ways-to-make-async-rust-easier/

https://gist.github.com/Matthias247/ffc0f189742abf6aa41a226fe07398a8

profile
개발이 어려운 개발자

0개의 댓글