// 1002: Backpressure โ Bounded sync_channel blocks producer
// When consumer is slow, bounded buffer fills and producer is forced to wait
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
// --- Approach 1: sync_channel with slow consumer ---
fn bounded_backpressure() -> (usize, Duration) {
const BUFFER_SIZE: usize = 3;
// sync_channel(N): sender blocks when N items are buffered
let (tx, rx) = mpsc::sync_channel::<i32>(BUFFER_SIZE);
let start = Instant::now();
let producer = thread::spawn(move || {
for i in 1..=9 {
tx.send(i).unwrap(); // blocks when buffer is full
}
// tx drops here โ signals consumer to stop
});
let consumer = thread::spawn(move || {
for item in rx.iter() {
thread::sleep(Duration::from_millis(5)); // slow consumer
let _ = item;
}
});
producer.join().unwrap();
consumer.join().unwrap();
(9, start.elapsed())
}
// --- Approach 2: try_send for non-blocking backpressure (drop or error) ---
fn try_send_demo() -> (usize, usize) {
let (tx, rx) = mpsc::sync_channel::<i32>(2);
let mut accepted = 0;
let mut dropped = 0;
for i in 1..=10 {
match tx.try_send(i) {
Ok(_) => accepted += 1,
Err(mpsc::TrySendError::Full(_)) => dropped += 1,
Err(mpsc::TrySendError::Disconnected(_)) => break,
}
}
drop(tx);
let drained: Vec<_> = rx.iter().collect();
assert_eq!(drained.len(), accepted);
(accepted, dropped)
}
// --- Approach 3: Bounded pipeline with backpressure between stages ---
fn bounded_pipeline(items: Vec<i32>) -> Vec<i32> {
// Stage channels โ each bounded to 2 items
let (tx1, rx1) = mpsc::sync_channel::<i32>(2);
let (tx2, rx2) = mpsc::sync_channel::<i32>(2);
let (tx3, rx3) = mpsc::sync_channel::<i32>(2);
// Stage 1: double
thread::spawn(move || {
for item in rx1.iter() { tx2.send(item * 2).unwrap(); }
});
// Stage 2: add 1 (slow)
thread::spawn(move || {
for item in rx2.iter() {
thread::sleep(Duration::from_millis(1)); // simulate slow processing
tx3.send(item + 1).unwrap();
}
});
// Producer
let producer = thread::spawn(move || {
for item in items { tx1.send(item).unwrap(); } // blocks when stage 1 full
});
// Collect
let results: Vec<i32> = rx3.iter().collect();
producer.join().unwrap();
results
}
// --- Approach 4: Measure backpressure effect ---
fn measure_backpressure_effect() -> bool {
// With buffer=1: producer is slowed to consumer's pace
let (tx_fast, rx_fast) = mpsc::channel::<i32>(); // unbounded
let (tx_bounded, rx_bounded) = mpsc::sync_channel::<i32>(1); // bounded=1
let fast_start = Instant::now();
let h = thread::spawn(move || {
for i in 0..20 { tx_fast.send(i).unwrap(); }
});
h.join().unwrap();
let fast_time = fast_start.elapsed();
drop(rx_fast);
let bounded_start = Instant::now();
let h2 = thread::spawn(move || {
for i in 0..20 { tx_bounded.send(i).unwrap(); }
});
// Slow consumer
thread::spawn(move || {
for _ in rx_bounded.iter() {
thread::sleep(Duration::from_millis(1));
}
});
h2.join().unwrap();
let bounded_time = bounded_start.elapsed();
// Bounded (backpressure) should be slower than unbounded
bounded_time > fast_time
}
fn main() {
let (count, elapsed) = bounded_backpressure();
println!("bounded: {} items in {:?}", count, elapsed);
let (accepted, dropped) = try_send_demo();
println!("try_send: accepted={} dropped={}", accepted, dropped);
let results = bounded_pipeline(vec![1, 2, 3, 4, 5]);
println!("bounded pipeline: {:?}", results);
println!("backpressure slower: {}", measure_backpressure_effect());
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bounded_backpressure_processes_all() {
let (count, _) = bounded_backpressure();
assert_eq!(count, 9);
}
#[test]
fn test_try_send_drops_when_full() {
let (accepted, dropped) = try_send_demo();
assert_eq!(accepted, 2); // buffer size = 2
assert_eq!(dropped, 8); // remaining 8 are dropped
assert_eq!(accepted + dropped, 10);
}
#[test]
fn test_bounded_pipeline_correctness() {
// 1*2+1=3, 2*2+1=5, 3*2+1=7
let mut results = bounded_pipeline(vec![1, 2, 3]);
results.sort();
assert_eq!(results, vec![3, 5, 7]);
}
#[test]
fn test_sync_channel_zero_buffer_rendezvous() {
// sync_channel(0) = rendezvous โ sender blocks until receiver takes
let (tx, rx) = mpsc::sync_channel::<i32>(0);
let h = thread::spawn(move || {
tx.send(42).unwrap(); // blocks until receiver calls recv()
});
assert_eq!(rx.recv().unwrap(), 42);
h.join().unwrap();
}
#[test]
fn test_backpressure_is_slower() {
assert!(measure_backpressure_effect());
}
#[test]
fn test_try_send_error_type() {
let (tx, _rx) = mpsc::sync_channel::<i32>(1);
tx.try_send(1).unwrap(); // fills the buffer
let err = tx.try_send(2);
assert!(matches!(err, Err(mpsc::TrySendError::Full(_))));
}
}
(* 1002: Backpressure โ Bounded Channel Blocks Producer *)
(* When consumer is slow, bounded buffer fills up and blocks the producer *)
(* --- Bounded queue (simulates sync_channel) --- *)
type 'a bounded_chan = {
q: 'a Queue.t;
capacity: int;
m: Mutex.t;
not_full: Condition.t;
not_empty: Condition.t;
mutable closed: bool;
}
let make_bounded_chan capacity = {
q = Queue.create ();
capacity;
m = Mutex.create ();
not_full = Condition.create ();
not_empty = Condition.create ();
closed = false;
}
let send_bounded c v =
Mutex.lock c.m;
while Queue.length c.q >= c.capacity && not c.closed do
Condition.wait c.not_full c.m (* BLOCK when full โ backpressure! *)
done;
if not c.closed then begin
Queue.push v c.q;
Condition.signal c.not_empty
end;
Mutex.unlock c.m
let recv_bounded c =
Mutex.lock c.m;
while Queue.is_empty c.q && not c.closed do
Condition.wait c.not_empty c.m
done;
let v = if Queue.is_empty c.q then None else Some (Queue.pop c.q) in
Condition.signal c.not_full;
Mutex.unlock c.m;
v
let close_bounded c =
Mutex.lock c.m;
c.closed <- true;
Condition.broadcast c.not_full;
Condition.broadcast c.not_empty;
Mutex.unlock c.m
(* --- Approach 1: Slow consumer applies backpressure --- *)
let () =
let chan = make_bounded_chan 3 in (* buffer of 3 *)
let sent_times = ref [] in
let recv_times = ref [] in
let m = Mutex.create () in
let producer = Thread.create (fun () ->
for i = 1 to 9 do
send_bounded chan i;
Mutex.lock m;
sent_times := Unix.gettimeofday () :: !sent_times;
Mutex.unlock m
done;
close_bounded chan
) () in
let consumer = Thread.create (fun () ->
let rec loop () =
match recv_bounded chan with
| None -> ()
| Some _ ->
Unix.sleepf 0.005; (* slow consumer *)
Mutex.lock m;
recv_times := Unix.gettimeofday () :: !recv_times;
Mutex.unlock m;
loop ()
in loop ()
) () in
Thread.join producer;
Thread.join consumer;
assert (List.length !sent_times = 9);
assert (List.length !recv_times = 9);
Printf.printf "Approach 1 (backpressure): sent=%d recv=%d (producer was blocked by slow consumer)\n"
(List.length !sent_times) (List.length !recv_times)
(* --- Approach 2: Producer detects backpressure (try_send) --- *)
let try_send c v =
Mutex.lock c.m;
let ok = Queue.length c.q < c.capacity in
if ok then begin
Queue.push v c.q;
Condition.signal c.not_empty
end;
Mutex.unlock c.m;
ok
let () =
let chan = make_bounded_chan 2 in
let accepted = ref 0 in
let dropped = ref 0 in
for i = 1 to 10 do
if try_send chan i then incr accepted
else incr dropped
done;
assert (!accepted = 2);
assert (!dropped = 8);
Printf.printf "Approach 2 (try_send): accepted=%d dropped=%d\n" !accepted !dropped
let () = Printf.printf "โ All tests passed\n"