스레드를 생성 시, spawn
메서드에 클로저를 넘겨주게 된다. spawn
함수의 원형을 보면
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(f).expect("failed to spawn thread")
}
F
제네릭 타입이 Send
+ 'static
이다. 여기서 중요한 것은 'static
을 가진다는 것이다. 이 의미는 FnOnce
가 호출 시 내부의 변수들은 모두 소유권을 가지고 있어야 한다는 의미이다. 왜냐면 스레드를 생성하는 함수의 라이프타임과 스레드의 라이프타임은 일치하지 않기 때문이다.
struct MyFuture {
cnt: i32,
}
impl Future for MyFuture {
type Output = Result<(), Box<dyn Error>>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
if self.cnt == 10 {
return std::task::Poll::Ready(Ok(()));
} else {
println!("cnt is not 10, cnt: {}, it will reutrn pending", self.cnt);
println!("it i will make new thread that wakes Task after 1 seconds");
let cx_clone = cx.waker().clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
cx_clone.wake();
});
self.get_mut().cnt += 1;
return std::task::Poll::Pending;
}
}
}
비동기에 대해서 공부하고 있는데 Future
trait을 구현하여 future 객체를 만들어보고자 하였다.
Poll::Pending
을 반환하기 전에 중요한 것은, 미래의 어느 시점에 Waker
가 호출되도록 준비해야한다.
여기서는 poll
메서드가 호출 시 스레드를 만들어서 1초 후 wake
를 호출하도록 할 예정이다.
여기서 최초에
std::thread::spawn(move || {
let cx_clone = cx.waker().clone();
std::thread::sleep(Duration::from_secs(1));
cx_clone.wake();
});
클로저 내부에서 cx_clone
을 생성하였더니 다음과 같은 에러가 발생하였다.
`LocalWaker` cannot be shared between threads safely
the trait `Sync` is not implemented for `LocalWaker`
required for `&LocalWaker` to implement `Send`
required because it appears within the type `&mut Context<'_>`
간단하게 해석하자면 Context
객체가 가지는 필드 중 LocalWaker
가 threads safely하게 구현되지 않았기에 cx자체를 넘겨서 복제를 할 수 없다.
그리고 cx가 &mut
으로 가변 참조 형태이기에 소유권자체가 넘어가는 것이 아니라 넘기면 안된다.
여러 이유로 매개변수인 cx 자체를 넘기면 안되고,
let cx_clone = cx.waker().clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
cx_clone.wake();
});
이렇게 외부에서 클론을 통해서 따로 소유권을 가진 Waker
를 만들어준 뒤, 클로저로 넘겨줘야한다.
use std::{error::Error, thread, time::Duration};
#[tokio::main]
async fn main() {
let future = MyFuture { cnt: 0 };
future.await.unwrap();
}
//
struct MyFuture {
cnt: i32,
}
impl Future for MyFuture {
type Output = Result<(), Box<dyn Error>>;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
if self.cnt == 10 {
return std::task::Poll::Ready(Ok(()));
} else {
println!("cnt is not 10, cnt: {}, it will reutrn pending", self.cnt);
println!("it i will make new thread that wake after 1 seconds");
let cx_clone = cx.waker().clone();
std::thread::spawn(move || {
println!("it will sleep almost 1 secs");
std::thread::sleep(Duration::from_secs(1));
cx_clone.wake();
println!("success wake up Task");
});
self.get_mut().cnt += 1;
return std::task::Poll::Pending;
}
}
}
future
는 .await
을 만나면 poll 메서드가 호출이된다.
최초 1회는 호출되지만 Waker
가 미래의 어느 시점에서 wake
를 호출하도록 설정을 해야한다.
cnt
의 값이 10이 아니라면 Pending
을 반환 전에 스레드를 생성하여 1초 후 wake
를 호출하도록 설정했다.
실행을 하게되면 정상적으로 실행이된다.
끝!