Read this post, if you haven't already.
Right now, with the current implementation where we spawn only one thread to consume the Runnable, we don't exploit the idle cores when most of modern computers support multiple cores.
static QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
let (tx, rx) = flume::unbounded::<Runnable>();
// Just one thread, and this is static!
thread::spawn(move || {
while let Ok(runnable) = rx.recv() {
println!("Received runnable");
let _ = catch_unwind(|| runnable.run());
}
});
tx
});
Considering that we initialize the QUEUE lazily, we can simply copy over the receiver and let it move to spawned thread:
static QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
let (tx, rx) = flume::unbounded::<Runnable>();
// for loop added
for _ in 0..3 {
// rx got cloned so now channel became multi-consumer
let recv = rx.clone();
thread::spawn(move || {
while let Ok(runnable) = recv.recv() {
println!("Received runnable");
let _ = catch_unwind(|| runnable.run());
}
});
}
tx
});
Async is not for CPU-bound job?
You may have heard the word like "async is not for CPU-bound job!"
But async is just a mechanism. You have to be able to use it for what you want as long as it makes sense.
Now, we successfully added multiple workers to one queue. The question is, do we need multiple queues?
Multi-queue implementation makes sense when we have different priority for tasks.
Firstly, we want to refactor spawn_task function to accept priority:
enum Priority {
High,
Low,
}
fn spawn_task<F, T>(future: F, priority: Priority) -> async_task::Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
//...
}
Let's give default priority with macro:
macro_rules! spawn_task {
($future:expr) => {
spawn_task($future, Priority::Low) // default
};
($future:expr, $priority:expr) => {
spawn_task($future, $priority)
};
}
And then depending on the priority, we want to have different queue to receive tasks.
// ...
static HIGH_QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
let (tx, rx) = flume::unbounded::<Runnable>();
// give more workers to high queue
for _ in 0..2 {
let recv = rx.clone();
add_worker(recv);
}
tx
});
static LOW_QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
let (tx, rx) = flume::unbounded::<Runnable>();
// give just one worker to low queue
add_worker(rx);
tx
});
let scheduler = match priority {
Priority::High => |runnable| HIGH_QUEUE.send(runnable).unwrap(),
Priority::Low => |runnable| LOW_QUEUE.send(runnable).unwrap(),
};
// ...
So the main function will be refactored as follows:
fn main() {
let ext_call1 = ExternalCall::new();
let ext_call2 = ExternalCall::new();
let t1 = spawn_task!(ext_call1);
let t2 = spawn_task!(ext_call2);
let group = spawn_task!(async {
ExternalCall::new().await;
ExternalCall::new().await;
});
sleep(Duration::from_micros(2));
println!("block to ensure all tasks are done");
futures_lite::future::block_on(t1);
futures_lite::future::block_on(t2);
futures_lite::future::block_on(group);
}
Digression!
We can further simplify main with the following:macro_rules! join { ($($future:expr),*) => { { let mut res = Vec::new(); $( res.push(futures_lite::future::block_on($future)); )* res } }; }Note however, this implementation will accept futures with return type being the same as each other as
Vecaccepts only one type.
This implementation effectively demonstrates how priority is handled in asynchronous processing. However, it leaves room for optimization.
Consider a scenario where the LOW_QUEUE is overwhelmed with tasks, while the HIGH_QUEUE remains idle. How can we address this imbalance? The solution lies in work stealing.
In the next post, we'll explore how to implement work stealing to improve efficiency.