๐Ÿฆ€ Functional Rust

1000: Reactive Stream

Difficulty: Advanced Category: Async / Concurrency FP Patterns Concept: Push-based Observable<T> with composable operators Key Insight: An `Observable` wraps a `subscribe_fn` โ€” it's a lazy push source; `map/filter/take` create new Observables that wrap the previous; this is the Rx pattern in pure std Rust without any crates

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 1000: Reactive Stream
// Push-based Observable<T> with map/filter/take/subscribe

use std::cell::RefCell;
use std::rc::Rc;

// --- Observer trait ---
trait Observer<T> {
    fn on_next(&mut self, value: T);
    fn on_error(&mut self, err: &str);
    fn on_complete(&mut self);
}

// --- Simple functional observer ---
struct FnObserver<T> {
    on_next_fn: Box<dyn FnMut(T)>,
    on_complete_fn: Box<dyn FnMut()>,
}

impl<T> FnObserver<T> {
    fn new(on_next: impl FnMut(T) + 'static) -> Self {
        FnObserver {
            on_next_fn: Box::new(on_next),
            on_complete_fn: Box::new(|| {}),
        }
    }

    fn with_complete(mut self, f: impl FnMut() + 'static) -> Self {
        self.on_complete_fn = Box::new(f);
        self
    }
}

impl<T> Observer<T> for FnObserver<T> {
    fn on_next(&mut self, value: T) { (self.on_next_fn)(value); }
    fn on_error(&mut self, _err: &str) {}
    fn on_complete(&mut self) { (self.on_complete_fn)(); }
}

// --- Observable: a lazy push source ---
struct Observable<T> {
    subscribe_fn: Box<dyn Fn(&mut dyn Observer<T>)>,
}

impl<T: Clone + 'static> Observable<T> {
    fn new(f: impl Fn(&mut dyn Observer<T>) + 'static) -> Self {
        Observable { subscribe_fn: Box::new(f) }
    }

    fn subscribe(&self, observer: &mut dyn Observer<T>) {
        (self.subscribe_fn)(observer);
    }

    fn from_iter(items: Vec<T>) -> Self {
        Observable::new(move |obs| {
            for item in &items {
                obs.on_next(item.clone());
            }
            obs.on_complete();
        })
    }
}

// --- Operators as free functions (return new Observable) ---

// Adapter structs allow borrowing `observer` without 'static
struct MapAdapter<'a, U, F> {
    inner: &'a mut dyn Observer<U>,
    f: &'a F,
}

impl<'a, T, U, F: Fn(T) -> U> Observer<T> for MapAdapter<'a, U, F> {
    fn on_next(&mut self, value: T) { self.inner.on_next((self.f)(value)); }
    fn on_error(&mut self, err: &str) { self.inner.on_error(err); }
    fn on_complete(&mut self) { self.inner.on_complete(); }
}

fn obs_map<T: Clone + 'static, U: Clone + 'static>(
    source: Observable<T>,
    f: impl Fn(T) -> U + 'static,
) -> Observable<U> {
    Observable::new(move |observer| {
        let mut adapter = MapAdapter { inner: observer, f: &f };
        source.subscribe(&mut adapter);
    })
}

struct FilterAdapter<'a, T, P> {
    inner: &'a mut dyn Observer<T>,
    pred: &'a P,
}

impl<'a, T, P: Fn(&T) -> bool> Observer<T> for FilterAdapter<'a, T, P> {
    fn on_next(&mut self, value: T) {
        if (self.pred)(&value) { self.inner.on_next(value); }
    }
    fn on_error(&mut self, err: &str) { self.inner.on_error(err); }
    fn on_complete(&mut self) { self.inner.on_complete(); }
}

fn obs_filter<T: Clone + 'static>(
    source: Observable<T>,
    pred: impl Fn(&T) -> bool + 'static,
) -> Observable<T> {
    Observable::new(move |observer| {
        let mut adapter = FilterAdapter { inner: observer, pred: &pred };
        source.subscribe(&mut adapter);
    })
}

