API Reference
Core types, sources, composition, and wiring.
Pipeline API
The user-facing API. This is all you need to build and run pipelines.
Run
One-shot pipeline execution. Source in, commands in the middle, sink at the end.
// Run a pipeline
gloo.Run(source, sink, cmd1, cmd2, cmd3)
// With explicit context
gloo.RunContext(ctx, source, sink, cmd1, cmd2, cmd3)
Chain
Fluent pipeline builder. Start from a source, chain commands, terminate with a sink.
gloo.Chain(source).
To(Grep("error", IgnoreCase)).
To(Sort(Reverse)).
To(Head(Lines(10))).
Sink(gloo.ByteWriteTo(os.Stdout))
// With explicit context
gloo.ChainContext(ctx, source).
To(Grep("error")).
To(Sort()).
Collect()
Terminal operations:
| Method | Returns | Description |
|---|---|---|
.Sink(sink) | (any, error) | Consume stream via a Sink |
.Collect() | (any, error) | Gather all items into a slice |
.ForEach(fn) | error | Call fn(T) error for each item |
Sources
Where data comes from.
Files
gloo.ByteFileSource(fs, files) // []byte lines from files
gloo.FileSource(fs, files) // string lines from files
Uses afero.Fs — swap in afero.NewMemMapFs() for tests.
Readers
gloo.ByteReaderSource(readers) // []byte lines from io.Reader
gloo.ReaderSource(readers) // string lines from io.Reader
Slices
gloo.SliceSource([][]byte{[]byte("hello"), []byte("world")})
In-memory. No I/O. Perfect for tests.
Sinks
Where data goes.
gloo.ByteWriteTo(os.Stdout) // write []byte lines to io.Writer
gloo.WriteTo(os.Stdout) // write string lines to io.Writer
Core Types
The interfaces everything is built on.
type Command[In, Out any] interface {
Execute(ctx context.Context, input Stream[In]) Stream[Out]
}
type Source[T any] interface {
Stream(ctx context.Context) Stream[T]
}
type Sink[In, Res any] interface {
Consume(ctx context.Context, input Stream[In]) (Res, error)
}
type Stream[T any] <-chan rill.Try[T]
Command Author API
Everything below is for people creating new commands, not using existing ones.
Patterns
Seven building blocks for creating commands. Each returns a Command.
patterns.Map(fn func(In) (Out, error)) // 1:1
patterns.Filter(fn func(T) (bool, error)) // 1:0-1
patterns.Accumulate(fn func([]T) ([]T, error)) // N:N
patterns.Aggregate(fn func([]In) (Out, error)) // N:1
patterns.Expand(fn func(In) ([]Out, error)) // 1:N
patterns.StatefulMap(factory func() func(In) (Out, error)) // 1:1 + state
patterns.StatefulFilter(factory func() func(T) (bool, error)) // 1:0-1 + state
patterns.Tap(fn func(T) error) // passthrough + side effect
patterns.Subprocess(name string, args ...string) // fork external process
See Patterns → for details and examples.
Composition
Combine commands into new commands.
// Same-type chain: Command[T,T] + Command[T,T] → Command[T,T]
pipeline := gloo.Compose(cmd1).To(cmd2).To(cmd3)
// Type-changing: Command[A,B] + Command[B,C] → Command[A,C]
pipeline := gloo.Pipe(cmd1, cmd2)
// Adapt a function to Command
cmd := gloo.FuncCommand[In, Out](fn)
Wiring
Low-level source/command/sink wiring.
gloo.From(ctx, source, cmd) // Source → Command → Stream
gloo.Into(ctx, source, cmd, sink) // Source → Command → Sink → Result
gloo.Collect(ctx, stream) // Stream → []T
Parameters
Type-safe option parsing for command constructors.
// Define option types
type Lines int
func (l Lines) Configure(f *flags) { f.lines = l }
// Parse in constructor
func Head(opts ...any) gloo.Command[[]byte, []byte] {
p := gloo.NewParameters[gloo.File, flags](opts...)
n := int(p.Flags.lines)
if n <= 0 { n = 10 }
return patterns.StatefulFilter(/* ... */)
}
// Users call cleanly via alias imports
Head(Lines(10))