๐Ÿฆ€ Functional Rust

347: Blocking in Async

Difficulty: 3 Level: Advanced CPU-intensive or blocking I/O work must be offloaded โ€” never block the async executor thread.

The Problem This Solves

Async executors like tokio run many tasks on a small thread pool. If one task blocks a thread โ€” calling `thread::sleep`, doing a CPU-intensive computation, or calling a synchronous I/O function โ€” that thread can't run any other tasks. With tokio's default 4-thread pool, one blocking call can cut async throughput by 25%. A handful of blocking calls can stall the entire runtime. The solution: offload blocking work to a separate thread pool. In tokio, `tokio::task::spawn_blocking(|| expensive_work())` submits the closure to a dedicated blocking thread pool (up to 512 threads by default), returning a `JoinHandle` you can `.await`. The async executor thread is free to run other tasks while the blocking work runs on the separate pool. This example simulates the pattern using `thread::spawn` + `mpsc::channel` โ€” structurally identical to `spawn_blocking`.

The Intuition

Imagine Node.js: the event loop is single-threaded. Calling `fs.readFileSync` blocks the loop โ€” no other requests can be served. That's why Node has `fs.readFile(callback)` and `worker_threads`. Rust's `spawn_blocking` is the same idea: push the blocking work off the event loop. In Python asyncio: `await loop.run_in_executor(None, blocking_function)` does the same thing. The rule: if a function might take more than 100ฮผs, offload it.

How It Works in Rust

// CPU-intensive work โ€” blocks a thread for significant time
fn cpu_heavy(n: u64) -> u64 {
 (1..=n).fold(0u64, |acc, x| acc.wrapping_add(x.wrapping_mul(x)))
}

// Simulates spawn_blocking: runs on a separate thread, returns a channel
fn spawn_blocking<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static)
 -> mpsc::Receiver<T>
{
 let (tx, rx) = mpsc::channel();
 thread::spawn(move || { let _ = tx.send(f()); });
 rx  // caller awaits (or recv()) this
}

// In real async context, the pattern is:
// let result = tokio::task::spawn_blocking(|| cpu_heavy(1_000_000)).await?;

// Multiple concurrent blocking tasks โ€” run in parallel, not sequentially
let handles: Vec<_> = vec![
 spawn_blocking(|| cpu_heavy(1_000_000)),
 spawn_blocking(|| cpu_heavy(500_000)),
 spawn_blocking(|| cpu_heavy(750_000)),
];
// All three run concurrently on separate threads
let results: Vec<u64> = handles.into_iter().map(|rx| rx.recv().unwrap()).collect();
The test `concurrent_blocking_faster` verifies that 4 tasks with 20ms each complete in <70ms total โ€” they run in parallel, not sequentially (which would take 80ms+).

What This Unlocks

Key Differences

ConceptOCamlRust
Blocking offload`Lwt_preemptive.detach f ()` (runs in background thread)`tokio::task::spawn_blocking(f)`
CPU-bound work`Thread.create` or `Domain.spawn` (5.x)`spawn_blocking` โ€” dedicated blocking pool
Result retrievalThread join`.await` on `JoinHandle`
Thread poolManual or `Thread_pool` libraryTokio manages two pools: async + blocking
use std::thread;
use std::sync::mpsc;
use std::time::{Duration, Instant};

// CPU-bound work that would block an async thread
fn cpu_heavy(n: u64) -> u64 {
    // Simulate expensive computation
    (1..=n).fold(0u64, |acc, x| acc.wrapping_add(x.wrapping_mul(x)))
}

// Blocking I/O simulation
fn blocking_io(label: &str) -> String {
    thread::sleep(Duration::from_millis(20));
    format!("result from {label}")
}

// Offload to thread pool (analogous to tokio::task::spawn_blocking)
fn spawn_blocking<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static)
    -> mpsc::Receiver<T>
{
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || { let _ = tx.send(f()); });
    rx
}

// Async context: always offload blocking work
fn async_handler(id: usize) -> String {
    // BAD: blocking_io("x") -- would block executor thread
    // GOOD: offload to thread pool
    let rx = spawn_blocking(move || blocking_io(&format!("source-{id}")));
    rx.recv().unwrap() // in real async: .await
}

fn main() {
    let start = Instant::now();

    // Offload multiple blocking tasks concurrently
    let handles: Vec<_> = vec![
        spawn_blocking(|| cpu_heavy(1_000_000)),
        spawn_blocking(|| cpu_heavy(500_000)),
        spawn_blocking(|| cpu_heavy(750_000)),
    ];

    let io_handles: Vec<_> = (0..3)
        .map(|i| spawn_blocking(move || blocking_io(&format!("io-{i}"))))
        .collect();

    // Collect results (in async: await join!)
    let cpu_results: Vec<u64> = handles.into_iter()
        .map(|rx| rx.recv().unwrap())
        .collect();

    let io_results: Vec<String> = io_handles.into_iter()
        .map(|rx| rx.recv().unwrap())
        .collect();

    println!("CPU results: {:?}", cpu_results);
    println!("IO results: {:?}", io_results);
    println!("Elapsed: {:.0}ms", start.elapsed().as_secs_f64() * 1000.0);
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn cpu_heavy_correct() {
        // sum of squares 1..=5 = 1+4+9+16+25 = 55
        assert_eq!(cpu_heavy(5), 55);
    }
    #[test]
    fn spawn_blocking_returns() {
        let rx = spawn_blocking(|| 42u32);
        assert_eq!(rx.recv().unwrap(), 42);
    }
    #[test]
    fn concurrent_blocking_faster() {
        let start = Instant::now();
        let handles: Vec<_> = (0..4)
            .map(|_| spawn_blocking(|| { thread::sleep(Duration::from_millis(20)); 1 }))
            .collect();
        for rx in handles { rx.recv().unwrap(); }
        assert!(start.elapsed() < Duration::from_millis(70)); // concurrent, not sequential
    }
}
(* OCaml: compute vs I/O separation *)

let cpu_bound n =
  (* Fibonacci: CPU heavy *)
  let rec fib = function
    | 0 | 1 -> 1
    | n -> fib (n-1) + fib (n-2)
  in fib n

let blocking_io label =
  Thread.delay 0.02;  (* simulate blocking I/O *)
  Printf.sprintf "result from %s" label

let run_mixed tasks =
  let handles = List.map (fun f -> Thread.create f ()) tasks in
  let results = ref [] in
  List.iter (fun t ->
    (* Thread.join returns unit in OCaml *)
    Thread.join t
  ) handles;
  !results

let () =
  let start = Unix.gettimeofday () in
  let _ = run_mixed [
    (fun () -> Printf.printf "Fib(35)=%d\n" (cpu_bound 35));
    (fun () -> Printf.printf "IO1: %s\n" (blocking_io "source1"));
    (fun () -> Printf.printf "IO2: %s\n" (blocking_io "source2"));
  ] in
  Printf.printf "Elapsed: %.3fs\n" (Unix.gettimeofday () -. start)