ExamplesBy LevelBy TopicLearning Paths
922 Advanced

922-cancellation-token — Cancellation Token

Functional Programming

Tutorial

The Problem

Long-running operations — large file downloads, iterative computations, background scans — must be stoppable. Stopping a thread forcefully from outside is unsafe in any language (leaked resources, inconsistent state). The safe pattern is cooperative cancellation: the running task periodically checks a cancellation flag and exits cleanly when set. This is the CancellationToken pattern from .NET, context.Context from Go, and AbortController from JavaScript. Rust's std::sync::atomic::AtomicBool provides the thread-safe flag needed for this pattern without heap allocation overhead.

🎯 Learning Outcomes

  • • Implement a CancellationToken using Arc<AtomicBool> for shared cancellation state
  • • Use Ordering::Release and Ordering::Acquire for correct cross-thread visibility
  • • Integrate cancellation checks into long-running loops
  • • Clone tokens for multi-task cancellation (one cancel, many stop)
  • • Compare with OCaml's Lwt_switch and Fiber.with_cancellation
  • Code Example

    //! Cooperative cancellation via a shared atomic flag.
    //!
    //! A [`CancellationToken`] lets one thread signal another to stop work.
    //! The worker periodically checks the token and exits early when
    //! cancellation has been requested.
    
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;
    use std::thread;
    use std::time::Duration;
    
    /// A clonable, thread-safe cooperative cancellation flag.
    #[derive(Debug, Clone, Default)]
    pub struct CancellationToken {
        flag: Arc<AtomicBool>,
    }
    
    impl CancellationToken {
        /// Creates a new token in the non-cancelled state.
        pub fn new() -> Self {
            Self {
                flag: Arc::new(AtomicBool::new(false)),
            }
        }
    
        /// Signals cancellation. Idempotent.
        pub fn cancel(&self) {
            self.flag.store(true, Ordering::SeqCst);
        }
    
        /// Returns `true` if cancellation has been requested.
        pub fn is_cancelled(&self) -> bool {
            self.flag.load(Ordering::SeqCst)
        }
    }
    
    /// Outcome of a cancellable task run.
    #[derive(Debug, PartialEq, Eq)]
    pub enum TaskOutcome {
        /// Task finished all requested steps.
        Completed(usize),
        /// Task was cancelled at the given step index.
        Cancelled(usize),
    }
    
    /// Runs a cooperative long-running task that checks `token` between steps.
    ///
    /// `on_step` is invoked for each step that actually runs. Between steps the
    /// thread sleeps for `step_delay`, mirroring the OCaml `Thread.delay 0.01`.
    pub fn long_task<F>(
        steps: usize,
        token: &CancellationToken,
        step_delay: Duration,
        mut on_step: F,
    ) -> TaskOutcome
    where
        F: FnMut(usize),
    {
        for i in 0..steps {
            if token.is_cancelled() {
                return TaskOutcome::Cancelled(i);
            }
            on_step(i);
            thread::sleep(step_delay);
        }
        if token.is_cancelled() {
            TaskOutcome::Cancelled(steps)
        } else {
            TaskOutcome::Completed(steps)
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn new_token_is_not_cancelled() {
            let token = CancellationToken::new();
            assert!(!token.is_cancelled());
        }
    
        #[test]
        fn cancel_sets_the_flag() {
            let token = CancellationToken::new();
            token.cancel();
            assert!(token.is_cancelled());
        }
    
        #[test]
        fn clones_share_state() {
            let token = CancellationToken::new();
            let clone = token.clone();
            token.cancel();
            assert!(clone.is_cancelled());
        }
    
        #[test]
        fn cancel_is_idempotent() {
            let token = CancellationToken::new();
            token.cancel();
            token.cancel();
            assert!(token.is_cancelled());
        }
    
        #[test]
        fn task_runs_to_completion_when_not_cancelled() {
            let token = CancellationToken::new();
            let outcome = long_task(5, &token, Duration::from_millis(0), |_| {});
            assert_eq!(outcome, TaskOutcome::Completed(5));
        }
    
        #[test]
        fn task_stops_immediately_when_pre_cancelled() {
            let token = CancellationToken::new();
            token.cancel();
            let outcome = long_task(5, &token, Duration::from_millis(0), |_| {
                panic!("step should not run");
            });
            assert_eq!(outcome, TaskOutcome::Cancelled(0));
        }
    
        #[test]
        fn task_stops_when_cancelled_from_another_thread() {
            let token = CancellationToken::new();
            let worker_token = token.clone();
    
            let handle = thread::spawn(move || {
                long_task(100, &worker_token, Duration::from_millis(10), |_| {})
            });
    
            thread::sleep(Duration::from_millis(35));
            token.cancel();
    
            match handle.join().expect("worker panicked") {
                TaskOutcome::Cancelled(step) => assert!(step < 100),
                other => panic!("expected cancellation, got {:?}", other),
            }
        }
    
        #[test]
        fn on_step_receives_increasing_indices() {
            let token = CancellationToken::new();
            let mut seen = Vec::new();
            let outcome = long_task(3, &token, Duration::from_millis(0), |i| seen.push(i));
            assert_eq!(outcome, TaskOutcome::Completed(3));
            assert_eq!(seen, vec![0, 1, 2]);
        }
    
        #[test]
        fn zero_step_task_completes_immediately() {
            let token = CancellationToken::new();
            let outcome = long_task(0, &token, Duration::from_millis(0), |_| {
                panic!("no steps expected");
            });
            assert_eq!(outcome, TaskOutcome::Completed(0));
        }
    }

    Key Differences

  • Atomics vs Mutex: Rust uses AtomicBool for lock-free cancellation checking; OCaml needs ref + Mutex for equivalent thread safety.
  • Memory ordering: Rust's Release/Acquire semantics are explicit and precise; OCaml's Mutex provides full mutual exclusion (stronger but more costly).
  • Cloneability: Rust tokens clone cheaply via Arc::clone (atomic reference count); OCaml's Mutex-wrapped bool requires similar Arc / ref wrapping.
  • Structured cancellation: OCaml's Lwt_switch integrates with the Lwt event loop; Rust's AtomicBool is a low-level primitive — tokio::CancellationToken is the high-level version.
  • OCaml Approach

    OCaml's Lwt_switch provides cooperative cancellation for Lwt promises. Lwt_switch.create () creates a switch; Lwt_switch.add_hook switch f registers cleanup; Lwt_switch.turn_off switch cancels. OCaml 5 Eio uses Fiber.with_cancellation and Cancel.cancel. For plain threads: let cancelled = ref false with a Mutex for thread safety, equivalent to Rust's AtomicBool. The Go-style context.Context has no direct OCaml equivalent in the standard library.

    Full Source

    //! Cooperative cancellation via a shared atomic flag.
    //!
    //! A [`CancellationToken`] lets one thread signal another to stop work.
    //! The worker periodically checks the token and exits early when
    //! cancellation has been requested.
    
    use std::sync::atomic::{AtomicBool, Ordering};
    use std::sync::Arc;
    use std::thread;
    use std::time::Duration;
    
    /// A clonable, thread-safe cooperative cancellation flag.
    #[derive(Debug, Clone, Default)]
    pub struct CancellationToken {
        flag: Arc<AtomicBool>,
    }
    
    impl CancellationToken {
        /// Creates a new token in the non-cancelled state.
        pub fn new() -> Self {
            Self {
                flag: Arc::new(AtomicBool::new(false)),
            }
        }
    
        /// Signals cancellation. Idempotent.
        pub fn cancel(&self) {
            self.flag.store(true, Ordering::SeqCst);
        }
    
        /// Returns `true` if cancellation has been requested.
        pub fn is_cancelled(&self) -> bool {
            self.flag.load(Ordering::SeqCst)
        }
    }
    
    /// Outcome of a cancellable task run.
    #[derive(Debug, PartialEq, Eq)]
    pub enum TaskOutcome {
        /// Task finished all requested steps.
        Completed(usize),
        /// Task was cancelled at the given step index.
        Cancelled(usize),
    }
    
    /// Runs a cooperative long-running task that checks `token` between steps.
    ///
    /// `on_step` is invoked for each step that actually runs. Between steps the
    /// thread sleeps for `step_delay`, mirroring the OCaml `Thread.delay 0.01`.
    pub fn long_task<F>(
        steps: usize,
        token: &CancellationToken,
        step_delay: Duration,
        mut on_step: F,
    ) -> TaskOutcome
    where
        F: FnMut(usize),
    {
        for i in 0..steps {
            if token.is_cancelled() {
                return TaskOutcome::Cancelled(i);
            }
            on_step(i);
            thread::sleep(step_delay);
        }
        if token.is_cancelled() {
            TaskOutcome::Cancelled(steps)
        } else {
            TaskOutcome::Completed(steps)
        }
    }
    
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn new_token_is_not_cancelled() {
            let token = CancellationToken::new();
            assert!(!token.is_cancelled());
        }
    
        #[test]
        fn cancel_sets_the_flag() {
            let token = CancellationToken::new();
            token.cancel();
            assert!(token.is_cancelled());
        }
    
        #[test]
        fn clones_share_state() {
            let token = CancellationToken::new();
            let clone = token.clone();
            token.cancel();
            assert!(clone.is_cancelled());
        }
    
        #[test]
        fn cancel_is_idempotent() {
            let token = CancellationToken::new();
            token.cancel();
            token.cancel();
            assert!(token.is_cancelled());
        }
    
        #[test]
        fn task_runs_to_completion_when_not_cancelled() {
            let token = CancellationToken::new();
            let outcome = long_task(5, &token, Duration::from_millis(0), |_| {});
            assert_eq!(outcome, TaskOutcome::Completed(5));
        }
    
        #[test]
        fn task_stops_immediately_when_pre_cancelled() {
            let token = CancellationToken::new();
            token.cancel();
            let outcome = long_task(5, &token, Duration::from_millis(0), |_| {
                panic!("step should not run");
            });
            assert_eq!(outcome, TaskOutcome::Cancelled(0));
        }
    
        #[test]
        fn task_stops_when_cancelled_from_another_thread() {
            let token = CancellationToken::new();
            let worker_token = token.clone();
    
            let handle = thread::spawn(move || {
                long_task(100, &worker_token, Duration::from_millis(10), |_| {})
            });
    
            thread::sleep(Duration::from_millis(35));
            token.cancel();
    
            match handle.join().expect("worker panicked") {
                TaskOutcome::Cancelled(step) => assert!(step < 100),
                other => panic!("expected cancellation, got {:?}", other),
            }
        }
    
        #[test]
        fn on_step_receives_increasing_indices() {
            let token = CancellationToken::new();
            let mut seen = Vec::new();
            let outcome = long_task(3, &token, Duration::from_millis(0), |i| seen.push(i));
            assert_eq!(outcome, TaskOutcome::Completed(3));
            assert_eq!(seen, vec![0, 1, 2]);
        }
    
        #[test]
        fn zero_step_task_completes_immediately() {
            let token = CancellationToken::new();
            let outcome = long_task(0, &token, Duration::from_millis(0), |_| {
                panic!("no steps expected");
            });
            assert_eq!(outcome, TaskOutcome::Completed(0));
        }
    }
    ✓ Tests Rust test suite
    #[cfg(test)]
    mod tests {
        use super::*;
    
        #[test]
        fn new_token_is_not_cancelled() {
            let token = CancellationToken::new();
            assert!(!token.is_cancelled());
        }
    
        #[test]
        fn cancel_sets_the_flag() {
            let token = CancellationToken::new();
            token.cancel();
            assert!(token.is_cancelled());
        }
    
        #[test]
        fn clones_share_state() {
            let token = CancellationToken::new();
            let clone = token.clone();
            token.cancel();
            assert!(clone.is_cancelled());
        }
    
        #[test]
        fn cancel_is_idempotent() {
            let token = CancellationToken::new();
            token.cancel();
            token.cancel();
            assert!(token.is_cancelled());
        }
    
        #[test]
        fn task_runs_to_completion_when_not_cancelled() {
            let token = CancellationToken::new();
            let outcome = long_task(5, &token, Duration::from_millis(0), |_| {});
            assert_eq!(outcome, TaskOutcome::Completed(5));
        }
    
        #[test]
        fn task_stops_immediately_when_pre_cancelled() {
            let token = CancellationToken::new();
            token.cancel();
            let outcome = long_task(5, &token, Duration::from_millis(0), |_| {
                panic!("step should not run");
            });
            assert_eq!(outcome, TaskOutcome::Cancelled(0));
        }
    
        #[test]
        fn task_stops_when_cancelled_from_another_thread() {
            let token = CancellationToken::new();
            let worker_token = token.clone();
    
            let handle = thread::spawn(move || {
                long_task(100, &worker_token, Duration::from_millis(10), |_| {})
            });
    
            thread::sleep(Duration::from_millis(35));
            token.cancel();
    
            match handle.join().expect("worker panicked") {
                TaskOutcome::Cancelled(step) => assert!(step < 100),
                other => panic!("expected cancellation, got {:?}", other),
            }
        }
    
        #[test]
        fn on_step_receives_increasing_indices() {
            let token = CancellationToken::new();
            let mut seen = Vec::new();
            let outcome = long_task(3, &token, Duration::from_millis(0), |i| seen.push(i));
            assert_eq!(outcome, TaskOutcome::Completed(3));
            assert_eq!(seen, vec![0, 1, 2]);
        }
    
        #[test]
        fn zero_step_task_completes_immediately() {
            let token = CancellationToken::new();
            let outcome = long_task(0, &token, Duration::from_millis(0), |_| {
                panic!("no steps expected");
            });
            assert_eq!(outcome, TaskOutcome::Completed(0));
        }
    }

    Deep Comparison

    922-cancellation-token — Language Comparison

    std vs tokio

    Aspectstd versiontokio version
    RuntimeOS threads via std::threadAsync tasks on tokio runtime
    Synchronizationstd::sync::Mutex, Condvartokio::sync::Mutex, channels
    Channelsstd::sync::mpsc (unbounded)tokio::sync::mpsc (bounded, async)
    BlockingThread blocks on lock/recvTask yields, runtime switches tasks
    OverheadOne OS thread per taskMany tasks per thread (M:N)
    Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers

    Exercises

  • Implement a timeout_token that automatically sets itself after a specified Duration, combining CancellationToken with thread::sleep.
  • Add a cancel_after(n: usize) method to CancellationToken that sets itself after n calls to is_cancelled().
  • Write a CancellableIter<I: Iterator> wrapper that checks the token on each .next() call.
  • Open Source Repos