March Docs

Flow

Flow module: backpressure-aware pipeline abstraction.

A Flow pipeline is a chain of stages: producers → transformers → consumers. Demand flows backward (consumers pull from producers), which prevents fast producers from overwhelming slow consumers (backpressure).

DESIGN: In the interpreter, Flow is implemented as a functional pipeline over lazy Seq(a) values (fold-based, church-encoded). The Stage(a) type wraps a Seq(a), and all transformations delegate to the Seq module.

In the compiled runtime, each Stage would be backed by an actor. The consumer sends a Demand(n) message upstream; each stage forwards demand to its source and buffers at most n items before sending them downstream. This is the GenStage/Flow model from Elixir, adapted for March actors.

with_concurrency/2 is a no-op in the interpreter but signals that the stage may execute with n concurrent workers in the compiled runtime.

See stdlib/docs/flow.md for design rationale, usage examples, and a comparison with Elixir's GenStage.

Quick start:

Flow.from_list([1, 2, 3, 4, 5]) |> Flow.map(fn x -> x * 2) |> Flow.filter(fn x -> x > 4) |> Flow.collect -- [6, 8, 10]

Types

typeStageStage(a) = Stage(Seq(a))#

Functions

fnfrom_listfrom_list(xs)#

Create a stage from a list of items.

fnfrom_seqfrom_seq(seq)#

Create a stage from an existing Seq.

fnunfoldunfold(seed, next)#

Create a stage by unfolding a seed value. next : seed -> Option((item, new_seed)) Produces items until next returns None.

Example — integers from 0 to 4: Flow.unfold(0, fn s -> if s >= 5 do None else Some((s, s + 1))) end

fnrangerange(start, stop)#

Create a stage over the integer range [start, stop).

fnmapmap(stage, f)#

Apply f to every item.

fnfilterfilter(stage, pred)#

Keep only items where pred returns true.

fnflat_mapflat_map(stage, f)#

Apply f to each item (f returns a list) and flatten the results.

fntaketake(stage, n)#

Keep at most n items.

fndropdrop(stage, n)#

Skip the first n items.

fnbatchbatch(stage, n)#

Group items into lists of at most n elements.

fnwith_concurrencywith_concurrency(stage, _n) do stage end#

Parallelism hint: in the compiled runtime, spawn n worker actors for this stage. No-op in the interpreter (single-threaded execution).

Flow.from_list(urls) |> Flow.map(fn url -> Http.get(url)) |> Flow.with_concurrency(8) |> Flow.collect

fncollectcollect(stage)#

Collect all items into a List.

fnrunrun(stage)#

Run the pipeline for side effects; discard all items.

fnreducereduce(stage, start, f)#

Reduce all items to a single value using f(accumulator, item) -> accumulator.

fnintointo(stage, start, f)#

Consume with f(acc, item) -> acc (alias for reduce).

fneacheach(stage, f)#

Run f on each item for its side effects.

fncountcount(stage)#

Count the total number of items produced.

fnfindfind(stage, pred)#

Return the first item satisfying pred, or None.

fnanyany(stage, pred)#

True if any item satisfies pred.

fnallall(stage, pred)#

True if all items satisfy pred.