use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;
// Broadcast channel: one sender, many receivers each get a copy
struct BroadcastSender<T: Clone> {
subscribers: Arc<Mutex<Vec<mpsc::SyncSender<T>>>>,
}
struct BroadcastReceiver<T> {
rx: mpsc::Receiver<T>,
}
impl<T: Clone + Send + 'static> BroadcastSender<T> {
fn new() -> Self {
Self { subscribers: Arc::new(Mutex::new(Vec::new())) }
}
fn subscribe(&self, buf: usize) -> BroadcastReceiver<T> {
let (tx, rx) = mpsc::sync_channel(buf);
self.subscribers.lock().unwrap().push(tx);
BroadcastReceiver { rx }
}
fn send(&self, msg: T) {
let subs = self.subscribers.lock().unwrap();
let mut alive = Vec::new();
for sub in subs.iter() {
// Clone for each subscriber
if sub.try_send(msg.clone()).is_ok() {
alive.push(sub.clone());
}
}
// Note: we don't prune dead subs in this simple version
}
}
impl<T> BroadcastReceiver<T> {
fn recv(&self) -> Option<T> {
self.rx.recv().ok()
}
fn try_recv(&self) -> Option<T> {
self.rx.try_recv().ok()
}
}
fn main() {
let sender: BroadcastSender<String> = BroadcastSender::new();
let r1 = sender.subscribe(16);
let r2 = sender.subscribe(16);
let r3 = sender.subscribe(16);
let h1 = thread::spawn(move || {
let msg = r1.recv().unwrap();
println!("Receiver 1 got: {msg}");
msg
});
let h2 = thread::spawn(move || {
let msg = r2.recv().unwrap();
println!("Receiver 2 got: {msg}");
msg
});
let h3 = thread::spawn(move || {
let msg = r3.recv().unwrap();
println!("Receiver 3 got: {msg}");
msg
});
thread::sleep(std::time::Duration::from_millis(5));
sender.send("hello everyone".to_string());
let (m1, m2, m3) = (h1.join().unwrap(), h2.join().unwrap(), h3.join().unwrap());
assert_eq!(m1, m2);
assert_eq!(m2, m3);
println!("All received the same message โ");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn all_subscribers_get_message() {
let s: BroadcastSender<i32> = BroadcastSender::new();
let r1 = s.subscribe(4);
let r2 = s.subscribe(4);
s.send(42);
assert_eq!(r1.recv(), Some(42));
assert_eq!(r2.recv(), Some(42));
}
#[test]
fn multiple_messages() {
let s: BroadcastSender<i32> = BroadcastSender::new();
let r = s.subscribe(8);
for i in 0..5 { s.send(i); }
let msgs: Vec<_> = (0..5).map(|_| r.recv().unwrap()).collect();
assert_eq!(msgs, vec![0,1,2,3,4]);
}
}
(* OCaml: broadcast via multiple channels *)
let broadcast subscribers message =
List.iter (fun ch ->
Event.sync (Event.send ch message)
) subscribers
let () =
let ch1 = Event.new_channel () in
let ch2 = Event.new_channel () in
let ch3 = Event.new_channel () in
let listener label ch =
Thread.create (fun () ->
let msg = Event.sync (Event.receive ch) in
Printf.printf "%s received: %s\n" label msg
) ()
in
let t1 = listener "A" ch1 in
let t2 = listener "B" ch2 in
let t3 = listener "C" ch3 in
broadcast [ch1; ch2; ch3] "hello everyone";
Thread.join t1;
Thread.join t2;
Thread.join t3