Patterns
Every streaming operation maps to one of seven composable building blocks.
Which pattern do I need?
| I want to… | Pattern | Cardinality |
|---|---|---|
| Transform each item independently | Map | 1:1 |
| Transform each item, tracking state | StatefulMap | 1:1 |
| Keep or drop items by a condition | Filter | 1:0-1 |
| Keep or drop items, tracking state | StatefulFilter | 1:0-1 |
| Collect all input, reorder or trim | Accumulate | N:N |
| Reduce all input to a single value | Aggregate | N:1 |
| Split each item into multiple outputs | Expand | 1:N |
Map
1:1Transforms each input item independently. Stateless — receives one item, returns one item. No memory of previous items.
func Map[In, Out any](fn func(In) (Out, error)) Command[In, Out]// Reverse each line — one in, one out
patterns.Map(func(line []byte) ([]byte, error) {
runes := []rune(string(line))
for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 {
runes[i], runes[j] = runes[j], runes[i]
}
return []byte(string(runes)), nil
})
Filter
1:0-1Keeps or drops items based on a predicate. Stateless — decides per item with no memory of previous items.
func Filter[T any](fn func(T) (bool, error)) Command[T, T]// Keep lines matching a pattern
re := regexp.MustCompile(pattern)
patterns.Filter(func(line []byte) (bool, error) {
return re.Match(line), nil
})
Accumulate
N:NCollects all input, processes it, then emits output. Must see all input before producing any output. Same input and output types.
func Accumulate[T any](fn func([]T) ([]T, error)) Command[T, T]// Sort all lines — collect everything, emit sorted
patterns.Accumulate(func(lines [][]byte) ([][]byte, error) {
sorted := make([][]byte, len(lines))
copy(sorted, lines)
slices.SortFunc(sorted, bytes.Compare)
return sorted, nil
})
Aggregate
N:1Reduces all input items to a single output value. Consumes everything and produces a summary. Output type can differ from input type.
func Aggregate[In, Out any](fn func([]In) (Out, error)) Command[In, Out]// Count lines, words, bytes — many in, one out
patterns.Aggregate(func(lines [][]byte) ([]byte, error) {
var lc, wc, bc int
for _, line := range lines {
lc++
bc += len(line)
wc += len(bytes.Fields(line))
}
return []byte(fmt.Sprintf("%d %d %d", lc, wc, bc)), nil
})
Expand
1:NMaps each input item to zero or more output items. Splits, unfolds, or fans out each input. Output order follows input order.
func Expand[In, Out any](fn func(In) ([]Out, error)) Command[In, Out]// Split each line — one in, many out
patterns.Expand(func(line []byte) ([][]byte, error) {
return bytes.Split(line, delim), nil
})
StatefulMap
1:1 statefulTransforms each item with state that persists across items. Factory creates fresh state per execution — the command is reusable as a value.
func StatefulMap[In, Out any](factory func() func(In) (Out, error)) Command[In, Out]// Number each line — 1:1 with state
patterns.StatefulMap(func() func([]byte) ([]byte, error) {
n := 0
return func(line []byte) ([]byte, error) {
n++
return []byte(fmt.Sprintf("%6d\t%s", n, line)), nil
}
})
StatefulFilter
1:0-1 statefulFilters items with state that persists across items. Factory creates fresh state per execution. Tracks counters, previous values, or positions.
func StatefulFilter[T any](factory func() func(T) (bool, error)) Command[T, T]// Take first n lines — filter with counter
patterns.StatefulFilter(func() func([]byte) (bool, error) {
count := 0
return func(_ []byte) (bool, error) {
count++
return count <= n, nil
}
})