Skip to content

weka/go-steps-engine

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

20 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Go Steps Engine

A flexible, state-aware step execution engine for Go applications. The engine provides a framework for executing sequences of operations with persistent state tracking, error handling, and support for different execution patterns.

Features

  • State Persistence: Pluggable state backends (Kubernetes conditions, databases, etc.)
  • Flexible Execution: Support for sequential, parallel, and grouped step execution
  • Error Recovery: Built-in retry logic, graceful error handling, and step continuation
  • Throttling: Built-in rate limiting and throttling capabilities
  • Observability: OpenTelemetry integration for tracing and logging
  • Kubernetes Native: First-class support for Kubernetes controller patterns

Quick Start

package main

import (
    "context"
    
    "github.com/weka/go-steps-engine/lifecycle"
)

func main() {
    ctx := context.Background()
    
    // Create steps
    steps := []lifecycle.Step{
        &lifecycle.SimpleStep{
            Name: "initialize",
            Run: func(ctx context.Context) error {
                // Your initialization logic here
                return nil
            },
        },
        &lifecycle.SimpleStep{
            Name: "process",
            Run: func(ctx context.Context) error {
                // Your processing logic here
                return nil
            },
        },
    }
    
    // Create and run engine
    engine := &lifecycle.StepsEngine{
        Steps: steps,
    }
    
    if err := engine.Run(ctx); err != nil {
        panic(err)
    }
}

Architecture

Core Components

StepsEngine

The main execution engine that runs a sequence of steps. It handles:

  • Step execution order and dependencies
  • State persistence through StateKeeper
  • Error handling and recovery
  • Throttling and rate limiting

StateKeeper Interface

Provides an abstraction for persisting step states across different backends:

type StateKeeper interface {
    GetSummaryAttributes() map[string]string
    GetSummary() string
    GetStepState(ctx context.Context, stepName string) (*StepState, error)
    SetStepState(ctx context.Context, state *StepState) error
    SupportsRunningState() bool
}

Step Types

  • SimpleStep: Basic sequential step execution
  • ParallelSteps: Execute multiple steps concurrently
  • GroupedSteps: Execute steps in groups with dependencies
  • DynamicStep: Steps that generate other steps dynamically

State Backends

Kubernetes StateKeeper (K8sObject)

Persists step states as Kubernetes conditions on custom resources:

// Create a StateKeeper for a Kubernetes object
stateKeeper := &lifecycle.K8sObject{
    Client:     kubeClient,
    Object:     myCustomResource,
    Conditions: &myCustomResource.Status.Conditions,
}

engine := &lifecycle.StepsEngine{
    StateKeeper: stateKeeper,
    Steps:       steps,
}

