348: Async Generator Pattern
Difficulty: 4 Level: Expert Produce values lazily from a function body โ like Python's `yield` or JavaScript's `function*`, using a channel.The Problem This Solves
Rust doesn't have native generator syntax (no `yield` keyword in stable Rust). But the pattern is extremely useful: compute values one at a time, yielding each to the consumer, without loading all values into memory. Think of an infinite Fibonacci sequence, a prime number stream, or a file parser that yields records as it reads. The channel-based generator pattern simulates this: a background thread runs the generator body and sends values through a `SyncSender`; the consumer iterates over the `Receiver`. Backpressure comes for free โ the `SyncSender` blocks when the buffer is full, so the generator never runs ahead of the consumer. The consumer controls pacing with `.take(n)`. In async Rust, this maps to `async-stream` or the `Stream` trait โ the generator `yield`s items and the consumer polls the stream.The Intuition
Python generators:def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
for n in itertools.islice(fibonacci(), 10):
print(n)
This example builds the identical pattern in Rust: `fibonacci_gen()` returns an `impl Iterator<Item=u64>` backed by a channel. The generator runs in a thread, the consumer drives it via iteration.
JavaScript's `async function*` is the async version โ Rust's `async-stream` crate provides the equivalent.
How It Works in Rust
fn generator<T: Send + 'static>(
body: impl FnOnce(mpsc::SyncSender<T>) + Send + 'static,
buffer: usize,
) -> impl Iterator<Item = T> {
let (tx, rx) = mpsc::sync_channel(buffer);
thread::spawn(move || body(tx)); // generator runs in background
rx.into_iter() // consumer iterates here
}
fn fibonacci_gen() -> impl Iterator<Item = u64> {
generator(|tx| {
let (mut a, mut b) = (0u64, 1u64);
loop {
if tx.send(a).is_err() { break; } // consumer dropped โ stop
(a, b) = (b, a.wrapping_add(b));
}
}, 8) // buffer 8 values ahead
}
// Use: infinite sequence, take only what you need
let first_10: Vec<u64> = fibonacci_gen().take(10).collect();
When the consumer calls `.take(10).collect()` and drops the iterator, the `Receiver` is dropped. The generator's next `tx.send(...)` returns `Err(SendError)` and `break`s the loop โ no leak, clean shutdown.
What This Unlocks
- Infinite sequences โ primes, Fibonacci, random numbers, timestamps โ without allocating the whole sequence.
- Streaming parsers โ yield parsed records one at a time as a file is read, processing in constant memory.
- Test data generators โ generate test cases lazily; take as many as needed per test.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Generator / lazy sequence | `type 'a stream = Cons of 'a * (unit -> 'a stream)` | Channel-based thread + `impl Iterator` |
| Native yield | `Seq.t` (lazy sequences in 4.07+) | No `yield` in stable; use channel or `async-stream` |
| Backpressure | Thunk delays evaluation | `SyncSender` buffer blocks producer |
| Consumer control | `Seq.take n` | `.take(n)` on iterator |
use std::sync::mpsc;
use std::thread;
// Generator: a closure that yields values via a sender
fn generator<T: Send + 'static>(
body: impl FnOnce(mpsc::SyncSender<T>) + Send + 'static,
buffer: usize,
) -> impl Iterator<Item = T> {
let (tx, rx) = mpsc::sync_channel(buffer);
thread::spawn(move || body(tx));
rx.into_iter()
}
fn fibonacci_gen() -> impl Iterator<Item = u64> {
generator(|tx| {
let (mut a, mut b) = (0u64, 1u64);
loop {
if tx.send(a).is_err() { break; }
(a, b) = (b, a.wrapping_add(b));
}
}, 8)
}
fn range_gen(start: i64, stop: i64, step: i64) -> impl Iterator<Item = i64> {
generator(move |tx| {
let mut i = start;
while i < stop {
if tx.send(i).is_err() { break; }
i += step;
}
}, 4)
}
fn primes_gen() -> impl Iterator<Item = u64> {
generator(|tx| {
let mut primes: Vec<u64> = Vec::new();
let mut n = 2u64;
loop {
let is_prime = primes.iter().all(|&p| n % p != 0);
if is_prime {
if tx.send(n).is_err() { break; }
primes.push(n);
}
n += 1;
}
}, 16)
}
fn main() {
// First 10 Fibonacci numbers
let fibs: Vec<u64> = fibonacci_gen().take(10).collect();
println!("Fibonacci: {fibs:?}");
// Range generator
let range: Vec<i64> = range_gen(0, 20, 3).collect();
println!("Range(0,20,3): {range:?}");
// First 10 primes
let primes: Vec<u64> = primes_gen().take(10).collect();
println!("Primes: {primes:?}");
// Chain generators
let combined: Vec<i64> = range_gen(0, 5, 1)
.chain(range_gen(10, 15, 1))
.collect();
println!("Combined: {combined:?}");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fibonacci_first_8() {
let f: Vec<_> = fibonacci_gen().take(8).collect();
assert_eq!(f, vec![0, 1, 1, 2, 3, 5, 8, 13]);
}
#[test]
fn range_gen_correct() {
let r: Vec<_> = range_gen(1, 10, 2).collect();
assert_eq!(r, vec![1, 3, 5, 7, 9]);
}
#[test]
fn first_5_primes() {
let p: Vec<_> = primes_gen().take(5).collect();
assert_eq!(p, vec![2, 3, 5, 7, 11]);
}
}
(* OCaml: generator via Seq (lazy sequence) *)
let fibonacci () : int Seq.t =
let rec go a b () =
Seq.Cons (a, go b (a + b))
in go 0 1
let range_gen start stop step () =
let rec go i () =
if i >= stop then Seq.Nil
else Seq.Cons (i, go (i + step))
in go start ()
let () =
(* Take first 10 Fibonacci numbers *)
let fibs = fibonacci () |> Seq.take 10 |> List.of_seq in
List.iter (Printf.printf "%d ") fibs;
print_newline ();
let r = range_gen 0 20 3 () |> List.of_seq in
List.iter (Printf.printf "%d ") r;
print_newline ()