Patterns

Every streaming operation maps to one of seven composable building blocks.

Which pattern do I need?

I want to…PatternCardinality
Transform each item independentlyMap1:1
Transform each item, tracking stateStatefulMap1:1
Keep or drop items by a conditionFilter1:0-1
Keep or drop items, tracking stateStatefulFilter1:0-1
Collect all input, reorder or trimAccumulateN:N
Reduce all input to a single valueAggregateN:1
Split each item into multiple outputsExpand1:N

Map

1:1

Transforms each input item independently. Stateless — receives one item, returns one item. No memory of previous items.

go — signature
func Map[In, Out any](fn func(In) (Out, error)) Command[In, Out]
go — example
// Reverse each line — one in, one out
patterns.Map(func(line []byte) ([]byte, error) {
    runes := []rune(string(line))
    for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 {
        runes[i], runes[j] = runes[j], runes[i]
    }
    return []byte(string(runes)), nil
})
shell equivalents: tr sed s/// cut basename dirname

Filter

1:0-1

Keeps or drops items based on a predicate. Stateless — decides per item with no memory of previous items.

go — signature
func Filter[T any](fn func(T) (bool, error)) Command[T, T]
go — example
// Keep lines matching a pattern
re := regexp.MustCompile(pattern)
patterns.Filter(func(line []byte) (bool, error) {
    return re.Match(line), nil
})
shell equivalents: grep grep -v

Accumulate

N:N

Collects all input, processes it, then emits output. Must see all input before producing any output. Same input and output types.

go — signature
func Accumulate[T any](fn func([]T) ([]T, error)) Command[T, T]
go — example
// Sort all lines — collect everything, emit sorted
patterns.Accumulate(func(lines [][]byte) ([][]byte, error) {
    sorted := make([][]byte, len(lines))
    copy(sorted, lines)
    slices.SortFunc(sorted, bytes.Compare)
    return sorted, nil
})
shell equivalents: sort tac tail -n shuf

Aggregate

N:1

Reduces all input items to a single output value. Consumes everything and produces a summary. Output type can differ from input type.

go — signature
func Aggregate[In, Out any](fn func([]In) (Out, error)) Command[In, Out]
go — example
// Count lines, words, bytes — many in, one out
patterns.Aggregate(func(lines [][]byte) ([]byte, error) {
    var lc, wc, bc int
    for _, line := range lines {
        lc++
        bc += len(line)
        wc += len(bytes.Fields(line))
    }
    return []byte(fmt.Sprintf("%d %d %d", lc, wc, bc)), nil
})
shell equivalents: wc -l wc -w sha256sum cksum

Expand

1:N

Maps each input item to zero or more output items. Splits, unfolds, or fans out each input. Output order follows input order.

go — signature
func Expand[In, Out any](fn func(In) ([]Out, error)) Command[In, Out]
go — example
// Split each line — one in, many out
patterns.Expand(func(line []byte) ([][]byte, error) {
    return bytes.Split(line, delim), nil
})
shell equivalents: split xargs -n1 fold

StatefulMap

1:1 stateful

Transforms each item with state that persists across items. Factory creates fresh state per execution — the command is reusable as a value.

go — signature
func StatefulMap[In, Out any](factory func() func(In) (Out, error)) Command[In, Out]
go — example
// Number each line — 1:1 with state
patterns.StatefulMap(func() func([]byte) ([]byte, error) {
    n := 0
    return func(line []byte) ([]byte, error) {
        n++
        return []byte(fmt.Sprintf("%6d\t%s", n, line)), nil
    }
})
shell equivalents: nl paste

StatefulFilter

1:0-1 stateful

Filters items with state that persists across items. Factory creates fresh state per execution. Tracks counters, previous values, or positions.

go — signature
func StatefulFilter[T any](factory func() func(T) (bool, error)) Command[T, T]
go — example
// Take first n lines — filter with counter
patterns.StatefulFilter(func() func([]byte) (bool, error) {
    count := 0
    return func(_ []byte) (bool, error) {
        count++
        return count <= n, nil
    }
})
shell equivalents: head -n uniq