๐Ÿฆ€ Functional Rust

981: Sequential Async Chain

Difficulty: Beginner Category: Async / Concurrency FP Patterns Concept: Sequential monadic binding โ€” each step waits for the previous Key Insight: `let x = f().await; let y = g(x).await` is syntactic sugar for OCaml's `let x = f () in let y = g x in ...`

Versions

DirectoryDescription
`std/`Standard library version using `std::sync`, `std::thread`
`tokio/`Tokio async runtime version using `tokio::sync`, `tokio::spawn`

Running

# Standard library version
cd std && cargo test

# Tokio version
cd tokio && cargo test
// 981: Sequential Async Chain
// Rust: sequential .await calls โ€” like OCaml's let* x = ... in let* y = ...

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};

fn block_on<F: Future>(mut fut: F) -> F::Output {
    let mut fut = unsafe { Pin::new_unchecked(&mut fut) };
    fn noop(_: *const ()) {}
    fn clone(p: *const ()) -> RawWaker { RawWaker::new(p, &VT) }
    static VT: RawWakerVTable = RawWakerVTable::new(clone, noop, noop, noop);
    let waker = unsafe { Waker::from_raw(RawWaker::new(std::ptr::null(), &VT)) };
    let mut cx = Context::from_waker(&waker);
    match fut.as_mut().poll(&mut cx) {
        Poll::Ready(v) => v,
        Poll::Pending => panic!("not ready"),
    }
}

// --- Simulated async data-fetch functions ---
async fn fetch_user_id() -> u32 { 42 }
async fn fetch_user_name(_id: u32) -> String { "Alice".to_string() }
async fn fetch_user_email(_name: &str) -> String { "alice@example.com".to_string() }

// --- Approach 1: Sequential let-binding with await ---
// Each .await = one let* step in OCaml
async fn full_lookup() -> (u32, String, String) {
    let id = fetch_user_id().await;
    let name = fetch_user_name(id).await;
    let email = fetch_user_email(&name).await;
    (id, name, email)
}

// --- Approach 2: Accumulating through a pipeline ---
async fn step1(x: i32) -> i32 { x + 10 }
async fn step2(x: i32) -> i32 { x * 2 }
async fn step3(x: i32) -> i32 { x - 5 }

async fn pipeline_seq(input: i32) -> (i32, i32, i32, i32) {
    let a = step1(input).await;
    let b = step2(a).await;
    let c = step3(b).await;
    (input, a, b, c)
}

// --- Approach 3: Error-aware sequence with ? operator ---
async fn guarded_div(a: i32, b: i32) -> Result<i32, &'static str> {
    if b == 0 { Err("division by zero") } else { Ok(a / b) }
}

async fn safe_pipeline() -> Result<i32, &'static str> {
    let x = 100;
    let y = guarded_div(x, 4).await?;   // let*? โ€” short-circuits on Err
    let z = guarded_div(y, 5).await?;
    Ok(z)
}

async fn bad_pipeline() -> Result<i32, &'static str> {
    let x = 100;
    let _y = guarded_div(x, 0).await?;  // short-circuits here
    Ok(999)
}

fn main() {
    let (id, name, email) = block_on(full_lookup());
    println!("full_lookup: id={} name={} email={}", id, name, email);

    let (orig, a, b, c) = block_on(pipeline_seq(5));
    println!("pipeline: {}->{}->{}->{}",  orig, a, b, c);

    println!("safe_pipeline: {:?}", block_on(safe_pipeline()));
    println!("bad_pipeline: {:?}", block_on(bad_pipeline()));
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_full_lookup() {
        let (id, name, email) = block_on(full_lookup());
        assert_eq!(id, 42);
        assert_eq!(name, "Alice");
        assert_eq!(email, "alice@example.com");
    }

    #[test]
    fn test_pipeline_seq() {
        let (orig, a, b, c) = block_on(pipeline_seq(5));
        assert_eq!(orig, 5);
        assert_eq!(a, 15);   // 5+10
        assert_eq!(b, 30);   // 15*2
        assert_eq!(c, 25);   // 30-5
    }

    #[test]
    fn test_safe_pipeline() {
        assert_eq!(block_on(safe_pipeline()), Ok(5)); // 100/4=25, 25/5=5
    }

    #[test]
    fn test_bad_pipeline_short_circuits() {
        assert_eq!(block_on(bad_pipeline()), Err("division by zero"));
    }

    #[test]
    fn test_sequential_order() {
        // Values from earlier awaits are available in later ones
        let result = block_on(async {
            let a = step1(10).await;  // 20
            let b = step2(a).await;   // 40 โ€” uses a
            let c = step3(b).await;   // 35 โ€” uses b
            c
        });
        assert_eq!(result, 35);
    }
}
(* 981: Sequential Async Chain *)
(* OCaml: let* x = ... in let* y = ... using ppx_let or Lwt.( let* ) *)

