# Cargo.toml
[dependencies]
async-task = "*"
futures-lite = "*"
flume = "*"
This crate provides the core functionality needed to convert futures into tasks
A lightweight implementation of futures
A multi-producer, multi-consumer channel that allows tasks to be safely passed around within the runtime. With this channel, you can simply clone receiver. In addition, you can exploit unbounded channels that can hold an unlimited number of messages and implements lock-free algorithms.
fn spawn_task<F, T>(future: F) -> async_task::Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
static QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
let (tx, rx) = flume::unbounded::<Runnable>();
thread::spawn(move || {
while let Ok(runnable) = rx.recv() {
println!("Received runnable");
let _ = catch_unwind(|| runnable.run());
}
});
tx
});
let scheduler = |runnable| QUEUE.send(runnable).unwrap();
let (runnable, task) = async_task::spawn(future, scheduler);
runnable.schedule();
println!("current num of tasks: {}", QUEUE.len());
task
}
This is generic function that accept ANY type that implements both future and send traits.
'static lifetime is required because we cannot force programmers to wait for a task to finish.
static QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
// ...
}
with static, we ensure that the queue is living throughout the lifetime of the program. Note that the LazyLock is lazily initialized - if this is not used, it won't be used. This has implication when we build task stealing. We will get back to this in the next post.
Runnable is a handle for runnable task. This handle has run method that polls the future by consuming the ownership. When Waker wakes the task, Runnable get scheduled onto the task queue.
static QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
let (tx, rx) = flume::unbounded::<Runnable>();
thread::spawn(move || {
while let Ok(runnable) = rx.recv() {
println!("Received runnable");
let _ = catch_unwind(|| runnable.run());
}
});
tx
});
Inside queue-initializing closure, we also spawn the thread in which we pass receiver for Runnable and start accepting message. This is blocking operation because we are building async queues to handle async tasks.
catch_unwind here runs the code and catches any error that's thrown while the code is running.
Now, receiver is up and ready. We need to send runnable to the queue so it can process.
let scheduler = |runnable| QUEUE.send(runnable).unwrap();
let (runnable, task) = async_task::spawn(future, scheduler);
runnable.schedule();
We create runnable and task by using the async_task::spawn. Inside this function, you will see unsafe block that allocates the future onto the heap. The runnable and task returned have a pointer to the same future. But then runnable is scheduled to the queue only when its schedule method is called. What this means is that if we don't invoke schedule method, task will not run.
You may also want to note that whereas the function signature is generic, the queue itself is not for generic. If you are particular about type safety, this may strike you bizarre and you are not alone. In fact, if you go over to the implementation ofasync_task::spawn you'll see unsafe block
Now, basic async runtime is implemented. We need some future implementation to test.
struct ExternalCall {
cnt: u32,
}
impl ExternalCall {
fn new() -> Self {
Self { cnt: 0 }
}
}
impl Future for ExternalCall {
type Output = String;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.cnt += 1;
if self.cnt > 5 {
println!("External call done");
Poll::Ready("External call done".to_string())
} else {
println!("External call not ready yet");
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
We can now run some futures in our runtime:
fn main() {
let ext_call1 = ExternalCall::new();
let ext_call2 = ExternalCall::new();
let group = spawn_task(async {
ExternalCall::new().await;
ExternalCall::new().await;
});
sleep(Duration::from_micros(2));
println!("block to ensure all tasks are done");
futures_lite::future::block_on(ext_call1);
futures_lite::future::block_on(ext_call2);
futures_lite::future::block_on(group);
}
Now, if you run this, you'll see the result like the following:
current num of tasks: 1
current num of tasks: 1
current num of tasks: 2
Received runnable
External call not ready yet
block to ensure all tasks are done
Received runnable
External call not ready yet
Received runnable
External call not ready yet
...
...
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call done
Received runnable
External call done
Received runnable
External call done
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call done
