๐Ÿฆ€ Functional Rust

464: Actor Pattern

Difficulty: 3 Level: Intermediate Encapsulate mutable state in a thread with a message inbox โ€” no locks needed, ever.

The Problem This Solves

Shared mutable state is the root of most concurrency bugs. You have a cache, a counter, or a connection pool that multiple threads need to read and write. The standard fix is `Arc<Mutex<T>>` โ€” but now you're managing lock lifetimes, worrying about deadlocks, and discovering that long critical sections destroy performance. The actor model takes a different approach: one thread owns the state, everyone else sends it messages. The state never moves between threads, so no locks are needed. Callers send a `Command` enum variant and optionally include a reply channel for responses. The actor processes messages one at a time โ€” serialization is structural, not enforced by locks. This is how Erlang, Akka, and Go's goroutines-with-channels work. In Rust, it maps directly to `std::sync::mpsc`: an actor is a thread looping on `recv()`, pattern-matching on command variants, and responding via one-shot channels embedded in the messages.

The Intuition

An actor is a thread with a typed mailbox: all state lives inside the thread, all interaction is through message variants, and serialization is guaranteed by the single-threaded message loop โ€” no `Mutex` required. The trade-off: simpler reasoning about state, but every interaction has message-passing overhead instead of direct memory access.

How It Works in Rust

use std::sync::mpsc;
use std::thread;

// Commands the actor understands
enum Command {
 Increment,
 Get(mpsc::SyncSender<i32>),  // caller embeds a reply channel
 Stop,
}

let (tx, rx) = mpsc::sync_channel::<Command>(32);

// Actor thread: owns all mutable state
thread::spawn(move || {
 let mut count = 0;           // state lives here โ€” never leaves this thread
 for cmd in rx {
     match cmd {
         Command::Increment => count += 1,
         Command::Get(reply) => reply.send(count).unwrap(),
         Command::Stop => break,
     }
 }
});

// Callers send messages โ€” no locking, no sharing
tx.send(Command::Increment).unwrap();
tx.send(Command::Increment).unwrap();

let (reply_tx, reply_rx) = mpsc::sync_channel(1);
tx.send(Command::Get(reply_tx)).unwrap();
let value = reply_rx.recv().unwrap();  // blocks until actor replies

tx.send(Command::Stop).unwrap();

What This Unlocks

Key Differences

ConceptOCamlRust
Actor stateMutable refs inside threadOwned local variables in message loop
Message typeVariant type`enum Command`
Reply mechanismShared `Queue` + blocking wait`mpsc::SyncSender` embedded in message
No-reply messageVariant with no payloadVariant with no fields
Shutdown`Stop` variant`Stop` variant or drop all senders
Concurrency primitiveManual mutex avoided`mpsc::sync_channel` โ€” no `Mutex` needed
// 464. Actor model in Rust
use std::sync::mpsc;
use std::thread;

enum Msg { Inc(i64), Dec(i64), Get(mpsc::SyncSender<i64>), Reset, Stop }

struct Actor { tx: mpsc::Sender<Msg> }

impl Actor {
    fn new() -> Self {
        let (tx,rx) = mpsc::channel::<Msg>();
        thread::spawn(move || {
            let mut s = 0i64;
            for m in rx { match m {
                Msg::Inc(n)  => s+=n,
                Msg::Dec(n)  => s-=n,
                Msg::Get(tx) => { let _=tx.send(s); }
                Msg::Reset   => s=0,
                Msg::Stop    => break,
            }}
        });
        Actor { tx }
    }
    fn inc(&self, n: i64)  { self.tx.send(Msg::Inc(n)).unwrap(); }
    fn dec(&self, n: i64)  { self.tx.send(Msg::Dec(n)).unwrap(); }
    fn reset(&self)         { self.tx.send(Msg::Reset).unwrap(); }
    fn get(&self) -> i64    { let (t,r)=mpsc::sync_channel(1); self.tx.send(Msg::Get(t)).unwrap(); r.recv().unwrap() }
}

fn main() {
    let a = Actor::new();
    a.inc(10); a.inc(5); a.dec(3);
    println!("state = {}", a.get()); // 12
    a.reset(); println!("after reset = {}", a.get()); // 0

    let a = std::sync::Arc::new(a);
    let hs: Vec<_> = (0..4).map(|_|{let a=std::sync::Arc::clone(&a); thread::spawn(move || { for _ in 0..10 { a.inc(1); } })}).collect();
    for h in hs { h.join().unwrap(); }
    println!("after 4ร—10 inc = {}", a.get()); // 40
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test] fn test_actor() {
        let a=Actor::new(); a.inc(7); a.inc(3); assert_eq!(a.get(),10);
        a.dec(4); assert_eq!(a.get(),6); a.reset(); assert_eq!(a.get(),0);
    }
}
(* 464. Actor model โ€“ OCaml *)
type msg = Inc of int | Get of int Queue.t | Stop

let make_actor () =
  let q=Queue.create () let m=Mutex.create () let c=Condition.create () in
  let send v=Mutex.lock m; Queue.push v q; Condition.signal c; Mutex.unlock m in
  let recv ()=Mutex.lock m; while Queue.is_empty q do Condition.wait c m done;
    let v=Queue.pop q in Mutex.unlock m; v in
  ignore (Thread.create (fun () ->
    let st=ref 0 and go=ref true in
    while !go do match recv () with
    | Inc n -> st := !st+n
    | Get rq -> Queue.push !st rq
    | Stop -> go:=false
    done) ());
  send

let () =
  let send=make_actor () in
  send (Inc 10); send (Inc 5);
  let rq=Queue.create () in send (Get rq);
  (* spin wait *)
  while Queue.is_empty rq do Thread.delay 0.001 done;
  Printf.printf "state=%d\n" (Queue.pop rq);
  send Stop; Thread.delay 0.01