API Reference

Core types, sources, composition, and wiring.

Pipeline API

The user-facing API. This is all you need to build and run pipelines.

Run

One-shot pipeline execution. Source in, commands in the middle, sink at the end.

go

// Run a pipeline
gloo.Run(source, sink, cmd1, cmd2, cmd3)

// With explicit context
gloo.RunContext(ctx, source, sink, cmd1, cmd2, cmd3)

Chain

Fluent pipeline builder. Start from a source, chain commands, terminate with a sink.

go

gloo.Chain(source).
    To(Grep("error", IgnoreCase)).
    To(Sort(Reverse)).
    To(Head(Lines(10))).
    Sink(gloo.ByteWriteTo(os.Stdout))

// With explicit context
gloo.ChainContext(ctx, source).
    To(Grep("error")).
    To(Sort()).
    Collect()

Terminal operations:

MethodReturnsDescription
.Sink(sink)(any, error)Consume stream via a Sink
.Collect()(any, error)Gather all items into a slice
.ForEach(fn)errorCall fn(T) error for each item

Sources

Where data comes from.

Files

go

gloo.ByteFileSource(fs, files)   // []byte lines from files
gloo.FileSource(fs, files)       // string lines from files

Uses afero.Fs — swap in afero.NewMemMapFs() for tests.

Readers

go

gloo.ByteReaderSource(readers)   // []byte lines from io.Reader
gloo.ReaderSource(readers)       // string lines from io.Reader

Slices

go

gloo.SliceSource([][]byte{[]byte("hello"), []byte("world")})

In-memory. No I/O. Perfect for tests.

Sinks

Where data goes.

go

gloo.ByteWriteTo(os.Stdout)     // write []byte lines to io.Writer
gloo.WriteTo(os.Stdout)         // write string lines to io.Writer

Core Types

The interfaces everything is built on.

go

type Command[In, Out any] interface {
    Execute(ctx context.Context, input Stream[In]) Stream[Out]
}

type Source[T any] interface {
    Stream(ctx context.Context) Stream[T]
}

type Sink[In, Res any] interface {
    Consume(ctx context.Context, input Stream[In]) (Res, error)
}

type Stream[T any] <-chan rill.Try[T]

Command Author API

Everything below is for people creating new commands, not using existing ones.

Patterns

Seven building blocks for creating commands. Each returns a Command.

go

patterns.Map(fn func(In) (Out, error))                       // 1:1
patterns.Filter(fn func(T) (bool, error))                    // 1:0-1
patterns.Accumulate(fn func([]T) ([]T, error))               // N:N
patterns.Aggregate(fn func([]In) (Out, error))                // N:1
patterns.Expand(fn func(In) ([]Out, error))                   // 1:N
patterns.StatefulMap(factory func() func(In) (Out, error))    // 1:1 + state
patterns.StatefulFilter(factory func() func(T) (bool, error)) // 1:0-1 + state
patterns.Tap(fn func(T) error)                                // passthrough + side effect
patterns.Subprocess(name string, args ...string)              // fork external process

See Patterns → for details and examples.

Composition

Combine commands into new commands.

go

// Same-type chain: Command[T,T] + Command[T,T] → Command[T,T]
pipeline := gloo.Compose(cmd1).To(cmd2).To(cmd3)

// Type-changing: Command[A,B] + Command[B,C] → Command[A,C]
pipeline := gloo.Pipe(cmd1, cmd2)

// Adapt a function to Command
cmd := gloo.FuncCommand[In, Out](fn)

Wiring

Low-level source/command/sink wiring.

go

gloo.From(ctx, source, cmd)          // Source → Command → Stream
gloo.Into(ctx, source, cmd, sink)    // Source → Command → Sink → Result
gloo.Collect(ctx, stream)            // Stream → []T

Parameters

Type-safe option parsing for command constructors.

go

// Define option types
type Lines int
func (l Lines) Configure(f *flags) { f.lines = l }

// Parse in constructor
func Head(opts ...any) gloo.Command[[]byte, []byte] {
    p := gloo.NewParameters[gloo.File, flags](opts...)
    n := int(p.Flags.lines)
    if n <= 0 { n = 10 }
    return patterns.StatefulFilter(/* ... */)
}

// Users call cleanly via alias imports
Head(Lines(10))