diff --git a/README.md b/README.md index 7f9fd84..dad2c14 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,16 @@ and will be removed in the next major version. conditions, retry/timeout, composite steps, interceptors and workflow options. - **DeepWiki:** +## Contrib + +Optional, independently-versioned modules under `contrib/`: + +- **[`contrib/otel`](./contrib/otel)** — OpenTelemetry tracing integration + via the existing `StepInterceptor` / `AttemptInterceptor` extension + points. Released as a separate Go module + (`github.com/Azure/go-workflow/contrib/otel`) so its OpenTelemetry + dependency does not enter core's transitive graph. + ## Contributing This project welcomes contributions. Most contributions require you to agree to a Contributor diff --git a/contrib/otel/README.md b/contrib/otel/README.md new file mode 100644 index 0000000..b30ea5a --- /dev/null +++ b/contrib/otel/README.md @@ -0,0 +1,66 @@ +# contrib/otel + +OpenTelemetry tracing integration for [go-workflow](../..) — implemented as +two interceptor factories that plug into the existing `StepInterceptor` and +`AttemptInterceptor` extension points. + +```go +import ( + flow "github.com/Azure/go-workflow" + "github.com/Azure/go-workflow/contrib/otel" +) + +w := &flow.Workflow{ + Option: flow.WorkflowOption{ + StepInterceptors: []flow.StepInterceptor{ + flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), + }, + AttemptInterceptors: []flow.AttemptInterceptor{ + flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)), + }, + }, +} +``` + +See the package godoc and the runnable `Example` for the full wiring. + +## What you get by default + +- One **step span** per Step lifetime (covering all retries) — name + `flow.String(step)`, attributes `workflow.step.name`, `workflow.step.status` + (`"success"` or `"error"`). +- One **attempt span** per individual attempt — name + `" (attempt N)"`, attributes `workflow.step.name`, + `workflow.step.attempt` (int64). +- Errors recorded with `RecordError` + `SetStatus(codes.Error)`; this + includes `context.Canceled`. +- Steps that are `Skipped` or `Canceled` by their `Condition` produce no + spans (they bypass the interceptor chain in core). + +Every default can be overridden via the `With*` options. See the godoc. + +## Dependency policy + +`contrib/otel` is an **independent Go module** (`github.com/Azure/go-workflow/contrib/otel`) +so the OpenTelemetry dependency does not enter the core module's transitive +graph. Runtime requires are limited to the OpenTelemetry **API** +(`go.opentelemetry.io/otel`, `…/otel/trace`); the SDK and exporters are +test-only dependencies. + +## Working on the module + +Because the contrib module is separate, run its tests from inside the module: + + cd contrib/otel && go test ./... + +The module's `go.mod` carries `replace github.com/Azure/go-workflow => ../..` +so it builds against in-tree core during development. + +## Open follow-ups + +- **CI**: add a job that runs `go test ./...` (and `-race`) inside + `contrib/otel/` in addition to the root-module job. +- **Release**: before tagging `contrib/otel/v0.1.0`, drop or pin the + `replace github.com/Azure/go-workflow => ../..` directive in + `contrib/otel/go.mod` so `go get + github.com/Azure/go-workflow/contrib/otel@v0.1.0` resolves cleanly. diff --git a/contrib/otel/attempt.go b/contrib/otel/attempt.go new file mode 100644 index 0000000..9476981 --- /dev/null +++ b/contrib/otel/attempt.go @@ -0,0 +1,59 @@ +package flowotel + +import ( + "context" + "fmt" + + flow "github.com/Azure/go-workflow" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// NewAttemptInterceptor returns a flow.AttemptInterceptor that emits one +// OpenTelemetry span per attempt (one per call into the retry loop). +// +// The span name defaults to fmt.Sprintf("%s (attempt %d)", flow.String(step), +// attempt) and may be overridden via WithAttemptSpanNamer. The default +// attribute set always includes workflow.step.name = flow.String(step) and +// workflow.step.attempt = int64(attempt). Extra attributes can be supplied +// via WithAttemptAttributes; they are appended to (not in place of) the +// defaults at span-start time. Canonical attributes (workflow.step.name, +// workflow.step.attempt) always win over user-supplied attributes — i.e., +// WithAttemptAttributes cannot override them. +// +// On a non-nil error from next() the span records the error via +// span.RecordError and sets its status to codes.Error. context.Canceled +// is treated like any other error (no special-case). +// +// Steps that the scheduler settles inline (Skipped or Canceled by their +// Condition) bypass the interceptor chain entirely and produce no span. +func NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor { + cfg := newConfig(opts) + tracer := cfg.resolveTracer() + return flow.AttemptInterceptorFunc(func(ctx context.Context, step flow.Steper, attempt uint64, next func(context.Context) error) error { + spanName := fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt) + if cfg.attemptSpanNamer != nil { + spanName = cfg.attemptSpanNamer(step, attempt) + } + // User attributes first, canonical defaults last so OTel's + // last-write-wins semantics keep canonical attrs authoritative. + attrs := make([]attribute.KeyValue, 0, 4) + if cfg.attemptAttributes != nil { + attrs = append(attrs, cfg.attemptAttributes(step, attempt)...) + } + attrs = append(attrs, + attribute.String(attrStepName, flow.String(step)), + attribute.Int64(attrStepAttempt, int64(attempt)), + ) + ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes(attrs...)) + defer span.End() + + err := next(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err + }) +} diff --git a/contrib/otel/attempt_test.go b/contrib/otel/attempt_test.go new file mode 100644 index 0000000..f5c7df1 --- /dev/null +++ b/contrib/otel/attempt_test.go @@ -0,0 +1,176 @@ +package flowotel_test + +import ( + "context" + "fmt" + "testing" + + flow "github.com/Azure/go-workflow" + "github.com/Azure/go-workflow/contrib/otel" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" +) + +func TestAttemptInterceptor_OneSpanPerAttempt(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &retryStep{Name: "S", NeedAttempts: 4} // succeeds on the 4th try + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp))) + w.Add(flow.Step(step).Retry(noBackoff(5))) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 4, "expected one span per attempt (0..3)") + for i, s := range spans { + a, ok := findAttr(s.Attributes(), "workflow.step.attempt") + require.True(t, ok, "span %d missing workflow.step.attempt", i) + assert.Equal(t, int64(i), a.Value.AsInt64(), "span %d attempt index mismatch", i) + assertAttr(t, s.Attributes(), "workflow.step.name", "S") + } +} + +func TestAttemptInterceptor_DefaultName(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("MyStep") + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp))) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + assert.Equal(t, "MyStep (attempt 0)", spans[0].Name()) +} + +func TestAttemptInterceptor_FailingAttemptRecorded(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &retryStep{Name: "Flaky", NeedAttempts: 2} // fails once, then succeeds + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp))) + w.Add(flow.Step(step).Retry(noBackoff(2))) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 2, "expected one span per attempt (failure + success)") + + // First attempt span: failure with RecordError + codes.Error. + first := spans[0] + assert.Equal(t, codes.Error, first.Status().Code, "first attempt span should be Error") + var sawException bool + for _, ev := range first.Events() { + if ev.Name == exceptionEventName { + sawException = true + break + } + } + assert.True(t, sawException, "first attempt should record an exception event") + + // Second attempt span: success leaves status Unset. + second := spans[1] + assert.Equal(t, codes.Unset, second.Status().Code, "successful attempt span should be Unset") +} + +func TestAttemptInterceptor_ChildOfCallerSpan(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + tracer := tp.Tracer("test") + ctx, outer := tracer.Start(context.Background(), "OUTER") + defer outer.End() + + step := flow.NoOp("S") + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp))) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(ctx)) + + // OUTER is still open (it ends via defer); the recorder has exactly the + // attempt span. A regression that emits more than one span will surface here. + spans := rec.Ended() + require.Len(t, spans, 1) + attempt := spans[0] + assert.Equal(t, outer.SpanContext().SpanID(), attempt.Parent().SpanID(), + "attempt span must be a child of the caller-supplied OUTER span") + assert.Equal(t, outer.SpanContext().TraceID(), attempt.SpanContext().TraceID()) +} + +func TestAttemptInterceptor_CustomNamer(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("Original") + namer := func(s flow.Steper, n uint64) string { + return fmt.Sprintf("X-%s-%d", flow.String(s), n) + } + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor( + flowotel.WithTracerProvider(tp), + flowotel.WithAttemptSpanNamer(namer), + )) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + s := spans[0] + assert.Equal(t, "X-Original-0", s.Name(), "custom attempt namer should win") + // Canonical attributes still present despite the custom name. + assertAttr(t, s.Attributes(), "workflow.step.name", "Original") + a, ok := findAttr(s.Attributes(), "workflow.step.attempt") + require.True(t, ok) + assert.Equal(t, int64(0), a.Value.AsInt64()) +} + +func TestAttemptInterceptor_CustomAttributes(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("Hello") + extras := func(flow.Steper, uint64) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("env", "test"), + attribute.Int("answer", 42), + // Regression: a malicious user attribute trying to override the + // canonical workflow.step.attempt key MUST be superseded by the + // interceptor's own value. + attribute.Int64("workflow.step.attempt", 999), + } + } + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor( + flowotel.WithTracerProvider(tp), + flowotel.WithAttemptAttributes(extras), + )) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + attrs := spans[0].Attributes() + + // Defaults still present. + assertAttr(t, attrs, "workflow.step.name", "Hello") + canonical, ok := findAttr(attrs, "workflow.step.attempt") + require.True(t, ok) + assert.Equal(t, int64(0), canonical.Value.AsInt64(), + "canonical workflow.step.attempt must win over user-supplied override") + + // User attributes appended. + assertAttr(t, attrs, "env", "test") + a, ok := findAttr(attrs, "answer") + require.True(t, ok, "custom int attribute missing") + assert.Equal(t, int64(42), a.Value.AsInt64()) +} + +func TestAttemptInterceptor_ContextCanceled(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &alwaysFail{Name: "S", Err: context.Canceled} + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp))) + w.Add(flow.Step(step)) + _ = w.Do(context.Background()) // workflow itself errors + + spans := rec.Ended() + require.NotEmpty(t, spans) + s := spans[0] + assert.Equal(t, codes.Error, s.Status().Code) + require.NotEmpty(t, s.Events()) + assert.Equal(t, exceptionEventName, s.Events()[0].Name) +} diff --git a/contrib/otel/consts.go b/contrib/otel/consts.go new file mode 100644 index 0000000..98de746 --- /dev/null +++ b/contrib/otel/consts.go @@ -0,0 +1,11 @@ +package flowotel + +// Attribute keys and status values emitted by the contrib/otel interceptors. +const ( + attrStepName = "workflow.step.name" + attrStepStatus = "workflow.step.status" + attrStepAttempt = "workflow.step.attempt" + + statusSuccess = "success" + statusError = "error" +) diff --git a/contrib/otel/doc.go b/contrib/otel/doc.go new file mode 100644 index 0000000..a610430 --- /dev/null +++ b/contrib/otel/doc.go @@ -0,0 +1,70 @@ +// Package otel provides OpenTelemetry tracing integration for go-workflow. +// +// It plugs into the two interceptor extension points exposed by go-workflow: +// StepInterceptor (one span per Step lifetime, across all retries) and +// AttemptInterceptor (one span per individual attempt). Use both together +// for a parent/child structure, or just one if you want a flatter trace. +// +// # Usage +// +// import ( +// flow "github.com/Azure/go-workflow" +// "github.com/Azure/go-workflow/contrib/otel" +// ) +// +// w := &flow.Workflow{ +// Option: flow.WorkflowOption{ +// StepInterceptors: []flow.StepInterceptor{ +// flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), +// }, +// AttemptInterceptors: []flow.AttemptInterceptor{ +// flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)), +// }, +// }, +// } +// +// See the runnable Example for a complete wiring with a stdout exporter. +// +// # Span conventions +// +// Step spans are named flow.String(step) and carry attributes +// workflow.step.name and workflow.step.status ("success" or "error"). +// Attempt spans are named " (attempt N)" and carry workflow.step.name +// and workflow.step.attempt (int64). +// +// All defaults can be overridden via Options (WithStepSpanNamer, +// WithAttemptSpanNamer, WithStepAttributes, WithAttemptAttributes). +// Canonical attributes (workflow.step.name, workflow.step.status, +// workflow.step.attempt) always win — user-supplied keys with the same +// names are silently superseded. +// +// # Parent/child relation +// +// When both interceptors are registered, attempt spans are children of the +// step span (same TraceID, Parent.SpanID equals the step span's SpanID). +// When only one is registered, that span is a child of whatever span (if +// any) is on the caller-provided context. +// +// # Provider resolution +// +// WithTracerProvider sets the OpenTelemetry TracerProvider explicitly. +// If unset (or nil), each factory falls back to otel.GetTracerProvider() +// at the moment NewStepInterceptor / NewAttemptInterceptor is called — +// not lazily on every interception. Swapping the global provider after +// construction does not affect already-built interceptors. +// +// # Skipped and Canceled-by-Condition steps +// +// Steps whose Condition resolves to Skipped or Canceled are settled inline +// by the workflow scheduler and bypass the interceptor chain entirely. As +// a result, they produce zero spans through this package. If you need to +// observe terminal-by-condition statuses, watch the StepResult instead. +// +// # context.Canceled +// +// When the error returned by next() satisfies errors.Is(err, context.Canceled), +// the resulting span is ended with RecordError + SetStatus(codes.Error) — +// the same as any other error. There is no special-case suppression. Users +// running graceful shutdowns who want different semantics can wrap the +// returned interceptor. +package flowotel diff --git a/contrib/otel/example_test.go b/contrib/otel/example_test.go new file mode 100644 index 0000000..a24abcc --- /dev/null +++ b/contrib/otel/example_test.go @@ -0,0 +1,52 @@ +package flowotel_test + +import ( + "context" + "fmt" + + flow "github.com/Azure/go-workflow" + "github.com/Azure/go-workflow/contrib/otel" + + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// Example demonstrates registering the contrib/otel step and attempt +// interceptors on a Workflow, with spans exported to stdout for inspection. +// +// We intentionally omit the `// Output:` directive: TraceIDs, SpanIDs and +// other span fields are non-deterministic, so verifying string output +// would force the example to mock the SDK and obscure the integration +// pattern. The Example still compiles and runs as part of `go test`. +func Example() { + // 1. Build a TracerProvider with a stdout exporter (any exporter works: + // OTLP, Jaeger, Zipkin, etc.). + exporter, err := stdouttrace.New(stdouttrace.WithoutTimestamps()) + if err != nil { + panic(err) + } + tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter)) + // Shutdown errors are intentionally ignored in this example; + // production code should log or surface them. + defer func() { _ = tp.Shutdown(context.Background()) }() + + // 2. Register both interceptors on a Workflow. + w := &flow.Workflow{} + w.Option.StepInterceptors = []flow.StepInterceptor{ + flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), + } + w.Option.AttemptInterceptors = []flow.AttemptInterceptor{ + flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)), + } + + // 3. Build a tiny 2-step pipeline: A → B. flow.Step(b).DependsOn(a) + // registers BOTH steps and the dependency in one call. + a := flow.NoOp("A") + b := flow.NoOp("B") + w.Add(flow.Step(b).DependsOn(a)) + + // 4. Run. + if err := w.Do(context.Background()); err != nil { + fmt.Println("workflow error:", err) + } +} diff --git a/contrib/otel/go.mod b/contrib/otel/go.mod new file mode 100644 index 0000000..c881dd9 --- /dev/null +++ b/contrib/otel/go.mod @@ -0,0 +1,28 @@ +module github.com/Azure/go-workflow/contrib/otel + +go 1.23.0 + +require ( + github.com/Azure/go-workflow v0.0.0-00010101000000-000000000000 + github.com/cenkalti/backoff/v4 v4.3.0 + github.com/stretchr/testify v1.11.1 + go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0 + go.opentelemetry.io/otel/sdk v1.37.0 + go.opentelemetry.io/otel/trace v1.37.0 +) + +require ( + github.com/benbjohnson/clock v1.3.5 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/metric v1.37.0 // indirect + golang.org/x/sys v0.34.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/Azure/go-workflow => ../.. diff --git a/contrib/otel/go.sum b/contrib/otel/go.sum new file mode 100644 index 0000000..3256ded --- /dev/null +++ b/contrib/otel/go.sum @@ -0,0 +1,48 @@ +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0 h1:SNhVp/9q4Go/XHBkQ1/d5u9P/U+L1yaGPoi0x+mStaI= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0/go.mod h1:tx8OOlGH6R4kLV67YaYO44GFXloEjGPZuMjEkaaqIp4= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/contrib/otel/helpers_test.go b/contrib/otel/helpers_test.go new file mode 100644 index 0000000..4a3211d --- /dev/null +++ b/contrib/otel/helpers_test.go @@ -0,0 +1,30 @@ +package flowotel_test + +import ( + flow "github.com/Azure/go-workflow" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +// newRecorderTracerProvider returns a TracerProvider wired to a synchronous +// in-memory SpanRecorder, suitable for asserting the spans emitted by the +// step / attempt interceptors. SpanRecorder implements SpanProcessor, so no +// extra batching or simple processor is needed. +func newRecorderTracerProvider() (*sdktrace.TracerProvider, *tracetest.SpanRecorder) { + rec := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)) + return tp, rec +} + +// newTestWorkflow builds an empty Workflow registered with the given step +// interceptor and/or attempt interceptor; nil arguments are not registered. +func newTestWorkflow(stepIC flow.StepInterceptor, attemptIC flow.AttemptInterceptor) *flow.Workflow { + w := &flow.Workflow{} + if stepIC != nil { + w.Option.StepInterceptors = []flow.StepInterceptor{stepIC} + } + if attemptIC != nil { + w.Option.AttemptInterceptors = []flow.AttemptInterceptor{attemptIC} + } + return w +} diff --git a/contrib/otel/integration_test.go b/contrib/otel/integration_test.go new file mode 100644 index 0000000..608f237 --- /dev/null +++ b/contrib/otel/integration_test.go @@ -0,0 +1,123 @@ +package flowotel_test + +import ( + "context" + "testing" + + flow "github.com/Azure/go-workflow" + "github.com/Azure/go-workflow/contrib/otel" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// isAttemptSpan identifies attempt spans by the canonical attribute that +// NewAttemptInterceptor always writes (workflow.step.attempt). Using the +// attribute rather than the default span name keeps the discriminator +// robust against WithAttemptSpanNamer overrides and default-name changes. +func isAttemptSpan(sp sdktrace.ReadOnlySpan) bool { + for _, kv := range sp.Attributes() { + if string(kv.Key) == "workflow.step.attempt" { + return true + } + } + return false +} + +func TestBothLayers_AttemptIsChildOfStep(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + s := flow.NoOp("OnlyStep") + w := newTestWorkflow( + flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), + flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)), + ) + w.Add(flow.Step(s)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 2) + + var stepSpan, attemptSpan sdktrace.ReadOnlySpan + for _, sp := range spans { + if isAttemptSpan(sp) { + attemptSpan = sp + } else { + stepSpan = sp + } + } + require.NotNil(t, stepSpan, "step span must be present") + require.NotNil(t, attemptSpan, "attempt span must be present") + + assert.Equal(t, stepSpan.SpanContext().TraceID(), attemptSpan.SpanContext().TraceID(), + "step and attempt spans must share TraceID") + assert.Equal(t, stepSpan.SpanContext().SpanID(), attemptSpan.Parent().SpanID(), + "attempt span must be child of step span") +} + +func TestBothLayers_RetryAttemptCount(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &retryStep{Name: "Flaky", NeedAttempts: 3} // succeeds on 3rd attempt + w := newTestWorkflow( + flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), + flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)), + ) + w.Add(flow.Step(step).Retry(noBackoff(5))) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + // 3 attempt spans + 1 step span = 4 total + require.Len(t, spans, 4) + + var stepSpan sdktrace.ReadOnlySpan + var attemptSpans []sdktrace.ReadOnlySpan + for _, sp := range spans { + if isAttemptSpan(sp) { + attemptSpans = append(attemptSpans, sp) + } else { + stepSpan = sp + } + } + require.NotNil(t, stepSpan) + require.Len(t, attemptSpans, 3) + + for _, a := range attemptSpans { + assert.Equal(t, stepSpan.SpanContext().TraceID(), a.SpanContext().TraceID(), + "every attempt span must share the step's TraceID") + assert.Equal(t, stepSpan.SpanContext().SpanID(), a.Parent().SpanID(), + "every attempt span must be a child of the step span") + } +} + +func TestProviderResolutionAtFactoryTime(t *testing.T) { + // CANNOT t.Parallel(): mutates global TracerProvider via otel.SetTracerProvider. + // All other tests in this package inject TracerProvider explicitly via + // WithTracerProvider, so flipping the global here does not affect them. + original := otel.GetTracerProvider() + t.Cleanup(func() { otel.SetTracerProvider(original) }) + + // Provider A is what's global at factory call time. + tpA, recA := newRecorderTracerProvider() + otel.SetTracerProvider(tpA) + + // Construct interceptors WITHOUT WithTracerProvider — they should snapshot + // the current global (tpA) once. + stepIC := flowotel.NewStepInterceptor() + attemptIC := flowotel.NewAttemptInterceptor() + + // Now swap the global to provider B. + tpB, recB := newRecorderTracerProvider() + otel.SetTracerProvider(tpB) + + // Run the workflow. Spans MUST land on provider A, not B. + s := flow.NoOp("ProviderTest") + w := newTestWorkflow(stepIC, attemptIC) + w.Add(flow.Step(s)) + require.NoError(t, w.Do(context.Background())) + + assert.Len(t, recA.Ended(), 2, "interceptors should still write to original provider (snapshot at factory time)") + assert.Len(t, recB.Ended(), 0, "swapped-in provider should NOT receive spans") +} diff --git a/contrib/otel/options.go b/contrib/otel/options.go new file mode 100644 index 0000000..bd77389 --- /dev/null +++ b/contrib/otel/options.go @@ -0,0 +1,142 @@ +package flowotel + +import ( + flow "github.com/Azure/go-workflow" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// defaultTracerName is the default instrumentation name passed to +// TracerProvider.Tracer when WithTracerName is not used. +const defaultTracerName = "github.com/Azure/go-workflow/contrib/otel" + +// config is the resolved configuration shared by both interceptor factories +// (NewStepInterceptor and NewAttemptInterceptor). The same Option values are +// accepted by both factories; options that target one layer are no-ops on the +// other. +type config struct { + tracerProvider trace.TracerProvider + tracerName string + stepSpanNamer func(flow.Steper) string + attemptSpanNamer func(flow.Steper, uint64) string + stepAttributes func(flow.Steper) []attribute.KeyValue + attemptAttributes func(flow.Steper, uint64) []attribute.KeyValue +} + +// Option configures a step or attempt interceptor produced by +// NewStepInterceptor / NewAttemptInterceptor. The same Option type is accepted +// by both factories; each option's documentation states which interceptor it +// affects (the other factory ignores it). +type Option func(*config) + +// newConfig applies opts to a fresh config and returns it. nil options are +// skipped so callers can build slices conditionally. +func newConfig(opts []Option) *config { + c := &config{} + for _, o := range opts { + if o != nil { + o(c) + } + } + return c +} + +// WithTracerProvider sets the OpenTelemetry TracerProvider used to obtain the +// Tracer that emits step and attempt spans. When unset (or set to nil), each +// factory falls back to otel.GetTracerProvider() at the moment +// NewStepInterceptor / NewAttemptInterceptor is called (not lazily on every +// interception). +// +// Affects: NewStepInterceptor, NewAttemptInterceptor. +func WithTracerProvider(tp trace.TracerProvider) Option { + return func(c *config) { c.tracerProvider = tp } +} + +// WithTracerName overrides the instrumentation name passed to +// TracerProvider.Tracer. Default: +// "github.com/Azure/go-workflow/contrib/otel". +// +// Affects: NewStepInterceptor, NewAttemptInterceptor. +func WithTracerName(name string) Option { + return func(c *config) { c.tracerName = name } +} + +// WithStepSpanNamer overrides the default step span name (flow.String(step)) +// with a caller-supplied function. Passing a nil fn is a no-op and leaves the +// previously configured (or default) namer in place. +// +// Affects: NewStepInterceptor only. NewAttemptInterceptor ignores this option. +func WithStepSpanNamer(fn func(flow.Steper) string) Option { + return func(c *config) { + if fn != nil { + c.stepSpanNamer = fn + } + } +} + +// WithAttemptSpanNamer overrides the default attempt span name +// (" (attempt N)") with a caller-supplied function. Passing a nil fn is +// a no-op and leaves the previously configured (or default) namer in place. +// +// Affects: NewAttemptInterceptor only. NewStepInterceptor ignores this option. +func WithAttemptSpanNamer(fn func(flow.Steper, uint64) string) Option { + return func(c *config) { + if fn != nil { + c.attemptSpanNamer = fn + } + } +} + +// WithStepAttributes registers a function that returns extra attributes to +// attach to step spans at span-start time. The returned attributes are added +// in addition to the defaults (e.g. workflow.step.name). Passing a nil fn is +// a no-op. +// +// Note: canonical attributes set by the interceptor (workflow.step.name, +// workflow.step.status) cannot be overridden by this option; passing those +// keys is silently superseded. +// +// Affects: NewStepInterceptor only. NewAttemptInterceptor ignores this option. +func WithStepAttributes(fn func(flow.Steper) []attribute.KeyValue) Option { + return func(c *config) { + if fn != nil { + c.stepAttributes = fn + } + } +} + +// WithAttemptAttributes registers a function that returns extra attributes to +// attach to attempt spans at span-start time. The returned attributes are +// added in addition to the defaults (e.g. workflow.step.name, +// workflow.step.attempt). Passing a nil fn is a no-op. +// +// Note: canonical attributes set by the interceptor (workflow.step.name, +// workflow.step.attempt) cannot be overridden by this option; passing those +// keys is silently superseded. +// +// Affects: NewAttemptInterceptor only. NewStepInterceptor ignores this option. +func WithAttemptAttributes(fn func(flow.Steper, uint64) []attribute.KeyValue) Option { + return func(c *config) { + if fn != nil { + c.attemptAttributes = fn + } + } +} + +// resolveTracer picks the configured TracerProvider (falling back to the +// global provider via otel.GetTracerProvider when nil) and returns a Tracer +// with the configured (or default) instrumentation name. It is intended to be +// called once per factory invocation at construction time; the resulting +// Tracer is captured by the returned interceptor closure. +func (c *config) resolveTracer() trace.Tracer { + tp := c.tracerProvider + if tp == nil { + tp = otel.GetTracerProvider() + } + name := c.tracerName + if name == "" { + name = defaultTracerName + } + return tp.Tracer(name) +} diff --git a/contrib/otel/step.go b/contrib/otel/step.go new file mode 100644 index 0000000..4937881 --- /dev/null +++ b/contrib/otel/step.go @@ -0,0 +1,58 @@ +package flowotel + +import ( + "context" + + flow "github.com/Azure/go-workflow" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// NewStepInterceptor returns a flow.StepInterceptor that emits one +// OpenTelemetry span per Step lifetime (covering all retry attempts). +// +// The span name defaults to flow.String(step) and may be overridden via +// WithStepSpanNamer. The default attribute set always includes +// workflow.step.name = flow.String(step) and, after next() returns, +// workflow.step.status ∈ {"success", "error"}. Extra attributes can be +// supplied via WithStepAttributes; they are appended to (not in place of) +// the defaults at span-start time. Canonical attributes (workflow.step.name, +// workflow.step.status) always win over user-supplied attributes — i.e., +// WithStepAttributes cannot override them. +// +// On a non-nil error from next() the span records the error via +// span.RecordError and sets its status to codes.Error. context.Canceled +// is treated like any other error (no special-case). +// +// Steps that the scheduler settles inline (Skipped or Canceled by their +// Condition) bypass the interceptor chain entirely and produce no span. +func NewStepInterceptor(opts ...Option) flow.StepInterceptor { + cfg := newConfig(opts) + tracer := cfg.resolveTracer() + return flow.StepInterceptorFunc(func(ctx context.Context, step flow.Steper, next func(context.Context) error) error { + spanName := flow.String(step) + if cfg.stepSpanNamer != nil { + spanName = cfg.stepSpanNamer(step) + } + // User attributes first, canonical defaults last so OTel's + // last-write-wins semantics keep canonical attrs authoritative. + attrs := make([]attribute.KeyValue, 0, 4) + if cfg.stepAttributes != nil { + attrs = append(attrs, cfg.stepAttributes(step)...) + } + attrs = append(attrs, attribute.String(attrStepName, flow.String(step))) + ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes(attrs...)) + defer span.End() + + err := next(ctx) + if err != nil { + span.SetAttributes(attribute.String(attrStepStatus, statusError)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetAttributes(attribute.String(attrStepStatus, statusSuccess)) + } + return err + }) +} diff --git a/contrib/otel/step_test.go b/contrib/otel/step_test.go new file mode 100644 index 0000000..14b7e9a --- /dev/null +++ b/contrib/otel/step_test.go @@ -0,0 +1,238 @@ +package flowotel_test + +import ( + "context" + "errors" + "testing" + + flow "github.com/Azure/go-workflow" + "github.com/Azure/go-workflow/contrib/otel" + + "github.com/cenkalti/backoff/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" +) + +// exceptionEventName is the OTel SDK convention for span events emitted by +// RecordError. +const exceptionEventName = "exception" + +// retryStep counts attempts and only succeeds on the Nth try. +type retryStep struct { + Name string + NeedAttempts int + Attempts int +} + +func (s *retryStep) String() string { return s.Name } +func (s *retryStep) Do(_ context.Context) error { + s.Attempts++ + if s.Attempts < s.NeedAttempts { + return errors.New("transient") + } + return nil +} + +// alwaysFail is a Step whose Do always returns the configured error. +type alwaysFail struct { + Name string + Err error +} + +func (s *alwaysFail) String() string { return s.Name } +func (s *alwaysFail) Do(_ context.Context) error { return s.Err } + +// findAttr looks up the named attribute key in the slice. Returns the +// attribute and whether it was found. +func findAttr(attrs []attribute.KeyValue, key string) (attribute.KeyValue, bool) { + for _, a := range attrs { + if string(a.Key) == key { + return a, true + } + } + return attribute.KeyValue{}, false +} + +func assertAttr(t *testing.T, attrs []attribute.KeyValue, key, want string) { + t.Helper() + a, ok := findAttr(attrs, key) + if !assert.True(t, ok, "attribute %q not found", key) { + return + } + assert.Equal(t, want, a.Value.AsString(), "attribute %q value mismatch", key) +} + +// noBackoff returns a RetryOption mutator that disables real backoff sleeps +// so retry tests run instantly. +func noBackoff(attempts uint64) func(*flow.RetryOption) { + return func(ro *flow.RetryOption) { + ro.Attempts = attempts + ro.Backoff = &backoff.ZeroBackOff{} + } +} + +func TestStepInterceptor_SuccessOneSpan(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("MyStep") + w := newTestWorkflow(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), nil) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + s := spans[0] + assert.Equal(t, "MyStep", s.Name()) + assertAttr(t, s.Attributes(), "workflow.step.name", "MyStep") + assertAttr(t, s.Attributes(), "workflow.step.status", "success") + assert.Equal(t, codes.Unset, s.Status().Code, "no SetStatus on success") +} + +func TestStepInterceptor_RetriesStillOneSpan(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &retryStep{Name: "Flaky", NeedAttempts: 3} + w := newTestWorkflow(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), nil) + w.Add(flow.Step(step).Retry(noBackoff(5))) + require.NoError(t, w.Do(context.Background())) + + assert.Equal(t, 3, step.Attempts, "step should have been attempted 3 times") + spans := rec.Ended() + require.Len(t, spans, 1, "step interceptor must emit exactly one span across retries") + s := spans[0] + assertAttr(t, s.Attributes(), "workflow.step.status", "success") +} + +func TestStepInterceptor_FinalErrorRecorded(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + boom := errors.New("boom") + step := &alwaysFail{Name: "Fail", Err: boom} + w := newTestWorkflow(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), nil) + w.Add(flow.Step(step).Retry(noBackoff(2))) + err := w.Do(context.Background()) + require.Error(t, err) + + spans := rec.Ended() + require.Len(t, spans, 1) + s := spans[0] + assertAttr(t, s.Attributes(), "workflow.step.status", "error") + assert.Equal(t, codes.Error, s.Status().Code) + + events := s.Events() + var sawException bool + for _, ev := range events { + if ev.Name == exceptionEventName { + sawException = true + break + } + } + assert.True(t, sawException, "expected RecordError to add an 'exception' event; got %+v", events) +} + +func TestStepInterceptor_ContextCanceled(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &alwaysFail{Name: "Cancel", Err: context.Canceled} + w := newTestWorkflow(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), nil) + w.Add(flow.Step(step)) + _ = w.Do(context.Background()) + + spans := rec.Ended() + require.Len(t, spans, 1) + s := spans[0] + assert.Equal(t, codes.Error, s.Status().Code) + var sawException bool + for _, ev := range s.Events() { + if ev.Name == exceptionEventName { + sawException = true + break + } + } + assert.True(t, sawException, "context.Canceled should record an exception event") +} + +func TestStepInterceptor_SkippedStepNoSpan(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + skipMe := flow.NoOp("Skipped") + w := newTestWorkflow(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), nil) + w.Add(flow.Step(skipMe).When(func(context.Context, map[flow.Steper]flow.StepResult) flow.StepStatus { + return flow.Skipped + })) + require.NoError(t, w.Do(context.Background())) + + assert.Empty(t, rec.Ended(), "Skipped steps must bypass the interceptor chain") + // neither Started() nor Ended() should fire for a Skipped step. + assert.Empty(t, rec.Started()) +} + +func TestStepInterceptor_CustomNamer(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("Original") + namer := func(s flow.Steper) string { return "custom:" + flow.String(s) } + w := newTestWorkflow(flowotel.NewStepInterceptor( + flowotel.WithTracerProvider(tp), + flowotel.WithStepSpanNamer(namer), + ), nil) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + s := spans[0] + assert.Equal(t, "custom:Original", s.Name(), "custom span namer should win") + // workflow.step.name attribute still uses flow.String(step) per spec. + assertAttr(t, s.Attributes(), "workflow.step.name", "Original") +} + +func TestStepInterceptor_CustomAttributesAppend(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("Hello") + extras := func(flow.Steper) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("env", "test"), + attribute.Int("answer", 42), + } + } + w := newTestWorkflow(flowotel.NewStepInterceptor( + flowotel.WithTracerProvider(tp), + flowotel.WithStepAttributes(extras), + ), nil) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + attrs := spans[0].Attributes() + assertAttr(t, attrs, "workflow.step.name", "Hello") // default still present + assertAttr(t, attrs, "workflow.step.status", "success") // default still present + assertAttr(t, attrs, "env", "test") // user-supplied + a, ok := findAttr(attrs, "answer") + require.True(t, ok, "custom int attribute missing") + assert.Equal(t, int64(42), a.Value.AsInt64()) +} + +func TestStepInterceptor_UserAttributeCannotOverrideCanonicalName(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("Real") + hijack := func(flow.Steper) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("workflow.step.name", "HACKED")} + } + w := newTestWorkflow(flowotel.NewStepInterceptor( + flowotel.WithTracerProvider(tp), + flowotel.WithStepAttributes(hijack), + ), nil) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + // Canonical attribute must win over user-supplied override. + assertAttr(t, spans[0].Attributes(), "workflow.step.name", flow.String(step)) +} diff --git a/openspec/changes/archive/2026-05-15-contrib-otel-tracing/.openspec.yaml b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/.openspec.yaml new file mode 100644 index 0000000..66dd08a --- /dev/null +++ b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-05-14 diff --git a/openspec/changes/archive/2026-05-15-contrib-otel-tracing/design.md b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/design.md new file mode 100644 index 0000000..4656cc2 --- /dev/null +++ b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/design.md @@ -0,0 +1,143 @@ +## Context + +go-workflow exposes structured observability through two interceptor interfaces declared in `interceptor.go`: + +- `StepInterceptor.InterceptStep(ctx, step, next)` — wraps the **full lifetime** of a step (across all retries), invoked exactly once per step. +- `AttemptInterceptor.InterceptAttempt(ctx, step, attempt, next)` — wraps a **single attempt**, invoked once per attempt with a 0-based index. + +The Workflow stores them in two slice fields and chains the lowest-index entry on the outside. Steps that resolve to `Skipped` or `Canceled` via their `Condition` are settled inline in `tick()` and never enter either chain (see capability `step-interceptor`). + +That extension point is exactly the right shape for OpenTelemetry tracing: a **step span** maps to the outer chain, a **per-attempt span** maps to the inner chain, and the existing `Steper` identity gives a stable key for span naming and attributes via `flow.String(step)`. + +The repository today is a single Go module (`github.com/Azure/go-workflow`) with no contrib subdirectory and no OpenTelemetry dependency. This change introduces the first contrib submodule, sets the precedent for all future contrib packages (e.g. logging, metrics, slog), and converts the repo into a multi-module workspace. + +Stakeholders: library users who need traces today (presently hand-rolling interceptors), library maintainers (must keep core dependency-light and avoid OTel API churn bleeding into core users). + +## Goals / Non-Goals + +**Goals:** + +- Provide a one-call API to wire OpenTelemetry traces into any `*flow.Workflow` via the existing interceptor extension points — no `BeforeStep` / `AfterStep` hooks, no fork of the core types. +- Ship as an **independent Go module** so the OpenTelemetry dependency does not enter the core module's transitive graph. +- Ship sane defaults (span names from `flow.String(step)`, retry-aware naming, automatic error recording) while letting users override every default through functional options. +- Keep the two interceptor factories **independently usable**: a user may register only the step layer, only the attempt layer, or both, and the result is always coherent. +- Demonstrate the pattern with a runnable godoc `Example` so other contrib modules (e.g. metrics) have a template. + +**Non-Goals:** + +- OTel **metrics** and **logs** signals. Out of scope for v0.1; addressed in a follow-up change if needed. +- A **workflow-level** span. The interceptor API is step-scoped; a workflow span is the caller's responsibility (`ctx, span := tracer.Start(ctx, "my-workflow"); defer span.End(); w.Do(ctx)`). Documented in package godoc, not implemented. +- Changing core behavior. No edits to any file under the root module. The Skipped/Canceled bypass is documented as inherited behavior, not redesigned. +- Auto-instrumentation discovery (e.g. `otel.GetTracerProvider()` magic in tests). Tests inject a `TracerProvider` explicitly via `WithTracerProvider`. +- Propagation across process boundaries. Step `ctx` carries whatever the caller put in; we do not inject/extract carriers. + +## Decisions + +### D1. Independent submodule, not a subpackage + +`contrib/otel/` gets its own `go.mod` declaring `github.com/Azure/go-workflow/contrib/otel` at module path. During development the contrib `go.mod` carries: + +``` +require github.com/Azure/go-workflow v0.0.0 +replace github.com/Azure/go-workflow => ../.. +``` + +so `go test ./...` in the contrib dir builds against the in-tree core. Released tags (`contrib/otel/v0.1.0`) drop or pin the replace. + +**Rationale:** OpenTelemetry's `go.opentelemetry.io/otel` pulls in a non-trivial dep tree. Core users who don't want traces should not pay for them in their `go.sum`. This is the pattern used by `gin-contrib`, `otelhttp`, etc. + +**Alternative considered:** Subpackage under core (`github.com/Azure/go-workflow/otel`). Rejected — would make OTel a hard transitive dep of every core user. + +### D2. Two factories, no glue helper + +Public API is exactly: + +```go +func NewStepInterceptor(opts ...Option) flow.StepInterceptor +func NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor +``` + +Users wire them with the existing `flow.WithStepInterceptor` / `flow.WithAttemptInterceptor` workflow options. + +**Rationale:** Per user decision in brainstorming. Mirrors the underlying interceptor model 1:1, lets users register only one layer if they want, and avoids inventing a new "OTel option" abstraction. Cost: caller writes two lines instead of one. + +**Alternative considered:** A single `otel.Tracing(opts...)` returning a `flow.WorkflowOption` that registers both. Rejected for v0.1 — hides the orthogonality, harder to register only the attempt layer (a common pattern for low-overhead deployments). + +### D3. Functional options, shared `config` + +Both factories take the same `Option` type and the same internal `config` struct. Options that only make sense for one layer (e.g. `WithAttemptSpanNamer`) are still accepted by the other; they are simply ignored. + +**Rationale:** One option type means users don't have to remember two namespaces (`StepOption` vs `AttemptOption`). The shared config is internal — package surface stays small. + +**Alternative considered:** Separate `StepOption` / `AttemptOption`. Rejected — doubles the API surface for almost no safety benefit; the only "wrong" combo is silently ignored, never an error. + +### D4. Span naming and attribute conventions + +| Concern | Default | +|---|---| +| Tracer name | `"github.com/Azure/go-workflow/contrib/otel"` (overridable via `WithTracerName`) | +| Step span name | `flow.String(step)` | +| Attempt span name | `fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt)` | +| Step span attrs | `workflow.step.name = flow.String(step)`; `workflow.step.status` = `"success"` \| `"error"` (set on End from the `next` error) | +| Attempt span attrs | `workflow.step.name`; `workflow.step.attempt = attempt` | +| Error path | `next` returns non-nil → `span.RecordError(err)` + `span.SetStatus(codes.Error, err.Error())`. `context.Canceled` is recorded the same way as any other error. | + +Custom namers / attribute providers override only what they replace; defaults still apply to non-overridden fields. The `workflow.step.*` namespace is chosen to match the `workflow-options` and `step-configuration` capabilities' terminology. + +**Rationale:** Aligned with user decision. `context.Canceled = Error` keeps the rule one-line and avoids the "is this user-initiated cancel or upstream timeout?" classification problem we have no signal to answer. + +### D5. Tracer acquisition + +`config` resolves a `trace.Tracer` exactly once at factory-call time: + +```go +tp := cfg.tracerProvider +if tp == nil { tp = otel.GetTracerProvider() } +tracer := tp.Tracer(cfg.tracerName) +``` + +This means swapping the global provider after `NewStepInterceptor` returns will not be observed. Documented. + +**Rationale:** Avoids per-step `otel.GetTracerProvider()` lookups (each call is a `sync.Map` access). Matches `otelhttp.NewHandler` behavior. + +### D6. Test strategy + +Tests live in `contrib/otel/*_test.go` and use: + +- `sdktrace.NewTracerProvider(sdktrace.WithSyncer(spanRecorder))` to produce real spans synchronously. +- `tracetest.NewSpanRecorder` to capture them. +- `flow.NoOpStep` / minimal handcrafted `Steper` impls for the workflow. +- An exported `(*Workflow).Do(ctx)` to drive end-to-end runs. + +The example (`example_test.go`) uses `stdouttrace` to demonstrate a real exporter — it is a build-checked godoc Example, not asserted on output. + +### D7. Version & release + +- Initial tag `contrib/otel/v0.1.0`. Pre-1.0 because OTel semconv naming for "workflow" is not standardized — we may rename `workflow.step.*` attributes once a convention exists upstream. +- CI must add a job that runs `go test ./...` inside `contrib/otel/`. Tracked as a follow-up infra task referenced from `tasks.md`. + +## Risks / Trade-offs + +- **OTel API breakage** → contrib module pinned to a single major; core unaffected. Mitigation: only depend on stable `go.opentelemetry.io/otel/trace` interfaces (Tracer, Span, SpanStartOption); no SDK types in the public API. +- **Multi-module repo cognitive overhead** → developers must `cd contrib/otel && go test ./...` separately from root. Mitigation: documented in `contrib/otel/README.md` and reinforced by CI splitting jobs by module. +- **`workflow.step.*` attribute names may clash with future OTel semconv** → could force a v1 rename. Accepted: pre-1.0 lets us rename without breaking SemVer promises. +- **`context.Canceled` always being `Error`** → users running graceful shutdowns will see error-status spans. Mitigation: documented loud and clear; we accept this for v0.1 simplicity. Users who need different semantics can wrap the interceptor (composability is preserved). +- **Replace directive in `contrib/otel/go.mod`** → breaks `go install github.com/Azure/go-workflow/contrib/otel@latest` if not removed before tagging a release. Mitigation: release checklist item + mention in `tasks.md`. +- **Single shared `config` for both layers** → user passes `WithAttemptSpanNamer` to `NewStepInterceptor`; it silently no-ops. Mitigation: godoc on each option says which layer it affects. + +## Migration Plan + +This is a new submodule with no prior version — no migration of existing users. For repository operators: + +1. Land the change on the development branch. +2. CI gains a `go-test-contrib-otel` job (`cd contrib/otel && go test ./...`). +3. Tag `contrib/otel/v0.1.0` after first release. Before tagging, remove the `replace` directive (or pin it to the matching core version) so `go get github.com/Azure/go-workflow/contrib/otel@v0.1.0` resolves cleanly. + +Rollback: revert the change branch. No core files are modified, so rollback is purely deletion of `contrib/otel/`. + +## Open Questions + +None blocking. Future follow-ups (out of scope here): + +- Should `contrib/otel` provide a metrics interceptor in v0.2? +- Should we adopt OTel semconv attribute names if/when a "task/workflow" namespace is standardized? diff --git a/openspec/changes/archive/2026-05-15-contrib-otel-tracing/proposal.md b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/proposal.md new file mode 100644 index 0000000..9612d60 --- /dev/null +++ b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/proposal.md @@ -0,0 +1,41 @@ +## Why + +go-workflow already has a clean two-layer interceptor model (`StepInterceptor` + `AttemptInterceptor`) that is the natural extension point for cross-cutting observability — but the repo ships no batteries-included integration with any tracing system. Users who want OpenTelemetry traces today have to hand-roll a pair of interceptors, get span parenting / retry semantics right, and re-derive sensible span names and attributes. That is repetitive boilerplate every project copies. + +A first-party `contrib/otel` module turns the interceptor extension point into a one-liner, ships sensible defaults, and demonstrates the recommended way to build other observability integrations against the interceptor API. + +## What Changes + +- Add `contrib/otel/` as a **new, independent Go module** at module path `github.com/Azure/go-workflow/contrib/otel`. Released and tagged separately from the core module so its OpenTelemetry dependency stays out of core users' transitive graph. +- New public API in package `flowotel`: + - `NewStepInterceptor(opts ...Option) flow.StepInterceptor` + - `NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor` + - `Option` functional options: `WithTracerProvider`, `WithTracerName`, `WithStepSpanNamer`, `WithAttemptSpanNamer`, `WithStepAttributes`, `WithAttemptAttributes`. +- Default span behavior: + - Step span name = `flow.String(step)`; attempt span name = `flow.String(step) + " (attempt N)"`. + - Step span attributes: `workflow.step.name`, `workflow.step.status`. Attempt span attributes: `workflow.step.name`, `workflow.step.attempt`. + - Errors (including `context.Canceled`) → `RecordError` + `SetStatus(codes.Error, …)`. + - The two interceptors are independent — when both registered, the attempt span is a child of the step span; when only one is registered, it works standalone against the caller's `ctx`. +- Document the existing core behavior that `Skipped` / `Canceled by Condition` steps bypass the interceptor chain, so they will not produce spans (no change to core). +- Add a runnable godoc `Example` (`contrib/otel/example_test.go`) that wires the SDK + a stdout exporter to a minimal workflow. +- Dependency policy: contrib/otel depends only on the OpenTelemetry **API** packages (`go.opentelemetry.io/otel`, `…/otel/trace`); the SDK and `tracetest` are test-only dependencies. + +## Capabilities + +### New Capabilities + +- `contrib-otel`: OpenTelemetry traces integration for go-workflow, delivered as an independent submodule that exposes two interceptor factories (step-level and attempt-level) wrapping the core `StepInterceptor` / `AttemptInterceptor` extension points. + +### Modified Capabilities + + + +## Impact + +- **New module**: `contrib/otel/` with its own `go.mod`, `go.sum`, source files, tests, and example. Replaces the placeholder change directory `contrib-common-steps/`. +- **No source changes** to the existing core module (`github.com/Azure/go-workflow`). Its `go.mod` and existing files are untouched. +- **New external dependencies** (scoped to the contrib module only): + - Runtime: `go.opentelemetry.io/otel`, `go.opentelemetry.io/otel/trace`. + - Test-only: `go.opentelemetry.io/otel/sdk`, `go.opentelemetry.io/otel/sdk/trace`, `go.opentelemetry.io/otel/sdk/trace/tracetest`, plus `stdouttrace` for the example. +- **Repository becomes multi-module**. CI must build/test `./contrib/otel` in addition to the root module. The contrib module uses a `replace github.com/Azure/go-workflow => ../..` directive so it builds against in-tree core during development; releases drop or pin the replace. +- **Versioning**: contrib/otel will be tagged independently as `contrib/otel/v0.x.y` per Go submodule conventions. Initial release is `v0.1.0` (pre-1.0 surface). diff --git a/openspec/changes/archive/2026-05-15-contrib-otel-tracing/specs/contrib-otel/spec.md b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/specs/contrib-otel/spec.md new file mode 100644 index 0000000..9fef99a --- /dev/null +++ b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/specs/contrib-otel/spec.md @@ -0,0 +1,197 @@ +## ADDED Requirements + +### Requirement: Independent submodule + +The `contrib/otel` package SHALL be delivered as an independent Go module at module path `github.com/Azure/go-workflow/contrib/otel`, located at the repository directory `contrib/otel/`. The module SHALL declare Go 1.23 (matching the core module) and SHALL NOT cause the core module `github.com/Azure/go-workflow` to acquire any new direct or transitive dependency on OpenTelemetry packages. + +#### Scenario: Core go.mod is unchanged +- **GIVEN** the change has been applied +- **WHEN** the root `go.mod` of `github.com/Azure/go-workflow` is inspected +- **THEN** it contains no `require`, `replace`, or `exclude` line referencing any `go.opentelemetry.io/...` module +- **AND** `go mod tidy` at the repository root produces no diff caused by this change + +#### Scenario: Contrib module builds standalone +- **GIVEN** a checkout of the repository +- **WHEN** the developer runs `go build ./...` and `go test ./...` from `contrib/otel/` +- **THEN** the commands succeed using only the contrib module's `go.mod` +- **AND** the contrib `go.mod` declares the module path `github.com/Azure/go-workflow/contrib/otel` +- **AND** the contrib `go.mod` carries `go 1.23` + +#### Scenario: Contrib module resolves core via replace during development +- **GIVEN** the repository is checked out at the change branch +- **WHEN** `contrib/otel/go.mod` is inspected +- **THEN** it contains a `replace github.com/Azure/go-workflow => ../..` directive (or an equivalent local replace) so the contrib module builds against in-tree core sources + +--- + +### Requirement: Dependency policy + +The `contrib/otel` module's runtime (non-test) dependencies SHALL be limited to the OpenTelemetry **API** packages — `go.opentelemetry.io/otel` and `go.opentelemetry.io/otel/trace` (and their transitive requirements) — and `github.com/Azure/go-workflow`. The OpenTelemetry **SDK**, exporter packages, and `tracetest` SHALL appear only as test dependencies. + +#### Scenario: SDK is test-only +- **GIVEN** the contrib module +- **WHEN** `go list -deps -test=false ./...` is run inside `contrib/otel/` +- **THEN** the output contains no package under `go.opentelemetry.io/otel/sdk/...` +- **AND** the output contains no `stdouttrace` or other exporter packages + +#### Scenario: API packages available at runtime +- **WHEN** a consumer imports `github.com/Azure/go-workflow/contrib/otel` +- **THEN** the import graph includes `go.opentelemetry.io/otel/trace` and `go.opentelemetry.io/otel/attribute` + +--- + +### Requirement: Step interceptor factory + +`contrib/otel` SHALL export a function `NewStepInterceptor(opts ...Option) flow.StepInterceptor` that returns a value satisfying the `flow.StepInterceptor` interface. Each invocation of the returned `InterceptStep` method SHALL start exactly one span at the beginning of `next` and end it after `next` returns, regardless of how many retry attempts occur inside `next`. + +#### Scenario: One span per step on success +- **GIVEN** a Workflow with a single step `s` registered with `flow.WithStepInterceptor(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)))` where `tp` records spans +- **WHEN** the Workflow runs and `s` succeeds on the first attempt +- **THEN** exactly one span is recorded with the step span name +- **AND** the span has status `OK` (or unset, the OTel default for success) + +#### Scenario: One span per step despite retries +- **GIVEN** a Workflow whose step `s` is configured to retry up to 3 times and finally succeeds on attempt 2 +- **AND** only `NewStepInterceptor` is registered (no AttemptInterceptor) +- **WHEN** the Workflow runs +- **THEN** exactly one span is recorded for `s` (not three) + +#### Scenario: Step span records terminal failure +- **GIVEN** a step `s` whose final attempt returns a non-nil error `err` +- **WHEN** the step interceptor's outer `next` returns +- **THEN** the recorded span calls `RecordError(err)` and `SetStatus(codes.Error, err.Error())` +- **AND** the span attribute `workflow.step.status` equals `"error"` + +--- + +### Requirement: Attempt interceptor factory + +`contrib/otel` SHALL export a function `NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor` that returns a value satisfying the `flow.AttemptInterceptor` interface. Each invocation of `InterceptAttempt` SHALL start one span before calling `next` and end it after `next` returns. Each per-attempt span SHALL carry the attempt index as the OTel attribute `workflow.step.attempt`. + +#### Scenario: One attempt span per attempt +- **GIVEN** a Workflow whose step `s` is configured to retry up to 3 times and ultimately succeeds on attempt 2 +- **AND** only `NewAttemptInterceptor` is registered (no StepInterceptor) +- **WHEN** the Workflow runs +- **THEN** exactly three spans are recorded (attempt 0, attempt 1, attempt 2) +- **AND** each span carries attribute `workflow.step.attempt` with the matching 0-based index + +#### Scenario: Failing attempt records its error +- **GIVEN** an attempt that returns error `err` from `next` +- **WHEN** that attempt's span is ended +- **THEN** the span calls `RecordError(err)` and `SetStatus(codes.Error, err.Error())` + +--- + +### Requirement: Independent registration of the two layers + +The two interceptor factories SHALL be usable independently or together. When both are registered on the same Workflow, the attempt span SHALL be a child of the step span (i.e. the attempt's `Span.Parent.SpanID` equals the step span's `SpanID`). When only one is registered, that span SHALL be a child of whatever span (if any) is on the caller-provided context. + +#### Scenario: Both layers registered → parent/child relation +- **GIVEN** a Workflow registered with both `NewStepInterceptor` and `NewAttemptInterceptor` +- **WHEN** a single step runs once +- **THEN** the step span and the attempt span share the same `TraceID` +- **AND** the attempt span's `Parent.SpanID` equals the step span's `SpanID` + +#### Scenario: Only attempt layer registered → no orphan +- **GIVEN** a Workflow registered with only `NewAttemptInterceptor` and a caller-supplied context that already contains an outer span `outer` +- **WHEN** a step runs +- **THEN** the attempt span's `Parent.SpanID` equals `outer`'s `SpanID` + +--- + +### Requirement: Default span naming + +When no custom namer is supplied, the default step span name SHALL equal `flow.String(step)`, and the default attempt span name SHALL equal `fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt)`. + +#### Scenario: Default step span name +- **GIVEN** a step `s` for which `flow.String(s) == "MyStep"` +- **WHEN** the step interceptor produces a span without a custom `WithStepSpanNamer` +- **THEN** the span's name is `"MyStep"` + +#### Scenario: Default attempt span name carries attempt index +- **GIVEN** a step `s` for which `flow.String(s) == "MyStep"` running its second attempt (`attempt == 1`) +- **WHEN** the attempt interceptor produces a span without a custom `WithAttemptSpanNamer` +- **THEN** the span's name is `"MyStep (attempt 1)"` + +--- + +### Requirement: Default span attributes + +The step span SHALL be created with attribute `workflow.step.name = flow.String(step)`, and on `End` it SHALL receive attribute `workflow.step.status` equal to `"success"` when `next` returned nil and `"error"` otherwise. The attempt span SHALL be created with attributes `workflow.step.name = flow.String(step)` and `workflow.step.attempt = attempt` (as an `Int64` attribute). + +#### Scenario: Step span carries name and status +- **GIVEN** a step `s` that succeeds +- **WHEN** its span is ended +- **THEN** the span's attribute set contains `workflow.step.name = flow.String(s)` +- **AND** the attribute `workflow.step.status` equals `"success"` + +#### Scenario: Attempt span carries name and attempt index +- **GIVEN** an attempt at index 2 of step `s` +- **WHEN** its span is created +- **THEN** the span's attribute set contains `workflow.step.name = flow.String(s)` +- **AND** the attribute `workflow.step.attempt` equals the int64 `2` + +--- + +### Requirement: Functional options + +`contrib/otel` SHALL expose a single exported `Option` type and the following functional option constructors. Both factories SHALL accept the same `Option` values. Options that target one layer SHALL be no-ops on the other layer (no error, no panic). + +| Option | Effect | +|---|---| +| `WithTracerProvider(tp trace.TracerProvider)` | Sets the provider. If `nil` or unset, defaults to `otel.GetTracerProvider()` resolved at factory-call time. | +| `WithTracerName(name string)` | Sets the tracer instrumentation name. Default `"github.com/Azure/go-workflow/contrib/otel"`. | +| `WithStepSpanNamer(fn func(flow.Steper) string)` | Overrides default step span naming. | +| `WithAttemptSpanNamer(fn func(flow.Steper, uint64) string)` | Overrides default attempt span naming. | +| `WithStepAttributes(fn func(flow.Steper) []attribute.KeyValue)` | Adds extra attributes to the step span at start. Defaults still apply. | +| `WithAttemptAttributes(fn func(flow.Steper, uint64) []attribute.KeyValue)` | Adds extra attributes to the attempt span at start. Defaults still apply. | + +#### Scenario: Custom step namer overrides default +- **GIVEN** a `NewStepInterceptor(WithStepSpanNamer(func(flow.Steper) string { return "custom-name" }))` +- **WHEN** any step runs through the interceptor +- **THEN** its step span name is `"custom-name"` + +#### Scenario: Custom attempt attributes are appended, not replaced +- **GIVEN** a `NewAttemptInterceptor(WithAttemptAttributes(func(flow.Steper, uint64) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("custom.k", "v")} }))` +- **WHEN** an attempt span is created +- **THEN** it carries the default attributes (`workflow.step.name`, `workflow.step.attempt`) AND the custom attribute `custom.k = "v"` + +#### Scenario: Provider defaults to global +- **GIVEN** no `WithTracerProvider` option +- **WHEN** `NewStepInterceptor()` is called +- **THEN** the constructed interceptor obtains its tracer from `otel.GetTracerProvider()` at the moment of the call (not lazily on every interception) + +--- + +### Requirement: Cancellation is recorded as error + +When the error returned by `next` satisfies `errors.Is(err, context.Canceled)`, the span produced by either interceptor SHALL be ended with `RecordError(err)` and `SetStatus(codes.Error, err.Error())` — the same as any other non-nil error. No special-case suppression SHALL be applied in v0.1. + +#### Scenario: Cancelled step is Error +- **GIVEN** a step that observes context cancellation and returns `context.Canceled` +- **WHEN** the span is ended +- **THEN** the span has status code `codes.Error` +- **AND** the span's events include the recorded `context.Canceled` error + +--- + +### Requirement: Skipped and Canceled-by-Condition steps produce no spans + +The contrib package SHALL document, and tests SHALL verify, that a step whose `Condition` resolves to `Skipped` or `Canceled` produces zero spans through either interceptor — by virtue of the core capability `step-interceptor` bypassing the chain entirely for such steps. `contrib/otel` SHALL NOT implement any compensating behavior. + +#### Scenario: Skipped step contributes no span +- **GIVEN** a Workflow with a step `s` whose `Condition` returns `Skipped` +- **AND** both `NewStepInterceptor` and `NewAttemptInterceptor` are registered +- **WHEN** the Workflow runs +- **THEN** the span recorder contains zero spans referencing `s` + +--- + +### Requirement: Runnable godoc Example + +The `contrib/otel` module SHALL include a runnable godoc Example (`Example*` function in `example_test.go`) that constructs a `TracerProvider` with a stdout exporter, registers both interceptors on a minimal Workflow, and runs the Workflow. The Example SHALL compile and execute under `go test ./...` inside the contrib module without external network access. + +#### Scenario: Example builds and runs +- **WHEN** `go test ./... -run Example` is executed inside `contrib/otel/` +- **THEN** the command exits with status 0 +- **AND** the Example function appears in `go doc -all github.com/Azure/go-workflow/contrib/otel` diff --git a/openspec/changes/archive/2026-05-15-contrib-otel-tracing/tasks.md b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/tasks.md new file mode 100644 index 0000000..c8a362a --- /dev/null +++ b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/tasks.md @@ -0,0 +1,77 @@ +## 1. Module bootstrap + +- [x] 1.1 Create directory `contrib/otel/` and run `go mod init github.com/Azure/go-workflow/contrib/otel` inside it. +- [x] 1.2 Set `go 1.23` in the new `contrib/otel/go.mod`; add `replace github.com/Azure/go-workflow => ../..` and a `require github.com/Azure/go-workflow v0.0.0` directive so the module builds against in-tree core. +- [x] 1.3 Add runtime dependencies `go.opentelemetry.io/otel` and `go.opentelemetry.io/otel/trace`; run `go mod tidy` and commit `contrib/otel/go.sum`. +- [x] 1.4 Verify that the root `go.mod` and `go.sum` show no diff after the contrib module is created. + +## 2. Public API surface + +- [x] 2.1 Create `contrib/otel/options.go` with the unexported `config` struct and the exported `Option` function type. +- [x] 2.2 Implement `WithTracerProvider`, `WithTracerName`, `WithStepSpanNamer`, `WithAttemptSpanNamer`, `WithStepAttributes`, `WithAttemptAttributes`. Document on each which interceptor layer it affects (and that it is a no-op on the other). +- [x] 2.3 Implement an internal `(*config).resolveTracer()` helper that picks `cfg.tracerProvider` (or `otel.GetTracerProvider()` when unset) and returns `tp.Tracer(cfg.tracerName)` with default name `"github.com/Azure/go-workflow/contrib/otel"`. + +## 3. Step interceptor + +- [x] 3.1 Create `contrib/otel/step.go` exporting `NewStepInterceptor(opts ...Option) flow.StepInterceptor` returning a `flow.StepInterceptorFunc`. +- [x] 3.2 In the closure: resolve span name (default `flow.String(step)`, overridable), build start attributes (`workflow.step.name` plus user-supplied), call `tracer.Start(ctx, name, trace.WithAttributes(...))`, defer `span.End()`. +- [x] 3.3 After `next(ctx)` returns: set `workflow.step.status` to `"success"` or `"error"`, and on non-nil error call `span.RecordError(err)` + `span.SetStatus(codes.Error, err.Error())`. Treat `context.Canceled` like any other error. +- [x] 3.4 Return the error unchanged. + +## 4. Attempt interceptor + +- [x] 4.1 Create `contrib/otel/attempt.go` exporting `NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor` returning a `flow.AttemptInterceptorFunc`. +- [x] 4.2 Resolve span name (default `fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt)`, overridable), build start attributes (`workflow.step.name`, `workflow.step.attempt = int64(attempt)`, plus user-supplied), call `tracer.Start(ctx, name, trace.WithAttributes(...))`, defer `span.End()`. +- [x] 4.3 After `next(ctx)` returns, on non-nil error call `RecordError` + `SetStatus(codes.Error, …)`. Return the error unchanged. + +## 5. Test scaffolding + +- [x] 5.1 Add test-only dependencies in `contrib/otel/`: `go.opentelemetry.io/otel/sdk/trace` and `go.opentelemetry.io/otel/sdk/trace/tracetest` (and the SDK's transitive `go.opentelemetry.io/otel/sdk`). Confirm with `go list -deps -test=false ./...` that they are not pulled into the runtime graph. +- [x] 5.2 Create `contrib/otel/internal_test.go` (or `helpers_test.go`) with a small helper that builds a `(tp *sdktrace.TracerProvider, recorder *tracetest.SpanRecorder)` pair. + +## 6. Step interceptor tests (`step_test.go`) + +- [x] 6.1 Test `TestStepInterceptor_SuccessOneSpan`: single step, succeeds first try → exactly one span, name = `flow.String(step)`, attribute `workflow.step.status == "success"`, status code OK/Unset. +- [x] 6.2 Test `TestStepInterceptor_RetriesStillOneSpan`: step retries N times then succeeds → still exactly one step span. +- [x] 6.3 Test `TestStepInterceptor_FinalErrorRecorded`: step fails terminally → `workflow.step.status == "error"`, `Status.Code == codes.Error`, span events include the `RecordError` event. +- [x] 6.4 Test `TestStepInterceptor_ContextCanceled`: step returns `context.Canceled` → status = Error, error event recorded. +- [x] 6.5 Test `TestStepInterceptor_SkippedStepNoSpan`: step with Condition resolving to `Skipped` → recorder yields zero spans. +- [x] 6.6 Test `TestStepInterceptor_CustomNamer`: `WithStepSpanNamer` overrides the default. +- [x] 6.7 Test `TestStepInterceptor_CustomAttributesAppend`: `WithStepAttributes` adds extras while defaults remain. + +## 7. Attempt interceptor tests (`attempt_test.go`) + +- [x] 7.1 Test `TestAttemptInterceptor_OneSpanPerAttempt`: step with N+1 attempts → exactly N+1 spans, attempt indices 0..N. +- [x] 7.2 Test `TestAttemptInterceptor_DefaultName`: name format matches `"%s (attempt %d)"`. +- [x] 7.3 Test `TestAttemptInterceptor_FailingAttemptRecorded`: failed attempt span has Error status + RecordError event. +- [x] 7.4 Test `TestAttemptInterceptor_ChildOfCallerSpan`: when only attempt interceptor is registered, attempt span's `Parent.SpanID` matches an outer caller span. +- [x] 7.5 Test `TestAttemptInterceptor_CustomNamer` and `TestAttemptInterceptor_CustomAttributes`. + +## 8. Combined / integration tests (`integration_test.go`) + +- [x] 8.1 Test `TestBothLayers_AttemptIsChildOfStep`: register both, run a step, assert attempt span's `Parent.SpanID` equals step span's `SpanID` and `TraceID` matches. +- [x] 8.2 Test `TestBothLayers_RetryAttemptCount`: step retries M times, assert one step span and M+1 attempt spans, all sharing the trace. +- [x] 8.3 Test `TestProviderResolutionAtFactoryTime`: change global `otel.SetTracerProvider` after constructing the interceptor → spans still flow to the original provider. + +## 9. Godoc Example (`example_test.go`) + +- [x] 9.1 Add test dep `go.opentelemetry.io/otel/exporters/stdout/stdouttrace`. +- [x] 9.2 Write `func Example()` that builds a `TracerProvider` with `stdouttrace.New(stdouttrace.WithoutTimestamps())`, registers both interceptors on a 2-step workflow, and runs it. Use `// Output:` only if deterministic; otherwise document why output is omitted. +- [x] 9.3 Verify `go test ./... -run Example` passes inside `contrib/otel/`. + +## 10. Package documentation + +- [x] 10.1 Add a package-level doc comment in `contrib/otel/doc.go` covering: usage snippet, default span/attribute conventions, parent/child relation between layers, the `Skipped`/`Canceled-by-Condition` zero-span behavior, and the `context.Canceled = Error` policy. +- [x] 10.2 Add `contrib/otel/README.md` with the usage snippet, dependency policy notice, and a "must `cd contrib/otel && go test ./...`" note for contributors. + +## 11. Repository-level docs and follow-ups + +- [x] 11.1 Add a top-level mention of the new contrib module in the root `README.md` (one line under a "contrib" heading or a dedicated section). +- [x] 11.2 Open a tracking issue (or add a TODO note in `contrib/otel/README.md`) for: (a) adding a CI job that runs `go test ./...` inside `contrib/otel/`, and (b) the release-time checklist to drop or pin the `replace` directive before tagging `contrib/otel/v0.1.0`. + +## 12. Verification before completion + +- [x] 12.1 `go test ./...` from repository root passes (core unchanged). +- [x] 12.2 `go test ./...` from `contrib/otel/` passes; race detector also passes (`-race`). +- [x] 12.3 `openspec validate contrib-otel-tracing --strict` reports no errors. +- [x] 12.4 `go list -deps -test=false ./...` inside `contrib/otel/` confirms no SDK / exporter / tracetest packages in the runtime graph. diff --git a/openspec/specs/contrib-otel/spec.md b/openspec/specs/contrib-otel/spec.md new file mode 100644 index 0000000..9fef99a --- /dev/null +++ b/openspec/specs/contrib-otel/spec.md @@ -0,0 +1,197 @@ +## ADDED Requirements + +### Requirement: Independent submodule + +The `contrib/otel` package SHALL be delivered as an independent Go module at module path `github.com/Azure/go-workflow/contrib/otel`, located at the repository directory `contrib/otel/`. The module SHALL declare Go 1.23 (matching the core module) and SHALL NOT cause the core module `github.com/Azure/go-workflow` to acquire any new direct or transitive dependency on OpenTelemetry packages. + +#### Scenario: Core go.mod is unchanged +- **GIVEN** the change has been applied +- **WHEN** the root `go.mod` of `github.com/Azure/go-workflow` is inspected +- **THEN** it contains no `require`, `replace`, or `exclude` line referencing any `go.opentelemetry.io/...` module +- **AND** `go mod tidy` at the repository root produces no diff caused by this change + +#### Scenario: Contrib module builds standalone +- **GIVEN** a checkout of the repository +- **WHEN** the developer runs `go build ./...` and `go test ./...` from `contrib/otel/` +- **THEN** the commands succeed using only the contrib module's `go.mod` +- **AND** the contrib `go.mod` declares the module path `github.com/Azure/go-workflow/contrib/otel` +- **AND** the contrib `go.mod` carries `go 1.23` + +#### Scenario: Contrib module resolves core via replace during development +- **GIVEN** the repository is checked out at the change branch +- **WHEN** `contrib/otel/go.mod` is inspected +- **THEN** it contains a `replace github.com/Azure/go-workflow => ../..` directive (or an equivalent local replace) so the contrib module builds against in-tree core sources + +--- + +### Requirement: Dependency policy + +The `contrib/otel` module's runtime (non-test) dependencies SHALL be limited to the OpenTelemetry **API** packages — `go.opentelemetry.io/otel` and `go.opentelemetry.io/otel/trace` (and their transitive requirements) — and `github.com/Azure/go-workflow`. The OpenTelemetry **SDK**, exporter packages, and `tracetest` SHALL appear only as test dependencies. + +#### Scenario: SDK is test-only +- **GIVEN** the contrib module +- **WHEN** `go list -deps -test=false ./...` is run inside `contrib/otel/` +- **THEN** the output contains no package under `go.opentelemetry.io/otel/sdk/...` +- **AND** the output contains no `stdouttrace` or other exporter packages + +#### Scenario: API packages available at runtime +- **WHEN** a consumer imports `github.com/Azure/go-workflow/contrib/otel` +- **THEN** the import graph includes `go.opentelemetry.io/otel/trace` and `go.opentelemetry.io/otel/attribute` + +--- + +### Requirement: Step interceptor factory + +`contrib/otel` SHALL export a function `NewStepInterceptor(opts ...Option) flow.StepInterceptor` that returns a value satisfying the `flow.StepInterceptor` interface. Each invocation of the returned `InterceptStep` method SHALL start exactly one span at the beginning of `next` and end it after `next` returns, regardless of how many retry attempts occur inside `next`. + +#### Scenario: One span per step on success +- **GIVEN** a Workflow with a single step `s` registered with `flow.WithStepInterceptor(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)))` where `tp` records spans +- **WHEN** the Workflow runs and `s` succeeds on the first attempt +- **THEN** exactly one span is recorded with the step span name +- **AND** the span has status `OK` (or unset, the OTel default for success) + +#### Scenario: One span per step despite retries +- **GIVEN** a Workflow whose step `s` is configured to retry up to 3 times and finally succeeds on attempt 2 +- **AND** only `NewStepInterceptor` is registered (no AttemptInterceptor) +- **WHEN** the Workflow runs +- **THEN** exactly one span is recorded for `s` (not three) + +#### Scenario: Step span records terminal failure +- **GIVEN** a step `s` whose final attempt returns a non-nil error `err` +- **WHEN** the step interceptor's outer `next` returns +- **THEN** the recorded span calls `RecordError(err)` and `SetStatus(codes.Error, err.Error())` +- **AND** the span attribute `workflow.step.status` equals `"error"` + +--- + +### Requirement: Attempt interceptor factory + +`contrib/otel` SHALL export a function `NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor` that returns a value satisfying the `flow.AttemptInterceptor` interface. Each invocation of `InterceptAttempt` SHALL start one span before calling `next` and end it after `next` returns. Each per-attempt span SHALL carry the attempt index as the OTel attribute `workflow.step.attempt`. + +#### Scenario: One attempt span per attempt +- **GIVEN** a Workflow whose step `s` is configured to retry up to 3 times and ultimately succeeds on attempt 2 +- **AND** only `NewAttemptInterceptor` is registered (no StepInterceptor) +- **WHEN** the Workflow runs +- **THEN** exactly three spans are recorded (attempt 0, attempt 1, attempt 2) +- **AND** each span carries attribute `workflow.step.attempt` with the matching 0-based index + +#### Scenario: Failing attempt records its error +- **GIVEN** an attempt that returns error `err` from `next` +- **WHEN** that attempt's span is ended +- **THEN** the span calls `RecordError(err)` and `SetStatus(codes.Error, err.Error())` + +--- + +### Requirement: Independent registration of the two layers + +The two interceptor factories SHALL be usable independently or together. When both are registered on the same Workflow, the attempt span SHALL be a child of the step span (i.e. the attempt's `Span.Parent.SpanID` equals the step span's `SpanID`). When only one is registered, that span SHALL be a child of whatever span (if any) is on the caller-provided context. + +#### Scenario: Both layers registered → parent/child relation +- **GIVEN** a Workflow registered with both `NewStepInterceptor` and `NewAttemptInterceptor` +- **WHEN** a single step runs once +- **THEN** the step span and the attempt span share the same `TraceID` +- **AND** the attempt span's `Parent.SpanID` equals the step span's `SpanID` + +#### Scenario: Only attempt layer registered → no orphan +- **GIVEN** a Workflow registered with only `NewAttemptInterceptor` and a caller-supplied context that already contains an outer span `outer` +- **WHEN** a step runs +- **THEN** the attempt span's `Parent.SpanID` equals `outer`'s `SpanID` + +--- + +### Requirement: Default span naming + +When no custom namer is supplied, the default step span name SHALL equal `flow.String(step)`, and the default attempt span name SHALL equal `fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt)`. + +#### Scenario: Default step span name +- **GIVEN** a step `s` for which `flow.String(s) == "MyStep"` +- **WHEN** the step interceptor produces a span without a custom `WithStepSpanNamer` +- **THEN** the span's name is `"MyStep"` + +#### Scenario: Default attempt span name carries attempt index +- **GIVEN** a step `s` for which `flow.String(s) == "MyStep"` running its second attempt (`attempt == 1`) +- **WHEN** the attempt interceptor produces a span without a custom `WithAttemptSpanNamer` +- **THEN** the span's name is `"MyStep (attempt 1)"` + +--- + +### Requirement: Default span attributes + +The step span SHALL be created with attribute `workflow.step.name = flow.String(step)`, and on `End` it SHALL receive attribute `workflow.step.status` equal to `"success"` when `next` returned nil and `"error"` otherwise. The attempt span SHALL be created with attributes `workflow.step.name = flow.String(step)` and `workflow.step.attempt = attempt` (as an `Int64` attribute). + +#### Scenario: Step span carries name and status +- **GIVEN** a step `s` that succeeds +- **WHEN** its span is ended +- **THEN** the span's attribute set contains `workflow.step.name = flow.String(s)` +- **AND** the attribute `workflow.step.status` equals `"success"` + +#### Scenario: Attempt span carries name and attempt index +- **GIVEN** an attempt at index 2 of step `s` +- **WHEN** its span is created +- **THEN** the span's attribute set contains `workflow.step.name = flow.String(s)` +- **AND** the attribute `workflow.step.attempt` equals the int64 `2` + +--- + +### Requirement: Functional options + +`contrib/otel` SHALL expose a single exported `Option` type and the following functional option constructors. Both factories SHALL accept the same `Option` values. Options that target one layer SHALL be no-ops on the other layer (no error, no panic). + +| Option | Effect | +|---|---| +| `WithTracerProvider(tp trace.TracerProvider)` | Sets the provider. If `nil` or unset, defaults to `otel.GetTracerProvider()` resolved at factory-call time. | +| `WithTracerName(name string)` | Sets the tracer instrumentation name. Default `"github.com/Azure/go-workflow/contrib/otel"`. | +| `WithStepSpanNamer(fn func(flow.Steper) string)` | Overrides default step span naming. | +| `WithAttemptSpanNamer(fn func(flow.Steper, uint64) string)` | Overrides default attempt span naming. | +| `WithStepAttributes(fn func(flow.Steper) []attribute.KeyValue)` | Adds extra attributes to the step span at start. Defaults still apply. | +| `WithAttemptAttributes(fn func(flow.Steper, uint64) []attribute.KeyValue)` | Adds extra attributes to the attempt span at start. Defaults still apply. | + +#### Scenario: Custom step namer overrides default +- **GIVEN** a `NewStepInterceptor(WithStepSpanNamer(func(flow.Steper) string { return "custom-name" }))` +- **WHEN** any step runs through the interceptor +- **THEN** its step span name is `"custom-name"` + +#### Scenario: Custom attempt attributes are appended, not replaced +- **GIVEN** a `NewAttemptInterceptor(WithAttemptAttributes(func(flow.Steper, uint64) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("custom.k", "v")} }))` +- **WHEN** an attempt span is created +- **THEN** it carries the default attributes (`workflow.step.name`, `workflow.step.attempt`) AND the custom attribute `custom.k = "v"` + +#### Scenario: Provider defaults to global +- **GIVEN** no `WithTracerProvider` option +- **WHEN** `NewStepInterceptor()` is called +- **THEN** the constructed interceptor obtains its tracer from `otel.GetTracerProvider()` at the moment of the call (not lazily on every interception) + +--- + +### Requirement: Cancellation is recorded as error + +When the error returned by `next` satisfies `errors.Is(err, context.Canceled)`, the span produced by either interceptor SHALL be ended with `RecordError(err)` and `SetStatus(codes.Error, err.Error())` — the same as any other non-nil error. No special-case suppression SHALL be applied in v0.1. + +#### Scenario: Cancelled step is Error +- **GIVEN** a step that observes context cancellation and returns `context.Canceled` +- **WHEN** the span is ended +- **THEN** the span has status code `codes.Error` +- **AND** the span's events include the recorded `context.Canceled` error + +--- + +### Requirement: Skipped and Canceled-by-Condition steps produce no spans + +The contrib package SHALL document, and tests SHALL verify, that a step whose `Condition` resolves to `Skipped` or `Canceled` produces zero spans through either interceptor — by virtue of the core capability `step-interceptor` bypassing the chain entirely for such steps. `contrib/otel` SHALL NOT implement any compensating behavior. + +#### Scenario: Skipped step contributes no span +- **GIVEN** a Workflow with a step `s` whose `Condition` returns `Skipped` +- **AND** both `NewStepInterceptor` and `NewAttemptInterceptor` are registered +- **WHEN** the Workflow runs +- **THEN** the span recorder contains zero spans referencing `s` + +--- + +### Requirement: Runnable godoc Example + +The `contrib/otel` module SHALL include a runnable godoc Example (`Example*` function in `example_test.go`) that constructs a `TracerProvider` with a stdout exporter, registers both interceptors on a minimal Workflow, and runs the Workflow. The Example SHALL compile and execute under `go test ./...` inside the contrib module without external network access. + +#### Scenario: Example builds and runs +- **WHEN** `go test ./... -run Example` is executed inside `contrib/otel/` +- **THEN** the command exits with status 0 +- **AND** the Example function appears in `go doc -all github.com/Azure/go-workflow/contrib/otel`