Build an application that routes inference events to multiple handlers and produces structured JSON/YAML logs.
This tutorial teaches you how to build an application that routes inference events to multiple handlers simultaneously — one for real-time console output and another for structured JSON logging suitable for log aggregation systems.
A CLI application that:
┌─────────────────────┐
│ Engine │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ WatermillSink │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ EventRouter │
└──────────┬──────────┘
│
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Text Printer│ │ JSON Logger │ │ Aggregator │
│ (stdout) │ │ (file) │ │ (metrics) │
└─────────────┘ └─────────────┘ └─────────────┘
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"sync/atomic"
"time"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-go-golems/geppetto/pkg/events"
"github.com/go-go-golems/geppetto/pkg/inference/engine"
"github.com/go-go-golems/geppetto/pkg/inference/engine/factory"
"github.com/go-go-golems/geppetto/pkg/inference/middleware"
"github.com/go-go-golems/geppetto/pkg/turns"
"golang.org/x/sync/errgroup"
)
func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}
func run() error {
ctx := context.Background()
// ... implementation follows
}
func run() error {
ctx := context.Background()
// Create the event router
router, err := events.NewEventRouter()
if err != nil {
return fmt.Errorf("failed to create router: %w", err)
}
defer router.Close()
// ... add handlers next
}
Important: The default in-memory router can block streaming if a handler is slow or not draining yet (see glaze help geppetto-events-streaming-watermill). For production or UI streaming, consider an external transport (Redis Streams) or explicitly configure gochannel buffering and disable publish→ACK blocking.
The built-in StepPrinterFunc streams text deltas to stdout:
// Handler 1: Real-time text streaming to console
router.AddHandler(
"console-printer", // Unique handler name
"chat", // Topic to subscribe to
events.StepPrinterFunc("", os.Stdout),
)
What it does:
partial event's delta directly to stdoutfinal eventsUse NewStructuredPrinter for machine-readable output:
// Handler 2: Structured JSON logging to file
logFile, err := os.Create("inference.log")
if err != nil {
return fmt.Errorf("failed to create log file: %w", err)
}
defer logFile.Close()
jsonPrinter := events.NewStructuredPrinter(logFile, events.PrinterOptions{
Format: events.FormatJSON, // JSON lines format
IncludeMetadata: true, // Include timing, tokens, etc.
Full: true, // Include all event types
})
router.AddHandler(
"json-logger",
"chat",
jsonPrinter,
)
Printer formats:
events.FormatJSON — One JSON object per line (JSONL)events.FormatYAML — YAML documents separated by ---events.FormatText — Human-readable textOptions:
IncludeMetadata: true — Add metadata field with timing, usage, etc.Full: true — Log all events, not just textPrefix: ">>> " — Prepend prefix to each lineBuild a handler that collects statistics:
// Handler 3: Metrics aggregator
var stats struct {
EventCount int64
InputTokens int64
OutputTokens int64
ToolCalls int64
Errors int64
StartTime time.Time
}
stats.StartTime = time.Now()
router.AddHandler("aggregator", "chat", func(msg *message.Message) error {
defer msg.Ack()
atomic.AddInt64(&stats.EventCount, 1)
ev, err := events.NewEventFromJson(msg.Payload)
if err != nil {
return nil // Skip malformed events
}
switch e := ev.(type) {
case *events.EventFinal:
atomic.AddInt64(&stats.InputTokens, int64(e.Metadata().Usage.InputTokens))
atomic.AddInt64(&stats.OutputTokens, int64(e.Metadata().Usage.OutputTokens))
case *events.EventToolCall:
atomic.AddInt64(&stats.ToolCalls, 1)
case *events.EventError:
atomic.AddInt64(&stats.Errors, 1)
}
return nil
})
// Create sink that publishes to the "chat" topic
sink := middleware.NewWatermillSink(router.Publisher, "chat")
// Create engine (sinks are attached at runtime via context)
eng, err := factory.NewEngineFromParsedValues(parsedValues)
if err != nil {
return fmt.Errorf("failed to create engine: %w", err)
}
// Build the Turn
turn := &turns.Turn{}
turns.AppendBlock(turn, turns.NewSystemTextBlock("You are a helpful assistant."))
turns.AppendBlock(turn, turns.NewUserTextBlock("Explain event-driven architecture in 3 sentences."))
// Run router and inference concurrently
eg, groupCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
return router.Run(groupCtx)
})
eg.Go(func() error {
<-router.Running() // Wait for router to be ready
// Attach sink to context for any helpers that publish
runCtx := events.WithEventSinks(groupCtx, sink)
_, err := eng.RunInference(runCtx, turn)
return err
})
if err := eg.Wait(); err != nil {
return fmt.Errorf("inference failed: %w", err)
}
// Print aggregated stats
duration := time.Since(stats.StartTime)
fmt.Printf("\n\n--- Statistics ---\n")
fmt.Printf("Duration: %v\n", duration)
fmt.Printf("Events: %d\n", stats.EventCount)
fmt.Printf("Tokens: %d in, %d out\n", stats.InputTokens, stats.OutputTokens)
fmt.Printf("Tool calls: %d\n", stats.ToolCalls)
fmt.Printf("Errors: %d\n", stats.Errors)
return nil
}
package main
import (
"context"
"fmt"
"os"
"sync/atomic"
"time"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-go-golems/geppetto/pkg/events"
"github.com/go-go-golems/geppetto/pkg/inference/engine"
"github.com/go-go-golems/geppetto/pkg/inference/engine/factory"
"github.com/go-go-golems/geppetto/pkg/inference/middleware"
"github.com/go-go-golems/geppetto/pkg/turns"
"golang.org/x/sync/errgroup"
)
func main() {
ctx := context.Background()
// 1. Create router
router, _ := events.NewEventRouter()
defer router.Close()
// 2. Console printer (real-time streaming)
router.AddHandler("console", "chat", events.StepPrinterFunc("", os.Stdout))
// 3. JSON logger (structured logs)
logFile, _ := os.Create("inference.log")
defer logFile.Close()
jsonPrinter := events.NewStructuredPrinter(logFile, events.PrinterOptions{
Format: events.FormatJSON,
IncludeMetadata: true,
Full: true,
})
router.AddHandler("json-logger", "chat", jsonPrinter)
// 4. Metrics aggregator
var totalTokens int64
router.AddHandler("metrics", "chat", func(msg *message.Message) error {
defer msg.Ack()
ev, _ := events.NewEventFromJson(msg.Payload)
if final, ok := ev.(*events.EventFinal); ok {
atomic.AddInt64(&totalTokens, int64(
final.Metadata().Usage.InputTokens+final.Metadata().Usage.OutputTokens,
))
}
return nil
})
// 5. Create sink and engine
sink := middleware.NewWatermillSink(router.Publisher, "chat")
eng, _ := factory.NewEngineFromParsedValues(parsedValues)
// 6. Build Turn
turn := &turns.Turn{}
turns.AppendBlock(turn, turns.NewUserTextBlock("Hello!"))
// 7. Run
eg, groupCtx := errgroup.WithContext(ctx)
eg.Go(func() error { return router.Run(groupCtx) })
eg.Go(func() error {
<-router.Running()
_, err := eng.RunInference(events.WithEventSinks(groupCtx, sink), turn)
return err
})
_ = eg.Wait()
fmt.Printf("\n\nTotal tokens: %d\n", totalTokens)
}
Console (stdout):
Hello! I'm here to help. What would you like to know?
Log file (inference.log):
{"type":"start","metadata":{"id":"abc123","model":"gpt-4"}}
{"type":"partial","delta":"Hello","completion":"Hello","metadata":{...}}
{"type":"partial","delta":"!","completion":"Hello!","metadata":{...}}
{"type":"final","text":"Hello! I'm here to help...","metadata":{"usage":{"input_tokens":12,"output_tokens":15}}}
Route different event types to different handlers:
// Tool events go to a separate topic
toolSink := middleware.NewWatermillSink(router.Publisher, "tools")
// Add tool-specific handler
router.AddHandler("tool-logger", "tools", func(msg *message.Message) error {
defer msg.Ack()
// Log tool calls to separate file
return nil
})
For distributed systems, use external Watermill transports:
import "github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream"
// Create Redis publisher/subscriber
pub, _ := redisstream.NewPublisher(redisstream.PublisherConfig{...}, logger)
sub, _ := redisstream.NewSubscriber(redisstream.SubscriberConfig{...}, logger)
// Pass to router
router, _ := events.NewEventRouter(
events.WithPublisher(pub),
events.WithSubscriber(sub),
)
Now events flow through Redis and can be consumed by any connected service.
| Problem | Cause | Solution |
|---|---|---|
| No console output | Handler not registered | Check AddHandler called before Run |
| Missing log entries | File not flushed | Ensure defer logFile.Close() |
| Duplicate events | Same handler name | Use unique names per handler |
| Events out of order | Async handlers | Events arrive in order per handler |
geppetto/cmd/examples/streaming-inference/main.go