State Mapping:

  • StepStatusSucceededConditionTrue
  • StepStatusFailedConditionFalse
  • StepStatusPendingConditionUnknown or missing condition
  • StepStatusRunning → Skipped (K8s conditions don't support running state)

Database StateKeeper (FSDB)

Persists step states in a database with full lifecycle tracking:

// Example implementation for database state keeper
type FsdbStateKeeper struct {
    stateManager *StateManager
    executionID  string
}

// Supports all step states including running state
func (o *FsdbStateKeeper) SupportsRunningState() bool {
    return true
}

Usage Patterns

Basic Sequential Execution

steps := []lifecycle.Step{
    &lifecycle.SimpleStep{
        Name:  "validate-input",
        State: &lifecycle.State{Name: "validate-input"},
        Run:   validateInput,
    },
    &lifecycle.SimpleStep{
        Name:  "process-data",
        State: &lifecycle.State{Name: "process-data"},
        Run:   processData,
        // This step depends on validate-input succeeding
    },
    &lifecycle.SimpleStep{
        Name:  "save-results",
        State: &lifecycle.State{Name: "save-results"},
        Run:   saveResults,
    },
}

Parallel Execution

parallelStep := &lifecycle.ParallelSteps{
    Name: "parallel-processing",
    Steps: []lifecycle.ParallelSubStep{
        {Name: "process-batch-1", Run: processBatch1},
        {Name: "process-batch-2", Run: processBatch2},
        {Name: "process-batch-3", Run: processBatch3},
    },
}

State-Aware Steps

Enable state tracking by providing a State struct. If you want state tracking, set State to a non-nil value with an explicit Name field. If you don't want state tracking, omit State or set it to nil.

// Basic state tracking with explicit name
step := &lifecycle.SimpleStep{
    Name:  "deploy-app",
    State: &lifecycle.State{Name: "deploy-app"}, // Explicit state name required
    Run:   deployApplication,
}

// Advanced state tracking with custom condition name and messages
step := &lifecycle.SimpleStep{
    Name: "deploy-app",
    State: &lifecycle.State{
        Name:    "AppDeployed",                    // Required: Custom condition name in K8s
        Reason:  "DeploymentReady",               // Optional: Custom reason for success
        Message: "Application deployed successfully", // Optional: Custom success message
    },
    Run: deployApplication,
    // Skip this step if AppDeployed condition is already True
}

// No state tracking
step := &lifecycle.SimpleStep{
    Name: "temporary-operation",
    // State: nil (default) - no state tracking
    Run: temporaryOperation,
}

Error Handling

step := &lifecycle.SimpleStep{
    Name: "risky-operation",
    Run: riskyOperation,
    ContinueOnError: true,  // Continue even if this step fails
    OnFail: func(ctx context.Context, stepName string, err error) error {
        // Custom cleanup or logging
        log.Printf("Step %s failed: %v", stepName, err)
        return nil
    },
}

Predicates and Conditional Execution

step := &lifecycle.SimpleStep{
    Name: "conditional-step",
    Run: conditionalOperation,
    Predicates: []lifecycle.PredicateFunc{
        func() bool {
            return someCondition() // Only run if this returns true
        },
    },
    AbortOnPredicatesFalse: false, // Skip instead of aborting
}

Throttling

step := &lifecycle.SimpleStep{
    Name: "rate-limited-operation",
    Run: expensiveOperation,
    Throttling: &throttling.ThrottlingSettings{
        Interval:          time.Minute * 5,
        EnsureStepSuccess: true,
    },
}

Advanced Features

Custom StateKeeper Implementation

type CustomStateKeeper struct {
    // Your custom state storage
}

func (c *CustomStateKeeper) GetSummaryAttributes() map[string]string {
    return map[string]string{"backend": "custom", "version": "1.0"}
}

func (c *CustomStateKeeper) GetSummary() string {
    return "CustomStateKeeper:v1.0"
}

func (c *CustomStateKeeper) GetStepState(ctx context.Context, stepName string) (*lifecycle.StepState, error) {
    // Retrieve state from your backend
}

func (c *CustomStateKeeper) SetStepState(ctx context.Context, state *lifecycle.StepState) error {
    // Persist state to your backend
}

func (c *CustomStateKeeper) SupportsRunningState() bool {
    return true // or false, depending on your backend capabilities
}

Kubernetes Controller Integration

func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // Get your custom resource
    obj := &v1.MyCustomResource{}
    if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }
    
    // Create StateKeeper
    stateKeeper := &lifecycle.K8sObject{
        Client:     r.Client,
        Object:     obj,
        Conditions: &obj.Status.Conditions,
    }
    
    // Define reconciliation steps
    steps := []lifecycle.Step{
        &lifecycle.SimpleStep{
            Name: "validate-spec",
            State: &lifecycle.State{
                Name: "SpecValid",  // Required explicit name
            },
            Run: r.validateSpec,
        },
        &lifecycle.SimpleStep{
            Name: "deploy-resources",
            State: &lifecycle.State{
                Name: "ResourcesDeployed",  // Required explicit name
            },
            Run: r.deployResources,
        },
        &lifecycle.SimpleStep{
            Name: "wait-ready",
            State: &lifecycle.State{
                Name: "Ready",  // Required explicit name
            },
            Run: r.waitForReady,
        },
    }
    
    // Run steps engine
    engine := &lifecycle.StepsEngine{
        StateKeeper: stateKeeper,
        Steps:       steps,
    }
    
    return engine.RunAsReconcilerResponse(ctx)
}

Error Types

The engine defines several error types for different scenarios:

// Expected errors that don't cause step failure
err := lifecycle.NewExpectedError(someError)

// Errors that should cause a wait/retry
err := lifecycle.NewWaitError(someError) 
err := lifecycle.NewWaitErrorWithDuration(someError, 30*time.Second)

// Retryable errors with custom retry timing
err := &lifecycle.RetryableError{
    Err:        someError,
    RetryAfter: time.Minute * 5,
}

Testing

Unit Testing Steps

func TestMyStep(t *testing.T) {
    ctx := context.Background()
    
    // Create mock StateKeeper if needed
    stateKeeper := &MockStateKeeper{}
    
    step := &lifecycle.SimpleStep{
        Name: "test-step",
        Run: func(ctx context.Context) error {
            // Your step logic
            return nil
        },
    }
    
    engine := &lifecycle.StepsEngine{
        StateKeeper: stateKeeper,
        Steps:       []lifecycle.Step{step},
    }
    
    err := engine.Run(ctx)
    assert.NoError(t, err)
}

Integration Testing with Real StateKeeper

func TestWithRealStateKeeper(t *testing.T) {
    // Set up real Kubernetes client or database connection
    // Run full integration test
}

Best Practices

  1. State Management

    • Always provide explicit State.Name when using state tracking
    • Use meaningful state names that map well to condition names
    • Provide clear reasons and messages for debugging
    • Consider whether your StateKeeper needs to track running states
  2. Error Handling

    • Use appropriate error types (ExpectedError, WaitError, etc.)
    • Implement proper cleanup in failure callbacks
    • Consider using ContinueOnError for non-critical steps
  3. Performance

    • Use throttling for expensive operations
    • Consider parallel execution for independent steps
    • Monitor state persistence overhead
  4. Observability

    • Leverage the built-in OpenTelemetry integration
    • Use meaningful step names for tracing
    • Include context in state messages
  5. Testing

    • Test steps individually with mock StateKeepers
    • Include integration tests with real backends
    • Test error scenarios and recovery paths

Contributing

  1. Follow Go best practices and conventions
  2. Add tests for new features
  3. Update documentation for interface changes
  4. Consider backward compatibility for public APIs

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages