A comprehensive guide on how to build custom processors using the provided middleware and types system in Glazed.
This guide explains how to build custom processors using the provided middleware and types system. We'll cover the core concepts, type definitions, and provide examples of implementing various processors.
The processing system is built around three main concepts:
type TableName = string
type FieldName = string
type GenericCellValue = interface{}
type Row = *orderedmap.OrderedMap[FieldName, GenericCellValue]
type Table struct {
Columns []FieldName
Rows []Row
finalized bool
}
type Processor interface {
AddRow(ctx context.Context, obj Row) error
Close(ctx context.Context) error
}
type ObjectMiddleware interface {
Process(ctx context.Context, row Row) ([]Row, error)
Close(ctx context.Context) error
}
type RowMiddleware interface {
Process(ctx context.Context, row Row) ([]Row, error)
Close(ctx context.Context) error
}
type TableMiddleware interface {
Process(ctx context.Context, table *Table) (*Table, error)
Close(ctx context.Context) error
}
Rows are implemented using OrderedMap, which preserves field order. Here's how to work with them:
// Create an empty row
row := types.NewRow()
// Create a row with initial data
row := types.NewRow(
types.MRP("name", "John"),
types.MRP("age", 30),
)
// Create from a map (fields will be sorted alphabetically)
data := map[string]interface{}{
"age": 30,
"name": "John",
}
row := types.NewRowFromMap(data)
// Create from a struct
type Person struct {
Name string
Age int
}
person := Person{Name: "John", Age: 30}
row := types.NewRowFromStruct(person, true) // true = lowercase keys
// Set values
row.Set("email", "john@example.com")
// Get values
value, exists := row.Get("name")
// Iterate over fields
for pair := row.Oldest(); pair != nil; pair = pair.Next() {
fmt.Printf("%s: %v\n", pair.Key, pair.Value)
}
Here's an example of a middleware that adds a timestamp to each row:
type TimestampMiddleware struct{}
func (m *TimestampMiddleware) Process(ctx context.Context, row Row) ([]Row, error) {
row.Set("timestamp", time.Now().Unix())
return []Row{row}, nil
}
func (m *TimestampMiddleware) Close(ctx context.Context) error {
return nil
}
This middleware filters rows based on a condition:
type AgeFilterMiddleware struct {
minAge int
}
func (m *AgeFilterMiddleware) Process(ctx context.Context, row Row) ([]Row, error) {
age, exists := row.Get("age")
if !exists {
return []Row{}, nil
}
if ageVal, ok := age.(int); ok && ageVal >= m.minAge {
return []Row{row}, nil
}
return []Row{}, nil
}
func (m *AgeFilterMiddleware) Close(ctx context.Context) error {
return nil
}
This middleware sorts rows by a specific column:
type SortTableMiddleware struct {
sortColumn string
}
func (m *SortTableMiddleware) Process(ctx context.Context, table *Table) (*Table, error) {
sort.Slice(table.Rows, func(i, j int) bool {
val1, _ := table.Rows[i].Get(m.sortColumn)
val2, _ := table.Rows[j].Get(m.sortColumn)
str1 := fmt.Sprintf("%v", val1)
str2 := fmt.Sprintf("%v", val2)
return str1 < str2
})
return table, nil
}
func (m *SortTableMiddleware) Close(ctx context.Context) error {
return nil
}
The TableProcessor combines multiple middlewares:
// Create a new processor
processor := NewTableProcessor(
WithObjectMiddleware(&TimestampMiddleware{}),
WithRowMiddleware(&AgeFilterMiddleware{minAge: 18}),
WithTableMiddleware(&SortTableMiddleware{sortColumn: "name"}),
)
// Process rows
row := types.NewRow(
types.MRP("name", "John"),
types.MRP("age", 25),
)
err := processor.AddRow(context.Background(), row)
if err != nil {
// Handle error
}
// Close processor to apply table middlewares
err = processor.Close(context.Background())
if err != nil {
// Handle error
}
// Get processed table
table := processor.GetTable()
The processing pipeline follows this order:
func (m *MyMiddleware) Process(ctx context.Context, row Row) ([]Row, error) {
newRow := types.NewRowFromRow(row) // Create a copy
// Modify newRow
return []Row{newRow}, nil
}
func (m *MyMiddleware) Process(ctx context.Context, row Row) ([]Row, error) {
rows := []Row{}
// Create multiple rows from input
return rows, nil
}
func (m *MyMiddleware) Process(ctx context.Context, row Row) ([]Row, error) {
if shouldKeep(row) {
return []Row{row}, nil
}
return []Row{}, nil
}