struct TakeAdapter<'a, T> {
    inner: &'a mut dyn Observer<T>,
    remaining: usize,
}

impl<'a, T> Observer<T> for TakeAdapter<'a, T> {
    fn on_next(&mut self, value: T) {
        if self.remaining > 0 {
            self.remaining -= 1;
            self.inner.on_next(value);
        }
    }
    fn on_error(&mut self, err: &str) { self.inner.on_error(err); }
    fn on_complete(&mut self) { self.inner.on_complete(); }
}

fn obs_take<T: Clone + 'static>(source: Observable<T>, n: usize) -> Observable<T> {
    Observable::new(move |observer| {
        let mut adapter = TakeAdapter { inner: observer, remaining: n };
        source.subscribe(&mut adapter);
    })
}

// --- Collect all emitted values ---
fn collect<T: Clone + 'static>(source: Observable<T>) -> Vec<T> {
    let results = Rc::new(RefCell::new(Vec::new()));
    let results2 = Rc::clone(&results);
    let mut observer = FnObserver::new(move |v: T| {
        results2.borrow_mut().push(v);
    });
    source.subscribe(&mut observer);
    let x = results.borrow().clone(); x
}


#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_from_iter() {
        let obs = Observable::from_iter(vec![1, 2, 3]);
        assert_eq!(collect(obs), vec![1, 2, 3]);
    }

    #[test]
    fn test_map() {
        let obs = Observable::from_iter(vec![1, 2, 3]);
        let mapped = obs_map(obs, |x: i32| x * 2);
        assert_eq!(collect(mapped), vec![2, 4, 6]);
    }

    #[test]
    fn test_filter() {
        let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
        let filtered = obs_filter(obs, |x| x % 2 == 0);
        assert_eq!(collect(filtered), vec![2, 4]);
    }

    #[test]
    fn test_take() {
        let obs = Observable::from_iter(vec![1, 2, 3, 4, 5]);
        let taken = obs_take(obs, 3);
        assert_eq!(collect(taken), vec![1, 2, 3]);
    }

    #[test]
    fn test_chain() {
        let source = Observable::from_iter(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
        let filtered = obs_filter(source, |x| x % 2 == 0);
        let mapped = obs_map(filtered, |x: i32| x * x);
        let taken = obs_take(mapped, 3);
        assert_eq!(collect(taken), vec![4, 16, 36]);
    }

    #[test]
    fn test_empty_observable() {
        let obs: Observable<i32> = Observable::from_iter(vec![]);
        assert_eq!(collect(obs), vec![]);
    }
}
(* 1000: Reactive Stream *)
(* Push-based Observable<T> with map/filter/subscribe *)

(* --- Observable: a source that pushes values to subscribers --- *)

type 'a observer = {
  on_next: 'a -> unit;
  on_error: exn -> unit;
  on_complete: unit -> unit;
}

type 'a observable = { subscribe: 'a observer -> unit -> unit }
(* subscribe returns an unsubscribe function *)

let make_observer ?(on_error=fun _ -> ()) ?(on_complete=fun () -> ()) on_next =
  { on_next; on_error; on_complete }

(* --- Source: emit values from a list --- *)

let from_list xs =
  { subscribe = fun observer ->
    List.iter observer.on_next xs;
    observer.on_complete ();
    fun () -> () (* unsubscribe is no-op for completed streams *)
  }

(* --- Operators --- *)

let map f obs =
  { subscribe = fun observer ->
    obs.subscribe {
      on_next = (fun v -> observer.on_next (f v));
      on_error = observer.on_error;
      on_complete = observer.on_complete;
    }
  }

let filter pred obs =
  { subscribe = fun observer ->
    obs.subscribe {
      on_next = (fun v -> if pred v then observer.on_next v);
      on_error = observer.on_error;
      on_complete = observer.on_complete;
    }
  }

