Engine-first event publishing with EventSink, Watermill routing, and context-carried sinks for streaming AI.
When you run AI inference, you want to see results as they stream in — not wait for the entire response. This enables:
Geppetto's event system publishes structured events as inference progresses. Every token, tool call, and error becomes an event that flows through a unified pipeline.
┌─────────────────────────────────────────────────────────────────┐
│ Your Application │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────┐ ┌───────────────────┐ │
│ │ Engine │────▶│ Event Sink │────▶│ Watermill Router │ │
│ └──────────┘ └──────────────┘ └─────────┬─────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────┐ ┌─────────────┐ │
│ │ Helpers/ │─────────────────────────▶ │ Handlers │ │
│ │ Tools │ (via context sinks) │ (printers, │ │
│ └──────────┘ │ custom) │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Key components:
| Component | Role |
|---|---|
| Engine | Makes provider API calls, emits lifecycle events (start, partial, final) |
| Event Sink | Receives events and publishes to a topic |
| Watermill Router | Routes events to registered handlers |
| Handlers | Process events (print to console, aggregate, forward) |
| Context Sinks | Let helpers/tools emit events without explicit plumbing |
| Event | Type Constant | When Emitted | Key Fields |
|---|---|---|---|
| start | EventTypeStart | Inference begins | Metadata |
| partial | EventTypePartialCompletion | Each streamed chunk | Delta, Completion |
| partial-thinking | EventTypePartialThinking | Reasoning/thinking text partial (OpenAI Chat/Responses) | Delta, Completion |
| final | EventTypeFinal | Inference completes | Text, Metadata.Usage |
| interrupt | EventTypeInterrupt | Context cancelled | Text (partial) |
| error | EventTypeError | Error occurs | ErrorString |
| Event | Type Constant | When Emitted | Key Fields |
|---|---|---|---|
| tool-call | EventTypeToolCall | Model requests tool | ToolCall.Name, ToolCall.Input, ToolCall.ID |
| tool-result | EventTypeToolResult | Tool returns result | ToolResult.ID, ToolResult.Result |
| tool-call-execute | EventTypeToolCallExecute | Execution starts | ToolCall |
| tool-call-execution-result | EventTypeToolCallExecutionResult | Execution finishes | ToolResult |
| Category | Events | Purpose |
|---|---|---|
| Reasoning | partial-thinking plus info boundary events such as thinking-started and thinking-ended | Provider thinking/reasoning traces |
| Web Search | web-search-started, web-search-searching, web-search-done | Built-in web search progress |
| File Search | file-search-started, file-search-done | Built-in file search progress |
| Code Interpreter | code-interpreter-* | Code execution progress |
| MCP | mcp-*, mcp-list-tools-* | MCP tool progress |
| Image Gen | image-generation-* | Image generation progress |
| Status | log, info, status, agent-mode-switch | UI/debug status |
Common concrete Go types when parsing with events.NewEventFromJson:
*events.EventPartialCompletionStart → stream start*events.EventPartialCompletion → normal answer Delta, accumulated answer Completion*events.EventThinkingPartial → reasoning/thinking Delta, accumulated reasoning Completion*events.EventFinal → Text*events.EventToolCall → ToolCall with Name, Input, ID*events.EventToolResult → ToolResult with ID, Result*events.EventError → ErrorString*events.EventInterrupt → TextReasoning text uses EventThinkingPartial. Consumers that render thinking should read Completion for the accumulated text and Delta only for the latest increment. The older EventReasoningTextDelta and EventReasoningTextDone event types were removed so there is a single reasoning partial stream with the same delta/completion semantics as normal assistant text.
See full catalog: geppetto/pkg/events/chat-events.go
Every event carries EventMetadata:
type EventMetadata struct {
ID uuid.UUID // Stable per stream
SessionID string // Correlation ID for the session
InferenceID string // Correlation ID for the inference call
TurnID string // Correlation ID for the turn
Model string // Model identifier (e.g., "gpt-4")
Duration time.Duration
Usage Usage // Token counts (updated continuously)
Extra map[string]any // Provider-specific context
}
type Usage struct {
InputTokens int
OutputTokens int
CachedTokens int // OpenAI prompt cache
CacheCreationInputTokens int // Claude cache
CacheReadInputTokens int // Claude cache
}
Note: Usage is updated as chunks arrive, so UIs can display evolving token counts in real-time.
Every event carries SessionID, InferenceID, and TurnID in its metadata. These are the same IDs stored on Turn.Metadata (see Turns and Blocks), which means you can link any event back to the specific Turn snapshot it belongs to.
This correlation is the bridge between the two parallel tracks in the system:
pre_inference, post_inference, etc.)A debugging UI can use these shared IDs to cross-highlight: clicking a Turn snapshot highlights all events emitted during that phase, and clicking an event highlights the Turn it belongs to.
Geppetto uses context-carried sinks:
events.PublishEventToContext(ctx, ...).Attach sinks to context.Context at runtime:
watermillSink := middleware.NewWatermillSink(router.Publisher, "chat")
runCtx := events.WithEventSinks(ctx, watermillSink)
// Anywhere downstream can publish
events.PublishEventToContext(runCtx, events.NewToolResultEvent(meta, toolResult))
| Provider | Streaming Behavior |
|---|---|
| OpenAI (Chat) | start → multiple partial → final. Tool calls merged and emitted as tool-call when complete. |
| OpenAI (Responses) | Adds info events for reasoning boundaries, partial-thinking for reasoning/summary deltas with accumulated Completion. Function args streamed via SSE. |
| Claude | Content-block merger emits start → partial → tool-call (when complete) → final. |
All providers publish via context sinks.
Use a Watermill-backed EventRouter to route events to handlers:
router, _ := events.NewEventRouter()
defer router.Close()
// Add a simple printer
router.AddHandler("chat", "chat", events.StepPrinterFunc("", os.Stdout))
// Create sink and engine
sink := middleware.NewWatermillSink(router.Publisher, "chat")
eng, _ := factory.NewEngineFromParsedValues(parsed)
eg, groupCtx := errgroup.WithContext(ctx)
eg.Go(func() error { return router.Run(groupCtx) })
eg.Go(func() error {
<-router.Running()
runCtx := events.WithEventSinks(groupCtx, sink)
turn := &turns.Turn{}
turns.AppendBlock(turn, turns.NewUserTextBlock("Say hello."))
_, err := eng.RunInference(runCtx, turn)
return err
})
_ = eg.Wait()
events.NewEventRouter() defaults to Watermill’s in-memory gochannel pub/sub with:
BlockPublishUntilSubscriberAck: trueThis is a reasonable default for simple “single handler prints to stdout” demos, but it can deadlock or stall streaming inference when:
For streaming UIs (or any high-rate event stream), explicitly configure the pub/sub and pass it into the router:
import (
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"github.com/go-go-golems/geppetto/pkg/events"
)
goPubSub := gochannel.NewGoChannel(gochannel.Config{
OutputChannelBuffer: 256,
BlockPublishUntilSubscriberAck: false,
}, watermill.NopLogger{})
router, _ := events.NewEventRouter(
events.WithPublisher(goPubSub),
events.WithSubscriber(goPubSub),
)
defer router.Close()
If you are using a durable transport (Redis Streams, NATS, Kafka), the details differ, but the rule of thumb stays the same: do not let slow consumers block the inference publisher.
// Text streaming (writes deltas and final newline)
router.AddHandler("chat", "chat", events.StepPrinterFunc("", os.Stdout))
// Or structured output (text/json/yaml)
printer := events.NewStructuredPrinter(os.Stdout, events.PrinterOptions{
Format: events.FormatText, // or FormatJSON / FormatYAML
IncludeMetadata: false,
Full: false,
})
router.AddHandler("chat", "chat", printer)
router.AddHandler("collector", "chat", func(msg *message.Message) error {
defer msg.Ack()
e, err := events.NewEventFromJson(msg.Payload)
if err != nil { return err }
switch ev := e.(type) {
case *events.EventPartialCompletionStart:
// initialize buffer / UI
case *events.EventPartialCompletion:
// append ev.Delta
case *events.EventFinal:
// flush buffer or update UI with ev.Text
case *events.EventToolCall:
// access via ev.ToolCall.Name and ev.ToolCall.Input (string)
case *events.EventToolResult:
// access via ev.ToolResult.ID and ev.ToolResult.Result (string)
case *events.EventError:
return fmt.Errorf(ev.ErrorString)
}
return nil
})
ToolCall.Input and ToolResult.Result are strings. If they contain JSON, pretty-print them:
func prettyJSONIfPossible(s string) string {
t := strings.TrimSpace(s)
if strings.HasPrefix(t, "{") || strings.HasPrefix(t, "[") {
var v interface{}
if err := json.Unmarshal([]byte(s), &v); err == nil {
if b, err := json.MarshalIndent(v, "", " "); err == nil {
return string(b)
}
}
}
return s
}
For distributed systems, swap the in-process transport for NATS, Kafka, etc.:
pub := /* e.g., NATS/Kafka publisher */
sub := /* e.g., NATS/Kafka subscriber */
router, _ := events.NewEventRouter(events.WithPublisher(pub), events.WithSubscriber(sub))
// Your handlers remain the same
router.AddHandler("chat", "chat", events.StepPrinterFunc("", os.Stdout))
Geppetto provides an event registry for custom event types that flow through the same infrastructure.
import "github.com/go-go-golems/geppetto/pkg/events"
type CustomProgressEvent struct {
events.EventImpl
Progress float64 `json:"progress"`
Status string `json:"status"`
}
func init() {
decoder := func(b []byte) (events.Event, error) {
var ev CustomProgressEvent
if err := json.Unmarshal(b, &ev); err != nil {
return nil, err
}
ev.SetPayload(b)
return &ev, nil
}
_ = events.RegisterEventCodec("custom-progress", decoder)
}
type CustomStatusEvent struct {
events.EventImpl
Phase string `json:"phase"`
}
func init() {
factory := func() events.Event {
return &CustomStatusEvent{
EventImpl: events.EventImpl{Type_: "custom-status"},
}
}
_ = events.RegisterEventFactory("custom-status", factory)
}
func init() {
encoder := func(ev events.Event) ([]byte, error) {
return json.Marshal(ev)
}
_ = events.RegisterEventEncoder("custom-progress", encoder)
}
meta := events.EventMetadata{
ID: uuid.New(),
SessionID: "session-123",
InferenceID: "inference-456",
TurnID: "turn-456",
}
customEvent := &CustomProgressEvent{
EventImpl: events.EventImpl{
Type_: "custom-progress",
Metadata_: meta,
},
Progress: 0.75,
Status: "processing",
}
// Publish via context sinks
events.PublishEventToContext(ctx, customEvent)
// Or publish directly to a configured sink
_ = sink.PublishEvent(customEvent)
router.AddHandler("custom-collector", "chat", func(msg *message.Message) error {
defer msg.Ack()
e, err := events.NewEventFromJson(msg.Payload)
if err != nil { return err }
if progressEv, ok := e.(*CustomProgressEvent); ok {
fmt.Printf("Progress: %.0f%% - %s\n",
progressEv.Progress*100, progressEv.Status)
}
return nil
})
events.EventImpl to satisfy the Event interface.init() functions to register at package initialization.myapp-progress not progress).EventMetadata with ID, optionally SessionID/InferenceID/TurnID.The router.Run(ctx) method blocks until cancelled. Use errgroup to coordinate:
ctx, cancel := context.WithCancel(context.Background())
// NOTE: No defer cancel() here, errgroup handles it via groupCtx
eg, groupCtx := errgroup.WithContext(ctx)
router, _ := events.NewEventRouter()
// Goroutine for the router
eg.Go(func() error {
defer func() {
log.Info().Msg("Closing event router")
_ = router.Close()
log.Info().Msg("Event router closed")
}()
log.Info().Msg("Starting event router")
runErr := router.Run(groupCtx)
log.Info().Err(runErr).Msg("Event router stopped")
// Don't return context.Canceled as a fatal error
if runErr != nil && !errors.Is(runErr, context.Canceled) {
return runErr
}
return nil
})
// Goroutine for the main task
eg.Go(func() error {
defer cancel() // Signal router to stop when done
<-router.Running()
log.Info().Msg("Event router is running, proceeding with main task")
turn := &turns.Turn{}
turns.AppendBlock(turn, turns.NewUserTextBlock("Say hello."))
_, err := eng.RunInference(events.WithEventSinks(groupCtx, watermillSink), turn)
if err != nil {
return err
}
log.Info().Msg("Main task finished")
return nil
})
log.Info().Msg("Waiting for goroutines to finish")
if err := eg.Wait(); err != nil {
log.Error().Err(err).Msg("Application finished with error")
} else {
log.Info().Msg("Application finished successfully")
}
Why this pattern?
router.Close() is called reliablyGeppetto ships a filtering sink that extracts structured payloads from streaming text and emits typed events.
Blocks are delimited with XML-like tags:
<geppetto:citations:v1>
```yaml
citations:
- title: GPT-4 Technical Report
authors: [OpenAI]
```
</geppetto:citations:v1>
Each extractor declares the (package, type, version) tuple it handles:
type Extractor interface {
TagPackage() string
TagType() string
TagVersion() string
NewSession(ctx context.Context, meta events.EventMetadata, itemID string) ExtractorSession
}
type ExtractorSession interface {
OnStart(ctx context.Context) []events.Event
OnRaw(ctx context.Context, chunk []byte) []events.Event
OnCompleted(ctx context.Context, raw []byte, success bool, err error) []events.Event
}
next := /* your downstream events.EventSink */
sink := structuredsink.NewFilteringSink(next, structuredsink.Options{
Malformed: structuredsink.MalformedErrorEvents,
}, &citationsExtractor{})
The parsehelpers package includes a debounced YAML parser for progressive updates:
ctrl := parsehelpers.NewDebouncedYAML[citationsPayload](parsehelpers.DebounceConfig{
SnapshotEveryBytes: 512,
SnapshotOnNewline: true,
MaxBytes: 64 << 10,
})
Use OnRaw with FeedBytes(...) to emit "best-so-far" events, then OnCompleted with FinalBytes(...) to parse the final payload. DebounceConfig.SanitizeYAML is optional and defaults to true, so mildly malformed LLM YAML is sanitized before yaml.Unmarshal. If a block is malformed, the sink applies the configured policy (MalformedErrorEvents, MalformedReconstructText, or MalformedIgnore).
router.Running() before invoking inference to avoid dropped events.events.NewStructuredPrinter for machine-readable output during tests.init() functions.zerolog.DebugLevel) to see event publishing.| Problem | Cause | Solution |
|---|---|---|
| No events received | Router not running | Call <-router.Running() before inference |
| Missing tool events | Sink not on context | Use events.WithEventSinks(ctx, sink) |
| Dropped events | Wrong topic | Match topic in NewWatermillSink and AddHandler |
| Events stop mid-stream | Context cancelled | Check for deadline or explicit cancellation |
| Streaming stalls / hangs | In-memory gochannel publish blocks on ACK or no buffering | Configure pub/sub (OutputChannelBuffer, BlockPublishUntilSubscriberAck=false) and keep handlers fast |
import (
"github.com/go-go-golems/geppetto/pkg/events" // Core events, router, printers
"github.com/go-go-golems/geppetto/pkg/inference/middleware" // WatermillSink
"github.com/go-go-golems/geppetto/pkg/events/structuredsink" // Filtering + extraction
)
When geppetto events are used in pinocchio's webchat system, they pass through a SEM (Structured Event Message) translation layer that converts raw geppetto events into frontend-consumable frames. This section bridges geppetto's event system with pinocchio's webchat pipeline.
Geppetto Event Pinocchio Webchat
───────────────── ─────────────────
Engine emits event
│
▼
PublishEventToContext(ctx, ev)
│
▼
Watermill Router / Go channel
│
▼
StreamCoordinator.consume()
│
├─── SEM Translator ──────▶ SEM JSON frame ──▶ WebSocket ──▶ Browser
│ (sem_translator.go) {"sem":true, (broadcast) (SEM registry
│ "event":{...}} routes to Redux)
│
└─── Timeline Projector ──▶ TimelineStore (SQLite)
(timeline_projector.go) (durable snapshots for hydration)
The SEM translator uses a type-safe registry (semregistry.RegisterByType[T]) that dispatches on the Go event type. Each geppetto event maps to one or more SEM frame types:
| Geppetto Event | SEM Frame Type(s) | Notes |
|---|---|---|
EventPartialCompletionStart | llm.start | Opens a streaming message entity |
EventPartialCompletion | llm.delta | Cumulative content (not just the delta) |
EventFinal | llm.final | Closes the message, final text |
EventToolCall | tool.start | Tool name, input, status=running |
EventToolResult | tool.result, tool.done | Result data + completion signal |
EventToolCallExecute | tool.start | Alternative entry for tool execution |
EventError | (logged, not translated) | Errors surface via other mechanisms |
SEM frames need stable IDs so that streaming updates (llm.start → llm.delta → llm.final) reference the same entity. The translator resolves IDs using a three-tier fallback:
event.Metadata().ID is set, use it directly"llm-" + uuid.New()IDs are cached per-translator instance so all streaming events for the same logical message share one ID.
The EventMetadata fields (SessionID, InferenceID, TurnID) serve double duty:
This means the metadata you set when publishing a custom event determines how it gets identified downstream in the webchat UI.
If you define a custom event type (see Custom Event Types above), you can make it appear in the webchat UI by:
pinocchio/pkg/webchat/sem_translator.goFor a complete walkthrough, see Adding a New Event Type End-to-End in the pinocchio docs.
geppetto/pkg/events/structuredsink/filtering_sink.go — The FilteringSink that extracts structured payloads from text streamsgeppetto/cmd/examples/streaming-inference/main.go