gloo
Stream. Transform. Compose.
Commands are functions. Pipelines are composition. Go makes it type-safe.
gloo.Run(source, sink, Grep("5xx"), Sort(), Uniq(Count))
7 Composable Patterns
Every streaming operation maps to one of seven building blocks.
Map
func Map[In, Out any](fn func(In) (Out, error)) Command[In, Out]Transforms each input item independently. Stateless — receives one item, returns one item. No memory of previous items.
Filter
func Filter[T any](fn func(T) (bool, error)) Command[T, T]Keeps or drops items based on a predicate. Stateless — decides per item with no memory of previous items.
Accumulate
func Accumulate[T any](fn func([]T) ([]T, error)) Command[T, T]Collects all input, processes it, then emits output. Must see all input before producing any output. Same input and output types.
Aggregate
func Aggregate[In, Out any](fn func([]In) (Out, error)) Command[In, Out]Reduces all input items to a single output value. Consumes everything and produces a summary. Output type can differ from input type.
Expand
func Expand[In, Out any](fn func(In) ([]Out, error)) Command[In, Out]Maps each input item to zero or more output items. Splits, unfolds, or fans out each input. Output order follows input order.
StatefulMap
func StatefulMap[In, Out any](factory func() func(In) (Out, error)) Command[In, Out]Transforms each item with state that persists across items. Factory creates fresh state per execution — the command is reusable as a value.
StatefulFilter
func StatefulFilter[T any](factory func() func(T) (bool, error)) Command[T, T]Filters items with state that persists across items. Factory creates fresh state per execution. Tracks counters, previous values, or positions.