type 'a future = unit -> 'a

let return_ x : 'a future = fun () -> x
let bind fut k = fun () -> k (fut ()) ()
let run f = f ()

(* Simulated let* (monadic bind) *)
let ( let* ) = bind

(* --- Approach 1: Sequential chain with let* --- *)

let fetch_user_id () = return_ 42
let fetch_user_name _id = return_ "Alice"
let fetch_user_email _name = return_ "alice@example.com"

let full_lookup () =
  let* id = fetch_user_id () in
  let* name = fetch_user_name id in
  let* email = fetch_user_email name in
  return_ (id, name, email)

let () =
  let (id, name, email) = run (full_lookup ()) in
  assert (id = 42);
  assert (name = "Alice");
  assert (email = "alice@example.com");
  Printf.printf "Approach 1 (let* chain): id=%d name=%s email=%s\n" id name email

(* --- Approach 2: Accumulating values through chain --- *)

let step1 x = return_ (x + 10)
let step2 x = return_ (x * 2)
let step3 x = return_ (x - 5)

let pipeline_seq input =
  let* a = step1 input in
  let* b = step2 a in
  let* c = step3 b in
  return_ (input, a, b, c)

let () =
  let (orig, a, b, c) = run (pipeline_seq 5) in
  (* 5 -> 15 -> 30 -> 25 *)
  assert (orig = 5);
  assert (a = 15);
  assert (b = 30);
  assert (c = 25);
  Printf.printf "Approach 2 (pipeline): %d->%d->%d->%d\n" orig a b c

(* --- Approach 3: Short-circuit with error-aware sequence --- *)

type ('a, 'e) result_future = unit -> ('a, 'e) result

let ok x : ('a, 'e) result_future = fun () -> Ok x
let fail e : ('a, 'e) result_future = fun () -> Error e
let run_r f = f ()

let ( let*? ) (fut : ('a, 'e) result_future) k =
  fun () -> match fut () with
    | Ok v -> k v ()
    | Error e -> Error e

let guarded_div a b =
  if b = 0 then fail "division by zero"
  else ok (a / b)

let safe_pipeline () =
  let*? x = ok 100 in
  let*? y = guarded_div x 4 in
  let*? z = guarded_div y 5 in
  ok z

let bad_pipeline () =
  let*? x = ok 100 in
  let*? _ = guarded_div x 0 in  (* short-circuits here *)
  ok 999

let () =
  (match run_r (safe_pipeline ()) with
  | Ok v -> assert (v = 5); Printf.printf "Approach 3 (safe): %d\n" v
  | Error _ -> assert false);
  (match run_r (bad_pipeline ()) with
  | Ok _ -> assert false
  | Error e -> Printf.printf "Approach 3 (error): %s\n" e)

let () = Printf.printf "โœ“ All tests passed\n"

๐Ÿ“Š Detailed Comparison

Sequential Async Chain โ€” Comparison

Core Insight

Sequential async chains are monadic do-notation: each step depends on the previous. Both languages provide sugar for this โ€” OCaml's `let*` (ppx_let / Lwt.Syntax) and Rust's `.await` on sequential lines. Values computed in earlier steps are in scope for later steps.

OCaml Approach

  • `let* x = fut in ...` desugars to `Lwt.bind fut (fun x -> ...)`
  • Requires `open Lwt.Syntax` or ppx_let
  • Short-circuit via `Lwt_result` and `let*?`
  • Each step is truly sequential โ€” Lwt schedules them one after another

Rust Approach

  • Sequential `.await` calls read like normal imperative code
  • Variables from earlier awaits are in scope for later ones (captures)
  • `?` operator provides short-circuit error propagation (like `let*?`)
  • The compiler generates a state machine โ€” no runtime overhead per step

Comparison Table

ConceptOCaml (Lwt)Rust
Sequential bind`let x = f () in let y = g x in โ€ฆ``let x = f().await; let y = g(x).await`
Error short-circuit`let*? x = f () in โ€ฆ``let x = f().await?;`
Later steps see earlierYes โ€” closure captures `x`Yes โ€” in same async fn scope
Sugar requires`open Lwt.Syntax`Just `async fn` + `.await`
Execution orderStrict left-to-rightStrict left-to-right
ParallelismNo (use `Lwt.both` / `Lwt.join`)No (use `join!` or threads)

std vs tokio

Aspectstd versiontokio version
RuntimeOS threads via `std::thread`Async tasks on tokio runtime
Synchronization`std::sync::Mutex`, `Condvar``tokio::sync::Mutex`, channels
Channels`std::sync::mpsc` (unbounded)`tokio::sync::mpsc` (bounded, async)
BlockingThread blocks on lock/recvTask yields, runtime switches tasks
OverheadOne OS thread per taskMany tasks per thread (M:N)
Best forCPU-bound, simple concurrencyI/O-bound, high-concurrency servers