Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ construction time — this is the recommended pattern. The embedded
flows in automatically. The legacy `flow.SubWorkflow` type is deprecated
and will be removed in the next major version.

## Passing values through `context.Context`

Cross-cutting capabilities — a logger, an Azure identity, a Kubernetes
client — belong in `ctx`, not in every step's constructor. `flow.ContextKey[T]`
is the type-safe key helper for putting them there; `flow.Logger`
(`ContextKey[*slog.Logger]`) is the canonical key for structured logging
and `flow.LogStepFields` / `flow.LogAttemptField` are ready-to-use
interceptors that tag every log line with `step=<name>` and `attempt=N`.

See `example/04_context_values_test.go` and the godoc on `flow.ContextKey`
/ `flow.Logger` / `flow.LogStepFields` for runnable examples.

## Learn more

- **[`example/`](./example)** — runnable, narrated examples for every feature, in increasing
Expand Down
50 changes: 50 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package flow

import "context"

// ContextKey is the typed-key helper for flowing values through a
// context.Context across go-workflow and its users. Declare a package-level
// variable of ContextKey[T] per value type and use With / From / FromOr at
// the call site:
//
// type Identity struct{ TenantID, SubID string }
// var IdentityKey = flow.ContextKey[Identity]{}
//
// // Caller injects:
// ctx = IdentityKey.With(ctx, Identity{TenantID: "t", SubID: "s"})
//
// // Steper reads:
// func (s *MyStep) Do(ctx context.Context) error {
// id, ok := IdentityKey.From(ctx)
// if !ok {
// return errors.New("identity required")
// }
// // ... use id ...
// return nil
// }
//
// Uniqueness is by T alone (ContextKey[T] is a zero-size struct used as
// the underlying context key), so two ContextKey[Identity]{} variables
// declared in different packages will collide. Each value type should
// have exactly one canonical key variable, exported by the package that
// owns the type.
type ContextKey[T any] struct{}

// With returns a new context carrying v under k.
func (k ContextKey[T]) With(ctx context.Context, v T) context.Context {
return context.WithValue(ctx, k, v)
}

// From returns the value stored under k and whether it was present.
func (k ContextKey[T]) From(ctx context.Context) (T, bool) {
v, ok := ctx.Value(k).(T)
return v, ok
}

// FromOr returns the value stored under k, or def if no value is present.
func (k ContextKey[T]) FromOr(ctx context.Context, def T) T {
if v, ok := k.From(ctx); ok {
return v
}
return def
}
66 changes: 66 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package flow

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestContextKey(t *testing.T) {
type creds struct{ Token string }
var key = ContextKey[creds]{}

t.Run("From returns ok=false when unset", func(t *testing.T) {
_, ok := key.From(context.Background())
assert.False(t, ok)
})

t.Run("FromOr returns default when unset", func(t *testing.T) {
def := creds{Token: "default"}
got := key.FromOr(context.Background(), def)
assert.Equal(t, def, got)
})

t.Run("With + From round-trips the value", func(t *testing.T) {
want := creds{Token: "abc"}
ctx := key.With(context.Background(), want)
got, ok := key.From(ctx)
assert.True(t, ok)
assert.Equal(t, want, got)
})

t.Run("FromOr returns stored value over default", func(t *testing.T) {
want := creds{Token: "abc"}
ctx := key.With(context.Background(), want)
got := key.FromOr(ctx, creds{Token: "fallback"})
assert.Equal(t, want, got)
})

t.Run("keys with different T do not collide", func(t *testing.T) {
var keyA = ContextKey[string]{}
var keyB = ContextKey[int]{}
ctx := keyA.With(context.Background(), "hello")
ctx = keyB.With(ctx, 42)

gotA, okA := keyA.From(ctx)
gotB, okB := keyB.From(ctx)
assert.True(t, okA)
assert.Equal(t, "hello", gotA)
assert.True(t, okB)
assert.Equal(t, 42, gotB)
})

t.Run("two distinct ContextKey[T] vars share the same key", func(t *testing.T) {
// ContextKey[T]{} is a zero-size struct keyed only by T, so two
// separate package-level vars of the same T will collide. This
// documents the contract: each value type should have ONE canonical
// key var (typically exported by the package that owns the type).
var k1 = ContextKey[string]{}
var k2 = ContextKey[string]{}
ctx := k1.With(context.Background(), "x")
got, ok := k2.From(ctx)
assert.True(t, ok)
assert.Equal(t, "x", got)
})
}
77 changes: 77 additions & 0 deletions example/04_context_values_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package flow_test

import (
"context"
"fmt"
"log/slog"
"os"

flow "github.com/Azure/go-workflow"
)

// # Passing values through ctx
//
// **What you'll learn**
// - Use `flow.ContextKey[T]` to put a typed value in `ctx` once and read
// it in any Steper — the everyday pattern for sharing things like a
// request ID, tenant, logger or client across a Workflow.
// - Reach for the prebuilt `flow.Logger` key (`ContextKey[*slog.Logger]`)
// when the value is a structured logger; combine with
// `flow.LogStepFields` / `flow.LogAttemptField` to tag every log line
// with the current step name and attempt number for free.

