Add a New Event Handler

Step-by-step guide to implement a custom event handler, subscribe it to the router, and parse incoming events.

Sections

Terminology & Glossary
📖 Documentation
Navigation
58 sectionsv0.1
📄 Add a New Event Handler — glaze help geppetto-playbook-add-event-handler
geppetto-playbook-add-event-handler

Add a New Event Handler

Step-by-step guide to implement a custom event handler, subscribe it to the router, and parse incoming events.

Tutorialgeppettoeventsplaybookwatermill

Add a New Event Handler

This playbook walks through adding a custom event handler to your Geppetto application. By the end, your handler will receive streaming events from inference and process them as needed (logging, aggregation, forwarding, etc.).

Prerequisites

Steps

Step 1: Create the Event Router

The router transports events via Watermill. Create it early in your application:

import "github.com/go-go-golems/geppetto/pkg/events"

router, err := events.NewEventRouter()
if err != nil {
    return fmt.Errorf("failed to create router: %w", err)
}
defer router.Close()

Options:

  • events.WithVerbose(true) — Enable Watermill debug logging
  • events.WithPublisher(pub) / events.WithSubscriber(sub) — Use external transport (NATS, Redis, Kafka)

Step 2: Implement Your Handler Function

A handler receives a *message.Message from Watermill and must call msg.Ack() when done:

import (
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/go-go-golems/geppetto/pkg/events"
)

func myCustomHandler(msg *message.Message) error {
    defer msg.Ack() // Always acknowledge the message
    
    // Parse the raw JSON into a typed event
    ev, err := events.NewEventFromJson(msg.Payload)
    if err != nil {
        return fmt.Errorf("failed to parse event: %w", err)
    }
    
    // Handle specific event types
    switch e := ev.(type) {
    case *events.EventPartialCompletionStart:
        fmt.Println("Stream started")
        
    case *events.EventPartialCompletion:
        fmt.Printf("Delta: %s\n", e.Delta)
        
    case *events.EventFinal:
        fmt.Printf("Final text: %s\n", e.Text)
        fmt.Printf("Tokens: %d input, %d output\n", 
            e.Metadata().Usage.InputTokens, 
            e.Metadata().Usage.OutputTokens)
        
    case *events.EventToolCall:
        fmt.Printf("Tool call: %s(%s)\n", e.ToolCall.Name, e.ToolCall.Input)
        
    case *events.EventToolResult:
        fmt.Printf("Tool result [%s]: %s\n", e.ToolResult.ID, e.ToolResult.Result)
        
    case *events.EventError:
        return fmt.Errorf("inference error: %s", e.ErrorString)
    }
    
    return nil
}

Context and blocking work (important)

Watermill handlers run on the router’s goroutines. If you block a handler (DB writes, HTTP calls, heavy JSON processing), you can unintentionally stall the entire event pipeline — which is especially noticeable during streaming inference (many partial events).

Rules of thumb:

  • Keep handlers fast and best-effort when they are driving UI updates or telemetry.
  • Always Ack() messages. If you forget, some transports will stall and/or re-deliver.
  • Avoid using msg.Context() as the context for durable side-effects (DB writes). It is scoped to message delivery and can be canceled unexpectedly relative to your persistence needs.
  • If a handler must do I/O, use a bounded derived context and consider offloading work to another goroutine/queue:
func persistSomething(ctx context.Context, payload []byte) error {
    // do DB work / network I/O
    return nil
}

func myHandler(msg *message.Message) error {
    defer msg.Ack()

    ioCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    _ = persistSomething(ioCtx, msg.Payload) // best-effort
    return nil
}

If you are using the default in-memory router (events.NewEventRouter()), also see the warning in Events about gochannel configuration for streaming apps.

Step 3: Register the Handler

Add your handler to the router with a unique name and topic:

router.AddHandler(
    "my-custom-handler",  // Handler name (unique within router)
    "chat",               // Topic to subscribe to
    myCustomHandler,      // Your handler function
)

Important: The topic must match the one used when creating the sink (Step 5).

Step 4: Create the Event Sink

The sink connects your engine to the router:

import "github.com/go-go-golems/geppetto/pkg/inference/middleware"

sink := middleware.NewWatermillSink(router.Publisher, "chat")

Step 5: Wire the Engine with the Sink

Create the engine normally (no sink/options are passed at engine construction time):

eng, err := factory.NewEngineFromParsedValues(parsedValues)
if err != nil {
    return err
}

Step 6: Run Router and Inference Concurrently

Use errgroup to coordinate the router and inference:

import "golang.org/x/sync/errgroup"

eg, groupCtx := errgroup.WithContext(ctx)

// Start the router
eg.Go(func() error {
    return router.Run(groupCtx)
})

