๐Ÿฆ€ Functional Rust

998: Circuit Breaker

Difficulty: Advanced Category: Async / Concurrency FP Patterns Concept: Fault tolerance state machine โ€” stop hammering failing services Key Insight: Three states: `Closed` (normal), `Open` (fail fast), `HalfOpen` (test recovery); transitions driven by failure count and elapsed time since opening

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 998: Circuit Breaker
// Open/Half-Open/Closed state machine for fault tolerance

use std::sync::Mutex;
use std::time::{Duration, Instant};

#[derive(Debug, PartialEq, Clone)]
enum BreakerState {
    Closed,
    Open { opened_at: Instant },
    HalfOpen,
}

pub struct CircuitBreaker {
    state: Mutex<BreakerState>,
    failures: Mutex<u32>,
    failure_threshold: u32,
    recovery_timeout: Duration,
}

#[derive(Debug, PartialEq)]
pub enum CallResult<T, E> {
    Success(T),
    Failure(E),
    CircuitOpen,
}

impl CircuitBreaker {
    pub fn new(failure_threshold: u32, recovery_timeout: Duration) -> Self {
        CircuitBreaker {
            state: Mutex::new(BreakerState::Closed),
            failures: Mutex::new(0),
            failure_threshold,
            recovery_timeout,
        }
    }

    fn maybe_transition_to_half_open(&self) {
        let mut state = self.state.lock().unwrap();
        if let BreakerState::Open { opened_at } = *state {
            if opened_at.elapsed() >= self.recovery_timeout {
                *state = BreakerState::HalfOpen;
            }
        }
    }

    pub fn call<T, E, F>(&self, f: F) -> CallResult<T, E>
    where
        F: FnOnce() -> Result<T, E>,
    {
        self.maybe_transition_to_half_open();

        let current_state = self.state.lock().unwrap().clone();
        match current_state {
            BreakerState::Open { .. } => CallResult::CircuitOpen,
            BreakerState::Closed | BreakerState::HalfOpen => {
                match f() {
                    Ok(v) => {
                        // Success: reset failures, close circuit
                        *self.failures.lock().unwrap() = 0;
                        *self.state.lock().unwrap() = BreakerState::Closed;
                        CallResult::Success(v)
                    }
                    Err(e) => {
                        let mut failures = self.failures.lock().unwrap();
                        *failures += 1;
                        if *failures >= self.failure_threshold {
                            *self.state.lock().unwrap() = BreakerState::Open {
                                opened_at: Instant::now(),
                            };
                        }
                        CallResult::Failure(e)
                    }
                }
            }
        }
    }

    pub fn state_name(&self) -> &'static str {
        match *self.state.lock().unwrap() {
            BreakerState::Closed => "Closed",
            BreakerState::Open { .. } => "Open",
            BreakerState::HalfOpen => "HalfOpen",
        }
    }

    pub fn reset(&self) {
        *self.state.lock().unwrap() = BreakerState::Closed;
        *self.failures.lock().unwrap() = 0;
    }
}

fn main() {
    let cb = CircuitBreaker::new(3, Duration::from_millis(50));
    println!("initial state: {}", cb.state_name());

    // Fail 3 times
    for i in 0..3 {
        let r = cb.call(|| Err::<i32, &str>("error"));
        println!("call {}: {:?}, state: {}", i+1, r, cb.state_name());
    }

    // Circuit open โ€” rejected
    let r = cb.call(|| Ok::<i32, &str>(42));
    println!("while open: {:?}", r);

    // Wait for recovery
    std::thread::sleep(Duration::from_millis(60));

    // Half-open โ€” let one through
    let r = cb.call(|| Ok::<i32, &str>(99));
    println!("after recovery: {:?}, state: {}", r, cb.state_name());
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_initial_state_closed() {
        let cb = CircuitBreaker::new(3, Duration::from_secs(1));
        assert_eq!(cb.state_name(), "Closed");
    }

    #[test]
    fn test_opens_after_threshold() {
        let cb = CircuitBreaker::new(3, Duration::from_secs(10));
        for _ in 0..3 {
            cb.call(|| Err::<i32, &str>("err"));
        }
        assert_eq!(cb.state_name(), "Open");
    }

    #[test]
    fn test_rejects_when_open() {
        let cb = CircuitBreaker::new(2, Duration::from_secs(10));
        cb.call(|| Err::<i32, &str>("e"));
        cb.call(|| Err::<i32, &str>("e")); // trip breaker
        let r = cb.call(|| Ok::<i32, &str>(42));
        assert_eq!(r, CallResult::CircuitOpen);
    }

    #[test]
    fn test_recovers_after_timeout() {
        let cb = CircuitBreaker::new(2, Duration::from_millis(20));
        cb.call(|| Err::<i32, &str>("e"));
        cb.call(|| Err::<i32, &str>("e")); // open
        assert_eq!(cb.state_name(), "Open");

        std::thread::sleep(Duration::from_millis(30));

        let r = cb.call(|| Ok::<i32, &str>(99));
        assert_eq!(r, CallResult::Success(99));
        assert_eq!(cb.state_name(), "Closed");
    }

    #[test]
    fn test_success_resets_failures() {
        let cb = CircuitBreaker::new(3, Duration::from_secs(1));
        cb.call(|| Err::<i32, &str>("e"));
        cb.call(|| Err::<i32, &str>("e")); // 2 failures
        cb.call(|| Ok::<i32, &str>(1));    // success โ€” reset
        cb.call(|| Err::<i32, &str>("e")); // 1 failure โ€” not open yet
        assert_eq!(cb.state_name(), "Closed");
    }
}
(* 998: Circuit Breaker *)
(* Open/Half-Open/Closed state machine for fault tolerance *)

