๐Ÿฆ€ Functional Rust

769: Streaming Parser for Large Inputs

Difficulty: 4 Level: Advanced Parse a large input incrementally as an `Iterator`, processing one record at a time without loading the whole dataset into memory.

The Problem This Solves

Loading a 10GB log file into a `Vec<Record>` before processing any of it is often not an option โ€” it may exceed available memory, or you may only need a handful of records that match a filter. Streaming parsers solve this by yielding one parsed record at a time, letting downstream code decide whether to collect, filter, aggregate, or early-exit. Beyond memory, streaming composition is compositional: `stream.filter(...).map(...).take(100)` stops parsing at 100 records โ€” no wasted work. This is the same design as `BufRead`-based line iteration in the standard library, and the foundation of streaming data pipelines in production Rust code.

The Intuition

Make the parser implement `Iterator` directly. Each call to `next()` reads the next non-empty line from the underlying `BufRead`, tries to parse it, and returns `Some(Ok(record))`, `Some(Err(error))`, or `None` (EOF). The caller never sees a `Vec<Record>` โ€” they get a lazy sequence. The reusable `line_buf: String` is cleared and refilled each call, so only one line is in memory at a time regardless of input size.

How It Works in Rust

The streaming struct โ€” wraps any `BufRead`:
pub struct RecordStream<R: BufRead> {
 reader:      R,
 line_buf:    String,  // reused across calls
 line_num:    usize,
 skip_errors: bool,
}
Generic over `R: BufRead` โ€” works with files, network sockets, in-memory `Cursor`, or anything else. `Iterator` implementation โ€” one line per `next()` call:
impl<R: BufRead> Iterator for RecordStream<R> {
 type Item = Result<Record, ParseError>;

 fn next(&mut self) -> Option<Self::Item> {
     loop {
         self.line_buf.clear();
         match self.reader.read_line(&mut self.line_buf) {
             Ok(0)  => return None,  // EOF
             Ok(_)  => {}
             Err(e) => return Some(Err(ParseError { line: self.line_num, message: e.to_string() })),
         }
         self.line_num += 1;
         let trimmed = self.line_buf.trim();
         if trimmed.is_empty() { continue; }  // skip blank lines

         match Self::parse_line(trimmed) {
             Some(rec) => return Some(Ok(rec)),
             None if self.skip_errors => continue,
             None => return Some(Err(ParseError { line: self.line_num, message: format!("bad line: {trimmed}") })),
         }
     }
 }
}
The `loop` with `continue` skips blank lines and (optionally) bad lines without returning `None` prematurely. Composition with standard iterator adaptors:
// Collect only valid records
let valid: Vec<Record> = stream_str(data)
 .skip_errors()
 .filter_map(|r| r.ok())
 .collect();

// Aggregate without collecting
let total: f64 = stream_str(data)
 .skip_errors()
 .filter_map(|r| r.ok())
 .map(|r| r.value)
 .sum();

// Early exit after first 10 high-value records
let top10: Vec<Record> = stream_str(data)
 .skip_errors()
 .filter_map(|r| r.ok())
 .filter(|r| r.value >= 90.0)
 .take(10)
 .collect();
These chains parse only as many lines as needed โ€” `take(10)` stops the iterator after 10 matches. Builder pattern for configuration:
pub fn skip_errors(mut self) -> Self {
 self.skip_errors = true;
 self
}
Fluent configuration before iteration begins.

What This Unlocks

Key Differences

ConceptOCamlRust
Lazy sequence`Seq.t` / `Stream.t``impl Iterator<Item = ...>`
Line-by-line reading`input_line` in loop`BufRead::read_line` into reusable `String`
Skip and continue`Seq.filter` or `try_with``continue` inside `Iterator::next` loop
Error per item`result` in sequence`Item = Result<Record, ParseError>`
// 769. Streaming Parser for Large Inputs
// Iterator-based: one record at a time, zero excess allocation

use std::io::{BufRead, Cursor};

// โ”€โ”€ Record type โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

#[derive(Debug, PartialEq)]
pub struct Record {
    pub id: u64,
    pub name: String,
    pub value: f64,
}

// โ”€โ”€ Parse error โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

#[derive(Debug)]
pub struct ParseError {
    pub line: usize,
    pub message: String,
}

// โ”€โ”€ Streaming iterator โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

pub struct RecordStream<R: BufRead> {
    reader: R,
    line_buf: String,
    line_num: usize,
    skip_errors: bool,
}

impl<R: BufRead> RecordStream<R> {
    pub fn new(reader: R) -> Self {
        Self { reader, line_buf: String::new(), line_num: 0, skip_errors: false }
    }

    pub fn skip_errors(mut self) -> Self {
        self.skip_errors = true;
        self
    }

    fn parse_line(s: &str) -> Option<Record> {
        let parts: Vec<&str> = s.splitn(3, ',').collect();
        if parts.len() < 3 { return None; }
        let id   = parts[0].trim().parse().ok()?;
        let name = parts[1].trim().to_string();
        let value = parts[2].trim().parse().ok()?;
        Some(Record { id, name, value })
    }
}

impl<R: BufRead> Iterator for RecordStream<R> {
    type Item = Result<Record, ParseError>;

    fn next(&mut self) -> Option<Self::Item> {
        loop {
            self.line_buf.clear();
            match self.reader.read_line(&mut self.line_buf) {
                Ok(0) => return None,   // EOF
                Ok(_) => {}
                Err(e) => return Some(Err(ParseError {
                    line: self.line_num,
                    message: e.to_string(),
                })),
            }
            self.line_num += 1;
            let trimmed = self.line_buf.trim();
            if trimmed.is_empty() { continue; }     // skip blank lines

            match Self::parse_line(trimmed) {
                Some(rec) => return Some(Ok(rec)),
                None if self.skip_errors => continue,  // bad line, skip
                None => return Some(Err(ParseError {
                    line: self.line_num,
                    message: format!("could not parse: '{trimmed}'"),
                })),
            }
        }
    }
}

// โ”€โ”€ Convenience constructor from &str โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

pub fn stream_str(s: &str) -> RecordStream<Cursor<&[u8]>> {
    RecordStream::new(Cursor::new(s.as_bytes()))
}

// โ”€โ”€ Aggregation: process without collecting โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

pub fn sum_values(stream: impl Iterator<Item = Result<Record, ParseError>>) -> f64 {
    stream.filter_map(|r| r.ok()).map(|r| r.value).sum()
}

fn main() {
    let data = "1, Alice, 95.5\n2, Bob, 87.0\n3, Carol, 100.0\nbad line\n4, Dave, 72.3\n";

    println!("=== With error reporting ===");
    for item in stream_str(data) {
        match item {
            Ok(r)  => println!("  OK  : {r:?}"),
            Err(e) => println!("  ERR line {}: {}", e.line, e.message),
        }
    }

    println!("\n=== Skip errors ===");
    let valid: Vec<Record> = stream_str(data)
        .skip_errors()
        .filter_map(|r| r.ok())
        .collect();
    println!("Valid records: {}", valid.len());

    println!("\n=== Sum without collecting ===");
    let total = sum_values(stream_str(data).skip_errors());
    println!("Sum of values: {total:.1}");

    println!("\n=== Chained iterator ops ===");
    let top = stream_str(data)
        .skip_errors()
        .filter_map(|r| r.ok())
        .filter(|r| r.value >= 90.0)
        .count();
    println!("Records with value >= 90: {top}");
}

#[cfg(test)]
mod tests {
    use super::*;

    const DATA: &str = "1,Alice,95.5\n2,Bob,87.0\n3,Carol,100.0\n";

    #[test]
    fn parses_all_valid() {
        let records: Vec<_> = stream_str(DATA).filter_map(|r| r.ok()).collect();
        assert_eq!(records.len(), 3);
        assert_eq!(records[0].name, "Alice");
    }

    #[test]
    fn reports_bad_line() {
        let data = "1,A,1.0\nbad\n2,B,2.0\n";
        let results: Vec<_> = stream_str(data).collect();
        assert!(results[1].is_err());
    }

    #[test]
    fn skip_errors_skips_bad() {
        let data = "1,A,1.0\nbad\n2,B,2.0\n";
        let records: Vec<_> = stream_str(data).skip_errors().filter_map(|r| r.ok()).collect();
        assert_eq!(records.len(), 2);
    }

    #[test]
    fn sum_works() {
        let total = sum_values(stream_str(DATA).skip_errors());
        assert!((total - 282.5).abs() < 0.001);
    }
}
(* Streaming parser in OCaml using Seq (lazy sequences) *)

type record = { id: int; name: string; value: float }

let parse_record line =
  match String.split_on_char ',' line with
  | [id_s; name; val_s] ->
    (try Some { id = int_of_string (String.trim id_s);
                name = String.trim name;
                value = float_of_string (String.trim val_s) }
     with Failure _ -> None)
  | _ -> None

(* Streaming sequence from a string (simulate file) *)
let lines_of_string s =
  String.split_on_char '\n' s |> List.to_seq

let parse_stream input =
  lines_of_string input
  |> Seq.filter (fun l -> String.trim l <> "")
  |> Seq.filter_map parse_record

let () =
  let data = {|1, Alice, 95.5
2, Bob, 87.0
3, Carol, 100.0
bad,line
4, Dave, 72.3|} in
  Printf.printf "Streaming parse:\n";
  let count = ref 0 in
  Seq.iter (fun r ->
    incr count;
    Printf.printf "  Record %d: id=%d name=%s value=%.1f\n"
      !count r.id r.name r.value
  ) (parse_stream data);
  Printf.printf "Total valid records: %d\n" !count