// Run inference after router is ready
eg.Go(func() error {
    <-router.Running() // Wait for router to be ready
    
    // Attach sink to context so provider engines and helpers publish to the router
    runCtx := events.WithEventSinks(groupCtx, sink)
    
    // Run your inference
    _, err := eng.RunInference(runCtx, turn)
    return err
})

if err := eg.Wait(); err != nil {
    return err
}

Complete Example

package main

import (
    "context"
    "fmt"
    "os"
    
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/go-go-golems/geppetto/pkg/events"
    "github.com/go-go-golems/geppetto/pkg/inference/engine"
    "github.com/go-go-golems/geppetto/pkg/inference/engine/factory"
    "github.com/go-go-golems/geppetto/pkg/inference/middleware"
    "github.com/go-go-golems/geppetto/pkg/turns"
    "golang.org/x/sync/errgroup"
)

func main() {
    ctx := context.Background()
    
    // 1. Create router
    router, _ := events.NewEventRouter()
    defer router.Close()
    
    // 2. Add built-in text printer
    router.AddHandler("printer", "chat", events.StepPrinterFunc("", os.Stdout))
    
    // 3. Add custom aggregator
    var tokenCount int
    router.AddHandler("aggregator", "chat", func(msg *message.Message) error {
        defer msg.Ack()
        ev, _ := events.NewEventFromJson(msg.Payload)
        if final, ok := ev.(*events.EventFinal); ok {
            tokenCount = final.Metadata().Usage.InputTokens + final.Metadata().Usage.OutputTokens
        }
        return nil
    })
    
    // 4. Create sink and engine
    sink := middleware.NewWatermillSink(router.Publisher, "chat")
    eng, _ := factory.NewEngineFromParsedValues(parsedValues)
    
    // 5. Build Turn
    turn := &turns.Turn{}
    turns.AppendBlock(turn, turns.NewUserTextBlock("Hello!"))
    
    // 6. Run concurrently
    eg, groupCtx := errgroup.WithContext(ctx)
    eg.Go(func() error { return router.Run(groupCtx) })
    eg.Go(func() error {
        <-router.Running()
        _, err := eng.RunInference(events.WithEventSinks(groupCtx, sink), turn)
        return err
    })
    
    _ = eg.Wait()
    fmt.Printf("\nTotal tokens: %d\n", tokenCount)
}

Advanced: Custom Event Types

Register custom event types to flow through the same infrastructure:

import "github.com/go-go-golems/geppetto/pkg/events"

// Define custom event
type MyProgressEvent struct {
    events.EventImpl
    Progress float64 `json:"progress"`
    Message  string  `json:"message"`
}

// Register decoder in init()
func init() {
    _ = events.RegisterEventFactory("my-progress", func() events.Event {
        return &MyProgressEvent{EventImpl: events.EventImpl{Type_: "my-progress"}}
    })
}

// Publish from your code
func publishProgress(ctx context.Context, progress float64, msg string) {
    ev := &MyProgressEvent{
        EventImpl: events.EventImpl{
            Type_: "my-progress",
            Metadata_: events.EventMetadata{ID: uuid.New()},
        },
        Progress: progress,
        Message:  msg,
    }
    events.PublishEventToContext(ctx, ev)
}

// Handle in your handler
func myHandler(msg *message.Message) error {
    defer msg.Ack()
    ev, _ := events.NewEventFromJson(msg.Payload)
    if progress, ok := ev.(*MyProgressEvent); ok {
        fmt.Printf("Progress: %.0f%% - %s\n", progress.Progress*100, progress.Message)
    }
    return nil
}

Troubleshooting

ProblemCauseSolution
No events receivedRouter not runningWait for <-router.Running() before inference
Events droppedTopic mismatchEnsure sink and handler use same topic
Duplicate eventsMultiple handlers same nameUse unique handler names
Handler not calledHandler added after RunAdd handlers before calling router.Run()
Context cancelledRouter stopped earlyCheck errgroup errors

Built-in Handlers

Geppetto provides ready-to-use handlers:

// Text streaming to stdout
router.AddHandler("chat", "chat", events.StepPrinterFunc("", os.Stdout))

// Structured output (JSON/YAML/text)
printer := events.NewStructuredPrinter(os.Stdout, events.PrinterOptions{
    Format:          events.FormatJSON,
    IncludeMetadata: true,
})
router.AddHandler("structured", "chat", printer)

// Raw event dumping (debugging)
router.AddHandler("debug", "chat", router.DumpRawEvents)

See Also

  • Events — Full events reference
  • Streaming Tutorial — Complete streaming example
  • Example: geppetto/cmd/examples/streaming-inference/main.go