๐Ÿฆ€ Functional Rust

336: Executor Basics

Difficulty: 5 Level: Master A minimal async executor โ€” the engine that drives futures to completion by polling them.

The Problem This Solves

`async fn` and `await` are syntax sugar. They compile into state machines that implement `Future`. But futures are inert โ€” they do nothing on their own. Something must call `future.poll(cx)` to drive them forward. That something is an executor. Tokio, async-std, and smol are all executors. Understanding how they work demystifies async Rust: why tasks are spawned, why `.await` doesn't block, and why `block_on` is needed at the top level. This example builds a real (if minimal) executor from scratch: it has a task queue, a waker that re-schedules tasks, and a run loop that processes tasks until all are done. This is the most foundational example in the async section. Once you understand executors, everything else โ€” structured concurrency, cancellation, async mutex โ€” makes sense.

The Intuition

Imagine a to-do list manager: 1. You add tasks to a queue. 2. The manager picks a task and runs it until it says "I'm blocked, come back later" (`Poll::Pending`). 3. When the task is unblocked, it re-adds itself to the queue. 4. The manager keeps working until the queue is empty. That's an executor. The `Waker` is the mechanism by which a blocked task says "add me back to the queue." Python asyncio works the same way: `asyncio.get_event_loop().run_until_complete(coro)` is exactly `block_on`.

How It Works in Rust

struct Task {
 future: Mutex<Option<BoxFuture>>,      // the future to drive
 sender: mpsc::SyncSender<Arc<Task>>,   // self-reschedule channel
}

impl Task {
 fn schedule(self: &Arc<Self>) {
     // Put self back in the executor queue
     let _ = self.sender.send(Arc::clone(self));
 }
}

// Waker wraps a Task Arc โ€” waking = scheduling
fn make_waker(task: Arc<Task>) -> Waker {
 // unsafe: manual vtable for clone/wake/drop on the Arc<Task>
 unsafe { Waker::from_raw(RawWaker::new(...)) }
}

impl SimpleExecutor {
 fn run(self) {
     drop(self.tx);  // when no more tasks are spawnable, recv() will end
     while let Ok(task) = self.rx.recv() {
         let mut slot = task.future.lock().unwrap();
         if let Some(mut f) = slot.take() {
             let w = make_waker(Arc::clone(&task));
             let mut cx = Context::from_waker(&w);
             if f.as_mut().poll(&mut cx) == Poll::Pending {
                 *slot = Some(f);  // put it back โ€” it'll reschedule via waker
             }
             // if Poll::Ready: task is done, slot stays empty
         }
     }
 }
}
The `mpsc::sync_channel` acts as the task queue. Dropping `self.tx` ensures `rx.recv()` returns `Err` once all tasks complete and no more can be enqueued โ€” cleanly ending the run loop.

What This Unlocks

Key Differences

ConceptOCamlRust
ExecutorLwt scheduler (implicit, always running)Explicit: must call `block_on` or `run`
Task schedulingLwt internal queue`mpsc::SyncSender<Arc<Task>>`
Wakeup mechanismLwt resolver / callbacks`Waker::wake()` โ†’ re-enqueue task
Spawn`Lwt.async (fun () -> ...)``executor.spawn(async { ... })`
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, mpsc};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

type BoxFuture = Pin<Box<dyn Future<Output=()> + Send>>;

struct Task {
    future: Mutex<Option<BoxFuture>>,
    sender: mpsc::SyncSender<Arc<Task>>,
}

impl Task {
    fn schedule(self: &Arc<Self>) { let _ = self.sender.send(Arc::clone(self)); }
}

fn make_waker(task: Arc<Task>) -> Waker {
    let ptr = Arc::into_raw(task) as *const ();
    unsafe fn cw(p:*const())->RawWaker{let a=Arc::from_raw(p as *const Task);std::mem::forget(Arc::clone(&a));std::mem::forget(a);RawWaker::new(p,&V)}
    unsafe fn w(p:*const()){Arc::from_raw(p as *const Task).schedule();}
    unsafe fn wbr(p:*const()){let a=Arc::from_raw(p as *const Task);a.schedule();std::mem::forget(a);}
    unsafe fn dw(p:*const()){drop(Arc::from_raw(p as *const Task));}
    static V: RawWakerVTable = RawWakerVTable::new(cw,w,wbr,dw);
    unsafe { Waker::from_raw(RawWaker::new(ptr, &V)) }
}

struct SimpleExecutor { rx: mpsc::Receiver<Arc<Task>>, tx: mpsc::SyncSender<Arc<Task>> }

impl SimpleExecutor {
    fn new() -> Self { let (tx,rx) = mpsc::sync_channel(100); Self{rx,tx} }
    fn spawn(&self, fut: impl Future<Output=()>+Send+'static) {
        let task = Arc::new(Task{future:Mutex::new(Some(Box::pin(fut))),sender:self.tx.clone()});
        task.schedule();
    }
    fn run(self) {
        drop(self.tx);
        while let Ok(task) = self.rx.recv() {
            let mut slot = task.future.lock().unwrap();
            if let Some(mut f) = slot.take() {
                let w = make_waker(Arc::clone(&task));
                let mut cx = Context::from_waker(&w);
                if f.as_mut().poll(&mut cx) == Poll::Pending { *slot = Some(f); }
            }
        }
    }
}

fn main() {
    let ex = SimpleExecutor::new();
    ex.spawn(async { println!("Task A"); });
    ex.spawn(async { println!("Task B"); });
    ex.spawn(async { println!("Task C"); });
    ex.run();
    println!("All done");
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::{AtomicUsize,Ordering};
    #[test] fn runs_tasks() {
        let counter = Arc::new(AtomicUsize::new(0));
        let ex = SimpleExecutor::new();
        for _ in 0..5 { let c = Arc::clone(&counter); ex.spawn(async move { c.fetch_add(1,Ordering::SeqCst); }); }
        ex.run();
        assert_eq!(counter.load(Ordering::SeqCst), 5);
    }
}
(* OCaml: minimal cooperative scheduler *)

type task = { mutable ready: bool; f: unit -> unit }
let queue : task Queue.t = Queue.create ()
let spawn f = Queue.add { ready=true; f } queue
let run () =
  while not (Queue.is_empty queue) do
    let tasks = Queue.fold (fun a t -> t::a) [] queue in
    Queue.clear queue;
    List.iter (fun t -> if t.ready then t.f ()) (List.rev tasks)
  done

let () =
  spawn (fun () -> Printf.printf "Task A\n");
  spawn (fun () -> Printf.printf "Task B\n");
  run ()