A flexible, state-aware step execution engine for Go applications. The engine provides a framework for executing sequences of operations with persistent state tracking, error handling, and support for different execution patterns.
- State Persistence: Pluggable state backends (Kubernetes conditions, databases, etc.)
- Flexible Execution: Support for sequential, parallel, and grouped step execution
- Error Recovery: Built-in retry logic, graceful error handling, and step continuation
- Throttling: Built-in rate limiting and throttling capabilities
- Observability: OpenTelemetry integration for tracing and logging
- Kubernetes Native: First-class support for Kubernetes controller patterns
package main
import (
"context"
"github.com/weka/go-steps-engine/lifecycle"
)
func main() {
ctx := context.Background()
// Create steps
steps := []lifecycle.Step{
&lifecycle.SimpleStep{
Name: "initialize",
Run: func(ctx context.Context) error {
// Your initialization logic here
return nil
},
},
&lifecycle.SimpleStep{
Name: "process",
Run: func(ctx context.Context) error {
// Your processing logic here
return nil
},
},
}
// Create and run engine
engine := &lifecycle.StepsEngine{
Steps: steps,
}
if err := engine.Run(ctx); err != nil {
panic(err)
}
}The main execution engine that runs a sequence of steps. It handles:
- Step execution order and dependencies
- State persistence through StateKeeper
- Error handling and recovery
- Throttling and rate limiting
Provides an abstraction for persisting step states across different backends:
type StateKeeper interface {
GetSummaryAttributes() map[string]string
GetSummary() string
GetStepState(ctx context.Context, stepName string) (*StepState, error)
SetStepState(ctx context.Context, state *StepState) error
SupportsRunningState() bool
}- SimpleStep: Basic sequential step execution
- ParallelSteps: Execute multiple steps concurrently
- GroupedSteps: Execute steps in groups with dependencies
- DynamicStep: Steps that generate other steps dynamically
Persists step states as Kubernetes conditions on custom resources:
// Create a StateKeeper for a Kubernetes object
stateKeeper := &lifecycle.K8sObject{
Client: kubeClient,
Object: myCustomResource,
Conditions: &myCustomResource.Status.Conditions,
}
engine := &lifecycle.StepsEngine{
StateKeeper: stateKeeper,
Steps: steps,
}State Mapping:
StepStatusSucceeded→ConditionTrueStepStatusFailed→ConditionFalseStepStatusPending→ConditionUnknownor missing conditionStepStatusRunning→ Skipped (K8s conditions don't support running state)
Persists step states in a database with full lifecycle tracking:
// Example implementation for database state keeper
type FsdbStateKeeper struct {
stateManager *StateManager
executionID string
}
// Supports all step states including running state
func (o *FsdbStateKeeper) SupportsRunningState() bool {
return true
}steps := []lifecycle.Step{
&lifecycle.SimpleStep{
Name: "validate-input",
State: &lifecycle.State{Name: "validate-input"},
Run: validateInput,
},
&lifecycle.SimpleStep{
Name: "process-data",
State: &lifecycle.State{Name: "process-data"},
Run: processData,
// This step depends on validate-input succeeding
},
&lifecycle.SimpleStep{
Name: "save-results",
State: &lifecycle.State{Name: "save-results"},
Run: saveResults,
},
}parallelStep := &lifecycle.ParallelSteps{
Name: "parallel-processing",
Steps: []lifecycle.ParallelSubStep{
{Name: "process-batch-1", Run: processBatch1},
{Name: "process-batch-2", Run: processBatch2},
{Name: "process-batch-3", Run: processBatch3},
},
}Enable state tracking by providing a State struct. If you want state tracking, set State to a non-nil value with an explicit Name field. If you don't want state tracking, omit State or set it to nil.
// Basic state tracking with explicit name
step := &lifecycle.SimpleStep{
Name: "deploy-app",
State: &lifecycle.State{Name: "deploy-app"}, // Explicit state name required
Run: deployApplication,
}
// Advanced state tracking with custom condition name and messages
step := &lifecycle.SimpleStep{
Name: "deploy-app",
State: &lifecycle.State{
Name: "AppDeployed", // Required: Custom condition name in K8s
Reason: "DeploymentReady", // Optional: Custom reason for success
Message: "Application deployed successfully", // Optional: Custom success message
},
Run: deployApplication,
// Skip this step if AppDeployed condition is already True
}
// No state tracking
step := &lifecycle.SimpleStep{
Name: "temporary-operation",
// State: nil (default) - no state tracking
Run: temporaryOperation,
}step := &lifecycle.SimpleStep{
Name: "risky-operation",
Run: riskyOperation,
ContinueOnError: true, // Continue even if this step fails
OnFail: func(ctx context.Context, stepName string, err error) error {
// Custom cleanup or logging
log.Printf("Step %s failed: %v", stepName, err)
return nil
},
}step := &lifecycle.SimpleStep{
Name: "conditional-step",
Run: conditionalOperation,
Predicates: []lifecycle.PredicateFunc{
func() bool {
return someCondition() // Only run if this returns true
},
},
AbortOnPredicatesFalse: false, // Skip instead of aborting
}step := &lifecycle.SimpleStep{
Name: "rate-limited-operation",
Run: expensiveOperation,
Throttling: &throttling.ThrottlingSettings{
Interval: time.Minute * 5,
EnsureStepSuccess: true,
},
}type CustomStateKeeper struct {
// Your custom state storage
}
func (c *CustomStateKeeper) GetSummaryAttributes() map[string]string {
return map[string]string{"backend": "custom", "version": "1.0"}
}
func (c *CustomStateKeeper) GetSummary() string {
return "CustomStateKeeper:v1.0"
}
func (c *CustomStateKeeper) GetStepState(ctx context.Context, stepName string) (*lifecycle.StepState, error) {
// Retrieve state from your backend
}
func (c *CustomStateKeeper) SetStepState(ctx context.Context, state *lifecycle.StepState) error {
// Persist state to your backend
}
func (c *CustomStateKeeper) SupportsRunningState() bool {
return true // or false, depending on your backend capabilities
}func (r *MyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Get your custom resource
obj := &v1.MyCustomResource{}
if err := r.Get(ctx, req.NamespacedName, obj); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// Create StateKeeper
stateKeeper := &lifecycle.K8sObject{
Client: r.Client,
Object: obj,
Conditions: &obj.Status.Conditions,
}
// Define reconciliation steps
steps := []lifecycle.Step{
&lifecycle.SimpleStep{
Name: "validate-spec",
State: &lifecycle.State{
Name: "SpecValid", // Required explicit name
},
Run: r.validateSpec,
},
&lifecycle.SimpleStep{
Name: "deploy-resources",
State: &lifecycle.State{
Name: "ResourcesDeployed", // Required explicit name
},
Run: r.deployResources,
},
&lifecycle.SimpleStep{
Name: "wait-ready",
State: &lifecycle.State{
Name: "Ready", // Required explicit name
},
Run: r.waitForReady,
},
}
// Run steps engine
engine := &lifecycle.StepsEngine{
StateKeeper: stateKeeper,
Steps: steps,
}
return engine.RunAsReconcilerResponse(ctx)
}The engine defines several error types for different scenarios:
// Expected errors that don't cause step failure
err := lifecycle.NewExpectedError(someError)
// Errors that should cause a wait/retry
err := lifecycle.NewWaitError(someError)
err := lifecycle.NewWaitErrorWithDuration(someError, 30*time.Second)
// Retryable errors with custom retry timing
err := &lifecycle.RetryableError{
Err: someError,
RetryAfter: time.Minute * 5,
}func TestMyStep(t *testing.T) {
ctx := context.Background()
// Create mock StateKeeper if needed
stateKeeper := &MockStateKeeper{}
step := &lifecycle.SimpleStep{
Name: "test-step",
Run: func(ctx context.Context) error {
// Your step logic
return nil
},
}
engine := &lifecycle.StepsEngine{
StateKeeper: stateKeeper,
Steps: []lifecycle.Step{step},
}
err := engine.Run(ctx)
assert.NoError(t, err)
}func TestWithRealStateKeeper(t *testing.T) {
// Set up real Kubernetes client or database connection
// Run full integration test
}-
State Management
- Always provide explicit
State.Namewhen using state tracking - Use meaningful state names that map well to condition names
- Provide clear reasons and messages for debugging
- Consider whether your StateKeeper needs to track running states
- Always provide explicit
-
Error Handling
- Use appropriate error types (
ExpectedError,WaitError, etc.) - Implement proper cleanup in failure callbacks
- Consider using
ContinueOnErrorfor non-critical steps
- Use appropriate error types (
-
Performance
- Use throttling for expensive operations
- Consider parallel execution for independent steps
- Monitor state persistence overhead
-
Observability
- Leverage the built-in OpenTelemetry integration
- Use meaningful step names for tracing
- Include context in state messages
-
Testing
- Test steps individually with mock StateKeepers
- Include integration tests with real backends
- Test error scenarios and recovery paths
- Follow Go best practices and conventions
- Add tests for new features
- Update documentation for interface changes
- Consider backward compatibility for public APIs