464: Actor Pattern
Difficulty: 3 Level: Intermediate Encapsulate mutable state in a thread with a message inbox โ no locks needed, ever.The Problem This Solves
Shared mutable state is the root of most concurrency bugs. You have a cache, a counter, or a connection pool that multiple threads need to read and write. The standard fix is `Arc<Mutex<T>>` โ but now you're managing lock lifetimes, worrying about deadlocks, and discovering that long critical sections destroy performance. The actor model takes a different approach: one thread owns the state, everyone else sends it messages. The state never moves between threads, so no locks are needed. Callers send a `Command` enum variant and optionally include a reply channel for responses. The actor processes messages one at a time โ serialization is structural, not enforced by locks. This is how Erlang, Akka, and Go's goroutines-with-channels work. In Rust, it maps directly to `std::sync::mpsc`: an actor is a thread looping on `recv()`, pattern-matching on command variants, and responding via one-shot channels embedded in the messages.The Intuition
An actor is a thread with a typed mailbox: all state lives inside the thread, all interaction is through message variants, and serialization is guaranteed by the single-threaded message loop โ no `Mutex` required. The trade-off: simpler reasoning about state, but every interaction has message-passing overhead instead of direct memory access.How It Works in Rust
use std::sync::mpsc;
use std::thread;
// Commands the actor understands
enum Command {
Increment,
Get(mpsc::SyncSender<i32>), // caller embeds a reply channel
Stop,
}
let (tx, rx) = mpsc::sync_channel::<Command>(32);
// Actor thread: owns all mutable state
thread::spawn(move || {
let mut count = 0; // state lives here โ never leaves this thread
for cmd in rx {
match cmd {
Command::Increment => count += 1,
Command::Get(reply) => reply.send(count).unwrap(),
Command::Stop => break,
}
}
});
// Callers send messages โ no locking, no sharing
tx.send(Command::Increment).unwrap();
tx.send(Command::Increment).unwrap();
let (reply_tx, reply_rx) = mpsc::sync_channel(1);
tx.send(Command::Get(reply_tx)).unwrap();
let value = reply_rx.recv().unwrap(); // blocks until actor replies
tx.send(Command::Stop).unwrap();
What This Unlocks
- Connection pools: the actor owns the pool, hands out connections via `Borrow` command, returns them via `Return` โ no deadlock possible.
- Shared caches: actor owns the `HashMap`, all threads read/write via message โ consistent without locks.
- Stateful protocol handlers: TCP connection state machine lives in one actor, driven by incoming packet messages.
Key Differences
| Concept | OCaml | Rust |
|---|---|---|
| Actor state | Mutable refs inside thread | Owned local variables in message loop |
| Message type | Variant type | `enum Command` |
| Reply mechanism | Shared `Queue` + blocking wait | `mpsc::SyncSender` embedded in message |
| No-reply message | Variant with no payload | Variant with no fields |
| Shutdown | `Stop` variant | `Stop` variant or drop all senders |
| Concurrency primitive | Manual mutex avoided | `mpsc::sync_channel` โ no `Mutex` needed |
// 464. Actor model in Rust
use std::sync::mpsc;
use std::thread;
enum Msg { Inc(i64), Dec(i64), Get(mpsc::SyncSender<i64>), Reset, Stop }
struct Actor { tx: mpsc::Sender<Msg> }
impl Actor {
fn new() -> Self {
let (tx,rx) = mpsc::channel::<Msg>();
thread::spawn(move || {
let mut s = 0i64;
for m in rx { match m {
Msg::Inc(n) => s+=n,
Msg::Dec(n) => s-=n,
Msg::Get(tx) => { let _=tx.send(s); }
Msg::Reset => s=0,
Msg::Stop => break,
}}
});
Actor { tx }
}
fn inc(&self, n: i64) { self.tx.send(Msg::Inc(n)).unwrap(); }
fn dec(&self, n: i64) { self.tx.send(Msg::Dec(n)).unwrap(); }
fn reset(&self) { self.tx.send(Msg::Reset).unwrap(); }
fn get(&self) -> i64 { let (t,r)=mpsc::sync_channel(1); self.tx.send(Msg::Get(t)).unwrap(); r.recv().unwrap() }
}
fn main() {
let a = Actor::new();
a.inc(10); a.inc(5); a.dec(3);
println!("state = {}", a.get()); // 12
a.reset(); println!("after reset = {}", a.get()); // 0
let a = std::sync::Arc::new(a);
let hs: Vec<_> = (0..4).map(|_|{let a=std::sync::Arc::clone(&a); thread::spawn(move || { for _ in 0..10 { a.inc(1); } })}).collect();
for h in hs { h.join().unwrap(); }
println!("after 4ร10 inc = {}", a.get()); // 40
}
#[cfg(test)]
mod tests {
use super::*;
#[test] fn test_actor() {
let a=Actor::new(); a.inc(7); a.inc(3); assert_eq!(a.get(),10);
a.dec(4); assert_eq!(a.get(),6); a.reset(); assert_eq!(a.get(),0);
}
}
(* 464. Actor model โ OCaml *)
type msg = Inc of int | Get of int Queue.t | Stop
let make_actor () =
let q=Queue.create () let m=Mutex.create () let c=Condition.create () in
let send v=Mutex.lock m; Queue.push v q; Condition.signal c; Mutex.unlock m in
let recv ()=Mutex.lock m; while Queue.is_empty q do Condition.wait c m done;
let v=Queue.pop q in Mutex.unlock m; v in
ignore (Thread.create (fun () ->
let st=ref 0 and go=ref true in
while !go do match recv () with
| Inc n -> st := !st+n
| Get rq -> Queue.push !st rq
| Stop -> go:=false
done) ());
send
let () =
let send=make_actor () in
send (Inc 10); send (Inc 5);
let rq=Queue.create () in send (Get rq);
(* spin wait *)
while Queue.is_empty rq do Thread.delay 0.001 done;
Printf.printf "state=%d\n" (Queue.pop rq);
send Stop; Thread.delay 0.01