Async Rust(1) - building async runtime

Migo·2024년 12월 28일

async-rust

목록 보기
1/2

Dependencies

# Cargo.toml
[dependencies]
async-task = "*"
futures-lite = "*"
flume = "*"

async-task

This crate provides the core functionality needed to convert futures into tasks

futures-lite

A lightweight implementation of futures

flume

A multi-producer, multi-consumer channel that allows tasks to be safely passed around within the runtime. With this channel, you can simply clone receiver. In addition, you can exploit unbounded channels that can hold an unlimited number of messages and implements lock-free algorithms.

Basic implementation

spawn_task

fn spawn_task<F, T>(future: F) -> async_task::Task<T>
where
    F: Future<Output = T> + Send + 'static,
    T: Send + 'static,
{
    static QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
        let (tx, rx) = flume::unbounded::<Runnable>();

        thread::spawn(move || {
            while let Ok(runnable) = rx.recv() {
                println!("Received runnable");
                let _ = catch_unwind(|| runnable.run());
            }
        });
        tx
    });

    let scheduler = |runnable| QUEUE.send(runnable).unwrap();
    let (runnable, task) = async_task::spawn(future, scheduler);
    runnable.schedule();
    println!("current num of tasks: {}", QUEUE.len());
    task
}

function signature

This is generic function that accept ANY type that implements both future and send traits.

'static lifetime is required because we cannot force programmers to wait for a task to finish.

static QUEUE

    static QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
     	// ...
    }

with static, we ensure that the queue is living throughout the lifetime of the program. Note that the LazyLock is lazily initialized - if this is not used, it won't be used. This has implication when we build task stealing. We will get back to this in the next post.

Runnable is a handle for runnable task. This handle has run method that polls the future by consuming the ownership. When Waker wakes the task, Runnable get scheduled onto the task queue.

static QUEUE: LazyLock<flume::Sender<Runnable>> = LazyLock::new(|| {
        let (tx, rx) = flume::unbounded::<Runnable>();

        thread::spawn(move || {
            while let Ok(runnable) = rx.recv() {
                println!("Received runnable");
                let _ = catch_unwind(|| runnable.run());
            }
        });
        tx
    });

Inside queue-initializing closure, we also spawn the thread in which we pass receiver for Runnable and start accepting message. This is blocking operation because we are building async queues to handle async tasks.

catch_unwind here runs the code and catches any error that's thrown while the code is running.

Now, receiver is up and ready. We need to send runnable to the queue so it can process.

    let scheduler = |runnable| QUEUE.send(runnable).unwrap();
    let (runnable, task) = async_task::spawn(future, scheduler);
    runnable.schedule();

We create runnable and task by using the async_task::spawn. Inside this function, you will see unsafe block that allocates the future onto the heap. The runnable and task returned have a pointer to the same future. But then runnable is scheduled to the queue only when its schedule method is called. What this means is that if we don't invoke schedule method, task will not run.

You may also want to note that whereas the function signature is generic, the queue itself is not for generic. If you are particular about type safety, this may strike you bizarre and you are not alone. In fact, if you go over to the implementation ofasync_task::spawn you'll see unsafe block

Implementing futures

Now, basic async runtime is implemented. We need some future implementation to test.

ExternalCall

struct ExternalCall {
    cnt: u32,
}
impl ExternalCall {
    fn new() -> Self {
        Self { cnt: 0 }
    }
}

impl Future for ExternalCall {
    type Output = String;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.cnt += 1;

        if self.cnt > 5 {
            println!("External call done");
            Poll::Ready("External call done".to_string())
        } else {
            println!("External call not ready yet");
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

Let's run!

We can now run some futures in our runtime:

fn main() {
    let ext_call1 = ExternalCall::new();
    let ext_call2 = ExternalCall::new();

    let group = spawn_task(async {
        ExternalCall::new().await;
        ExternalCall::new().await;
    });
    sleep(Duration::from_micros(2));
    println!("block to ensure all tasks are done");
    futures_lite::future::block_on(ext_call1);
    futures_lite::future::block_on(ext_call2);
    futures_lite::future::block_on(group);
}

Now, if you run this, you'll see the result like the following:

current num of tasks: 1
current num of tasks: 1
current num of tasks: 2
Received runnable
External call not ready yet
block to ensure all tasks are done
Received runnable
External call not ready yet
Received runnable
External call not ready yet
...
...
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call done
Received runnable
External call done
Received runnable
External call done
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call not ready yet
Received runnable
External call done

Visual representation of what happened

profile
Dude with existential crisis

0개의 댓글