336: Executor Basics
Difficulty: 5 Level: Master A minimal async executor โ the engine that drives futures to completion by polling them.The Problem This Solves
`async fn` and `await` are syntax sugar. They compile into state machines that implement `Future`. But futures are inert โ they do nothing on their own. Something must call `future.poll(cx)` to drive them forward. That something is an executor. Tokio, async-std, and smol are all executors. Understanding how they work demystifies async Rust: why tasks are spawned, why `.await` doesn't block, and why `block_on` is needed at the top level. This example builds a real (if minimal) executor from scratch: it has a task queue, a waker that re-schedules tasks, and a run loop that processes tasks until all are done. This is the most foundational example in the async section. Once you understand executors, everything else โ structured concurrency, cancellation, async mutex โ makes sense.The Intuition
Imagine a to-do list manager: 1. You add tasks to a queue. 2. The manager picks a task and runs it until it says "I'm blocked, come back later" (`Poll::Pending`). 3. When the task is unblocked, it re-adds itself to the queue. 4. The manager keeps working until the queue is empty. That's an executor. The `Waker` is the mechanism by which a blocked task says "add me back to the queue." Python asyncio works the same way: `asyncio.get_event_loop().run_until_complete(coro)` is exactly `block_on`.How It Works in Rust
struct Task {
future: Mutex<Option<BoxFuture>>, // the future to drive
sender: mpsc::SyncSender<Arc<Task>>, // self-reschedule channel
}
impl Task {
fn schedule(self: &Arc<Self>) {
// Put self back in the executor queue
let _ = self.sender.send(Arc::clone(self));
}
}
// Waker wraps a Task Arc โ waking = scheduling
fn make_waker(task: Arc<Task>) -> Waker {
// unsafe: manual vtable for clone/wake/drop on the Arc<Task>
unsafe { Waker::from_raw(RawWaker::new(...)) }
}
impl SimpleExecutor {
fn run(self) {
drop(self.tx); // when no more tasks are spawnable, recv() will end
while let Ok(task) = self.rx.recv() {
let mut slot = task.future.lock().unwrap();
if let Some(mut f) = slot.take() {
let w = make_waker(Arc::clone(&task));
let mut cx = Context::from_waker(&w);
if f.as_mut().poll(&mut cx) == Poll::Pending {
*slot = Some(f); // put it back โ it'll reschedule via waker
}
// if Poll::Ready: task is done, slot stays empty
}
}
}
}
The `mpsc::sync_channel` acts as the task queue. Dropping `self.tx` ensures `rx.recv()` returns `Err` once all tasks complete and no more can be enqueued โ cleanly ending the run loop.
What This Unlocks
- Understanding tokio internals โ tokio's work-stealing thread pool is a sophisticated version of exactly this pattern.
- Embedded async โ write a minimal executor for `no_std` environments where you can't use tokio.
- Testing async code โ single-threaded executors like this make async tests deterministic and fast.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Executor | Lwt scheduler (implicit, always running) | Explicit: must call `block_on` or `run` |
| Task scheduling | Lwt internal queue | `mpsc::SyncSender<Arc<Task>>` |
| Wakeup mechanism | Lwt resolver / callbacks | `Waker::wake()` โ re-enqueue task |
| Spawn | `Lwt.async (fun () -> ...)` | `executor.spawn(async { ... })` |
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, mpsc};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
type BoxFuture = Pin<Box<dyn Future<Output=()> + Send>>;
struct Task {
future: Mutex<Option<BoxFuture>>,
sender: mpsc::SyncSender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) { let _ = self.sender.send(Arc::clone(self)); }
}
fn make_waker(task: Arc<Task>) -> Waker {
let ptr = Arc::into_raw(task) as *const ();
unsafe fn cw(p:*const())->RawWaker{let a=Arc::from_raw(p as *const Task);std::mem::forget(Arc::clone(&a));std::mem::forget(a);RawWaker::new(p,&V)}
unsafe fn w(p:*const()){Arc::from_raw(p as *const Task).schedule();}
unsafe fn wbr(p:*const()){let a=Arc::from_raw(p as *const Task);a.schedule();std::mem::forget(a);}
unsafe fn dw(p:*const()){drop(Arc::from_raw(p as *const Task));}
static V: RawWakerVTable = RawWakerVTable::new(cw,w,wbr,dw);
unsafe { Waker::from_raw(RawWaker::new(ptr, &V)) }
}
struct SimpleExecutor { rx: mpsc::Receiver<Arc<Task>>, tx: mpsc::SyncSender<Arc<Task>> }
impl SimpleExecutor {
fn new() -> Self { let (tx,rx) = mpsc::sync_channel(100); Self{rx,tx} }
fn spawn(&self, fut: impl Future<Output=()>+Send+'static) {
let task = Arc::new(Task{future:Mutex::new(Some(Box::pin(fut))),sender:self.tx.clone()});
task.schedule();
}
fn run(self) {
drop(self.tx);
while let Ok(task) = self.rx.recv() {
let mut slot = task.future.lock().unwrap();
if let Some(mut f) = slot.take() {
let w = make_waker(Arc::clone(&task));
let mut cx = Context::from_waker(&w);
if f.as_mut().poll(&mut cx) == Poll::Pending { *slot = Some(f); }
}
}
}
}
fn main() {
let ex = SimpleExecutor::new();
ex.spawn(async { println!("Task A"); });
ex.spawn(async { println!("Task B"); });
ex.spawn(async { println!("Task C"); });
ex.run();
println!("All done");
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize,Ordering};
#[test] fn runs_tasks() {
let counter = Arc::new(AtomicUsize::new(0));
let ex = SimpleExecutor::new();
for _ in 0..5 { let c = Arc::clone(&counter); ex.spawn(async move { c.fetch_add(1,Ordering::SeqCst); }); }
ex.run();
assert_eq!(counter.load(Ordering::SeqCst), 5);
}
}
(* OCaml: minimal cooperative scheduler *)
type task = { mutable ready: bool; f: unit -> unit }
let queue : task Queue.t = Queue.create ()
let spawn f = Queue.add { ready=true; f } queue
let run () =
while not (Queue.is_empty queue) do
let tasks = Queue.fold (fun a t -> t::a) [] queue in
Queue.clear queue;
List.iter (fun t -> if t.ready then t.f ()) (List.rev tasks)
done
let () =
spawn (fun () -> Printf.printf "Task A\n");
spawn (fun () -> Printf.printf "Task B\n");
run ()