type state = Closed | Open of float | HalfOpen

type 'a circuit_breaker = {
  mutable state: state;
  mutable failures: int;
  failure_threshold: int;
  recovery_timeout_s: float;
  m: Mutex.t;
}

let make_breaker ?(failure_threshold=3) ?(recovery_timeout_s=1.0) () = {
  state = Closed;
  failures = 0;
  failure_threshold;
  recovery_timeout_s;
  m = Mutex.create ();
}

type 'a breaker_result = BrResult of 'a | CircuitOpen | CallError of string

let call breaker f =
  Mutex.lock breaker.m;

  (* Transition from Open to HalfOpen if timeout elapsed *)
  (match breaker.state with
  | Open since when Unix.gettimeofday () -. since >= breaker.recovery_timeout_s ->
    breaker.state <- HalfOpen
  | _ -> ());

  let state = breaker.state in
  Mutex.unlock breaker.m;

  match state with
  | Open _ -> CircuitOpen
  | Closed | HalfOpen ->
    (match (try Ok (f ()) with e -> Error (Printexc.to_string e)) with
    | Ok v ->
      Mutex.lock breaker.m;
      breaker.failures <- 0;
      breaker.state <- Closed;
      Mutex.unlock breaker.m;
      BrResult v
    | Error e ->
      Mutex.lock breaker.m;
      breaker.failures <- breaker.failures + 1;
      if breaker.failures >= breaker.failure_threshold then
        breaker.state <- Open (Unix.gettimeofday ());
      Mutex.unlock breaker.m;
      CallError e)

let state_name b = match b.state with
  | Closed -> "Closed"
  | Open _ -> "Open"
  | HalfOpen -> "HalfOpen"

(* --- Approach 1: Fail 3 times โ†’ Open, then recover --- *)

let () =
  let b = make_breaker ~failure_threshold:3 ~recovery_timeout_s:0.05 () in
  assert (state_name b = "Closed");

  (* Fail 3 times *)
  for _ = 1 to 3 do
    let _ = call b (fun () -> failwith "simulated error") in ()
  done;
  assert (state_name b = "Open");
  Printf.printf "Approach 1: after 3 failures: %s\n" (state_name b);

  (* Circuit is open โ€” calls rejected *)
  (match call b (fun () -> 42) with
  | CircuitOpen -> Printf.printf "Approach 1: call rejected (circuit open)\n"
  | _ -> assert false);

  (* Wait for recovery timeout *)
  Unix.sleepf 0.06;
  (* Next call should go through (HalfOpen) *)
  (match call b (fun () -> 99) with
  | BrResult v ->
    assert (v = 99);
    assert (state_name b = "Closed");
    Printf.printf "Approach 1: recovered, got %d, state: %s\n" v (state_name b)
  | _ -> assert false)

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Circuit Breaker โ€” Comparison

Core Insight

The circuit breaker is an automatic state machine that protects callers from cascading failures. Like an electrical circuit breaker: too many failures "trip" it to Open, then it tests recovery in HalfOpen, then resets to Closed on success.

OCaml Approach

  • Mutable record fields for `state`, `failures`, `failure_threshold`
  • `Mutex` to protect state transitions (thread-safe)
  • `Unix.gettimeofday()` float for wall-clock timing
  • `Open of float` carries the timestamp when it opened
  • State check and transition in `call` function

Rust Approach

  • `Mutex<BreakerState>` + `Mutex<u32>` for state and failures
  • `BreakerState::Open { opened_at: Instant }` โ€” `Instant` for elapsed time
  • `Instant::now().elapsed() >= recovery_timeout` for timeout check
  • `call<T, E, F>(&self, f: F) -> CallResult<T, E>` โ€” generic over result type
  • `maybe_transition_to_half_open()` for clean separation

Comparison Table

ConceptOCamlRust
State enum`type state = Closed \Open \HalfOpen``enum BreakerState { Closed, Open { at: Instant }, HalfOpen }`
Thread safety`Mutex.t` + explicit lock/unlock`Mutex<BreakerState>` RAII
Timing`Unix.gettimeofday ()` (f64 secs)`Instant::now()` / `.elapsed()`
Call result`BrResult v \CircuitOpen \CallError e``CallResult<T,E>` enum
Transition logicPattern match in `call`Separate `maybe_transition` method
Generic over types`'a circuit_breaker`Generic `<T, E, F>` on `call`
ProductionManual or library (retrying-oc)`failsafe-rs`, `tower::limit`

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers