Events, Streaming, and Watermill in Geppetto

Engine-first event publishing with EventSink, Watermill routing, and context-carried sinks for streaming AI.

Sections

Terminology & Glossary
📖 Documentation
Navigation
54 sectionsv0.1
📄 Events, Streaming, and Watermill in Geppetto — glaze help geppetto-events-streaming-watermill
geppetto-events-streaming-watermill

Events, Streaming, and Watermill in Geppetto

Engine-first event publishing with EventSink, Watermill routing, and context-carried sinks for streaming AI.

Topicgeppettoarchitectureeventswatermillpubsubai

Events, Streaming, and Watermill in Geppetto

Why Events?

When you run AI inference, you want to see results as they stream in — not wait for the entire response. This enables:

  • Responsive UIs — show tokens as they arrive, not after 10 seconds of silence
  • Progress feedback — show "Thinking..." or tool execution status
  • Debugging — trace exactly what the model is doing and when
  • Real-time metrics — display token counts as they accumulate

Geppetto's event system publishes structured events as inference progresses. Every token, tool call, and error becomes an event that flows through a unified pipeline.

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                         Your Application                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│   ┌──────────┐     ┌──────────────┐     ┌───────────────────┐  │
│   │  Engine  │────▶│  Event Sink  │────▶│  Watermill Router │  │
│   └──────────┘     └──────────────┘     └─────────┬─────────┘  │
│        │                                          │             │
│        ▼                                          ▼             │
│   ┌──────────┐                            ┌─────────────┐       │
│   │ Helpers/ │─────────────────────────▶ │  Handlers   │       │
│   │  Tools   │   (via context sinks)      │ (printers,  │       │
│   └──────────┘                            │  custom)    │       │
│                                           └─────────────┘       │
└─────────────────────────────────────────────────────────────────┘

Key components:

ComponentRole
EngineMakes provider API calls, emits lifecycle events (start, partial, final)
Event SinkReceives events and publishes to a topic
Watermill RouterRoutes events to registered handlers
HandlersProcess events (print to console, aggregate, forward)
Context SinksLet helpers/tools emit events without explicit plumbing

Event Types

Core Lifecycle Events

EventType ConstantWhen EmittedKey Fields
startEventTypeStartInference beginsMetadata
partialEventTypePartialCompletionEach streamed chunkDelta, Completion
partial-thinkingEventTypePartialThinkingReasoning/thinking text partial (OpenAI Chat/Responses)Delta, Completion
finalEventTypeFinalInference completesText, Metadata.Usage
interruptEventTypeInterruptContext cancelledText (partial)
errorEventTypeErrorError occursErrorString

Tool Events

EventType ConstantWhen EmittedKey Fields
tool-callEventTypeToolCallModel requests toolToolCall.Name, ToolCall.Input, ToolCall.ID
tool-resultEventTypeToolResultTool returns resultToolResult.ID, ToolResult.Result
tool-call-executeEventTypeToolCallExecuteExecution startsToolCall
tool-call-execution-resultEventTypeToolCallExecutionResultExecution finishesToolResult

Extended Events

CategoryEventsPurpose
Reasoningpartial-thinking plus info boundary events such as thinking-started and thinking-endedProvider thinking/reasoning traces
Web Searchweb-search-started, web-search-searching, web-search-doneBuilt-in web search progress
File Searchfile-search-started, file-search-doneBuilt-in file search progress
Code Interpretercode-interpreter-*Code execution progress
MCPmcp-*, mcp-list-tools-*MCP tool progress
Image Genimage-generation-*Image generation progress
Statuslog, info, status, agent-mode-switchUI/debug status

Event Type Cheat Sheet

Common concrete Go types when parsing with events.NewEventFromJson:

  • *events.EventPartialCompletionStart → stream start
  • *events.EventPartialCompletion → normal answer Delta, accumulated answer Completion
  • *events.EventThinkingPartial → reasoning/thinking Delta, accumulated reasoning Completion
  • *events.EventFinalText
  • *events.EventToolCallToolCall with Name, Input, ID
  • *events.EventToolResultToolResult with ID, Result
  • *events.EventErrorErrorString
  • *events.EventInterruptText

Reasoning text uses EventThinkingPartial. Consumers that render thinking should read Completion for the accumulated text and Delta only for the latest increment. The older EventReasoningTextDelta and EventReasoningTextDone event types were removed so there is a single reasoning partial stream with the same delta/completion semantics as normal assistant text.

See full catalog: geppetto/pkg/events/chat-events.go

Event Metadata

Every event carries EventMetadata:

type EventMetadata struct {
    ID       uuid.UUID // Stable per stream
    SessionID   string // Correlation ID for the session
    InferenceID string // Correlation ID for the inference call
    TurnID   string    // Correlation ID for the turn
    Model    string    // Model identifier (e.g., "gpt-4")
    Duration time.Duration
    
    Usage    Usage     // Token counts (updated continuously)
    Extra    map[string]any // Provider-specific context
}

type Usage struct {
    InputTokens              int
    OutputTokens             int
    CachedTokens             int // OpenAI prompt cache
    CacheCreationInputTokens int // Claude cache
    CacheReadInputTokens     int // Claude cache
}

Note: Usage is updated as chunks arrive, so UIs can display evolving token counts in real-time.

Event-to-Turn Correlation

Every event carries SessionID, InferenceID, and TurnID in its metadata. These are the same IDs stored on Turn.Metadata (see Turns and Blocks), which means you can link any event back to the specific Turn snapshot it belongs to.

This correlation is the bridge between the two parallel tracks in the system:

  • State track: Turn snapshots captured at named phases (pre_inference, post_inference, etc.)
  • Event track: streaming telemetry emitted during those phases

A debugging UI can use these shared IDs to cross-highlight: clicking a Turn snapshot highlights all events emitted during that phase, and clicking an event highlights the Turn it belongs to.

Publishing Events

Geppetto uses context-carried sinks:

  • Provider engines publish via events.PublishEventToContext(ctx, ...).
  • Helper layers (tool loops, middleware) also publish via context.

Attach sinks to context.Context at runtime:

watermillSink := middleware.NewWatermillSink(router.Publisher, "chat")
runCtx := events.WithEventSinks(ctx, watermillSink)

// Anywhere downstream can publish
events.PublishEventToContext(runCtx, events.NewToolResultEvent(meta, toolResult))

Provider Event Flow

ProviderStreaming Behavior
OpenAI (Chat)start → multiple partialfinal. Tool calls merged and emitted as tool-call when complete.
OpenAI (Responses)Adds info events for reasoning boundaries, partial-thinking for reasoning/summary deltas with accumulated Completion. Function args streamed via SSE.
ClaudeContent-block merger emits startpartialtool-call (when complete) → final.

All providers publish via context sinks.

Running the Event Router

Use a Watermill-backed EventRouter to route events to handlers:

router, _ := events.NewEventRouter()
defer router.Close()

// Add a simple printer
router.AddHandler("chat", "chat", events.StepPrinterFunc("", os.Stdout))

// Create sink and engine
sink := middleware.NewWatermillSink(router.Publisher, "chat")
eng, _ := factory.NewEngineFromParsedValues(parsed)

eg, groupCtx := errgroup.WithContext(ctx)

eg.Go(func() error { return router.Run(groupCtx) })
eg.Go(func() error {
    <-router.Running()
    runCtx := events.WithEventSinks(groupCtx, sink)
    turn := &turns.Turn{}
    turns.AppendBlock(turn, turns.NewUserTextBlock("Say hello."))
    _, err := eng.RunInference(runCtx, turn)
    return err
})

_ = eg.Wait()

Important: in-memory router defaults can block streaming

events.NewEventRouter() defaults to Watermill’s in-memory gochannel pub/sub with:

  • BlockPublishUntilSubscriberAck: true
  • an unbuffered output channel (Watermill default)

This is a reasonable default for simple “single handler prints to stdout” demos, but it can deadlock or stall streaming inference when:

  • any handler is slow (UI rendering, DB writes, network I/O),
  • a handler is registered but not actively draining yet,
  • you publish many partial events quickly (token streaming).

For streaming UIs (or any high-rate event stream), explicitly configure the pub/sub and pass it into the router:

import (
    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
    "github.com/go-go-golems/geppetto/pkg/events"
)

goPubSub := gochannel.NewGoChannel(gochannel.Config{
    OutputChannelBuffer:            256,
    BlockPublishUntilSubscriberAck: false,
}, watermill.NopLogger{})

router, _ := events.NewEventRouter(
    events.WithPublisher(goPubSub),
    events.WithSubscriber(goPubSub),
)
defer router.Close()

If you are using a durable transport (Redis Streams, NATS, Kafka), the details differ, but the rule of thumb stays the same: do not let slow consumers block the inference publisher.

Client-side Consumption Patterns

1) Console Streaming Printer

// Text streaming (writes deltas and final newline)
router.AddHandler("chat", "chat", events.StepPrinterFunc("", os.Stdout))

// Or structured output (text/json/yaml)
printer := events.NewStructuredPrinter(os.Stdout, events.PrinterOptions{
    Format:          events.FormatText, // or FormatJSON / FormatYAML
    IncludeMetadata: false,
    Full:            false,
})
router.AddHandler("chat", "chat", printer)

2) Custom Handler

router.AddHandler("collector", "chat", func(msg *message.Message) error {
    defer msg.Ack()
    e, err := events.NewEventFromJson(msg.Payload)
    if err != nil { return err }

    switch ev := e.(type) {
    case *events.EventPartialCompletionStart:
        // initialize buffer / UI
    case *events.EventPartialCompletion:
        // append ev.Delta
    case *events.EventFinal:
        // flush buffer or update UI with ev.Text
    case *events.EventToolCall:
        // access via ev.ToolCall.Name and ev.ToolCall.Input (string)
    case *events.EventToolResult:
        // access via ev.ToolResult.ID and ev.ToolResult.Result (string)
    case *events.EventError:
        return fmt.Errorf(ev.ErrorString)
    }
    return nil
})

Pretty-printing tool payloads (JSON-aware)

ToolCall.Input and ToolResult.Result are strings. If they contain JSON, pretty-print them:

func prettyJSONIfPossible(s string) string {
    t := strings.TrimSpace(s)
    if strings.HasPrefix(t, "{") || strings.HasPrefix(t, "[") {
        var v interface{}
        if err := json.Unmarshal([]byte(s), &v); err == nil {
            if b, err := json.MarshalIndent(v, "", "  "); err == nil {
                return string(b)
            }
        }
    }
    return s
}

3) Cross-process Consumption

For distributed systems, swap the in-process transport for NATS, Kafka, etc.:

pub := /* e.g., NATS/Kafka publisher */
sub := /* e.g., NATS/Kafka subscriber */
router, _ := events.NewEventRouter(events.WithPublisher(pub), events.WithSubscriber(sub))

// Your handlers remain the same
router.AddHandler("chat", "chat", events.StepPrinterFunc("", os.Stdout))

Custom Event Types

Geppetto provides an event registry for custom event types that flow through the same infrastructure.

Registration Methods

1. RegisterEventCodec (full control)

import "github.com/go-go-golems/geppetto/pkg/events"

type CustomProgressEvent struct {
    events.EventImpl
    Progress float64 `json:"progress"`
    Status   string  `json:"status"`
}

func init() {
    decoder := func(b []byte) (events.Event, error) {
        var ev CustomProgressEvent
        if err := json.Unmarshal(b, &ev); err != nil {
            return nil, err
        }
        ev.SetPayload(b)
        return &ev, nil
    }
    
    _ = events.RegisterEventCodec("custom-progress", decoder)
}

2. RegisterEventFactory (convenience)

type CustomStatusEvent struct {
    events.EventImpl
    Phase string `json:"phase"`
}

func init() {
    factory := func() events.Event {
        return &CustomStatusEvent{
            EventImpl: events.EventImpl{Type_: "custom-status"},
        }
    }
    
    _ = events.RegisterEventFactory("custom-status", factory)
}

3. RegisterEventEncoder (outbound serialization)

func init() {
    encoder := func(ev events.Event) ([]byte, error) {
        return json.Marshal(ev)
    }
    
    _ = events.RegisterEventEncoder("custom-progress", encoder)
}

Publishing Custom Events

meta := events.EventMetadata{
    ID:     uuid.New(),
    SessionID:  "session-123",
    InferenceID: "inference-456",
    TurnID: "turn-456",
}

customEvent := &CustomProgressEvent{
    EventImpl: events.EventImpl{
        Type_:     "custom-progress",
        Metadata_: meta,
    },
    Progress: 0.75,
    Status:   "processing",
}

// Publish via context sinks
events.PublishEventToContext(ctx, customEvent)

// Or publish directly to a configured sink
_ = sink.PublishEvent(customEvent)

Consuming Custom Events

router.AddHandler("custom-collector", "chat", func(msg *message.Message) error {
    defer msg.Ack()
    
    e, err := events.NewEventFromJson(msg.Payload)
    if err != nil { return err }
    
    if progressEv, ok := e.(*CustomProgressEvent); ok {
        fmt.Printf("Progress: %.0f%% - %s\n", 
            progressEv.Progress*100, progressEv.Status)
    }
    
    return nil
})

Best Practices for Custom Events

  • Embed EventImpl: All custom events should embed events.EventImpl to satisfy the Event interface.
  • Register in init(): Use init() functions to register at package initialization.
  • Unique type names: Choose distinctive strings (e.g., myapp-progress not progress).
  • Metadata consistency: Always populate EventMetadata with ID, optionally SessionID/InferenceID/TurnID.
  • Handle registration errors: Duplicate registrations will fail.

Use Cases for Custom Events

  • Tool-specific progress: Database query progress, file upload status
  • Domain events: "user-action", "workflow-step-completed"
  • Integration events: Events from external systems
  • Debugging events: Instrumentation during development

Full Example: Router + Engine + Helpers

The router.Run(ctx) method blocks until cancelled. Use errgroup to coordinate:

ctx, cancel := context.WithCancel(context.Background())
// NOTE: No defer cancel() here, errgroup handles it via groupCtx

eg, groupCtx := errgroup.WithContext(ctx)
router, _ := events.NewEventRouter()

// Goroutine for the router
eg.Go(func() error {
    defer func() {
        log.Info().Msg("Closing event router")
        _ = router.Close()
        log.Info().Msg("Event router closed")
    }()

    log.Info().Msg("Starting event router")
    runErr := router.Run(groupCtx)
    log.Info().Err(runErr).Msg("Event router stopped")
    // Don't return context.Canceled as a fatal error
    if runErr != nil && !errors.Is(runErr, context.Canceled) {
        return runErr
    }
    return nil
})

// Goroutine for the main task
eg.Go(func() error {
    defer cancel() // Signal router to stop when done

    <-router.Running()
    log.Info().Msg("Event router is running, proceeding with main task")

    turn := &turns.Turn{}
    turns.AppendBlock(turn, turns.NewUserTextBlock("Say hello."))
    _, err := eng.RunInference(events.WithEventSinks(groupCtx, watermillSink), turn)
    if err != nil {
        return err
    }
    
    log.Info().Msg("Main task finished")
    return nil 
})

log.Info().Msg("Waiting for goroutines to finish")
if err := eg.Wait(); err != nil {
    log.Error().Err(err).Msg("Application finished with error")
} else {
    log.Info().Msg("Application finished successfully")
}

Why this pattern?

  • Router is confirmed running before publishing starts
  • Router stays alive while the task runs
  • If task finishes/fails, router shuts down via context cancellation
  • If router fails, task stops via context cancellation
  • router.Close() is called reliably

Structured Data Extraction with Filtering Sinks

Geppetto ships a filtering sink that extracts structured payloads from streaming text and emits typed events.

Tag Format

Blocks are delimited with XML-like tags:

<geppetto:citations:v1>
```yaml
citations:
  - title: GPT-4 Technical Report
    authors: [OpenAI]
```
</geppetto:citations:v1>

Extractor Interface

Each extractor declares the (package, type, version) tuple it handles:

type Extractor interface {
    TagPackage() string
    TagType() string
    TagVersion() string
    NewSession(ctx context.Context, meta events.EventMetadata, itemID string) ExtractorSession
}

type ExtractorSession interface {
    OnStart(ctx context.Context) []events.Event
    OnRaw(ctx context.Context, chunk []byte) []events.Event
    OnCompleted(ctx context.Context, raw []byte, success bool, err error) []events.Event
}

Sink Setup

next := /* your downstream events.EventSink */
sink := structuredsink.NewFilteringSink(next, structuredsink.Options{
    Malformed: structuredsink.MalformedErrorEvents,
}, &citationsExtractor{})

Parsing Helpers

The parsehelpers package includes a debounced YAML parser for progressive updates:

ctrl := parsehelpers.NewDebouncedYAML[citationsPayload](parsehelpers.DebounceConfig{
    SnapshotEveryBytes: 512,
    SnapshotOnNewline:  true,
    MaxBytes:           64 << 10,
})

Use OnRaw with FeedBytes(...) to emit "best-so-far" events, then OnCompleted with FinalBytes(...) to parse the final payload. DebounceConfig.SanitizeYAML is optional and defaults to true, so mildly malformed LLM YAML is sanitized before yaml.Unmarshal. If a block is malformed, the sink applies the configured policy (MalformedErrorEvents, MalformedReconstructText, or MalformedIgnore).

Practical Tips

  • Always wait for router.Running() before invoking inference to avoid dropped events.
  • Attach the same sink to both the engine and the context so helpers/tools can publish.
  • Prefer events.NewStructuredPrinter for machine-readable output during tests.
  • Register custom event types in init() functions.
  • Enable debug logging (zerolog.DebugLevel) to see event publishing.

