Step-by-step guide to implement a custom event handler, subscribe it to the router, and parse incoming events.
This playbook walks through adding a custom event handler to your Geppetto application. By the end, your handler will receive streaming events from inference and process them as needed (logging, aggregation, forwarding, etc.).
The router transports events via Watermill. Create it early in your application:
import "github.com/go-go-golems/geppetto/pkg/events"
router, err := events.NewEventRouter()
if err != nil {
return fmt.Errorf("failed to create router: %w", err)
}
defer router.Close()
Options:
events.WithVerbose(true) — Enable Watermill debug loggingevents.WithPublisher(pub) / events.WithSubscriber(sub) — Use external transport (NATS, Redis, Kafka)A handler receives a *message.Message from Watermill and must call msg.Ack() when done:
import (
"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-go-golems/geppetto/pkg/events"
)
func myCustomHandler(msg *message.Message) error {
defer msg.Ack() // Always acknowledge the message
// Parse the raw JSON into a typed event
ev, err := events.NewEventFromJson(msg.Payload)
if err != nil {
return fmt.Errorf("failed to parse event: %w", err)
}
// Handle specific event types
switch e := ev.(type) {
case *events.EventPartialCompletionStart:
fmt.Println("Stream started")
case *events.EventPartialCompletion:
fmt.Printf("Delta: %s\n", e.Delta)
case *events.EventFinal:
fmt.Printf("Final text: %s\n", e.Text)
fmt.Printf("Tokens: %d input, %d output\n",
e.Metadata().Usage.InputTokens,
e.Metadata().Usage.OutputTokens)
case *events.EventToolCall:
fmt.Printf("Tool call: %s(%s)\n", e.ToolCall.Name, e.ToolCall.Input)
case *events.EventToolResult:
fmt.Printf("Tool result [%s]: %s\n", e.ToolResult.ID, e.ToolResult.Result)
case *events.EventError:
return fmt.Errorf("inference error: %s", e.ErrorString)
}
return nil
}
Watermill handlers run on the router’s goroutines. If you block a handler (DB writes, HTTP calls, heavy JSON processing), you can unintentionally stall the entire event pipeline — which is especially noticeable during streaming inference (many partial events).
Rules of thumb:
Ack() messages. If you forget, some transports will stall and/or re-deliver.msg.Context() as the context for durable side-effects (DB writes). It is scoped to message delivery and can be canceled unexpectedly relative to your persistence needs.func persistSomething(ctx context.Context, payload []byte) error {
// do DB work / network I/O
return nil
}
func myHandler(msg *message.Message) error {
defer msg.Ack()
ioCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = persistSomething(ioCtx, msg.Payload) // best-effort
return nil
}
If you are using the default in-memory router (events.NewEventRouter()), also see the warning in Events about gochannel configuration for streaming apps.
Add your handler to the router with a unique name and topic:
router.AddHandler(
"my-custom-handler", // Handler name (unique within router)
"chat", // Topic to subscribe to
myCustomHandler, // Your handler function
)
Important: The topic must match the one used when creating the sink (Step 5).
The sink connects your engine to the router:
import "github.com/go-go-golems/geppetto/pkg/inference/middleware"
sink := middleware.NewWatermillSink(router.Publisher, "chat")
Create the engine normally (no sink/options are passed at engine construction time):
eng, err := factory.NewEngineFromParsedValues(parsedValues)
if err != nil {
return err
}
Use errgroup to coordinate the router and inference:
import "golang.org/x/sync/errgroup"
eg, groupCtx := errgroup.WithContext(ctx)
// Start the router
eg.Go(func() error {
return router.Run(groupCtx)
})
// Run inference after router is ready
eg.Go(func() error {
<-router.Running() // Wait for router to be ready
// Attach sink to context so provider engines and helpers publish to the router
runCtx := events.WithEventSinks(groupCtx, sink)
// Run your inference
_, err := eng.RunInference(runCtx, turn)
return err
})
if err := eg.Wait(); err != nil {
return err
}
package main
import (
"context"
"fmt"
"os"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/go-go-golems/geppetto/pkg/events"
"github.com/go-go-golems/geppetto/pkg/inference/engine"
"github.com/go-go-golems/geppetto/pkg/inference/engine/factory"
"github.com/go-go-golems/geppetto/pkg/inference/middleware"
"github.com/go-go-golems/geppetto/pkg/turns"
"golang.org/x/sync/errgroup"
)
func main() {
ctx := context.Background()
// 1. Create router
router, _ := events.NewEventRouter()
defer router.Close()
// 2. Add built-in text printer
router.AddHandler("printer", "chat", events.StepPrinterFunc("", os.Stdout))
// 3. Add custom aggregator
var tokenCount int
router.AddHandler("aggregator", "chat", func(msg *message.Message) error {
defer msg.Ack()
ev, _ := events.NewEventFromJson(msg.Payload)
if final, ok := ev.(*events.EventFinal); ok {
tokenCount = final.Metadata().Usage.InputTokens + final.Metadata().Usage.OutputTokens
}
return nil
})
// 4. Create sink and engine
sink := middleware.NewWatermillSink(router.Publisher, "chat")
eng, _ := factory.NewEngineFromParsedValues(parsedValues)
// 5. Build Turn
turn := &turns.Turn{}
turns.AppendBlock(turn, turns.NewUserTextBlock("Hello!"))
// 6. Run concurrently
eg, groupCtx := errgroup.WithContext(ctx)
eg.Go(func() error { return router.Run(groupCtx) })
eg.Go(func() error {
<-router.Running()
_, err := eng.RunInference(events.WithEventSinks(groupCtx, sink), turn)
return err
})
_ = eg.Wait()
fmt.Printf("\nTotal tokens: %d\n", tokenCount)
}
Register custom event types to flow through the same infrastructure:
import "github.com/go-go-golems/geppetto/pkg/events"
// Define custom event
type MyProgressEvent struct {
events.EventImpl
Progress float64 `json:"progress"`
Message string `json:"message"`
}
// Register decoder in init()
func init() {
_ = events.RegisterEventFactory("my-progress", func() events.Event {
return &MyProgressEvent{EventImpl: events.EventImpl{Type_: "my-progress"}}
})
}
// Publish from your code
func publishProgress(ctx context.Context, progress float64, msg string) {
ev := &MyProgressEvent{
EventImpl: events.EventImpl{
Type_: "my-progress",
Metadata_: events.EventMetadata{ID: uuid.New()},
},
Progress: progress,
Message: msg,
}
events.PublishEventToContext(ctx, ev)
}
// Handle in your handler
func myHandler(msg *message.Message) error {
defer msg.Ack()
ev, _ := events.NewEventFromJson(msg.Payload)
if progress, ok := ev.(*MyProgressEvent); ok {
fmt.Printf("Progress: %.0f%% - %s\n", progress.Progress*100, progress.Message)
}
return nil
}
| Problem | Cause | Solution |
|---|---|---|
| No events received | Router not running | Wait for <-router.Running() before inference |
| Events dropped | Topic mismatch | Ensure sink and handler use same topic |
| Duplicate events | Multiple handlers same name | Use unique handler names |
| Handler not called | Handler added after Run | Add handlers before calling router.Run() |
| Context cancelled | Router stopped early | Check errgroup errors |
Geppetto provides ready-to-use handlers:
// Text streaming to stdout
router.AddHandler("chat", "chat", events.StepPrinterFunc("", os.Stdout))
// Structured output (JSON/YAML/text)
printer := events.NewStructuredPrinter(os.Stdout, events.PrinterOptions{
Format: events.FormatJSON,
IncludeMetadata: true,
})
router.AddHandler("structured", "chat", printer)
// Raw event dumping (debugging)
router.AddHandler("debug", "chat", router.DumpRawEvents)
geppetto/cmd/examples/streaming-inference/main.go