Async Rust(3) - Task stealing implementation

Migo·2024년 12월 30일

Read the following if you haven't already:

1) Building async runtime

2) Worker pool and priority queue implementation for async runtime

Task Stealing

Currently we have queues. For each queue we have consuming thread workers as follows:

Given that workers are specific to a queue, it would be wasteful if we don't exploit them fully. Imagine that the low priority queue is receiving a lot of jobs whereas high queue is sitting idle.

In that situation, we want the consuming threads for high queue somehow steal tasks from low priority queue.

implementation

We will have channels as a return so we can have multiple producer and consumer for each queue:

    static HIGH_CHANNEL: LazyLock<(flume::Sender<Runnable>, flume::Receiver<Runnable>)> =
        LazyLock::new(|| {
            flume::unbounded()
        });
        
   static LOW_CHANNEL: LazyLock<(flume::Sender<Runnable>, flume::Receiver<Runnable>)> =
        LazyLock::new(|| {
            flume::unbounded()
        });

Now we need to refactor the queues-creating logics:

  static HIGH_QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
        // give more workers to high queue
        for _ in 0..2 {
            let high_recv = HIGH_CHANNEL.1.clone();
            let low_recv = LOW_CHANNEL.1.clone();
            add_worker(high_recv, low_recv);
        }
        HIGH_CHANNEL.0.clone()
    });
    static LOW_QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
        // give more workers to high queue
        add_worker(LOW_CHANNEL.1.clone(), HIGH_CHANNEL.1.clone());
        LOW_CHANNEL.0.clone()
    });
    
    
    
// spawn worker that will steal the task from less preferred queue when preferred queue is empty
fn spawn_work_stealing_worker(preferred_recv: Receiver<Runnable>, less_preferred_recv: Receiver<Runnable>) {
    thread::spawn(move || loop {
        match preferred_recv.try_recv() {
            Ok(runnable) => {
                let _ = catch_unwind(|| runnable.run());
            }
            Err(_) => match less_preferred_recv.try_recv() {
                Ok(runnable) => {
                    let _ = catch_unwind(|| runnable.run());
                }
                Err(_) => {
                    thread::sleep(Duration::from_millis(1));
                }
            },
        }
    });
}

Summary

Now we have our own async runtime with task-stealing feature. Note however, if we want some clear separation, say one queue for CPU-bound operation and the other for IO-bound, allowing this task stealing would not give much away.

profile
Dude with existential crisis

0개의 댓글