rust thread safe closer capture & poll method

wangki·2025년 8월 8일
0

Rust

목록 보기
46/54

thread safe 하게 변수 캡처하기

스레드를 생성 시, spawn 메서드에 클로저를 넘겨주게 된다. spawn함수의 원형을 보면

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static,
{
    Builder::new().spawn(f).expect("failed to spawn thread")
}

F 제네릭 타입이 Send + 'static이다. 여기서 중요한 것은 'static을 가진다는 것이다. 이 의미는 FnOnce가 호출 시 내부의 변수들은 모두 소유권을 가지고 있어야 한다는 의미이다. 왜냐면 스레드를 생성하는 함수의 라이프타임과 스레드의 라이프타임은 일치하지 않기 때문이다.

poll 메서드 예시

struct MyFuture {
    cnt: i32,
}

impl Future for MyFuture {
    type Output = Result<(), Box<dyn Error>>;

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        if self.cnt == 10 {
            return std::task::Poll::Ready(Ok(()));
        } else {            
            println!("cnt is not 10, cnt: {}, it will reutrn pending", self.cnt);
            println!("it i will make new thread that wakes Task after 1 seconds");

            let cx_clone = cx.waker().clone();
            std::thread::spawn(move || {
                std::thread::sleep(Duration::from_secs(1));
                cx_clone.wake();
            });
            self.get_mut().cnt += 1;
            return std::task::Poll::Pending;
        }
    }
}

비동기에 대해서 공부하고 있는데 Future trait을 구현하여 future 객체를 만들어보고자 하였다.

Poll::Pending을 반환하기 전에 중요한 것은, 미래의 어느 시점에 Waker가 호출되도록 준비해야한다.

여기서는 poll 메서드가 호출 시 스레드를 만들어서 1초 후 wake를 호출하도록 할 예정이다.
여기서 최초에

std::thread::spawn(move || {
    let cx_clone = cx.waker().clone();
    std::thread::sleep(Duration::from_secs(1));
    cx_clone.wake();
});

클로저 내부에서 cx_clone을 생성하였더니 다음과 같은 에러가 발생하였다.

`LocalWaker` cannot be shared between threads safely
the trait `Sync` is not implemented for `LocalWaker`
required for `&LocalWaker` to implement `Send`
required because it appears within the type `&mut Context<'_>`

간단하게 해석하자면 Context객체가 가지는 필드 중 LocalWaker가 threads safely하게 구현되지 않았기에 cx자체를 넘겨서 복제를 할 수 없다.

그리고 cx가 &mut으로 가변 참조 형태이기에 소유권자체가 넘어가는 것이 아니라 넘기면 안된다.
여러 이유로 매개변수인 cx 자체를 넘기면 안되고,

let cx_clone = cx.waker().clone();
std::thread::spawn(move || {
    std::thread::sleep(Duration::from_secs(1));
    cx_clone.wake();
});

이렇게 외부에서 클론을 통해서 따로 소유권을 가진 Waker를 만들어준 뒤, 클로저로 넘겨줘야한다.

MyFuture 생성

use std::{error::Error, thread, time::Duration};

#[tokio::main]
async fn main() {
    let future = MyFuture { cnt: 0 };
    future.await.unwrap();
}

// 
struct MyFuture {
    cnt: i32,
}

impl Future for MyFuture {
    type Output = Result<(), Box<dyn Error>>;

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
        if self.cnt == 10 {
            return std::task::Poll::Ready(Ok(()));
        } else {            
            println!("cnt is not 10, cnt: {}, it will reutrn pending", self.cnt);
            println!("it i will make new thread that wake after 1 seconds");
            
            let cx_clone = cx.waker().clone();
            std::thread::spawn(move || {
                println!("it will sleep almost 1 secs");
                std::thread::sleep(Duration::from_secs(1));
                cx_clone.wake();
                println!("success wake up Task");
            });

            self.get_mut().cnt += 1;
            return std::task::Poll::Pending;
        }
    }
}

future.await을 만나면 poll 메서드가 호출이된다.
최초 1회는 호출되지만 Waker가 미래의 어느 시점에서 wake를 호출하도록 설정을 해야한다.
cnt의 값이 10이 아니라면 Pending을 반환 전에 스레드를 생성하여 1초 후 wake를 호출하도록 설정했다.

실행을 하게되면 정상적으로 실행이된다.

끝!

0개의 댓글