let take n obs =
  { subscribe = fun observer ->
    let count = ref 0 in
    obs.subscribe {
      on_next = (fun v ->
        if !count < n then begin
          incr count;
          observer.on_next v;
          if !count = n then observer.on_complete ()
        end);
      on_error = observer.on_error;
      on_complete = observer.on_complete;
    }
  }

(* --- Approach 1: Chain operators --- *)

let () =
  let results = ref [] in
  let source = from_list [1;2;3;4;5;6;7;8;9;10] in
  let stream =
    source
    |> filter (fun x -> x mod 2 = 0)  (* keep even *)
    |> map (fun x -> x * x)            (* square *)
    |> take 3                          (* first 3 *)
  in
  let observer = make_observer (fun v -> results := v :: !results) in
  let _ = stream.subscribe observer in
  let results = List.rev !results in
  assert (results = [4; 16; 36]);
  Printf.printf "Approach 1 (reactive chain): [%s]\n"
    (String.concat "; " (List.map string_of_int results))

(* --- Approach 2: Subject (hot observable โ€” broadcast to multiple) --- *)

type 'a subject = {
  mutable observers: ('a observer) list;
  mutable completed: bool;
}

let make_subject () = { observers = []; completed = false }

let subject_subscribe subj obs =
  if not subj.completed then
    subj.observers <- obs :: subj.observers;
  fun () -> subj.observers <- List.filter (fun o -> o != obs) subj.observers

let subject_next subj v =
  List.iter (fun o -> o.on_next v) subj.observers

let subject_complete subj =
  subj.completed <- true;
  List.iter (fun o -> o.on_complete ()) subj.observers

let () =
  let subj = make_subject () in
  let r1 = ref [] and r2 = ref [] in
  let _ = subject_subscribe subj (make_observer (fun v -> r1 := v :: !r1)) in
  let _ = subject_subscribe subj (make_observer (fun v -> r2 := v :: !r2)) in
  List.iter (subject_next subj) [10; 20; 30];
  subject_complete subj;
  assert (List.rev !r1 = [10;20;30]);
  assert (List.rev !r2 = [10;20;30]);
  Printf.printf "Approach 2 (subject): r1=%d r2=%d items\n"
    (List.length !r1) (List.length !r2)

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Reactive Stream โ€” Comparison

Core Insight

Reactive streams are push-based lazy sequences: a producer pushes values to a subscriber. Unlike iterators (pull-based), the producer controls timing. Operators like `map`/`filter` wrap one observable in another โ€” forming a lazy chain that only runs when subscribed.

OCaml Approach

  • `observable = { subscribe: observer -> unsubscribe }` as a record
  • `observer = { on_next; on_error; on_complete }` callbacks
  • `map`/`filter`/`take` create new observables wrapping the old
  • `Subject` (hot observable): broadcasts to multiple subscribers
  • Closely mirrors RxJS architecture

Rust Approach

  • `Observable<T>` wraps `Box<dyn Fn(&mut dyn Observer<T>)>`
  • `Observer<T>` trait with `on_next`, `on_error`, `on_complete`
  • `FnObserver` adapter for closure-based observers
  • `Rc<RefCell<_>>` for shared state within single-threaded observable
  • `obs_map`, `obs_filter`, `obs_take` as free functions returning new Observable
  • For production: `futures::Stream` or the `rxrust` crate

Comparison Table

ConceptOCamlRust
Observable type`{ subscribe: observer -> unsub }``struct Observable<T> { subscribe_fn }`
Observer type`{ on_next; on_error; on_complete }``trait Observer<T>`
Map operator`map f obs` โ†’ new observable`obs_map(source, f)` โ†’ `Observable<U>`
Filter operator`filter pred obs``obs_filter(source, pred)`
Take operator`take n obs``obs_take(source, n)`
Hot observable`Subject` typeManual with `Arc<Mutex<Vec<...>>>`
Production`Lwt_react`, `rxocaml``futures::Stream`, `rxrust`

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers