๐Ÿฆ€ Functional Rust

992: Actor Pattern

Difficulty: Intermediate Category: Async / Concurrency FP Patterns Concept: Encapsulate mutable state in a thread with a typed message inbox Key Insight: An actor is just `thread + mpsc::channel<Message>` โ€” the enum `Message` is the actor's API; request-response uses a `Sender<Reply>` embedded in the message

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 992: Actor Pattern
// Rust: enum Message + thread + mpsc channel mailbox

use std::sync::mpsc;
use std::thread;

// --- Approach 1: Counter actor ---
#[derive(Debug)]
enum CounterMsg {
    Increment(i64),
    Decrement(i64),
    GetValue(mpsc::Sender<i64>),
    Shutdown,
}

struct CounterActor {
    tx: mpsc::Sender<CounterMsg>,
}

impl CounterActor {
    fn spawn() -> Self {
        let (tx, rx) = mpsc::channel::<CounterMsg>();
        thread::spawn(move || {
            let mut state: i64 = 0;
            for msg in rx.iter() {
                match msg {
                    CounterMsg::Increment(n) => state += n,
                    CounterMsg::Decrement(n) => state -= n,
                    CounterMsg::GetValue(reply) => { reply.send(state).ok(); }
                    CounterMsg::Shutdown => break,
                }
            }
        });
        CounterActor { tx }
    }

    fn increment(&self, n: i64) { self.tx.send(CounterMsg::Increment(n)).unwrap(); }
    fn decrement(&self, n: i64) { self.tx.send(CounterMsg::Decrement(n)).unwrap(); }

    fn get_value(&self) -> i64 {
        let (reply_tx, reply_rx) = mpsc::channel();
        self.tx.send(CounterMsg::GetValue(reply_tx)).unwrap();
        reply_rx.recv().unwrap()
    }

    fn shutdown(self) { self.tx.send(CounterMsg::Shutdown).ok(); }
}

// --- Approach 2: Generic actor with request-response ---
#[derive(Debug)]
enum AdderMsg {
    Add { a: i32, b: i32, reply: mpsc::Sender<i32> },
    Stop,
}

struct AdderActor {
    tx: mpsc::Sender<AdderMsg>,
}

impl AdderActor {
    fn spawn() -> Self {
        let (tx, rx) = mpsc::channel::<AdderMsg>();
        thread::spawn(move || {
            for msg in rx.iter() {
                match msg {
                    AdderMsg::Add { a, b, reply } => { reply.send(a + b).ok(); }
                    AdderMsg::Stop => break,
                }
            }
        });
        AdderActor { tx }
    }

    fn add(&self, a: i32, b: i32) -> i32 {
        let (reply_tx, reply_rx) = mpsc::channel();
        self.tx.send(AdderMsg::Add { a, b, reply: reply_tx }).unwrap();
        reply_rx.recv().unwrap()
    }

    fn stop(self) { self.tx.send(AdderMsg::Stop).ok(); }
}

// --- Approach 3: State machine actor ---
#[derive(Debug, PartialEq, Clone)]
enum TrafficLight { Red, Yellow, Green }

#[derive(Debug)]
enum TrafficMsg {
    Next,
    GetState(mpsc::Sender<TrafficLight>),
    Stop,
}

struct TrafficActor { tx: mpsc::Sender<TrafficMsg> }

impl TrafficActor {
    fn spawn() -> Self {
        let (tx, rx) = mpsc::channel::<TrafficMsg>();
        thread::spawn(move || {
            let mut state = TrafficLight::Red;
            for msg in rx.iter() {
                match msg {
                    TrafficMsg::Next => {
                        state = match state {
                            TrafficLight::Red => TrafficLight::Green,
                            TrafficLight::Green => TrafficLight::Yellow,
                            TrafficLight::Yellow => TrafficLight::Red,
                        };
                    }
                    TrafficMsg::GetState(reply) => { reply.send(state.clone()).ok(); }
                    TrafficMsg::Stop => break,
                }
            }
        });
        TrafficActor { tx }
    }

    fn next(&self) { self.tx.send(TrafficMsg::Next).unwrap(); }

    fn state(&self) -> TrafficLight {
        let (r, rx) = mpsc::channel();
        self.tx.send(TrafficMsg::GetState(r)).unwrap();
        rx.recv().unwrap()
    }

    fn stop(self) { self.tx.send(TrafficMsg::Stop).ok(); }
}

fn main() {
    let actor = CounterActor::spawn();
    actor.increment(10);
    actor.increment(5);
    actor.decrement(3);
    println!("counter: {}", actor.get_value());
    actor.shutdown();

    let adder = AdderActor::spawn();
    println!("17 + 25 = {}", adder.add(17, 25));
    adder.stop();

    let traffic = TrafficActor::spawn();
    traffic.next(); traffic.next();
    println!("traffic state: {:?}", traffic.state());
    traffic.stop();
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_counter_actor() {
        let actor = CounterActor::spawn();
        actor.increment(10);
        actor.increment(5);
        actor.decrement(3);
        assert_eq!(actor.get_value(), 12);
        actor.shutdown();
    }

    #[test]
    fn test_adder_actor() {
        let adder = AdderActor::spawn();
        assert_eq!(adder.add(17, 25), 42);
        assert_eq!(adder.add(1, 1), 2);
        adder.stop();
    }

    #[test]
    fn test_traffic_light_actor() {
        let t = TrafficActor::spawn();
        assert_eq!(t.state(), TrafficLight::Red);
        t.next();
        assert_eq!(t.state(), TrafficLight::Green);
        t.next();
        assert_eq!(t.state(), TrafficLight::Yellow);
        t.next();
        assert_eq!(t.state(), TrafficLight::Red);
        t.stop();
    }

    #[test]
    fn test_counter_negative() {
        let actor = CounterActor::spawn();
        actor.decrement(5);
        assert_eq!(actor.get_value(), -5);
        actor.shutdown();
    }
}
(* 992: Actor Pattern *)
(* Actor = thread + channel mailbox. Messages are enum variants *)

(* --- Message type for a counter actor --- *)

type message =
  | Increment of int
  | Decrement of int
  | GetValue of int ref * Mutex.t * Condition.t
  | Shutdown

(* --- Channel helpers --- *)

type 'a chan = { q: 'a Queue.t; m: Mutex.t; cond: Condition.t }

let make_chan () = { q = Queue.create (); m = Mutex.create (); cond = Condition.create () }

let send c v =
  Mutex.lock c.m;
  Queue.push v c.q;
  Condition.signal c.cond;
  Mutex.unlock c.m

let recv c =
  Mutex.lock c.m;
  while Queue.is_empty c.q do Condition.wait c.cond c.m done;
  let v = Queue.pop c.q in
  Mutex.unlock c.m;
  v

(* --- Actor: runs in its own thread, processes messages --- *)

let make_counter_actor () =
  let mailbox = make_chan () in

  let _actor_thread = Thread.create (fun () ->
    let state = ref 0 in
    let running = ref true in
    while !running do
      match recv mailbox with
      | Increment n -> state := !state + n
      | Decrement n -> state := !state - n
      | GetValue (result, m, cond) ->
        Mutex.lock m;
        result := !state;
        Condition.signal cond;
        Mutex.unlock m
      | Shutdown -> running := false
    done
  ) () in

  mailbox

(* --- Approach 1: Send messages to actor --- *)

let () =
  let actor = make_counter_actor () in

  send actor (Increment 10);
  send actor (Increment 5);
  send actor (Decrement 3);

  (* Synchronous get: send a "reply channel" in message *)
  let result = ref 0 in
  let reply_m = Mutex.create () in
  let reply_cond = Condition.create () in
  Mutex.lock reply_m;
  send actor (GetValue (result, reply_m, reply_cond));
  Condition.wait reply_cond reply_m;
  Mutex.unlock reply_m;

  assert (!result = 12); (* 10+5-3 *)
  Printf.printf "Approach 1 (counter actor): %d\n" !result;

  send actor Shutdown

(* --- Approach 2: Multiple actors collaborating --- *)

type adder_msg = Add of int * int * (int -> unit) | Stop

let make_adder_actor () =
  let mailbox = make_chan () in
  let _ = Thread.create (fun () ->
    let rec loop () =
      match recv mailbox with
      | Add (a, b, reply) -> reply (a + b); loop ()
      | Stop -> ()
    in
    loop ()
  ) () in
  mailbox

let () =
  let adder = make_adder_actor () in
  let result = ref 0 in
  let m = Mutex.create () in
  let cond = Condition.create () in
  Mutex.lock m;
  send adder (Add (17, 25, fun v ->
    Mutex.lock m;
    result := v;
    Condition.signal cond;
    Mutex.unlock m
  ));
  Condition.wait cond m;
  Mutex.unlock m;
  assert (!result = 42);
  Printf.printf "Approach 2 (adder actor): %d\n" !result;
  send adder Stop

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Actor Pattern โ€” Comparison

Core Insight

Actors replace shared mutable state with message passing to a single owner. The actor owns its state exclusively โ€” no locks needed because only one thread touches it. The channel IS the thread-safe boundary.

OCaml Approach

  • Thread + Queue + Mutex + Condition = manual mailbox
  • Message type is a variant; actor loops over `recv` calls
  • Request-response: embed `result ref` + `Condition` in message
  • More boilerplate, same concept as Rust
  • Akka/Erlang heritage โ€” functional languages pioneered this pattern

Rust Approach

  • `mpsc::channel::<Message>()` is the mailbox
  • `thread::spawn` runs the actor loop: `for msg in rx.iter() { match msg }`
  • Request-response: embed `mpsc::Sender<Reply>` in the message variant
  • Struct wraps the `Sender<Message>` โ€” provides typed API methods
  • No `Arc<Mutex<...>>` needed โ€” the actor owns all state

Comparison Table

ConceptOCamlRust
Mailbox`Queue` + `Mutex` + `Condition``mpsc::channel::<Msg>()`
Message typeVariant type / ADT`enum Message { ... }`
Actor loop`while running { match recv }``for msg in rx.iter() { match msg }`
Request-responseEmbed `result ref` + ConditionEmbed `Sender<Reply>` in variant
State ownership`ref` inside actor closureLocal variable in thread closure
Shutdown`Shutdown` variant`Shutdown` variant (or drop tx)
No lock neededYes โ€” one ownerYes โ€” one owner

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers