๐Ÿฆ€ Functional Rust

348: Async Generator Pattern

Difficulty: 4 Level: Expert Produce values lazily from a function body โ€” like Python's `yield` or JavaScript's `function*`, using a channel.

The Problem This Solves

Rust doesn't have native generator syntax (no `yield` keyword in stable Rust). But the pattern is extremely useful: compute values one at a time, yielding each to the consumer, without loading all values into memory. Think of an infinite Fibonacci sequence, a prime number stream, or a file parser that yields records as it reads. The channel-based generator pattern simulates this: a background thread runs the generator body and sends values through a `SyncSender`; the consumer iterates over the `Receiver`. Backpressure comes for free โ€” the `SyncSender` blocks when the buffer is full, so the generator never runs ahead of the consumer. The consumer controls pacing with `.take(n)`. In async Rust, this maps to `async-stream` or the `Stream` trait โ€” the generator `yield`s items and the consumer polls the stream.

The Intuition

Python generators:
def fibonacci():
 a, b = 0, 1
 while True:
     yield a
     a, b = b, a + b

for n in itertools.islice(fibonacci(), 10):
 print(n)
This example builds the identical pattern in Rust: `fibonacci_gen()` returns an `impl Iterator<Item=u64>` backed by a channel. The generator runs in a thread, the consumer drives it via iteration. JavaScript's `async function*` is the async version โ€” Rust's `async-stream` crate provides the equivalent.

How It Works in Rust

fn generator<T: Send + 'static>(
 body: impl FnOnce(mpsc::SyncSender<T>) + Send + 'static,
 buffer: usize,
) -> impl Iterator<Item = T> {
 let (tx, rx) = mpsc::sync_channel(buffer);
 thread::spawn(move || body(tx));  // generator runs in background
 rx.into_iter()  // consumer iterates here
}

fn fibonacci_gen() -> impl Iterator<Item = u64> {
 generator(|tx| {
     let (mut a, mut b) = (0u64, 1u64);
     loop {
         if tx.send(a).is_err() { break; }  // consumer dropped โ€” stop
         (a, b) = (b, a.wrapping_add(b));
     }
 }, 8)  // buffer 8 values ahead
}

// Use: infinite sequence, take only what you need
let first_10: Vec<u64> = fibonacci_gen().take(10).collect();
When the consumer calls `.take(10).collect()` and drops the iterator, the `Receiver` is dropped. The generator's next `tx.send(...)` returns `Err(SendError)` and `break`s the loop โ€” no leak, clean shutdown.

What This Unlocks

Key Differences

ConceptOCamlRust
Generator / lazy sequence`type 'a stream = Cons of 'a * (unit -> 'a stream)`Channel-based thread + `impl Iterator`
Native yield`Seq.t` (lazy sequences in 4.07+)No `yield` in stable; use channel or `async-stream`
BackpressureThunk delays evaluation`SyncSender` buffer blocks producer
Consumer control`Seq.take n``.take(n)` on iterator
use std::sync::mpsc;
use std::thread;

// Generator: a closure that yields values via a sender
fn generator<T: Send + 'static>(
    body: impl FnOnce(mpsc::SyncSender<T>) + Send + 'static,
    buffer: usize,
) -> impl Iterator<Item = T> {
    let (tx, rx) = mpsc::sync_channel(buffer);
    thread::spawn(move || body(tx));
    rx.into_iter()
}

fn fibonacci_gen() -> impl Iterator<Item = u64> {
    generator(|tx| {
        let (mut a, mut b) = (0u64, 1u64);
        loop {
            if tx.send(a).is_err() { break; }
            (a, b) = (b, a.wrapping_add(b));
        }
    }, 8)
}

fn range_gen(start: i64, stop: i64, step: i64) -> impl Iterator<Item = i64> {
    generator(move |tx| {
        let mut i = start;
        while i < stop {
            if tx.send(i).is_err() { break; }
            i += step;
        }
    }, 4)
}

fn primes_gen() -> impl Iterator<Item = u64> {
    generator(|tx| {
        let mut primes: Vec<u64> = Vec::new();
        let mut n = 2u64;
        loop {
            let is_prime = primes.iter().all(|&p| n % p != 0);
            if is_prime {
                if tx.send(n).is_err() { break; }
                primes.push(n);
            }
            n += 1;
        }
    }, 16)
}

fn main() {
    // First 10 Fibonacci numbers
    let fibs: Vec<u64> = fibonacci_gen().take(10).collect();
    println!("Fibonacci: {fibs:?}");

    // Range generator
    let range: Vec<i64> = range_gen(0, 20, 3).collect();
    println!("Range(0,20,3): {range:?}");

    // First 10 primes
    let primes: Vec<u64> = primes_gen().take(10).collect();
    println!("Primes: {primes:?}");

    // Chain generators
    let combined: Vec<i64> = range_gen(0, 5, 1)
        .chain(range_gen(10, 15, 1))
        .collect();
    println!("Combined: {combined:?}");
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn fibonacci_first_8() {
        let f: Vec<_> = fibonacci_gen().take(8).collect();
        assert_eq!(f, vec![0, 1, 1, 2, 3, 5, 8, 13]);
    }
    #[test]
    fn range_gen_correct() {
        let r: Vec<_> = range_gen(1, 10, 2).collect();
        assert_eq!(r, vec![1, 3, 5, 7, 9]);
    }
    #[test]
    fn first_5_primes() {
        let p: Vec<_> = primes_gen().take(5).collect();
        assert_eq!(p, vec![2, 3, 5, 7, 11]);
    }
}
(* OCaml: generator via Seq (lazy sequence) *)

let fibonacci () : int Seq.t =
  let rec go a b () =
    Seq.Cons (a, go b (a + b))
  in go 0 1

let range_gen start stop step () =
  let rec go i () =
    if i >= stop then Seq.Nil
    else Seq.Cons (i, go (i + step))
  in go start ()

let () =
  (* Take first 10 Fibonacci numbers *)
  let fibs = fibonacci () |> Seq.take 10 |> List.of_seq in
  List.iter (Printf.printf "%d ") fibs;
  print_newline ();

  let r = range_gen 0 20 3 () |> List.of_seq in
  List.iter (Printf.printf "%d ") r;
  print_newline ()