// UserKey is the canonical key for a request-scoped user. Declare one
// `ContextKey[T]` per value type — uniqueness is by T, so two such vars
// of the same T deliberately share the same key.
var UserKey = flow.ContextKey[string]{}

// ExampleContextKey shows the bare-bones pattern: caller injects, step reads.
func ExampleContextKey() {
greet := flow.Func("Greet", func(ctx context.Context) error {
fmt.Println("hello", UserKey.FromOr(ctx, "anonymous"))
return nil
})

w := new(flow.Workflow)
w.Add(flow.Step(greet))

ctx := UserKey.With(context.Background(), "ada")
_ = w.Do(ctx)
// Output:
// hello ada
}

// ExampleLogger shows the prebuilt flow.Logger key together with the two
// log interceptors. The step's Do() only logs business attributes;
// step=Greet and attempt=0 come along for free.
func ExampleLogger() {
// Plain text logger to stdout, with time/level stripped so // Output:
// stays deterministic. Real callers just pass slog.Default() or their
// own *slog.Logger.
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
ReplaceAttr: func(_ []string, a slog.Attr) slog.Attr {
if a.Key == slog.TimeKey || a.Key == slog.LevelKey {
return slog.Attr{}
}
return a
},
}))

greet := flow.Func("Greet", func(ctx context.Context) error {
flow.Logger.FromOr(ctx, slog.Default()).Info("hi", "name", "ada")
return nil
})

w := &flow.Workflow{
Option: flow.WorkflowOption{
StepInterceptors: []flow.StepInterceptor{flow.LogStepFields()},
AttemptInterceptors: []flow.AttemptInterceptor{flow.LogAttemptField()},
},
}
w.Add(flow.Step(greet))

ctx := flow.Logger.With(context.Background(), logger)
_ = w.Do(ctx)
// Output:
// msg=hi step=Greet attempt=0 name=ada
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
//
// **Where they fit**
//
// StepInterceptor (workflow-level, see 10_observability_test.go)
// StepInterceptor (workflow-level, see 11_observability_test.go)
// └── retry loop (one iteration per attempt)
// └── AttemptInterceptor (workflow-level)
// └── BeforeStep callbacks ← runs once PER ATTEMPT
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
// **When to reach for which mechanism**
//
// Need to log/trace every Step? → Interceptor (this file).
// Need to react to upstream's terminal status → Condition (05_conditions).
// Need to react to upstream's terminal status → Condition (06_conditions).
// Need behaviour for one specific Step? → BeforeStep / AfterStep
// (04_callbacks).
// (05_callbacks).
//
// **Caveats**
// - Steps settled inline as Skipped/Canceled by their Condition bypass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// keyed by failed Step. Iterate to print them all, or use `errors.As`.
// - Use `Workflow.StateOf(step).GetStatus()` to inspect any Step's
// terminal status post-run.
// - For per-Step structured logging, prefer an interceptor (10_observability)
// - For per-Step structured logging, prefer an interceptor (11_observability)
// over `AfterStep` so the same logger applies to every Step.
//
// **The two questions you'll typically ask**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
// You wrote the Workflow in your test ─► substitute Steps directly,
// no need for Mock.
// Production code built the Workflow ─► flow.Mock to swap one Step.
// You want to assert on per-Step error/status ─► see 11_debugging.
// You want to assert on per-Step error/status ─► see 12_debugging.
// You want to assert on Begin/End ordering ─► add a StepInterceptor in
// the test (10_observability).
// the test (11_observability).

// ExampleMock shows the typical use: a production workflow assembled
// elsewhere, with one Step substituted in the test.
Expand Down
8 changes: 4 additions & 4 deletions example/13_mutators_test.go → example/14_mutators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// `flow.Builder` to register `Input` / `BeforeStep` / `AfterStep` /
// `Retry` / `Timeout` / ..., or both.
// - Parent-Workflow Mutators reach into sub-Workflows automatically — the
// same WorkflowOptionReceiver mechanism described in 10_observability for
// same WorkflowOptionReceiver mechanism described in 11_observability for
// interceptors.
//
// **Where they fit**
Expand All @@ -30,9 +30,9 @@ import (
// **When to reach for which mechanism**
//
// Need behaviour for one specific Step? → BeforeStep / AfterStep
// (04_callbacks).
// (05_callbacks).
// Need behaviour for every Step in the Workflow? → Interceptor
// (10_observability).
// (11_observability).
// Need behaviour for every Step of a given type,
// even Steps added later or inside sub-workflows? → Mutator (this file).
//
Expand Down Expand Up @@ -161,7 +161,7 @@ func ExampleMutator_ctxValue() {
// time.
//
// The propagation mechanism is the same `WorkflowOptionReceiver` interface
// used by interceptors in 10_observability: any Step that contains a
// used by interceptors in 11_observability: any Step that contains a
// sub-Workflow (a `*Workflow` used as a Step, or a struct embedding the
// deprecated `flow.SubWorkflow`) automatically receives the parent's Option
// (Mutators included) before the inner Workflow starts scheduling.
Expand Down
40 changes: 26 additions & 14 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@

This directory is the **`go-workflow` learning path** in code form. Each
file is a runnable [Go example test](https://pkg.go.dev/testing#hdr-Examples)
focused on one question. Read top to bottom on a first pass; jump around
once you know what you need.
focused on one question.

`go test ./example/...` runs everything and verifies the output blocks
stay in sync with the library.

## How this path is ordered

These files are sorted by **how often you'll reach for the feature**, not
by API surface area or implementation depth. The earlier the file, the
more likely you'll touch it on day one. Read top-to-bottom on a first
pass and you'll build the right mental model in the right order; jump
around once you know what you need.

When adding a new example, place it where its frequency-of-use lands —
even if that pushes everything below it down by one. The numbers are
ordering hints, not stable identifiers.

## Path

### Get the mental model (read first)
Expand All @@ -17,36 +28,37 @@ stay in sync with the library.
| [01_quickstart](01_quickstart_test.go) | Any struct with a `Do` method is a Step. End-to-end 3-minute tour: parallel fetch + merge into a profile, with data flowing through `Input` callbacks. |
| [02_steps_and_deps](02_steps_and_deps_test.go) | `Step` / `Steps` / `DependsOn` / `Pipe` / `BatchPipe` / `Name`. |

### Move data through the graph
### Move data into a Step

| File | What you'll learn |
|---------------------------------|---|
| [03_data_flow](03_data_flow_test.go) | The standard `Input` callback pattern (with your structs and with `Func`/`FuncIO`/`FuncI`/`FuncO`). |
| [04_callbacks](04_callbacks_test.go) | `BeforeStep` / `AfterStep` and how they relate to `Input`. |
| [03_data_flow](03_data_flow_test.go) | The standard `Input` callback pattern (with your structs and with `Func`/`FuncIO`/`FuncI`/`FuncO`) — typed, point-to-point. |
| [04_context_values](04_context_values_test.go) | `flow.ContextKey[T]` for graph-wide values like a logger, request ID or client; `flow.Logger` and the log interceptors as the canonical case. |
| [05_callbacks](05_callbacks_test.go) | `BeforeStep` / `AfterStep` and how they relate to `Input`. |

### Decide what runs (and what doesn't)

| File | What you'll learn |
|---------------------------------|---|
| [05_conditions](05_conditions_test.go) | `Condition`, `When`, `flow.Skip` / `flow.Cancel` from inside `Do`. |
| [06_branching](06_branching_test.go) | `If` / `Switch` for runtime-data-driven branches. |
| [07_retry_and_timeout](07_retry_and_timeout_test.go) | `Retry`, per-attempt timeout, step timeout, deterministic-clock testing. |
| [06_conditions](06_conditions_test.go) | `Condition`, `When`, `flow.Skip` / `flow.Cancel` from inside `Do`. |
| [07_branching](07_branching_test.go) | `If` / `Switch` for runtime-data-driven branches. |
| [08_retry_and_timeout](08_retry_and_timeout_test.go) | `Retry`, per-attempt timeout, step timeout, deterministic-clock testing. |

### Build bigger workflows

| File | What you'll learn |
|---------------------------------|---|
| [08_workflow_in_workflow](08_workflow_in_workflow_test.go) | Use a `*Workflow` as a Step. Why a "composite Step" struct is an antipattern. |
| [09_workflow_options](09_workflow_options_test.go) | `MaxConcurrency`, `DontPanic`. |
| [09_workflow_in_workflow](09_workflow_in_workflow_test.go) | Use a `*Workflow` as a Step. Why a "composite Step" struct is an antipattern. |
| [10_workflow_options](10_workflow_options_test.go) | `MaxConcurrency`, `DontPanic`. |

### Operate, debug, test

| File | What you'll learn |
|---------------------------------|---|
| [10_observability](10_observability_test.go) | `StepInterceptor` / `AttemptInterceptor` for cross-cutting logging, tracing, metrics. |
| [11_debugging](11_debugging_test.go) | `ErrWorkflow` and `Workflow.StateOf` for post-run inspection. |
| [12_testing_workflows](12_testing_workflows_test.go) | `flow.Mock` to substitute Step behaviour in tests. |
| [13_mutators](13_mutators_test.go) | `Mutate[T]` to contribute config (defaults, Retry, callbacks) to every Step of a type — even inside sub-Workflows. |
| [11_observability](11_observability_test.go) | `StepInterceptor` / `AttemptInterceptor` for cross-cutting logging, tracing, metrics. |
| [12_debugging](12_debugging_test.go) | `ErrWorkflow` and `Workflow.StateOf` for post-run inspection. |
| [13_testing_workflows](13_testing_workflows_test.go) | `flow.Mock` to substitute Step behaviour in tests. |
| [14_mutators](14_mutators_test.go) | `Mutate[T]` to contribute config (defaults, Retry, callbacks) to every Step of a type — even inside sub-Workflows. |

## Conventions used in these examples

Expand Down
Loading
Loading