๐Ÿฆ€ Functional Rust

1002: Backpressure

Difficulty: Intermediate Category: Async / Concurrency FP Patterns Concept: Bounded channel forces producer to slow down when consumer can't keep up Key Insight: `sync_channel(N)` blocks the sender when N items are already buffered โ€” this IS backpressure; `sync_channel(0)` is a rendezvous (CSP-style); `try_send` is the non-blocking variant that returns `TrySendError::Full`

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 1002: Backpressure โ€” Bounded sync_channel blocks producer
// When consumer is slow, bounded buffer fills and producer is forced to wait

use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};

// --- Approach 1: sync_channel with slow consumer ---
fn bounded_backpressure() -> (usize, Duration) {
    const BUFFER_SIZE: usize = 3;
    // sync_channel(N): sender blocks when N items are buffered
    let (tx, rx) = mpsc::sync_channel::<i32>(BUFFER_SIZE);

    let start = Instant::now();

    let producer = thread::spawn(move || {
        for i in 1..=9 {
            tx.send(i).unwrap(); // blocks when buffer is full
        }
        // tx drops here โ€” signals consumer to stop
    });

    let consumer = thread::spawn(move || {
        for item in rx.iter() {
            thread::sleep(Duration::from_millis(5)); // slow consumer
            let _ = item;
        }
    });

    producer.join().unwrap();
    consumer.join().unwrap();
    (9, start.elapsed())
}

// --- Approach 2: try_send for non-blocking backpressure (drop or error) ---
fn try_send_demo() -> (usize, usize) {
    let (tx, rx) = mpsc::sync_channel::<i32>(2);

    let mut accepted = 0;
    let mut dropped = 0;

    for i in 1..=10 {
        match tx.try_send(i) {
            Ok(_) => accepted += 1,
            Err(mpsc::TrySendError::Full(_)) => dropped += 1,
            Err(mpsc::TrySendError::Disconnected(_)) => break,
        }
    }

    drop(tx);
    let drained: Vec<_> = rx.iter().collect();
    assert_eq!(drained.len(), accepted);
    (accepted, dropped)
}

// --- Approach 3: Bounded pipeline with backpressure between stages ---
fn bounded_pipeline(items: Vec<i32>) -> Vec<i32> {
    // Stage channels โ€” each bounded to 2 items
    let (tx1, rx1) = mpsc::sync_channel::<i32>(2);
    let (tx2, rx2) = mpsc::sync_channel::<i32>(2);
    let (tx3, rx3) = mpsc::sync_channel::<i32>(2);

    // Stage 1: double
    thread::spawn(move || {
        for item in rx1.iter() { tx2.send(item * 2).unwrap(); }
    });

    // Stage 2: add 1 (slow)
    thread::spawn(move || {
        for item in rx2.iter() {
            thread::sleep(Duration::from_millis(1)); // simulate slow processing
            tx3.send(item + 1).unwrap();
        }
    });

    // Producer
    let producer = thread::spawn(move || {
        for item in items { tx1.send(item).unwrap(); } // blocks when stage 1 full
    });

    // Collect
    let results: Vec<i32> = rx3.iter().collect();
    producer.join().unwrap();
    results
}

// --- Approach 4: Measure backpressure effect ---
fn measure_backpressure_effect() -> bool {
    // With buffer=1: producer is slowed to consumer's pace
    let (tx_fast, rx_fast) = mpsc::channel::<i32>();       // unbounded
    let (tx_bounded, rx_bounded) = mpsc::sync_channel::<i32>(1); // bounded=1

    let fast_start = Instant::now();
    let h = thread::spawn(move || {
        for i in 0..20 { tx_fast.send(i).unwrap(); }
    });
    h.join().unwrap();
    let fast_time = fast_start.elapsed();
    drop(rx_fast);

    let bounded_start = Instant::now();
    let h2 = thread::spawn(move || {
        for i in 0..20 { tx_bounded.send(i).unwrap(); }
    });
    // Slow consumer
    thread::spawn(move || {
        for _ in rx_bounded.iter() {
            thread::sleep(Duration::from_millis(1));
        }
    });
    h2.join().unwrap();
    let bounded_time = bounded_start.elapsed();

    // Bounded (backpressure) should be slower than unbounded
    bounded_time > fast_time
}

fn main() {
    let (count, elapsed) = bounded_backpressure();
    println!("bounded: {} items in {:?}", count, elapsed);

    let (accepted, dropped) = try_send_demo();
    println!("try_send: accepted={} dropped={}", accepted, dropped);

    let results = bounded_pipeline(vec![1, 2, 3, 4, 5]);
    println!("bounded pipeline: {:?}", results);

    println!("backpressure slower: {}", measure_backpressure_effect());
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_bounded_backpressure_processes_all() {
        let (count, _) = bounded_backpressure();
        assert_eq!(count, 9);
    }

    #[test]
    fn test_try_send_drops_when_full() {
        let (accepted, dropped) = try_send_demo();
        assert_eq!(accepted, 2); // buffer size = 2
        assert_eq!(dropped, 8); // remaining 8 are dropped
        assert_eq!(accepted + dropped, 10);
    }

    #[test]
    fn test_bounded_pipeline_correctness() {
        // 1*2+1=3, 2*2+1=5, 3*2+1=7
        let mut results = bounded_pipeline(vec![1, 2, 3]);
        results.sort();
        assert_eq!(results, vec![3, 5, 7]);
    }

    #[test]
    fn test_sync_channel_zero_buffer_rendezvous() {
        // sync_channel(0) = rendezvous โ€” sender blocks until receiver takes
        let (tx, rx) = mpsc::sync_channel::<i32>(0);
        let h = thread::spawn(move || {
            tx.send(42).unwrap(); // blocks until receiver calls recv()
        });
        assert_eq!(rx.recv().unwrap(), 42);
        h.join().unwrap();
    }

    #[test]
    fn test_backpressure_is_slower() {
        assert!(measure_backpressure_effect());
    }

    #[test]
    fn test_try_send_error_type() {
        let (tx, _rx) = mpsc::sync_channel::<i32>(1);
        tx.try_send(1).unwrap(); // fills the buffer
        let err = tx.try_send(2);
        assert!(matches!(err, Err(mpsc::TrySendError::Full(_))));
    }
}
(* 1002: Backpressure โ€” Bounded Channel Blocks Producer *)
(* When consumer is slow, bounded buffer fills up and blocks the producer *)

(* --- Bounded queue (simulates sync_channel) --- *)

type 'a bounded_chan = {
  q: 'a Queue.t;
  capacity: int;
  m: Mutex.t;
  not_full: Condition.t;
  not_empty: Condition.t;
  mutable closed: bool;
}

let make_bounded_chan capacity = {
  q = Queue.create ();
  capacity;
  m = Mutex.create ();
  not_full = Condition.create ();
  not_empty = Condition.create ();
  closed = false;
}

let send_bounded c v =
  Mutex.lock c.m;
  while Queue.length c.q >= c.capacity && not c.closed do
    Condition.wait c.not_full c.m  (* BLOCK when full โ€” backpressure! *)
  done;
  if not c.closed then begin
    Queue.push v c.q;
    Condition.signal c.not_empty
  end;
  Mutex.unlock c.m

let recv_bounded c =
  Mutex.lock c.m;
  while Queue.is_empty c.q && not c.closed do
    Condition.wait c.not_empty c.m
  done;
  let v = if Queue.is_empty c.q then None else Some (Queue.pop c.q) in
  Condition.signal c.not_full;
  Mutex.unlock c.m;
  v

let close_bounded c =
  Mutex.lock c.m;
  c.closed <- true;
  Condition.broadcast c.not_full;
  Condition.broadcast c.not_empty;
  Mutex.unlock c.m

(* --- Approach 1: Slow consumer applies backpressure --- *)

let () =
  let chan = make_bounded_chan 3 in  (* buffer of 3 *)
  let sent_times = ref [] in
  let recv_times = ref [] in
  let m = Mutex.create () in

  let producer = Thread.create (fun () ->
    for i = 1 to 9 do
      send_bounded chan i;
      Mutex.lock m;
      sent_times := Unix.gettimeofday () :: !sent_times;
      Mutex.unlock m
    done;
    close_bounded chan
  ) () in

  let consumer = Thread.create (fun () ->
    let rec loop () =
      match recv_bounded chan with
      | None -> ()
      | Some _ ->
        Unix.sleepf 0.005;  (* slow consumer *)
        Mutex.lock m;
        recv_times := Unix.gettimeofday () :: !recv_times;
        Mutex.unlock m;
        loop ()
    in loop ()
  ) () in

  Thread.join producer;
  Thread.join consumer;

  assert (List.length !sent_times = 9);
  assert (List.length !recv_times = 9);
  Printf.printf "Approach 1 (backpressure): sent=%d recv=%d (producer was blocked by slow consumer)\n"
    (List.length !sent_times) (List.length !recv_times)

(* --- Approach 2: Producer detects backpressure (try_send) --- *)

let try_send c v =
  Mutex.lock c.m;
  let ok = Queue.length c.q < c.capacity in
  if ok then begin
    Queue.push v c.q;
    Condition.signal c.not_empty
  end;
  Mutex.unlock c.m;
  ok

let () =
  let chan = make_bounded_chan 2 in
  let accepted = ref 0 in
  let dropped = ref 0 in

  for i = 1 to 10 do
    if try_send chan i then incr accepted
    else incr dropped
  done;

  assert (!accepted = 2);
  assert (!dropped = 8);
  Printf.printf "Approach 2 (try_send): accepted=%d dropped=%d\n" !accepted !dropped

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Backpressure โ€” Comparison

Core Insight

Backpressure prevents unbounded buffering: instead of letting producers flood a buffer until it runs out of memory, the producer is forced to wait when the buffer is full. This propagates slowness upstream โ€” the natural rate-limiting of processing pipelines.

OCaml Approach

  • Simulate bounded channel with `Queue` + `Mutex` + two `Condition` variables
  • `send_bounded`: wait while `Queue.length >= capacity` (not_full condition)
  • `recv_bounded`: signal `not_full` after each receive
  • `try_send`: non-blocking check โ€” returns bool indicating acceptance
  • More boilerplate than Rust โ€” no built-in bounded channel

Rust Approach

  • `mpsc::sync_channel(N)` creates a bounded channel with buffer of N
  • `tx.send(v)` blocks when buffer is full โ€” zero-cost backpressure
  • `tx.try_send(v)` returns `Err(TrySendError::Full(_))` immediately
  • `sync_channel(0)` is a CSP rendezvous โ€” synchronous handoff
  • Works transparently with `rx.iter()` โ€” pipeline stages auto-throttle

Comparison Table

ConceptOCaml (simulated)Rust
Bounded channelManual Queue + Mutex + 2 Condvar`mpsc::sync_channel(N)`
Blocking send`Condition.wait not_full` in send`tx.send(v)` blocks automatically
Non-blocking send`try_send` (custom)`tx.try_send(v)` built-in
Buffer full error`return false` from try_send`Err(TrySendError::Full(v))`
Rendezvous (N=0)capacity=0 edge case`sync_channel(0)` first-class
Pipeline backpressureManual per-stageEach stage's `sync_channel` auto-throttles
Async backpressureN/A`tokio::sync::mpsc` with `send().await`

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers