Flow
Flow module: backpressure-aware pipeline abstraction.
A Flow pipeline is a chain of stages: producers → transformers → consumers. Demand flows backward (consumers pull from producers), which prevents fast producers from overwhelming slow consumers (backpressure).
DESIGN: In the interpreter, Flow is implemented as a functional pipeline over lazy Seq(a) values (fold-based, church-encoded). The Stage(a) type wraps a Seq(a), and all transformations delegate to the Seq module.
In the compiled runtime, each Stage would be backed by an actor. The consumer sends a Demand(n) message upstream; each stage forwards demand to its source and buffers at most n items before sending them downstream. This is the GenStage/Flow model from Elixir, adapted for March actors.
with_concurrency/2 is a no-op in the interpreter but signals that the stage may execute with n concurrent workers in the compiled runtime.
See stdlib/docs/flow.md for design rationale, usage examples, and a comparison with Elixir's GenStage.
Quick start:
Flow.from_list([1, 2, 3, 4, 5]) |> Flow.map(fn x -> x * 2) |> Flow.filter(fn x -> x > 4) |> Flow.collect -- [6, 8, 10]
Types
Functions
Create a stage from a list of items.
Create a stage from an existing Seq.
Create a stage by unfolding a seed value. next : seed -> Option((item, new_seed)) Produces items until next returns None.
Example — integers from 0 to 4: Flow.unfold(0, fn s -> if s >= 5 do None else Some((s, s + 1))) end
Create a stage over the integer range [start, stop).
Apply f to every item.
Keep only items where pred returns true.
Apply f to each item (f returns a list) and flatten the results.
Keep at most n items.
Skip the first n items.
Group items into lists of at most n elements.
Parallelism hint: in the compiled runtime, spawn n worker actors for this stage. No-op in the interpreter (single-threaded execution).
Flow.from_list(urls) |> Flow.map(fn url -> Http.get(url)) |> Flow.with_concurrency(8) |> Flow.collect
Collect all items into a List.
Run the pipeline for side effects; discard all items.
Reduce all items to a single value using f(accumulator, item) -> accumulator.
Consume with f(acc, item) -> acc (alias for reduce).
Run f on each item for its side effects.
Count the total number of items produced.
Return the first item satisfying pred, or None.
True if any item satisfies pred.
True if all items satisfy pred.