Troubleshooting

ProblemCauseSolution
No events receivedRouter not runningCall <-router.Running() before inference
Missing tool eventsSink not on contextUse events.WithEventSinks(ctx, sink)
Dropped eventsWrong topicMatch topic in NewWatermillSink and AddHandler
Events stop mid-streamContext cancelledCheck for deadline or explicit cancellation
Streaming stalls / hangsIn-memory gochannel publish blocks on ACK or no bufferingConfigure pub/sub (OutputChannelBuffer, BlockPublishUntilSubscriberAck=false) and keep handlers fast

Packages

import (
    "github.com/go-go-golems/geppetto/pkg/events"               // Core events, router, printers
    "github.com/go-go-golems/geppetto/pkg/inference/middleware" // WatermillSink
    "github.com/go-go-golems/geppetto/pkg/events/structuredsink" // Filtering + extraction
)

Where Events Go: The SEM Translation Layer

When geppetto events are used in pinocchio's webchat system, they pass through a SEM (Structured Event Message) translation layer that converts raw geppetto events into frontend-consumable frames. This section bridges geppetto's event system with pinocchio's webchat pipeline.

The Translation Pipeline

Geppetto Event                    Pinocchio Webchat
─────────────────                 ─────────────────
Engine emits event
    │
    ▼
PublishEventToContext(ctx, ev)
    │
    ▼
Watermill Router / Go channel
    │
    ▼
StreamCoordinator.consume()
    │
    ├─── SEM Translator ──────▶ SEM JSON frame ──▶ WebSocket ──▶ Browser
    │    (sem_translator.go)     {"sem":true,        (broadcast)   (SEM registry
    │                             "event":{...}}                    routes to Redux)
    │
    └─── Timeline Projector ──▶ TimelineStore (SQLite)
         (timeline_projector.go)  (durable snapshots for hydration)

How Events Map to SEM Types

The SEM translator uses a type-safe registry (semregistry.RegisterByType[T]) that dispatches on the Go event type. Each geppetto event maps to one or more SEM frame types:

Geppetto EventSEM Frame Type(s)Notes
EventPartialCompletionStartllm.startOpens a streaming message entity
EventPartialCompletionllm.deltaCumulative content (not just the delta)
EventFinalllm.finalCloses the message, final text
EventToolCalltool.startTool name, input, status=running
EventToolResulttool.result, tool.doneResult data + completion signal
EventToolCallExecutetool.startAlternative entry for tool execution
EventError(logged, not translated)Errors surface via other mechanisms

Stable ID Resolution

SEM frames need stable IDs so that streaming updates (llm.startllm.deltallm.final) reference the same entity. The translator resolves IDs using a three-tier fallback:

  1. Metadata ID — if event.Metadata().ID is set, use it directly
  2. Cached correlation — look up by InferenceID → TurnID → SessionID (first match wins)
  3. Generated fallback"llm-" + uuid.New()

IDs are cached per-translator instance so all streaming events for the same logical message share one ID.

Correlation IDs Bridge the Gap

The EventMetadata fields (SessionID, InferenceID, TurnID) serve double duty:

  • Within geppetto: They link events to Turn snapshots (see Turns and Blocks)
  • Within pinocchio: They enable the SEM translator to resolve stable entity IDs and the timeline projector to group related events

This means the metadata you set when publishing a custom event determines how it gets identified downstream in the webchat UI.

Adding Custom Events to Webchat

If you define a custom event type (see Custom Event Types above), you can make it appear in the webchat UI by:

  1. Registering a SEM handler in pinocchio/pkg/webchat/sem_translator.go
  2. Optionally adding a timeline projector case for persistence
  3. Creating a frontend SEM handler and widget

For a complete walkthrough, see Adding a New Event Type End-to-End in the pinocchio docs.

See Also

  • Inference Engines — How engines emit events; see "Complete Runtime Flow" for how events relate to Turn snapshots
  • Turns and Blocks — The Turn data model; events carry Turn correlation IDs
  • Middlewares — Middlewares can emit events (e.g., agent-mode-switch)
  • Tools — Tool events and execution
  • Streaming Tutorial — Complete example
  • Adding a New Event Type (pinocchio) — End-to-end tutorial for custom event types in webchat
  • Implementation: geppetto/pkg/events/structuredsink/filtering_sink.go — The FilteringSink that extracts structured payloads from text streams
  • Example: geppetto/cmd/examples/streaming-inference/main.go