๐Ÿฆ€ Functional Rust

330: Async Sink

Difficulty: 4 Level: Expert A destination that accepts values and flushes them in batches โ€” the write side of a stream.

The Problem This Solves

If a `Stream` is a source you pull from, a `Sink` is a destination you push into. In real async systems you constantly need to write to something that can't accept items one-by-one at full speed: a database that prefers bulk inserts, a log aggregator that batches messages, a network socket that benefits from coalescing writes, or a metrics collector that flushes every N samples. Writing directly to these sinks on every item is expensive. The `Sink` pattern buffers incoming values and flushes when the buffer is full (or on explicit flush), absorbing bursts without hammering the downstream. In async Rust, `futures::Sink` gives you `send(item).await` and `flush().await` โ€” the sink decides internally when to actually write. This example implements the same pattern synchronously using `VecDeque` as a buffer, showing the core logic without a runtime dependency.

The Intuition

Think of it like a batched database writer in Node.js:
// Don't do one INSERT per row โ€” buffer and flush
buffer.push(item);
if (buffer.length >= BATCH_SIZE) await db.insertMany(buffer.splice(0));
Rust's `BatchSink` is the same idea: `send()` buffers, flush triggers when capacity is reached. The async version (from `futures::SinkExt`) wraps `send` in an async call so the flush โ€” which might wait for I/O โ€” doesn't block the executor.

How It Works in Rust

struct BatchSink<T> {
 buffer: VecDeque<T>,
 capacity: usize,
 flushed_batches: Vec<Vec<T>>,
}

impl<T: Clone> BatchSink<T> {
 fn send(&mut self, item: T) -> Result<(), String> {
     self.buffer.push_back(item);
     // Auto-flush when buffer reaches capacity
     if self.buffer.len() >= self.capacity { self.flush()?; }
     Ok(())
 }

 fn flush(&mut self) -> Result<(), String> {
     if !self.buffer.is_empty() {
         // drain() moves items out without cloning
         let batch: Vec<T> = self.buffer.drain(..).collect();
         self.flushed_batches.push(batch);
     }
     Ok(())
 }
}
Key: `VecDeque::drain(..)` empties the buffer in one move. Always call `flush()` explicitly at the end โ€” the last partial batch won't auto-flush if `len < capacity`.

What This Unlocks

Key Differences

ConceptOCamlRust
Mutable buffer`mutable buf: 'a list` (reversed for O(1) prepend)`VecDeque<T>` (O(1) both ends)
Flush triggerManual check on `List.length``buffer.len() >= capacity`
Drain`s.buf <- []` (GC frees old list)`buffer.drain(..)` (moves into new `Vec`)
Error handlingUnit / exception`Result<(), String>`
use std::collections::VecDeque;

struct BatchSink<T> {
    buffer: VecDeque<T>,
    capacity: usize,
    flushed_batches: Vec<Vec<T>>,
}

impl<T: Clone> BatchSink<T> {
    fn new(capacity: usize) -> Self {
        Self { buffer: VecDeque::new(), capacity, flushed_batches: Vec::new() }
    }
    fn send(&mut self, item: T) -> Result<(), String> {
        self.buffer.push_back(item);
        if self.buffer.len() >= self.capacity { self.flush()?; }
        Ok(())
    }
    fn flush(&mut self) -> Result<(), String> {
        if !self.buffer.is_empty() {
            let b: Vec<T> = self.buffer.drain(..).collect();
            println!("Flushing {} items", b.len());
            self.flushed_batches.push(b);
        }
        Ok(())
    }
    fn into_batches(self) -> Vec<Vec<T>> { self.flushed_batches }
}

fn main() {
    let mut s: BatchSink<i32> = BatchSink::new(3);
    for i in 1..=8 { s.send(i).unwrap(); }
    s.flush().unwrap();
    println!("Batches: {}", s.into_batches().len());
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test] fn flushes_in_batches() {
        let mut s: BatchSink<i32> = BatchSink::new(3);
        for i in 1..=9 { s.send(i).unwrap(); }
        let b = s.into_batches();
        assert_eq!(b.len(), 3);
        assert_eq!(b[0], vec![1,2,3]);
    }
    #[test] fn partial_flush() {
        let mut s: BatchSink<i32> = BatchSink::new(5);
        for i in 1..=3 { s.send(i).unwrap(); }
        s.flush().unwrap();
        let b = s.into_batches();
        assert_eq!(b[0], vec![1,2,3]);
    }
}
(* OCaml: sink-like accumulator with flush *)

type 'a sink = { mutable buf: 'a list; cap: int; flush_fn: 'a list -> unit }

let make_sink cap f = { buf=[]; cap; flush_fn=f }

let send s x =
  s.buf <- x :: s.buf;
  if List.length s.buf >= s.cap then (s.flush_fn (List.rev s.buf); s.buf <- [])

let flush s = if s.buf <> [] then (s.flush_fn (List.rev s.buf); s.buf <- [])

let () =
  let out = ref 0 in
  let s = make_sink 3 (fun b -> out := !out + 1; Printf.printf "Flush %d items\n" (List.length b)) in
  List.iter (send s) [1;2;3;4;5;6;7;8];
  flush s;
  Printf.printf "Batches: %d\n" !out