347: Blocking in Async
Difficulty: 3 Level: Advanced CPU-intensive or blocking I/O work must be offloaded โ never block the async executor thread.The Problem This Solves
Async executors like tokio run many tasks on a small thread pool. If one task blocks a thread โ calling `thread::sleep`, doing a CPU-intensive computation, or calling a synchronous I/O function โ that thread can't run any other tasks. With tokio's default 4-thread pool, one blocking call can cut async throughput by 25%. A handful of blocking calls can stall the entire runtime. The solution: offload blocking work to a separate thread pool. In tokio, `tokio::task::spawn_blocking(|| expensive_work())` submits the closure to a dedicated blocking thread pool (up to 512 threads by default), returning a `JoinHandle` you can `.await`. The async executor thread is free to run other tasks while the blocking work runs on the separate pool. This example simulates the pattern using `thread::spawn` + `mpsc::channel` โ structurally identical to `spawn_blocking`.The Intuition
Imagine Node.js: the event loop is single-threaded. Calling `fs.readFileSync` blocks the loop โ no other requests can be served. That's why Node has `fs.readFile(callback)` and `worker_threads`. Rust's `spawn_blocking` is the same idea: push the blocking work off the event loop. In Python asyncio: `await loop.run_in_executor(None, blocking_function)` does the same thing. The rule: if a function might take more than 100ฮผs, offload it.How It Works in Rust
// CPU-intensive work โ blocks a thread for significant time
fn cpu_heavy(n: u64) -> u64 {
(1..=n).fold(0u64, |acc, x| acc.wrapping_add(x.wrapping_mul(x)))
}
// Simulates spawn_blocking: runs on a separate thread, returns a channel
fn spawn_blocking<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static)
-> mpsc::Receiver<T>
{
let (tx, rx) = mpsc::channel();
thread::spawn(move || { let _ = tx.send(f()); });
rx // caller awaits (or recv()) this
}
// In real async context, the pattern is:
// let result = tokio::task::spawn_blocking(|| cpu_heavy(1_000_000)).await?;
// Multiple concurrent blocking tasks โ run in parallel, not sequentially
let handles: Vec<_> = vec![
spawn_blocking(|| cpu_heavy(1_000_000)),
spawn_blocking(|| cpu_heavy(500_000)),
spawn_blocking(|| cpu_heavy(750_000)),
];
// All three run concurrently on separate threads
let results: Vec<u64> = handles.into_iter().map(|rx| rx.recv().unwrap()).collect();
The test `concurrent_blocking_faster` verifies that 4 tasks with 20ms each complete in <70ms total โ they run in parallel, not sequentially (which would take 80ms+).
What This Unlocks
- File system operations โ `tokio::fs` uses `spawn_blocking` internally; for non-tokio I/O, use it explicitly.
- Crypto / hashing โ bcrypt, Argon2 password hashing is intentionally slow; always offload.
- Database queries (synchronous drivers) โ wrap sync DB calls in `spawn_blocking` until you can switch to an async driver.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Blocking offload | `Lwt_preemptive.detach f ()` (runs in background thread) | `tokio::task::spawn_blocking(f)` |
| CPU-bound work | `Thread.create` or `Domain.spawn` (5.x) | `spawn_blocking` โ dedicated blocking pool |
| Result retrieval | Thread join | `.await` on `JoinHandle` |
| Thread pool | Manual or `Thread_pool` library | Tokio manages two pools: async + blocking |
use std::thread;
use std::sync::mpsc;
use std::time::{Duration, Instant};
// CPU-bound work that would block an async thread
fn cpu_heavy(n: u64) -> u64 {
// Simulate expensive computation
(1..=n).fold(0u64, |acc, x| acc.wrapping_add(x.wrapping_mul(x)))
}
// Blocking I/O simulation
fn blocking_io(label: &str) -> String {
thread::sleep(Duration::from_millis(20));
format!("result from {label}")
}
// Offload to thread pool (analogous to tokio::task::spawn_blocking)
fn spawn_blocking<T: Send + 'static>(f: impl FnOnce() -> T + Send + 'static)
-> mpsc::Receiver<T>
{
let (tx, rx) = mpsc::channel();
thread::spawn(move || { let _ = tx.send(f()); });
rx
}
// Async context: always offload blocking work
fn async_handler(id: usize) -> String {
// BAD: blocking_io("x") -- would block executor thread
// GOOD: offload to thread pool
let rx = spawn_blocking(move || blocking_io(&format!("source-{id}")));
rx.recv().unwrap() // in real async: .await
}
fn main() {
let start = Instant::now();
// Offload multiple blocking tasks concurrently
let handles: Vec<_> = vec![
spawn_blocking(|| cpu_heavy(1_000_000)),
spawn_blocking(|| cpu_heavy(500_000)),
spawn_blocking(|| cpu_heavy(750_000)),
];
let io_handles: Vec<_> = (0..3)
.map(|i| spawn_blocking(move || blocking_io(&format!("io-{i}"))))
.collect();
// Collect results (in async: await join!)
let cpu_results: Vec<u64> = handles.into_iter()
.map(|rx| rx.recv().unwrap())
.collect();
let io_results: Vec<String> = io_handles.into_iter()
.map(|rx| rx.recv().unwrap())
.collect();
println!("CPU results: {:?}", cpu_results);
println!("IO results: {:?}", io_results);
println!("Elapsed: {:.0}ms", start.elapsed().as_secs_f64() * 1000.0);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cpu_heavy_correct() {
// sum of squares 1..=5 = 1+4+9+16+25 = 55
assert_eq!(cpu_heavy(5), 55);
}
#[test]
fn spawn_blocking_returns() {
let rx = spawn_blocking(|| 42u32);
assert_eq!(rx.recv().unwrap(), 42);
}
#[test]
fn concurrent_blocking_faster() {
let start = Instant::now();
let handles: Vec<_> = (0..4)
.map(|_| spawn_blocking(|| { thread::sleep(Duration::from_millis(20)); 1 }))
.collect();
for rx in handles { rx.recv().unwrap(); }
assert!(start.elapsed() < Duration::from_millis(70)); // concurrent, not sequential
}
}
(* OCaml: compute vs I/O separation *)
let cpu_bound n =
(* Fibonacci: CPU heavy *)
let rec fib = function
| 0 | 1 -> 1
| n -> fib (n-1) + fib (n-2)
in fib n
let blocking_io label =
Thread.delay 0.02; (* simulate blocking I/O *)
Printf.sprintf "result from %s" label
let run_mixed tasks =
let handles = List.map (fun f -> Thread.create f ()) tasks in
let results = ref [] in
List.iter (fun t ->
(* Thread.join returns unit in OCaml *)
Thread.join t
) handles;
!results
let () =
let start = Unix.gettimeofday () in
let _ = run_mixed [
(fun () -> Printf.printf "Fib(35)=%d\n" (cpu_bound 35));
(fun () -> Printf.printf "IO1: %s\n" (blocking_io "source1"));
(fun () -> Printf.printf "IO2: %s\n" (blocking_io "source2"));
] in
Printf.printf "Elapsed: %.3fs\n" (Unix.gettimeofday () -. start)