From a3b0614cd218b138520837d09749de1e2804e9c0 Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Thu, 14 May 2026 11:41:54 +0000 Subject: [PATCH 01/13] feat(contrib/otel): bootstrap submodule --- contrib/otel/deps_test.go | 6 ++++++ contrib/otel/doc.go | 1 + contrib/otel/go.mod | 18 ++++++++++++++++++ contrib/otel/go.sum | 31 +++++++++++++++++++++++++++++++ 4 files changed, 56 insertions(+) create mode 100644 contrib/otel/deps_test.go create mode 100644 contrib/otel/doc.go create mode 100644 contrib/otel/go.mod create mode 100644 contrib/otel/go.sum diff --git a/contrib/otel/deps_test.go b/contrib/otel/deps_test.go new file mode 100644 index 0000000..e456613 --- /dev/null +++ b/contrib/otel/deps_test.go @@ -0,0 +1,6 @@ +package otel_test + +import ( + _ "go.opentelemetry.io/otel/sdk/trace" + _ "go.opentelemetry.io/otel/sdk/trace/tracetest" +) diff --git a/contrib/otel/doc.go b/contrib/otel/doc.go new file mode 100644 index 0000000..9bee48e --- /dev/null +++ b/contrib/otel/doc.go @@ -0,0 +1 @@ +package otel diff --git a/contrib/otel/go.mod b/contrib/otel/go.mod new file mode 100644 index 0000000..55b60d2 --- /dev/null +++ b/contrib/otel/go.mod @@ -0,0 +1,18 @@ +module github.com/Azure/go-workflow/contrib/otel + +go 1.23.0 + +require go.opentelemetry.io/otel/sdk v1.37.0 + +require ( + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel v1.37.0 // indirect + go.opentelemetry.io/otel/metric v1.37.0 // indirect + go.opentelemetry.io/otel/trace v1.37.0 // indirect + golang.org/x/sys v0.34.0 // indirect +) + +replace github.com/Azure/go-workflow => ../.. diff --git a/contrib/otel/go.sum b/contrib/otel/go.sum new file mode 100644 index 0000000..0a82940 --- /dev/null +++ b/contrib/otel/go.sum @@ -0,0 +1,31 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= +golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 6f5fd52f4a67954df9f77bb846ed8d6d8277527c Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Thu, 14 May 2026 12:07:25 +0000 Subject: [PATCH 02/13] docs(contrib/otel): explain deps_test.go as bootstrap anchor --- contrib/otel/deps_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/contrib/otel/deps_test.go b/contrib/otel/deps_test.go index e456613..64c66fc 100644 --- a/contrib/otel/deps_test.go +++ b/contrib/otel/deps_test.go @@ -1,3 +1,7 @@ +// Anchors test-only OpenTelemetry SDK dependencies in go.mod / go.sum so the +// dependency policy for the contrib/otel module (SDK is test-only) is observable +// from the bootstrap commit alone. Real tests in later tasks import these +// packages directly; remove this file once they do. package otel_test import ( From 8e9301bdf5efc097fd3cadcb4db3140294d7a7e7 Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Thu, 14 May 2026 12:10:52 +0000 Subject: [PATCH 03/13] feat(contrib/otel): add Option API and config helpers Co-Authored-By: Claude Opus 4.7 (1M context) --- contrib/otel/go.mod | 11 +++- contrib/otel/go.sum | 10 ++- contrib/otel/options.go | 134 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+), 5 deletions(-) create mode 100644 contrib/otel/options.go diff --git a/contrib/otel/go.mod b/contrib/otel/go.mod index 55b60d2..775615b 100644 --- a/contrib/otel/go.mod +++ b/contrib/otel/go.mod @@ -2,16 +2,21 @@ module github.com/Azure/go-workflow/contrib/otel go 1.23.0 -require go.opentelemetry.io/otel/sdk v1.37.0 +require ( + github.com/Azure/go-workflow v0.0.0-00010101000000-000000000000 + go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/sdk v1.37.0 + go.opentelemetry.io/otel/trace v1.37.0 +) require ( + github.com/benbjohnson/clock v1.3.5 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/otel v1.37.0 // indirect go.opentelemetry.io/otel/metric v1.37.0 // indirect - go.opentelemetry.io/otel/trace v1.37.0 // indirect golang.org/x/sys v0.34.0 // indirect ) diff --git a/contrib/otel/go.sum b/contrib/otel/go.sum index 0a82940..6eb6954 100644 --- a/contrib/otel/go.sum +++ b/contrib/otel/go.sum @@ -1,3 +1,7 @@ +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -11,8 +15,10 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= diff --git a/contrib/otel/options.go b/contrib/otel/options.go new file mode 100644 index 0000000..07ea535 --- /dev/null +++ b/contrib/otel/options.go @@ -0,0 +1,134 @@ +package otel + +import ( + flow "github.com/Azure/go-workflow" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// defaultTracerName is the default instrumentation name passed to +// TracerProvider.Tracer when WithTracerName is not used. +const defaultTracerName = "github.com/Azure/go-workflow/contrib/otel" + +// config is the resolved configuration shared by both interceptor factories +// (NewStepInterceptor and NewAttemptInterceptor). The same Option values are +// accepted by both factories; options that target one layer are no-ops on the +// other. +type config struct { + tracerProvider trace.TracerProvider + tracerName string + stepSpanNamer func(flow.Steper) string + attemptSpanNamer func(flow.Steper, uint64) string + stepAttributes func(flow.Steper) []attribute.KeyValue + attemptAttributes func(flow.Steper, uint64) []attribute.KeyValue +} + +// Option configures a step or attempt interceptor produced by +// NewStepInterceptor / NewAttemptInterceptor. The same Option type is accepted +// by both factories; each option's documentation states which interceptor it +// affects (the other factory ignores it). +type Option func(*config) + +// newConfig applies opts to a fresh config and returns it. nil options are +// skipped so callers can build slices conditionally. +func newConfig(opts []Option) *config { + c := &config{} + for _, o := range opts { + if o != nil { + o(c) + } + } + return c +} + +// WithTracerProvider sets the OpenTelemetry TracerProvider used to obtain the +// Tracer that emits step and attempt spans. When unset (or set to nil), each +// factory falls back to otel.GetTracerProvider() at the moment +// NewStepInterceptor / NewAttemptInterceptor is called (not lazily on every +// interception). +// +// Affects: NewStepInterceptor, NewAttemptInterceptor. +func WithTracerProvider(tp trace.TracerProvider) Option { + return func(c *config) { c.tracerProvider = tp } +} + +// WithTracerName overrides the instrumentation name passed to +// TracerProvider.Tracer. Default: +// "github.com/Azure/go-workflow/contrib/otel". +// +// Affects: NewStepInterceptor, NewAttemptInterceptor. +func WithTracerName(name string) Option { + return func(c *config) { c.tracerName = name } +} + +// WithStepSpanNamer overrides the default step span name (flow.String(step)) +// with a caller-supplied function. Passing a nil fn is a no-op and leaves the +// previously configured (or default) namer in place. +// +// Affects: NewStepInterceptor only. NewAttemptInterceptor ignores this option. +func WithStepSpanNamer(fn func(flow.Steper) string) Option { + return func(c *config) { + if fn != nil { + c.stepSpanNamer = fn + } + } +} + +// WithAttemptSpanNamer overrides the default attempt span name +// (" (attempt N)") with a caller-supplied function. Passing a nil fn is +// a no-op and leaves the previously configured (or default) namer in place. +// +// Affects: NewAttemptInterceptor only. NewStepInterceptor ignores this option. +func WithAttemptSpanNamer(fn func(flow.Steper, uint64) string) Option { + return func(c *config) { + if fn != nil { + c.attemptSpanNamer = fn + } + } +} + +// WithStepAttributes registers a function that returns extra attributes to +// attach to step spans at span-start time. The returned attributes are added +// in addition to the defaults (e.g. workflow.step.name). Passing a nil fn is +// a no-op. +// +// Affects: NewStepInterceptor only. NewAttemptInterceptor ignores this option. +func WithStepAttributes(fn func(flow.Steper) []attribute.KeyValue) Option { + return func(c *config) { + if fn != nil { + c.stepAttributes = fn + } + } +} + +// WithAttemptAttributes registers a function that returns extra attributes to +// attach to attempt spans at span-start time. The returned attributes are +// added in addition to the defaults (e.g. workflow.step.name, +// workflow.step.attempt). Passing a nil fn is a no-op. +// +// Affects: NewAttemptInterceptor only. NewStepInterceptor ignores this option. +func WithAttemptAttributes(fn func(flow.Steper, uint64) []attribute.KeyValue) Option { + return func(c *config) { + if fn != nil { + c.attemptAttributes = fn + } + } +} + +// resolveTracer picks the configured TracerProvider (falling back to the +// global provider via otel.GetTracerProvider when nil) and returns a Tracer +// with the configured (or default) instrumentation name. It is intended to be +// called once per factory invocation at construction time; the resulting +// Tracer is captured by the returned interceptor closure. +func (c *config) resolveTracer() trace.Tracer { + tp := c.tracerProvider + if tp == nil { + tp = otel.GetTracerProvider() + } + name := c.tracerName + if name == "" { + name = defaultTracerName + } + return tp.Tracer(name) +} From 17cb9a8cde8d9f1a92f64c2225b688c08478f988 Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Thu, 14 May 2026 12:35:21 +0000 Subject: [PATCH 04/13] feat(contrib/otel): add step interceptor and tests Implements Task 3 of the contrib-otel-tracing change: NewStepInterceptor emits exactly one OpenTelemetry span per Step lifetime (covering all retry attempts), with workflow.step.name + workflow.step.status attributes and codes.Error / RecordError on failure (context.Canceled included). Skipped/Canceled-by-Condition steps bypass the chain and produce no span. Replaces deps_test.go: helpers_test.go now imports the SDK + tracetest packages directly, anchoring them to the test graph. Co-Authored-By: Claude Opus 4.7 (1M context) --- contrib/otel/deps_test.go | 10 -- contrib/otel/go.mod | 6 +- contrib/otel/go.sum | 9 ++ contrib/otel/helpers_test.go | 16 +++ contrib/otel/step.go | 61 ++++++++++ contrib/otel/step_test.go | 222 +++++++++++++++++++++++++++++++++++ 6 files changed, 313 insertions(+), 11 deletions(-) delete mode 100644 contrib/otel/deps_test.go create mode 100644 contrib/otel/helpers_test.go create mode 100644 contrib/otel/step.go create mode 100644 contrib/otel/step_test.go diff --git a/contrib/otel/deps_test.go b/contrib/otel/deps_test.go deleted file mode 100644 index 64c66fc..0000000 --- a/contrib/otel/deps_test.go +++ /dev/null @@ -1,10 +0,0 @@ -// Anchors test-only OpenTelemetry SDK dependencies in go.mod / go.sum so the -// dependency policy for the contrib/otel module (SDK is test-only) is observable -// from the bootstrap commit alone. Real tests in later tasks import these -// packages directly; remove this file once they do. -package otel_test - -import ( - _ "go.opentelemetry.io/otel/sdk/trace" - _ "go.opentelemetry.io/otel/sdk/trace/tracetest" -) diff --git a/contrib/otel/go.mod b/contrib/otel/go.mod index 775615b..901d989 100644 --- a/contrib/otel/go.mod +++ b/contrib/otel/go.mod @@ -4,6 +4,8 @@ go 1.23.0 require ( github.com/Azure/go-workflow v0.0.0-00010101000000-000000000000 + github.com/cenkalti/backoff/v4 v4.3.0 + github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.37.0 go.opentelemetry.io/otel/sdk v1.37.0 go.opentelemetry.io/otel/trace v1.37.0 @@ -11,13 +13,15 @@ require ( require ( github.com/benbjohnson/clock v1.3.5 // indirect - github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel/metric v1.37.0 // indirect golang.org/x/sys v0.34.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) replace github.com/Azure/go-workflow => ../.. diff --git a/contrib/otel/go.sum b/contrib/otel/go.sum index 6eb6954..38982d4 100644 --- a/contrib/otel/go.sum +++ b/contrib/otel/go.sum @@ -13,8 +13,14 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= @@ -33,5 +39,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/contrib/otel/helpers_test.go b/contrib/otel/helpers_test.go new file mode 100644 index 0000000..0d53308 --- /dev/null +++ b/contrib/otel/helpers_test.go @@ -0,0 +1,16 @@ +package otel_test + +import ( + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +// newRecorderTracerProvider returns a TracerProvider wired to a synchronous +// in-memory SpanRecorder, suitable for asserting the spans emitted by the +// step / attempt interceptors. SpanRecorder implements SpanProcessor, so no +// extra batching or simple processor is needed. +func newRecorderTracerProvider() (*sdktrace.TracerProvider, *tracetest.SpanRecorder) { + rec := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)) + return tp, rec +} diff --git a/contrib/otel/step.go b/contrib/otel/step.go new file mode 100644 index 0000000..98122d0 --- /dev/null +++ b/contrib/otel/step.go @@ -0,0 +1,61 @@ +package otel + +import ( + "context" + + flow "github.com/Azure/go-workflow" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// Attribute keys and status values emitted on step spans. +const ( + attrStepName = "workflow.step.name" + attrStepStatus = "workflow.step.status" + statusSuccess = "success" + statusError = "error" +) + +// NewStepInterceptor returns a flow.StepInterceptor that emits one +// OpenTelemetry span per Step lifetime (covering all retry attempts). +// +// The span name defaults to flow.String(step) and may be overridden via +// WithStepSpanNamer. The default attribute set always includes +// workflow.step.name = flow.String(step) and, after next() returns, +// workflow.step.status ∈ {"success", "error"}. Extra attributes can be +// supplied via WithStepAttributes; they are appended to (not in place of) +// the defaults at span-start time. +// +// On a non-nil error from next() the span records the error via +// span.RecordError and sets its status to codes.Error. context.Canceled +// is treated like any other error (no special-case). +// +// Steps that the scheduler settles inline (Skipped or Canceled by their +// Condition) bypass the interceptor chain entirely and produce no span. +func NewStepInterceptor(opts ...Option) flow.StepInterceptor { + cfg := newConfig(opts) + tracer := cfg.resolveTracer() + return flow.StepInterceptorFunc(func(ctx context.Context, step flow.Steper, next func(context.Context) error) error { + spanName := flow.String(step) + if cfg.stepSpanNamer != nil { + spanName = cfg.stepSpanNamer(step) + } + attrs := []attribute.KeyValue{attribute.String(attrStepName, flow.String(step))} + if cfg.stepAttributes != nil { + attrs = append(attrs, cfg.stepAttributes(step)...) + } + ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes(attrs...)) + defer span.End() + + err := next(ctx) + if err != nil { + span.SetAttributes(attribute.String(attrStepStatus, statusError)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetAttributes(attribute.String(attrStepStatus, statusSuccess)) + } + return err + }) +} diff --git a/contrib/otel/step_test.go b/contrib/otel/step_test.go new file mode 100644 index 0000000..fd226c1 --- /dev/null +++ b/contrib/otel/step_test.go @@ -0,0 +1,222 @@ +package otel_test + +import ( + "context" + "errors" + "testing" + + flow "github.com/Azure/go-workflow" + otelflow "github.com/Azure/go-workflow/contrib/otel" + + "github.com/cenkalti/backoff/v4" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// retryStep counts attempts and only succeeds on the Nth try. +type retryStep struct { + Name string + NeedAttempts int + Attempts int +} + +func (s *retryStep) String() string { return s.Name } +func (s *retryStep) Do(_ context.Context) error { + s.Attempts++ + if s.Attempts < s.NeedAttempts { + return errors.New("transient") + } + return nil +} + +// alwaysFail is a Step whose Do always returns the configured error. +type alwaysFail struct { + Name string + Err error +} + +func (s *alwaysFail) String() string { return s.Name } +func (s *alwaysFail) Do(_ context.Context) error { return s.Err } + +// findAttr looks up the named attribute key in the slice. Returns the +// attribute and whether it was found. +func findAttr(attrs []attribute.KeyValue, key string) (attribute.KeyValue, bool) { + for _, a := range attrs { + if string(a.Key) == key { + return a, true + } + } + return attribute.KeyValue{}, false +} + +func assertAttr(t *testing.T, attrs []attribute.KeyValue, key, want string) { + t.Helper() + a, ok := findAttr(attrs, key) + if !assert.True(t, ok, "attribute %q not found", key) { + return + } + assert.Equal(t, want, a.Value.AsString(), "attribute %q value mismatch", key) +} + +// noBackoff returns a RetryOption mutator that disables real backoff sleeps +// so retry tests run instantly. +func noBackoff(attempts uint64) func(*flow.RetryOption) { + return func(ro *flow.RetryOption) { + ro.Attempts = attempts + ro.Backoff = &backoff.ZeroBackOff{} + } +} + +func newWorkflow(ic flow.StepInterceptor) *flow.Workflow { + return &flow.Workflow{Option: flow.WorkflowOption{ + StepInterceptors: []flow.StepInterceptor{ic}, + }} +} + +func TestStepInterceptor_SuccessOneSpan(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("MyStep") + w := newWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp))) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + s := spans[0] + assert.Equal(t, "MyStep", s.Name()) + assertAttr(t, s.Attributes(), "workflow.step.name", "MyStep") + assertAttr(t, s.Attributes(), "workflow.step.status", "success") + assert.Equal(t, codes.Unset, s.Status().Code, "no SetStatus on success") +} + +func TestStepInterceptor_RetriesStillOneSpan(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &retryStep{Name: "Flaky", NeedAttempts: 3} + w := newWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp))) + w.Add(flow.Step(step).Retry(noBackoff(5))) + require.NoError(t, w.Do(context.Background())) + + assert.Equal(t, 3, step.Attempts, "step should have been attempted 3 times") + spans := rec.Ended() + require.Len(t, spans, 1, "step interceptor must emit exactly one span across retries") + s := spans[0] + assertAttr(t, s.Attributes(), "workflow.step.status", "success") +} + +func TestStepInterceptor_FinalErrorRecorded(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + boom := errors.New("boom") + step := &alwaysFail{Name: "Fail", Err: boom} + w := newWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp))) + w.Add(flow.Step(step).Retry(noBackoff(2))) + err := w.Do(context.Background()) + require.Error(t, err) + + spans := rec.Ended() + require.Len(t, spans, 1) + s := spans[0] + assertAttr(t, s.Attributes(), "workflow.step.status", "error") + assert.Equal(t, codes.Error, s.Status().Code) + + events := s.Events() + var sawException bool + for _, ev := range events { + if ev.Name == "exception" { + sawException = true + break + } + } + assert.True(t, sawException, "expected RecordError to add an 'exception' event; got %+v", events) +} + +func TestStepInterceptor_ContextCanceled(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &alwaysFail{Name: "Cancel", Err: context.Canceled} + w := newWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp))) + w.Add(flow.Step(step)) + _ = w.Do(context.Background()) + + spans := rec.Ended() + require.Len(t, spans, 1) + s := spans[0] + assert.Equal(t, codes.Error, s.Status().Code) + var sawException bool + for _, ev := range s.Events() { + if ev.Name == "exception" { + sawException = true + break + } + } + assert.True(t, sawException, "context.Canceled should record an exception event") +} + +func TestStepInterceptor_SkippedStepNoSpan(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + skipMe := flow.NoOp("Skipped") + w := newWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp))) + w.Add(flow.Step(skipMe).When(func(context.Context, map[flow.Steper]flow.StepResult) flow.StepStatus { + return flow.Skipped + })) + require.NoError(t, w.Do(context.Background())) + + assert.Empty(t, rec.Ended(), "Skipped steps must bypass the interceptor chain") + // also assert nothing was started. + var _ []sdktrace.ReadWriteSpan = rec.Started() + assert.Empty(t, rec.Started()) +} + +func TestStepInterceptor_CustomNamer(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("Original") + namer := func(s flow.Steper) string { return "custom:" + flow.String(s) } + w := newWorkflow(otelflow.NewStepInterceptor( + otelflow.WithTracerProvider(tp), + otelflow.WithStepSpanNamer(namer), + )) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + s := spans[0] + assert.Equal(t, "custom:Original", s.Name(), "custom span namer should win") + // workflow.step.name attribute still uses flow.String(step) per spec. + assertAttr(t, s.Attributes(), "workflow.step.name", "Original") +} + +func TestStepInterceptor_CustomAttributesAppend(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("Hello") + extras := func(flow.Steper) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("env", "test"), + attribute.Int("answer", 42), + } + } + w := newWorkflow(otelflow.NewStepInterceptor( + otelflow.WithTracerProvider(tp), + otelflow.WithStepAttributes(extras), + )) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + attrs := spans[0].Attributes() + assertAttr(t, attrs, "workflow.step.name", "Hello") // default still present + assertAttr(t, attrs, "workflow.step.status", "success") // default still present + assertAttr(t, attrs, "env", "test") // user-supplied + a, ok := findAttr(attrs, "answer") + require.True(t, ok, "custom int attribute missing") + assert.Equal(t, int64(42), a.Value.AsInt64()) +} From 1d5014ef2b1ac7f55cfbacf45486909fbc8835f4 Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Thu, 14 May 2026 13:13:29 +0000 Subject: [PATCH 05/13] refactor(contrib/otel): canonical attrs win, extract consts, polish tests Co-Authored-By: Claude Opus 4.7 (1M context) --- contrib/otel/consts.go | 11 +++++++++++ contrib/otel/options.go | 4 ++++ contrib/otel/step.go | 17 +++++++---------- contrib/otel/step_test.go | 32 +++++++++++++++++++++++++++----- 4 files changed, 49 insertions(+), 15 deletions(-) create mode 100644 contrib/otel/consts.go diff --git a/contrib/otel/consts.go b/contrib/otel/consts.go new file mode 100644 index 0000000..f13748b --- /dev/null +++ b/contrib/otel/consts.go @@ -0,0 +1,11 @@ +package otel + +// Attribute keys and status values emitted by the contrib/otel interceptors. +const ( + attrStepName = "workflow.step.name" + attrStepStatus = "workflow.step.status" + attrStepAttempt = "workflow.step.attempt" + + statusSuccess = "success" + statusError = "error" +) diff --git a/contrib/otel/options.go b/contrib/otel/options.go index 07ea535..5e03c5d 100644 --- a/contrib/otel/options.go +++ b/contrib/otel/options.go @@ -93,6 +93,10 @@ func WithAttemptSpanNamer(fn func(flow.Steper, uint64) string) Option { // in addition to the defaults (e.g. workflow.step.name). Passing a nil fn is // a no-op. // +// Note: canonical attributes set by the interceptor (workflow.step.name, +// workflow.step.status) cannot be overridden by this option; passing those +// keys is silently superseded. +// // Affects: NewStepInterceptor only. NewAttemptInterceptor ignores this option. func WithStepAttributes(fn func(flow.Steper) []attribute.KeyValue) Option { return func(c *config) { diff --git a/contrib/otel/step.go b/contrib/otel/step.go index 98122d0..cf0d0bf 100644 --- a/contrib/otel/step.go +++ b/contrib/otel/step.go @@ -9,14 +9,6 @@ import ( "go.opentelemetry.io/otel/trace" ) -// Attribute keys and status values emitted on step spans. -const ( - attrStepName = "workflow.step.name" - attrStepStatus = "workflow.step.status" - statusSuccess = "success" - statusError = "error" -) - // NewStepInterceptor returns a flow.StepInterceptor that emits one // OpenTelemetry span per Step lifetime (covering all retry attempts). // @@ -25,7 +17,9 @@ const ( // workflow.step.name = flow.String(step) and, after next() returns, // workflow.step.status ∈ {"success", "error"}. Extra attributes can be // supplied via WithStepAttributes; they are appended to (not in place of) -// the defaults at span-start time. +// the defaults at span-start time. Canonical attributes (workflow.step.name, +// workflow.step.status) always win over user-supplied attributes — i.e., +// WithStepAttributes cannot override them. // // On a non-nil error from next() the span records the error via // span.RecordError and sets its status to codes.Error. context.Canceled @@ -41,10 +35,13 @@ func NewStepInterceptor(opts ...Option) flow.StepInterceptor { if cfg.stepSpanNamer != nil { spanName = cfg.stepSpanNamer(step) } - attrs := []attribute.KeyValue{attribute.String(attrStepName, flow.String(step))} + // User attributes first, canonical defaults last so OTel's + // last-write-wins semantics keep canonical attrs authoritative. + attrs := make([]attribute.KeyValue, 0, 4) if cfg.stepAttributes != nil { attrs = append(attrs, cfg.stepAttributes(step)...) } + attrs = append(attrs, attribute.String(attrStepName, flow.String(step))) ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes(attrs...)) defer span.End() diff --git a/contrib/otel/step_test.go b/contrib/otel/step_test.go index fd226c1..46101f6 100644 --- a/contrib/otel/step_test.go +++ b/contrib/otel/step_test.go @@ -13,9 +13,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - sdktrace "go.opentelemetry.io/otel/sdk/trace" ) +// exceptionEventName is the OTel SDK convention for span events emitted by +// RecordError. +const exceptionEventName = "exception" + // retryStep counts attempts and only succeeds on the Nth try. type retryStep struct { Name string @@ -127,7 +130,7 @@ func TestStepInterceptor_FinalErrorRecorded(t *testing.T) { events := s.Events() var sawException bool for _, ev := range events { - if ev.Name == "exception" { + if ev.Name == exceptionEventName { sawException = true break } @@ -149,7 +152,7 @@ func TestStepInterceptor_ContextCanceled(t *testing.T) { assert.Equal(t, codes.Error, s.Status().Code) var sawException bool for _, ev := range s.Events() { - if ev.Name == "exception" { + if ev.Name == exceptionEventName { sawException = true break } @@ -168,8 +171,7 @@ func TestStepInterceptor_SkippedStepNoSpan(t *testing.T) { require.NoError(t, w.Do(context.Background())) assert.Empty(t, rec.Ended(), "Skipped steps must bypass the interceptor chain") - // also assert nothing was started. - var _ []sdktrace.ReadWriteSpan = rec.Started() + // neither Started() nor Ended() should fire for a Skipped step. assert.Empty(t, rec.Started()) } @@ -220,3 +222,23 @@ func TestStepInterceptor_CustomAttributesAppend(t *testing.T) { require.True(t, ok, "custom int attribute missing") assert.Equal(t, int64(42), a.Value.AsInt64()) } + +func TestStepInterceptor_UserAttributeCannotOverrideCanonicalName(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("Real") + hijack := func(flow.Steper) []attribute.KeyValue { + return []attribute.KeyValue{attribute.String("workflow.step.name", "HACKED")} + } + w := newWorkflow(otelflow.NewStepInterceptor( + otelflow.WithTracerProvider(tp), + otelflow.WithStepAttributes(hijack), + )) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + // Canonical attribute must win over user-supplied override. + assertAttr(t, spans[0].Attributes(), "workflow.step.name", flow.String(step)) +} From 910ea3dbc1765a678999775bd4438bd23d83096c Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Thu, 14 May 2026 13:48:40 +0000 Subject: [PATCH 06/13] feat(contrib/otel): add attempt interceptor and tests Implements NewAttemptInterceptor: one OTel span per attempt, default name " (attempt N)", canonical attrs workflow.step.name and workflow.step.attempt (int64), error path records via RecordError + SetStatus(codes.Error). User-supplied attributes are appended but the canonical pair always wins (last-write-wins). WithAttemptAttributes godoc in options.go updated to document this precedence symmetric with WithStepAttributes. Tests (package otel_test, reusing helpers from step_test.go): - TestAttemptInterceptor_OneSpanPerAttempt - TestAttemptInterceptor_DefaultName - TestAttemptInterceptor_FailingAttemptRecorded - TestAttemptInterceptor_ChildOfCallerSpan - TestAttemptInterceptor_CustomNamer - TestAttemptInterceptor_CustomAttributes (regression: user cannot override workflow.step.attempt) --- contrib/otel/attempt.go | 65 +++++++++++++ contrib/otel/attempt_test.go | 173 +++++++++++++++++++++++++++++++++++ contrib/otel/options.go | 4 + 3 files changed, 242 insertions(+) create mode 100644 contrib/otel/attempt.go create mode 100644 contrib/otel/attempt_test.go diff --git a/contrib/otel/attempt.go b/contrib/otel/attempt.go new file mode 100644 index 0000000..fbf4890 --- /dev/null +++ b/contrib/otel/attempt.go @@ -0,0 +1,65 @@ +package otel + +import ( + "context" + "fmt" + + flow "github.com/Azure/go-workflow" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// NewAttemptInterceptor returns a flow.AttemptInterceptor that emits one +// OpenTelemetry span per attempt (one per call into the retry loop). +// +// The span name defaults to fmt.Sprintf("%s (attempt %d)", flow.String(step), +// attempt) and may be overridden via WithAttemptSpanNamer. The default +// attribute set always includes workflow.step.name = flow.String(step) and +// workflow.step.attempt = int64(attempt). Extra attributes can be supplied +// via WithAttemptAttributes; they are appended to (not in place of) the +// defaults at span-start time. Canonical attributes (workflow.step.name, +// workflow.step.attempt) always win over user-supplied attributes — i.e., +// WithAttemptAttributes cannot override them. +// +// On a non-nil error from next() the span records the error via +// span.RecordError and sets its status to codes.Error. context.Canceled +// is treated like any other error (no special-case). +// +// Steps that the scheduler settles inline (Skipped or Canceled by their +// Condition) bypass the interceptor chain entirely and produce no span. +func NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor { + cfg := newConfig(opts) + tracer := cfg.resolveTracer() + return flow.AttemptInterceptorFunc(func(ctx context.Context, step flow.Steper, attempt uint64, next func(context.Context) error) error { + spanName := defaultAttemptSpanName(step, attempt) + if cfg.attemptSpanNamer != nil { + spanName = cfg.attemptSpanNamer(step, attempt) + } + // User attributes first, canonical defaults last so OTel's + // last-write-wins semantics keep canonical attrs authoritative. + attrs := make([]attribute.KeyValue, 0, 4) + if cfg.attemptAttributes != nil { + attrs = append(attrs, cfg.attemptAttributes(step, attempt)...) + } + attrs = append(attrs, + attribute.String(attrStepName, flow.String(step)), + attribute.Int64(attrStepAttempt, int64(attempt)), + ) + ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes(attrs...)) + defer span.End() + + err := next(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + return err + }) +} + +// defaultAttemptSpanName returns the canonical attempt span name used when no +// WithAttemptSpanNamer override is supplied. +func defaultAttemptSpanName(step flow.Steper, attempt uint64) string { + return fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt) +} diff --git a/contrib/otel/attempt_test.go b/contrib/otel/attempt_test.go new file mode 100644 index 0000000..72b6979 --- /dev/null +++ b/contrib/otel/attempt_test.go @@ -0,0 +1,173 @@ +package otel_test + +import ( + "context" + "fmt" + "testing" + + flow "github.com/Azure/go-workflow" + otelflow "github.com/Azure/go-workflow/contrib/otel" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// newAttemptWorkflow builds a Workflow with only the given AttemptInterceptor +// registered (no StepInterceptor) so attempt-layer tests stay isolated. +func newAttemptWorkflow(ic flow.AttemptInterceptor) *flow.Workflow { + return &flow.Workflow{Option: flow.WorkflowOption{ + AttemptInterceptors: []flow.AttemptInterceptor{ic}, + }} +} + +func TestAttemptInterceptor_OneSpanPerAttempt(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &retryStep{Name: "S", NeedAttempts: 4} // succeeds on the 4th try + w := newAttemptWorkflow(otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w.Add(flow.Step(step).Retry(noBackoff(5))) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 4, "expected one span per attempt (0..3)") + for i, s := range spans { + a, ok := findAttr(s.Attributes(), "workflow.step.attempt") + require.True(t, ok, "span %d missing workflow.step.attempt", i) + assert.Equal(t, int64(i), a.Value.AsInt64(), "span %d attempt index mismatch", i) + assertAttr(t, s.Attributes(), "workflow.step.name", "S") + } +} + +func TestAttemptInterceptor_DefaultName(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("MyStep") + w := newAttemptWorkflow(otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + assert.Equal(t, "MyStep (attempt 0)", spans[0].Name()) +} + +func TestAttemptInterceptor_FailingAttemptRecorded(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &retryStep{Name: "Flaky", NeedAttempts: 2} // fails once, then succeeds + w := newAttemptWorkflow(otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w.Add(flow.Step(step).Retry(noBackoff(2))) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 2, "expected one span per attempt (failure + success)") + + // First attempt span: failure with RecordError + codes.Error. + first := spans[0] + assert.Equal(t, codes.Error, first.Status().Code, "first attempt span should be Error") + var sawException bool + for _, ev := range first.Events() { + if ev.Name == exceptionEventName { + sawException = true + break + } + } + assert.True(t, sawException, "first attempt should record an exception event") + + // Second attempt span: success leaves status Unset. + second := spans[1] + assert.Equal(t, codes.Unset, second.Status().Code, "successful attempt span should be Unset") +} + +func TestAttemptInterceptor_ChildOfCallerSpan(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + tracer := tp.Tracer("test") + ctx, outer := tracer.Start(context.Background(), "OUTER") + defer outer.End() + + step := flow.NoOp("S") + w := newAttemptWorkflow(otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(ctx)) + + // OUTER is still open (it ends via defer); only the attempt span should be Ended. + spans := rec.Ended() + var attempt sdktrace.ReadOnlySpan + for _, s := range spans { + if s.Name() != "OUTER" { + attempt = s + break + } + } + require.NotNil(t, attempt, "expected an attempt span among %d ended spans", len(spans)) + assert.Equal(t, outer.SpanContext().SpanID(), attempt.Parent().SpanID(), + "attempt span must be a child of the caller-supplied OUTER span") +} + +func TestAttemptInterceptor_CustomNamer(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("Original") + namer := func(s flow.Steper, n uint64) string { + return fmt.Sprintf("X-%s-%d", flow.String(s), n) + } + w := newAttemptWorkflow(otelflow.NewAttemptInterceptor( + otelflow.WithTracerProvider(tp), + otelflow.WithAttemptSpanNamer(namer), + )) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + s := spans[0] + assert.Equal(t, "X-Original-0", s.Name(), "custom attempt namer should win") + // Canonical attributes still present despite the custom name. + assertAttr(t, s.Attributes(), "workflow.step.name", "Original") + a, ok := findAttr(s.Attributes(), "workflow.step.attempt") + require.True(t, ok) + assert.Equal(t, int64(0), a.Value.AsInt64()) +} + +func TestAttemptInterceptor_CustomAttributes(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := flow.NoOp("Hello") + extras := func(flow.Steper, uint64) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("env", "test"), + attribute.Int("answer", 42), + // Regression: a malicious user attribute trying to override the + // canonical workflow.step.attempt key MUST be superseded by the + // interceptor's own value. + attribute.Int64("workflow.step.attempt", 999), + } + } + w := newAttemptWorkflow(otelflow.NewAttemptInterceptor( + otelflow.WithTracerProvider(tp), + otelflow.WithAttemptAttributes(extras), + )) + w.Add(flow.Step(step)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 1) + attrs := spans[0].Attributes() + + // Defaults still present. + assertAttr(t, attrs, "workflow.step.name", "Hello") + canonical, ok := findAttr(attrs, "workflow.step.attempt") + require.True(t, ok) + assert.Equal(t, int64(0), canonical.Value.AsInt64(), + "canonical workflow.step.attempt must win over user-supplied override") + + // User attributes appended. + assertAttr(t, attrs, "env", "test") + a, ok := findAttr(attrs, "answer") + require.True(t, ok, "custom int attribute missing") + assert.Equal(t, int64(42), a.Value.AsInt64()) +} diff --git a/contrib/otel/options.go b/contrib/otel/options.go index 5e03c5d..9ad6980 100644 --- a/contrib/otel/options.go +++ b/contrib/otel/options.go @@ -111,6 +111,10 @@ func WithStepAttributes(fn func(flow.Steper) []attribute.KeyValue) Option { // added in addition to the defaults (e.g. workflow.step.name, // workflow.step.attempt). Passing a nil fn is a no-op. // +// Note: canonical attributes set by the interceptor (workflow.step.name, +// workflow.step.attempt) cannot be overridden by this option; passing those +// keys is silently superseded. +// // Affects: NewAttemptInterceptor only. NewStepInterceptor ignores this option. func WithAttemptAttributes(fn func(flow.Steper, uint64) []attribute.KeyValue) Option { return func(c *config) { From 09ed6db27aa6a606702320d0cf987ad72386bb60 Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Thu, 14 May 2026 14:00:06 +0000 Subject: [PATCH 07/13] refactor(contrib/otel): inline attempt namer, share workflow helper, simplify tests --- contrib/otel/attempt.go | 8 +----- contrib/otel/attempt_test.go | 51 +++++++++++++++++++----------------- contrib/otel/helpers_test.go | 14 ++++++++++ contrib/otel/step_test.go | 28 ++++++++------------ 4 files changed, 53 insertions(+), 48 deletions(-) diff --git a/contrib/otel/attempt.go b/contrib/otel/attempt.go index fbf4890..4749969 100644 --- a/contrib/otel/attempt.go +++ b/contrib/otel/attempt.go @@ -32,7 +32,7 @@ func NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor { cfg := newConfig(opts) tracer := cfg.resolveTracer() return flow.AttemptInterceptorFunc(func(ctx context.Context, step flow.Steper, attempt uint64, next func(context.Context) error) error { - spanName := defaultAttemptSpanName(step, attempt) + spanName := fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt) if cfg.attemptSpanNamer != nil { spanName = cfg.attemptSpanNamer(step, attempt) } @@ -57,9 +57,3 @@ func NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor { return err }) } - -// defaultAttemptSpanName returns the canonical attempt span name used when no -// WithAttemptSpanNamer override is supplied. -func defaultAttemptSpanName(step flow.Steper, attempt uint64) string { - return fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt) -} diff --git a/contrib/otel/attempt_test.go b/contrib/otel/attempt_test.go index 72b6979..2bdc069 100644 --- a/contrib/otel/attempt_test.go +++ b/contrib/otel/attempt_test.go @@ -12,22 +12,13 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - sdktrace "go.opentelemetry.io/otel/sdk/trace" ) -// newAttemptWorkflow builds a Workflow with only the given AttemptInterceptor -// registered (no StepInterceptor) so attempt-layer tests stay isolated. -func newAttemptWorkflow(ic flow.AttemptInterceptor) *flow.Workflow { - return &flow.Workflow{Option: flow.WorkflowOption{ - AttemptInterceptors: []flow.AttemptInterceptor{ic}, - }} -} - func TestAttemptInterceptor_OneSpanPerAttempt(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := &retryStep{Name: "S", NeedAttempts: 4} // succeeds on the 4th try - w := newAttemptWorkflow(otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) w.Add(flow.Step(step).Retry(noBackoff(5))) require.NoError(t, w.Do(context.Background())) @@ -45,7 +36,7 @@ func TestAttemptInterceptor_DefaultName(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := flow.NoOp("MyStep") - w := newAttemptWorkflow(otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) @@ -58,7 +49,7 @@ func TestAttemptInterceptor_FailingAttemptRecorded(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := &retryStep{Name: "Flaky", NeedAttempts: 2} // fails once, then succeeds - w := newAttemptWorkflow(otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) w.Add(flow.Step(step).Retry(noBackoff(2))) require.NoError(t, w.Do(context.Background())) @@ -90,22 +81,18 @@ func TestAttemptInterceptor_ChildOfCallerSpan(t *testing.T) { defer outer.End() step := flow.NoOp("S") - w := newAttemptWorkflow(otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) w.Add(flow.Step(step)) require.NoError(t, w.Do(ctx)) - // OUTER is still open (it ends via defer); only the attempt span should be Ended. + // OUTER is still open (it ends via defer); the recorder has exactly the + // attempt span. A regression that emits more than one span will surface here. spans := rec.Ended() - var attempt sdktrace.ReadOnlySpan - for _, s := range spans { - if s.Name() != "OUTER" { - attempt = s - break - } - } - require.NotNil(t, attempt, "expected an attempt span among %d ended spans", len(spans)) + require.Len(t, spans, 1) + attempt := spans[0] assert.Equal(t, outer.SpanContext().SpanID(), attempt.Parent().SpanID(), "attempt span must be a child of the caller-supplied OUTER span") + assert.Equal(t, outer.SpanContext().TraceID(), attempt.SpanContext().TraceID()) } func TestAttemptInterceptor_CustomNamer(t *testing.T) { @@ -115,7 +102,7 @@ func TestAttemptInterceptor_CustomNamer(t *testing.T) { namer := func(s flow.Steper, n uint64) string { return fmt.Sprintf("X-%s-%d", flow.String(s), n) } - w := newAttemptWorkflow(otelflow.NewAttemptInterceptor( + w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor( otelflow.WithTracerProvider(tp), otelflow.WithAttemptSpanNamer(namer), )) @@ -147,7 +134,7 @@ func TestAttemptInterceptor_CustomAttributes(t *testing.T) { attribute.Int64("workflow.step.attempt", 999), } } - w := newAttemptWorkflow(otelflow.NewAttemptInterceptor( + w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor( otelflow.WithTracerProvider(tp), otelflow.WithAttemptAttributes(extras), )) @@ -171,3 +158,19 @@ func TestAttemptInterceptor_CustomAttributes(t *testing.T) { require.True(t, ok, "custom int attribute missing") assert.Equal(t, int64(42), a.Value.AsInt64()) } + +func TestAttemptInterceptor_ContextCanceled(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &alwaysFail{Name: "S", Err: context.Canceled} + w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w.Add(flow.Step(step)) + _ = w.Do(context.Background()) // workflow itself errors + + spans := rec.Ended() + require.NotEmpty(t, spans) + s := spans[0] + assert.Equal(t, codes.Error, s.Status().Code) + require.NotEmpty(t, s.Events()) + assert.Equal(t, exceptionEventName, s.Events()[0].Name) +} diff --git a/contrib/otel/helpers_test.go b/contrib/otel/helpers_test.go index 0d53308..a8539d4 100644 --- a/contrib/otel/helpers_test.go +++ b/contrib/otel/helpers_test.go @@ -1,6 +1,7 @@ package otel_test import ( + flow "github.com/Azure/go-workflow" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" ) @@ -14,3 +15,16 @@ func newRecorderTracerProvider() (*sdktrace.TracerProvider, *tracetest.SpanRecor tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(rec)) return tp, rec } + +// newTestWorkflow builds an empty Workflow registered with the given step +// interceptor and/or attempt interceptor; nil arguments are not registered. +func newTestWorkflow(stepIC flow.StepInterceptor, attemptIC flow.AttemptInterceptor) *flow.Workflow { + w := &flow.Workflow{} + if stepIC != nil { + w.Option.StepInterceptors = []flow.StepInterceptor{stepIC} + } + if attemptIC != nil { + w.Option.AttemptInterceptors = []flow.AttemptInterceptor{attemptIC} + } + return w +} diff --git a/contrib/otel/step_test.go b/contrib/otel/step_test.go index 46101f6..e20e829 100644 --- a/contrib/otel/step_test.go +++ b/contrib/otel/step_test.go @@ -73,17 +73,11 @@ func noBackoff(attempts uint64) func(*flow.RetryOption) { } } -func newWorkflow(ic flow.StepInterceptor) *flow.Workflow { - return &flow.Workflow{Option: flow.WorkflowOption{ - StepInterceptors: []flow.StepInterceptor{ic}, - }} -} - func TestStepInterceptor_SuccessOneSpan(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := flow.NoOp("MyStep") - w := newWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), nil) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) @@ -100,7 +94,7 @@ func TestStepInterceptor_RetriesStillOneSpan(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := &retryStep{Name: "Flaky", NeedAttempts: 3} - w := newWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), nil) w.Add(flow.Step(step).Retry(noBackoff(5))) require.NoError(t, w.Do(context.Background())) @@ -116,7 +110,7 @@ func TestStepInterceptor_FinalErrorRecorded(t *testing.T) { tp, rec := newRecorderTracerProvider() boom := errors.New("boom") step := &alwaysFail{Name: "Fail", Err: boom} - w := newWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), nil) w.Add(flow.Step(step).Retry(noBackoff(2))) err := w.Do(context.Background()) require.Error(t, err) @@ -142,7 +136,7 @@ func TestStepInterceptor_ContextCanceled(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := &alwaysFail{Name: "Cancel", Err: context.Canceled} - w := newWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), nil) w.Add(flow.Step(step)) _ = w.Do(context.Background()) @@ -164,7 +158,7 @@ func TestStepInterceptor_SkippedStepNoSpan(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() skipMe := flow.NoOp("Skipped") - w := newWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), nil) w.Add(flow.Step(skipMe).When(func(context.Context, map[flow.Steper]flow.StepResult) flow.StepStatus { return flow.Skipped })) @@ -180,10 +174,10 @@ func TestStepInterceptor_CustomNamer(t *testing.T) { tp, rec := newRecorderTracerProvider() step := flow.NoOp("Original") namer := func(s flow.Steper) string { return "custom:" + flow.String(s) } - w := newWorkflow(otelflow.NewStepInterceptor( + w := newTestWorkflow(otelflow.NewStepInterceptor( otelflow.WithTracerProvider(tp), otelflow.WithStepSpanNamer(namer), - )) + ), nil) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) @@ -205,10 +199,10 @@ func TestStepInterceptor_CustomAttributesAppend(t *testing.T) { attribute.Int("answer", 42), } } - w := newWorkflow(otelflow.NewStepInterceptor( + w := newTestWorkflow(otelflow.NewStepInterceptor( otelflow.WithTracerProvider(tp), otelflow.WithStepAttributes(extras), - )) + ), nil) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) @@ -230,10 +224,10 @@ func TestStepInterceptor_UserAttributeCannotOverrideCanonicalName(t *testing.T) hijack := func(flow.Steper) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("workflow.step.name", "HACKED")} } - w := newWorkflow(otelflow.NewStepInterceptor( + w := newTestWorkflow(otelflow.NewStepInterceptor( otelflow.WithTracerProvider(tp), otelflow.WithStepAttributes(hijack), - )) + ), nil) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) From 8f0806a37737af7a459bf8bf1b62cb76c3e81a38 Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Fri, 15 May 2026 01:43:06 +0000 Subject: [PATCH 08/13] feat(contrib/otel): add integration tests and godoc Example - TestBothLayers_AttemptIsChildOfStep: attempt span shares TraceID with step and has step span as parent. - TestBothLayers_RetryAttemptCount: one step span + N attempt spans across retries, all in the same trace. - TestProviderResolutionAtFactoryTime: locks in that NewStepInterceptor and NewAttemptInterceptor snapshot the global TracerProvider at construction time, not on every interception. - Example: runnable godoc that wires both interceptors with a stdouttrace exporter on a 2-step pipeline. // Output: omitted because span IDs and timestamps are non-deterministic. - Add stdouttrace as a test-only dependency; runtime audit confirms no production deps on sdk/stdouttrace. --- contrib/otel/example_test.go | 49 ++++++++++++++ contrib/otel/go.mod | 1 + contrib/otel/go.sum | 2 + contrib/otel/integration_test.go | 113 +++++++++++++++++++++++++++++++ 4 files changed, 165 insertions(+) create mode 100644 contrib/otel/example_test.go create mode 100644 contrib/otel/integration_test.go diff --git a/contrib/otel/example_test.go b/contrib/otel/example_test.go new file mode 100644 index 0000000..fb29603 --- /dev/null +++ b/contrib/otel/example_test.go @@ -0,0 +1,49 @@ +package otel_test + +import ( + "context" + "fmt" + + flow "github.com/Azure/go-workflow" + otelflow "github.com/Azure/go-workflow/contrib/otel" + + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// Example demonstrates registering the contrib/otel step and attempt +// interceptors on a Workflow, with spans exported to stdout for inspection. +// +// We intentionally omit the `// Output:` directive: TraceIDs, SpanIDs and +// other span fields are non-deterministic, so verifying string output +// would force the example to mock the SDK and obscure the integration +// pattern. The Example still compiles and runs as part of `go test`. +func Example() { + // 1. Build a TracerProvider with a stdout exporter (any exporter works: + // OTLP, Jaeger, Zipkin, etc.). + exporter, err := stdouttrace.New(stdouttrace.WithoutTimestamps()) + if err != nil { + panic(err) + } + tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter)) + defer func() { _ = tp.Shutdown(context.Background()) }() + + // 2. Register both interceptors on a Workflow. + w := &flow.Workflow{} + w.Option.StepInterceptors = []flow.StepInterceptor{ + otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), + } + w.Option.AttemptInterceptors = []flow.AttemptInterceptor{ + otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), + } + + // 3. Build a tiny 2-step pipeline: A → B. + a := flow.NoOp("A") + b := flow.NoOp("B") + w.Add(flow.Step(b).DependsOn(a)) + + // 4. Run. + if err := w.Do(context.Background()); err != nil { + fmt.Println("workflow error:", err) + } +} diff --git a/contrib/otel/go.mod b/contrib/otel/go.mod index 901d989..c881dd9 100644 --- a/contrib/otel/go.mod +++ b/contrib/otel/go.mod @@ -7,6 +7,7 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0 go.opentelemetry.io/otel/sdk v1.37.0 go.opentelemetry.io/otel/trace v1.37.0 ) diff --git a/contrib/otel/go.sum b/contrib/otel/go.sum index 38982d4..3256ded 100644 --- a/contrib/otel/go.sum +++ b/contrib/otel/go.sum @@ -29,6 +29,8 @@ go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJyS go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0 h1:SNhVp/9q4Go/XHBkQ1/d5u9P/U+L1yaGPoi0x+mStaI= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.37.0/go.mod h1:tx8OOlGH6R4kLV67YaYO44GFXloEjGPZuMjEkaaqIp4= go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= diff --git a/contrib/otel/integration_test.go b/contrib/otel/integration_test.go new file mode 100644 index 0000000..483dc8f --- /dev/null +++ b/contrib/otel/integration_test.go @@ -0,0 +1,113 @@ +package otel_test + +import ( + "context" + "strings" + "testing" + + flow "github.com/Azure/go-workflow" + otelflow "github.com/Azure/go-workflow/contrib/otel" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +// stepName is the canonical step span name written by NewStepInterceptor. +// Attempt span names contain " (attempt N)" so they are distinguishable. +func isAttemptSpan(name string) bool { return strings.Contains(name, "(attempt ") } + +func TestBothLayers_AttemptIsChildOfStep(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + s := flow.NoOp("OnlyStep") + w := newTestWorkflow( + otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), + otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), + ) + w.Add(flow.Step(s)) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + require.Len(t, spans, 2) + + var stepSpan, attemptSpan sdktrace.ReadOnlySpan + for _, sp := range spans { + if isAttemptSpan(sp.Name()) { + attemptSpan = sp + } else { + stepSpan = sp + } + } + require.NotNil(t, stepSpan, "step span must be present") + require.NotNil(t, attemptSpan, "attempt span must be present") + + assert.Equal(t, stepSpan.SpanContext().TraceID(), attemptSpan.SpanContext().TraceID(), + "step and attempt spans must share TraceID") + assert.Equal(t, stepSpan.SpanContext().SpanID(), attemptSpan.Parent().SpanID(), + "attempt span must be child of step span") +} + +func TestBothLayers_RetryAttemptCount(t *testing.T) { + t.Parallel() + tp, rec := newRecorderTracerProvider() + step := &retryStep{Name: "Flaky", NeedAttempts: 3} // succeeds on 3rd attempt + w := newTestWorkflow( + otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), + otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), + ) + w.Add(flow.Step(step).Retry(noBackoff(5))) + require.NoError(t, w.Do(context.Background())) + + spans := rec.Ended() + // 3 attempt spans + 1 step span = 4 total + require.Len(t, spans, 4) + + var stepSpan sdktrace.ReadOnlySpan + var attemptSpans []sdktrace.ReadOnlySpan + for _, sp := range spans { + if isAttemptSpan(sp.Name()) { + attemptSpans = append(attemptSpans, sp) + } else { + stepSpan = sp + } + } + require.NotNil(t, stepSpan) + require.Len(t, attemptSpans, 3) + + for _, a := range attemptSpans { + assert.Equal(t, stepSpan.SpanContext().TraceID(), a.SpanContext().TraceID(), + "every attempt span must share the step's TraceID") + assert.Equal(t, stepSpan.SpanContext().SpanID(), a.Parent().SpanID(), + "every attempt span must be a child of the step span") + } +} + +func TestProviderResolutionAtFactoryTime(t *testing.T) { + // CANNOT t.Parallel(): mutates global TracerProvider via otel.SetTracerProvider. + original := otel.GetTracerProvider() + t.Cleanup(func() { otel.SetTracerProvider(original) }) + + // Provider A is what's global at factory call time. + tpA, recA := newRecorderTracerProvider() + otel.SetTracerProvider(tpA) + + // Construct interceptors WITHOUT WithTracerProvider — they should snapshot + // the current global (tpA) once. + stepIC := otelflow.NewStepInterceptor() + attemptIC := otelflow.NewAttemptInterceptor() + + // Now swap the global to provider B. + tpB, recB := newRecorderTracerProvider() + otel.SetTracerProvider(tpB) + + // Run the workflow. Spans MUST land on provider A, not B. + s := flow.NoOp("ProviderTest") + w := newTestWorkflow(stepIC, attemptIC) + w.Add(flow.Step(s)) + require.NoError(t, w.Do(context.Background())) + + assert.Len(t, recA.Ended(), 2, "interceptors should still write to original provider (snapshot at factory time)") + assert.Len(t, recB.Ended(), 0, "swapped-in provider should NOT receive spans") +} From a37c5fa8d4102b6d4f0e7670a279daf22aed5f21 Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Fri, 15 May 2026 02:04:17 +0000 Subject: [PATCH 09/13] refactor(contrib/otel): identify attempt spans by attribute, polish docs --- contrib/otel/example_test.go | 5 ++++- contrib/otel/integration_test.go | 22 ++++++++++++++++------ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/contrib/otel/example_test.go b/contrib/otel/example_test.go index fb29603..267c0d5 100644 --- a/contrib/otel/example_test.go +++ b/contrib/otel/example_test.go @@ -26,6 +26,8 @@ func Example() { panic(err) } tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter)) + // Shutdown errors are intentionally ignored in this example; + // production code should log or surface them. defer func() { _ = tp.Shutdown(context.Background()) }() // 2. Register both interceptors on a Workflow. @@ -37,7 +39,8 @@ func Example() { otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), } - // 3. Build a tiny 2-step pipeline: A → B. + // 3. Build a tiny 2-step pipeline: A → B. flow.Step(b).DependsOn(a) + // registers BOTH steps and the dependency in one call. a := flow.NoOp("A") b := flow.NoOp("B") w.Add(flow.Step(b).DependsOn(a)) diff --git a/contrib/otel/integration_test.go b/contrib/otel/integration_test.go index 483dc8f..87454cb 100644 --- a/contrib/otel/integration_test.go +++ b/contrib/otel/integration_test.go @@ -2,7 +2,6 @@ package otel_test import ( "context" - "strings" "testing" flow "github.com/Azure/go-workflow" @@ -14,9 +13,18 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" ) -// stepName is the canonical step span name written by NewStepInterceptor. -// Attempt span names contain " (attempt N)" so they are distinguishable. -func isAttemptSpan(name string) bool { return strings.Contains(name, "(attempt ") } +// isAttemptSpan identifies attempt spans by the canonical attribute that +// NewAttemptInterceptor always writes (workflow.step.attempt). Using the +// attribute rather than the default span name keeps the discriminator +// robust against WithAttemptSpanNamer overrides and default-name changes. +func isAttemptSpan(sp sdktrace.ReadOnlySpan) bool { + for _, kv := range sp.Attributes() { + if string(kv.Key) == "workflow.step.attempt" { + return true + } + } + return false +} func TestBothLayers_AttemptIsChildOfStep(t *testing.T) { t.Parallel() @@ -34,7 +42,7 @@ func TestBothLayers_AttemptIsChildOfStep(t *testing.T) { var stepSpan, attemptSpan sdktrace.ReadOnlySpan for _, sp := range spans { - if isAttemptSpan(sp.Name()) { + if isAttemptSpan(sp) { attemptSpan = sp } else { stepSpan = sp @@ -67,7 +75,7 @@ func TestBothLayers_RetryAttemptCount(t *testing.T) { var stepSpan sdktrace.ReadOnlySpan var attemptSpans []sdktrace.ReadOnlySpan for _, sp := range spans { - if isAttemptSpan(sp.Name()) { + if isAttemptSpan(sp) { attemptSpans = append(attemptSpans, sp) } else { stepSpan = sp @@ -86,6 +94,8 @@ func TestBothLayers_RetryAttemptCount(t *testing.T) { func TestProviderResolutionAtFactoryTime(t *testing.T) { // CANNOT t.Parallel(): mutates global TracerProvider via otel.SetTracerProvider. + // All other tests in this package inject TracerProvider explicitly via + // WithTracerProvider, so flipping the global here does not affect them. original := otel.GetTracerProvider() t.Cleanup(func() { otel.SetTracerProvider(original) }) From c94dc7e84e3182542234b4d1cf769582161cd0f8 Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Fri, 15 May 2026 02:14:11 +0000 Subject: [PATCH 10/13] docs(contrib/otel): package godoc, README, root contrib section Co-Authored-By: Claude Opus 4.7 (1M context) --- README.md | 10 ++++++ contrib/otel/README.md | 66 ++++++++++++++++++++++++++++++++++++++++ contrib/otel/doc.go | 69 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+) create mode 100644 contrib/otel/README.md diff --git a/README.md b/README.md index 7f9fd84..dad2c14 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,16 @@ and will be removed in the next major version. conditions, retry/timeout, composite steps, interceptors and workflow options. - **DeepWiki:** +## Contrib + +Optional, independently-versioned modules under `contrib/`: + +- **[`contrib/otel`](./contrib/otel)** — OpenTelemetry tracing integration + via the existing `StepInterceptor` / `AttemptInterceptor` extension + points. Released as a separate Go module + (`github.com/Azure/go-workflow/contrib/otel`) so its OpenTelemetry + dependency does not enter core's transitive graph. + ## Contributing This project welcomes contributions. Most contributions require you to agree to a Contributor diff --git a/contrib/otel/README.md b/contrib/otel/README.md new file mode 100644 index 0000000..a7164e9 --- /dev/null +++ b/contrib/otel/README.md @@ -0,0 +1,66 @@ +# contrib/otel + +OpenTelemetry tracing integration for [go-workflow](../..) — implemented as +two interceptor factories that plug into the existing `StepInterceptor` and +`AttemptInterceptor` extension points. + +```go +import ( + flow "github.com/Azure/go-workflow" + otelflow "github.com/Azure/go-workflow/contrib/otel" +) + +w := &flow.Workflow{ + Option: flow.WorkflowOption{ + StepInterceptors: []flow.StepInterceptor{ + otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), + }, + AttemptInterceptors: []flow.AttemptInterceptor{ + otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), + }, + }, +} +``` + +See the package godoc and the runnable `Example` for the full wiring. + +## What you get by default + +- One **step span** per Step lifetime (covering all retries) — name + `flow.String(step)`, attributes `workflow.step.name`, `workflow.step.status` + (`"success"` or `"error"`). +- One **attempt span** per individual attempt — name + `" (attempt N)"`, attributes `workflow.step.name`, + `workflow.step.attempt` (int64). +- Errors recorded with `RecordError` + `SetStatus(codes.Error)`; this + includes `context.Canceled`. +- Steps that are `Skipped` or `Canceled` by their `Condition` produce no + spans (they bypass the interceptor chain in core). + +Every default can be overridden via the `With*` options. See the godoc. + +## Dependency policy + +`contrib/otel` is an **independent Go module** (`github.com/Azure/go-workflow/contrib/otel`) +so the OpenTelemetry dependency does not enter the core module's transitive +graph. Runtime requires are limited to the OpenTelemetry **API** +(`go.opentelemetry.io/otel`, `…/otel/trace`); the SDK and exporters are +test-only dependencies. + +## Working on the module + +Because the contrib module is separate, run its tests from inside the module: + + cd contrib/otel && go test ./... + +The module's `go.mod` carries `replace github.com/Azure/go-workflow => ../..` +so it builds against in-tree core during development. + +## Open follow-ups + +- **CI**: add a job that runs `go test ./...` (and `-race`) inside + `contrib/otel/` in addition to the root-module job. +- **Release**: before tagging `contrib/otel/v0.1.0`, drop or pin the + `replace github.com/Azure/go-workflow => ../..` directive in + `contrib/otel/go.mod` so `go get + github.com/Azure/go-workflow/contrib/otel@v0.1.0` resolves cleanly. diff --git a/contrib/otel/doc.go b/contrib/otel/doc.go index 9bee48e..dc5dbe1 100644 --- a/contrib/otel/doc.go +++ b/contrib/otel/doc.go @@ -1 +1,70 @@ +// Package otel provides OpenTelemetry tracing integration for go-workflow. +// +// It plugs into the two interceptor extension points exposed by go-workflow: +// StepInterceptor (one span per Step lifetime, across all retries) and +// AttemptInterceptor (one span per individual attempt). Use both together +// for a parent/child structure, or just one if you want a flatter trace. +// +// # Usage +// +// import ( +// flow "github.com/Azure/go-workflow" +// otelflow "github.com/Azure/go-workflow/contrib/otel" +// ) +// +// w := &flow.Workflow{ +// Option: flow.WorkflowOption{ +// StepInterceptors: []flow.StepInterceptor{ +// otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), +// }, +// AttemptInterceptors: []flow.AttemptInterceptor{ +// otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), +// }, +// }, +// } +// +// See the runnable Example for a complete wiring with a stdout exporter. +// +// # Span conventions +// +// Step spans are named flow.String(step) and carry attributes +// workflow.step.name and workflow.step.status ("success" or "error"). +// Attempt spans are named " (attempt N)" and carry workflow.step.name +// and workflow.step.attempt (int64). +// +// All defaults can be overridden via Options (WithStepSpanNamer, +// WithAttemptSpanNamer, WithStepAttributes, WithAttemptAttributes). +// Canonical attributes (workflow.step.name, workflow.step.status, +// workflow.step.attempt) always win — user-supplied keys with the same +// names are silently superseded. +// +// # Parent/child relation +// +// When both interceptors are registered, attempt spans are children of the +// step span (same TraceID, Parent.SpanID equals the step span's SpanID). +// When only one is registered, that span is a child of whatever span (if +// any) is on the caller-provided context. +// +// # Provider resolution +// +// WithTracerProvider sets the OpenTelemetry TracerProvider explicitly. +// If unset (or nil), each factory falls back to otel.GetTracerProvider() +// at the moment NewStepInterceptor / NewAttemptInterceptor is called — +// not lazily on every interception. Swapping the global provider after +// construction does not affect already-built interceptors. +// +// # Skipped and Canceled-by-Condition steps +// +// Steps whose Condition resolves to Skipped or Canceled are settled inline +// by the workflow scheduler and bypass the interceptor chain entirely. As +// a result, they produce zero spans through this package. If you need to +// observe terminal-by-condition statuses, watch the StepResult instead. +// +// # context.Canceled +// +// When the error returned by next() satisfies errors.Is(err, context.Canceled), +// the resulting span is ended with RecordError + SetStatus(codes.Error) — +// the same as any other error. There is no special-case suppression. Users +// running graceful shutdowns who want different semantics can wrap the +// returned interceptor. package otel From 8f4723a96115e97b87582e58c70b39a1df594218 Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Fri, 15 May 2026 02:14:15 +0000 Subject: [PATCH 11/13] docs(openspec): mark contrib-otel-tracing tasks complete Co-Authored-By: Claude Opus 4.7 (1M context) --- .../changes/contrib-otel-tracing/tasks.md | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 openspec/changes/contrib-otel-tracing/tasks.md diff --git a/openspec/changes/contrib-otel-tracing/tasks.md b/openspec/changes/contrib-otel-tracing/tasks.md new file mode 100644 index 0000000..c8a362a --- /dev/null +++ b/openspec/changes/contrib-otel-tracing/tasks.md @@ -0,0 +1,77 @@ +## 1. Module bootstrap + +- [x] 1.1 Create directory `contrib/otel/` and run `go mod init github.com/Azure/go-workflow/contrib/otel` inside it. +- [x] 1.2 Set `go 1.23` in the new `contrib/otel/go.mod`; add `replace github.com/Azure/go-workflow => ../..` and a `require github.com/Azure/go-workflow v0.0.0` directive so the module builds against in-tree core. +- [x] 1.3 Add runtime dependencies `go.opentelemetry.io/otel` and `go.opentelemetry.io/otel/trace`; run `go mod tidy` and commit `contrib/otel/go.sum`. +- [x] 1.4 Verify that the root `go.mod` and `go.sum` show no diff after the contrib module is created. + +## 2. Public API surface + +- [x] 2.1 Create `contrib/otel/options.go` with the unexported `config` struct and the exported `Option` function type. +- [x] 2.2 Implement `WithTracerProvider`, `WithTracerName`, `WithStepSpanNamer`, `WithAttemptSpanNamer`, `WithStepAttributes`, `WithAttemptAttributes`. Document on each which interceptor layer it affects (and that it is a no-op on the other). +- [x] 2.3 Implement an internal `(*config).resolveTracer()` helper that picks `cfg.tracerProvider` (or `otel.GetTracerProvider()` when unset) and returns `tp.Tracer(cfg.tracerName)` with default name `"github.com/Azure/go-workflow/contrib/otel"`. + +## 3. Step interceptor + +- [x] 3.1 Create `contrib/otel/step.go` exporting `NewStepInterceptor(opts ...Option) flow.StepInterceptor` returning a `flow.StepInterceptorFunc`. +- [x] 3.2 In the closure: resolve span name (default `flow.String(step)`, overridable), build start attributes (`workflow.step.name` plus user-supplied), call `tracer.Start(ctx, name, trace.WithAttributes(...))`, defer `span.End()`. +- [x] 3.3 After `next(ctx)` returns: set `workflow.step.status` to `"success"` or `"error"`, and on non-nil error call `span.RecordError(err)` + `span.SetStatus(codes.Error, err.Error())`. Treat `context.Canceled` like any other error. +- [x] 3.4 Return the error unchanged. + +## 4. Attempt interceptor + +- [x] 4.1 Create `contrib/otel/attempt.go` exporting `NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor` returning a `flow.AttemptInterceptorFunc`. +- [x] 4.2 Resolve span name (default `fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt)`, overridable), build start attributes (`workflow.step.name`, `workflow.step.attempt = int64(attempt)`, plus user-supplied), call `tracer.Start(ctx, name, trace.WithAttributes(...))`, defer `span.End()`. +- [x] 4.3 After `next(ctx)` returns, on non-nil error call `RecordError` + `SetStatus(codes.Error, …)`. Return the error unchanged. + +## 5. Test scaffolding + +- [x] 5.1 Add test-only dependencies in `contrib/otel/`: `go.opentelemetry.io/otel/sdk/trace` and `go.opentelemetry.io/otel/sdk/trace/tracetest` (and the SDK's transitive `go.opentelemetry.io/otel/sdk`). Confirm with `go list -deps -test=false ./...` that they are not pulled into the runtime graph. +- [x] 5.2 Create `contrib/otel/internal_test.go` (or `helpers_test.go`) with a small helper that builds a `(tp *sdktrace.TracerProvider, recorder *tracetest.SpanRecorder)` pair. + +## 6. Step interceptor tests (`step_test.go`) + +- [x] 6.1 Test `TestStepInterceptor_SuccessOneSpan`: single step, succeeds first try → exactly one span, name = `flow.String(step)`, attribute `workflow.step.status == "success"`, status code OK/Unset. +- [x] 6.2 Test `TestStepInterceptor_RetriesStillOneSpan`: step retries N times then succeeds → still exactly one step span. +- [x] 6.3 Test `TestStepInterceptor_FinalErrorRecorded`: step fails terminally → `workflow.step.status == "error"`, `Status.Code == codes.Error`, span events include the `RecordError` event. +- [x] 6.4 Test `TestStepInterceptor_ContextCanceled`: step returns `context.Canceled` → status = Error, error event recorded. +- [x] 6.5 Test `TestStepInterceptor_SkippedStepNoSpan`: step with Condition resolving to `Skipped` → recorder yields zero spans. +- [x] 6.6 Test `TestStepInterceptor_CustomNamer`: `WithStepSpanNamer` overrides the default. +- [x] 6.7 Test `TestStepInterceptor_CustomAttributesAppend`: `WithStepAttributes` adds extras while defaults remain. + +## 7. Attempt interceptor tests (`attempt_test.go`) + +- [x] 7.1 Test `TestAttemptInterceptor_OneSpanPerAttempt`: step with N+1 attempts → exactly N+1 spans, attempt indices 0..N. +- [x] 7.2 Test `TestAttemptInterceptor_DefaultName`: name format matches `"%s (attempt %d)"`. +- [x] 7.3 Test `TestAttemptInterceptor_FailingAttemptRecorded`: failed attempt span has Error status + RecordError event. +- [x] 7.4 Test `TestAttemptInterceptor_ChildOfCallerSpan`: when only attempt interceptor is registered, attempt span's `Parent.SpanID` matches an outer caller span. +- [x] 7.5 Test `TestAttemptInterceptor_CustomNamer` and `TestAttemptInterceptor_CustomAttributes`. + +## 8. Combined / integration tests (`integration_test.go`) + +- [x] 8.1 Test `TestBothLayers_AttemptIsChildOfStep`: register both, run a step, assert attempt span's `Parent.SpanID` equals step span's `SpanID` and `TraceID` matches. +- [x] 8.2 Test `TestBothLayers_RetryAttemptCount`: step retries M times, assert one step span and M+1 attempt spans, all sharing the trace. +- [x] 8.3 Test `TestProviderResolutionAtFactoryTime`: change global `otel.SetTracerProvider` after constructing the interceptor → spans still flow to the original provider. + +## 9. Godoc Example (`example_test.go`) + +- [x] 9.1 Add test dep `go.opentelemetry.io/otel/exporters/stdout/stdouttrace`. +- [x] 9.2 Write `func Example()` that builds a `TracerProvider` with `stdouttrace.New(stdouttrace.WithoutTimestamps())`, registers both interceptors on a 2-step workflow, and runs it. Use `// Output:` only if deterministic; otherwise document why output is omitted. +- [x] 9.3 Verify `go test ./... -run Example` passes inside `contrib/otel/`. + +## 10. Package documentation + +- [x] 10.1 Add a package-level doc comment in `contrib/otel/doc.go` covering: usage snippet, default span/attribute conventions, parent/child relation between layers, the `Skipped`/`Canceled-by-Condition` zero-span behavior, and the `context.Canceled = Error` policy. +- [x] 10.2 Add `contrib/otel/README.md` with the usage snippet, dependency policy notice, and a "must `cd contrib/otel && go test ./...`" note for contributors. + +## 11. Repository-level docs and follow-ups + +- [x] 11.1 Add a top-level mention of the new contrib module in the root `README.md` (one line under a "contrib" heading or a dedicated section). +- [x] 11.2 Open a tracking issue (or add a TODO note in `contrib/otel/README.md`) for: (a) adding a CI job that runs `go test ./...` inside `contrib/otel/`, and (b) the release-time checklist to drop or pin the `replace` directive before tagging `contrib/otel/v0.1.0`. + +## 12. Verification before completion + +- [x] 12.1 `go test ./...` from repository root passes (core unchanged). +- [x] 12.2 `go test ./...` from `contrib/otel/` passes; race detector also passes (`-race`). +- [x] 12.3 `openspec validate contrib-otel-tracing --strict` reports no errors. +- [x] 12.4 `go list -deps -test=false ./...` inside `contrib/otel/` confirms no SDK / exporter / tracetest packages in the runtime graph. From d28f8e104faa4a64d0356a3f918927135056e4fe Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Fri, 15 May 2026 02:19:42 +0000 Subject: [PATCH 12/13] refactor(contrib/otel): rename package otel -> flowotel, drop alias --- contrib/otel/README.md | 6 +- contrib/otel/attempt.go | 2 +- contrib/otel/attempt_test.go | 26 +-- contrib/otel/consts.go | 2 +- contrib/otel/doc.go | 8 +- contrib/otel/example_test.go | 8 +- contrib/otel/helpers_test.go | 2 +- contrib/otel/integration_test.go | 16 +- contrib/otel/options.go | 2 +- contrib/otel/step.go | 2 +- contrib/otel/step_test.go | 32 +-- .../contrib-otel-tracing/.openspec.yaml | 2 + .../changes/contrib-otel-tracing/design.md | 143 +++++++++++++ .../changes/contrib-otel-tracing/proposal.md | 41 ++++ .../specs/contrib-otel/spec.md | 197 ++++++++++++++++++ 15 files changed, 436 insertions(+), 53 deletions(-) create mode 100644 openspec/changes/contrib-otel-tracing/.openspec.yaml create mode 100644 openspec/changes/contrib-otel-tracing/design.md create mode 100644 openspec/changes/contrib-otel-tracing/proposal.md create mode 100644 openspec/changes/contrib-otel-tracing/specs/contrib-otel/spec.md diff --git a/contrib/otel/README.md b/contrib/otel/README.md index a7164e9..b30ea5a 100644 --- a/contrib/otel/README.md +++ b/contrib/otel/README.md @@ -7,16 +7,16 @@ two interceptor factories that plug into the existing `StepInterceptor` and ```go import ( flow "github.com/Azure/go-workflow" - otelflow "github.com/Azure/go-workflow/contrib/otel" + "github.com/Azure/go-workflow/contrib/otel" ) w := &flow.Workflow{ Option: flow.WorkflowOption{ StepInterceptors: []flow.StepInterceptor{ - otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), + flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), }, AttemptInterceptors: []flow.AttemptInterceptor{ - otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), + flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)), }, }, } diff --git a/contrib/otel/attempt.go b/contrib/otel/attempt.go index 4749969..9476981 100644 --- a/contrib/otel/attempt.go +++ b/contrib/otel/attempt.go @@ -1,4 +1,4 @@ -package otel +package flowotel import ( "context" diff --git a/contrib/otel/attempt_test.go b/contrib/otel/attempt_test.go index 2bdc069..f5c7df1 100644 --- a/contrib/otel/attempt_test.go +++ b/contrib/otel/attempt_test.go @@ -1,4 +1,4 @@ -package otel_test +package flowotel_test import ( "context" @@ -6,7 +6,7 @@ import ( "testing" flow "github.com/Azure/go-workflow" - otelflow "github.com/Azure/go-workflow/contrib/otel" + "github.com/Azure/go-workflow/contrib/otel" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,7 +18,7 @@ func TestAttemptInterceptor_OneSpanPerAttempt(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := &retryStep{Name: "S", NeedAttempts: 4} // succeeds on the 4th try - w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp))) w.Add(flow.Step(step).Retry(noBackoff(5))) require.NoError(t, w.Do(context.Background())) @@ -36,7 +36,7 @@ func TestAttemptInterceptor_DefaultName(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := flow.NoOp("MyStep") - w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp))) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) @@ -49,7 +49,7 @@ func TestAttemptInterceptor_FailingAttemptRecorded(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := &retryStep{Name: "Flaky", NeedAttempts: 2} // fails once, then succeeds - w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp))) w.Add(flow.Step(step).Retry(noBackoff(2))) require.NoError(t, w.Do(context.Background())) @@ -81,7 +81,7 @@ func TestAttemptInterceptor_ChildOfCallerSpan(t *testing.T) { defer outer.End() step := flow.NoOp("S") - w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp))) w.Add(flow.Step(step)) require.NoError(t, w.Do(ctx)) @@ -102,9 +102,9 @@ func TestAttemptInterceptor_CustomNamer(t *testing.T) { namer := func(s flow.Steper, n uint64) string { return fmt.Sprintf("X-%s-%d", flow.String(s), n) } - w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor( - otelflow.WithTracerProvider(tp), - otelflow.WithAttemptSpanNamer(namer), + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor( + flowotel.WithTracerProvider(tp), + flowotel.WithAttemptSpanNamer(namer), )) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) @@ -134,9 +134,9 @@ func TestAttemptInterceptor_CustomAttributes(t *testing.T) { attribute.Int64("workflow.step.attempt", 999), } } - w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor( - otelflow.WithTracerProvider(tp), - otelflow.WithAttemptAttributes(extras), + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor( + flowotel.WithTracerProvider(tp), + flowotel.WithAttemptAttributes(extras), )) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) @@ -163,7 +163,7 @@ func TestAttemptInterceptor_ContextCanceled(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := &alwaysFail{Name: "S", Err: context.Canceled} - w := newTestWorkflow(nil, otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp))) + w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp))) w.Add(flow.Step(step)) _ = w.Do(context.Background()) // workflow itself errors diff --git a/contrib/otel/consts.go b/contrib/otel/consts.go index f13748b..98de746 100644 --- a/contrib/otel/consts.go +++ b/contrib/otel/consts.go @@ -1,4 +1,4 @@ -package otel +package flowotel // Attribute keys and status values emitted by the contrib/otel interceptors. const ( diff --git a/contrib/otel/doc.go b/contrib/otel/doc.go index dc5dbe1..a610430 100644 --- a/contrib/otel/doc.go +++ b/contrib/otel/doc.go @@ -9,16 +9,16 @@ // // import ( // flow "github.com/Azure/go-workflow" -// otelflow "github.com/Azure/go-workflow/contrib/otel" +// "github.com/Azure/go-workflow/contrib/otel" // ) // // w := &flow.Workflow{ // Option: flow.WorkflowOption{ // StepInterceptors: []flow.StepInterceptor{ -// otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), +// flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), // }, // AttemptInterceptors: []flow.AttemptInterceptor{ -// otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), +// flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)), // }, // }, // } @@ -67,4 +67,4 @@ // the same as any other error. There is no special-case suppression. Users // running graceful shutdowns who want different semantics can wrap the // returned interceptor. -package otel +package flowotel diff --git a/contrib/otel/example_test.go b/contrib/otel/example_test.go index 267c0d5..a24abcc 100644 --- a/contrib/otel/example_test.go +++ b/contrib/otel/example_test.go @@ -1,11 +1,11 @@ -package otel_test +package flowotel_test import ( "context" "fmt" flow "github.com/Azure/go-workflow" - otelflow "github.com/Azure/go-workflow/contrib/otel" + "github.com/Azure/go-workflow/contrib/otel" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -33,10 +33,10 @@ func Example() { // 2. Register both interceptors on a Workflow. w := &flow.Workflow{} w.Option.StepInterceptors = []flow.StepInterceptor{ - otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), + flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), } w.Option.AttemptInterceptors = []flow.AttemptInterceptor{ - otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), + flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)), } // 3. Build a tiny 2-step pipeline: A → B. flow.Step(b).DependsOn(a) diff --git a/contrib/otel/helpers_test.go b/contrib/otel/helpers_test.go index a8539d4..4a3211d 100644 --- a/contrib/otel/helpers_test.go +++ b/contrib/otel/helpers_test.go @@ -1,4 +1,4 @@ -package otel_test +package flowotel_test import ( flow "github.com/Azure/go-workflow" diff --git a/contrib/otel/integration_test.go b/contrib/otel/integration_test.go index 87454cb..608f237 100644 --- a/contrib/otel/integration_test.go +++ b/contrib/otel/integration_test.go @@ -1,11 +1,11 @@ -package otel_test +package flowotel_test import ( "context" "testing" flow "github.com/Azure/go-workflow" - otelflow "github.com/Azure/go-workflow/contrib/otel" + "github.com/Azure/go-workflow/contrib/otel" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -31,8 +31,8 @@ func TestBothLayers_AttemptIsChildOfStep(t *testing.T) { tp, rec := newRecorderTracerProvider() s := flow.NoOp("OnlyStep") w := newTestWorkflow( - otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), - otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), + flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), + flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)), ) w.Add(flow.Step(s)) require.NoError(t, w.Do(context.Background())) @@ -62,8 +62,8 @@ func TestBothLayers_RetryAttemptCount(t *testing.T) { tp, rec := newRecorderTracerProvider() step := &retryStep{Name: "Flaky", NeedAttempts: 3} // succeeds on 3rd attempt w := newTestWorkflow( - otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), - otelflow.NewAttemptInterceptor(otelflow.WithTracerProvider(tp)), + flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), + flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)), ) w.Add(flow.Step(step).Retry(noBackoff(5))) require.NoError(t, w.Do(context.Background())) @@ -105,8 +105,8 @@ func TestProviderResolutionAtFactoryTime(t *testing.T) { // Construct interceptors WITHOUT WithTracerProvider — they should snapshot // the current global (tpA) once. - stepIC := otelflow.NewStepInterceptor() - attemptIC := otelflow.NewAttemptInterceptor() + stepIC := flowotel.NewStepInterceptor() + attemptIC := flowotel.NewAttemptInterceptor() // Now swap the global to provider B. tpB, recB := newRecorderTracerProvider() diff --git a/contrib/otel/options.go b/contrib/otel/options.go index 9ad6980..bd77389 100644 --- a/contrib/otel/options.go +++ b/contrib/otel/options.go @@ -1,4 +1,4 @@ -package otel +package flowotel import ( flow "github.com/Azure/go-workflow" diff --git a/contrib/otel/step.go b/contrib/otel/step.go index cf0d0bf..4937881 100644 --- a/contrib/otel/step.go +++ b/contrib/otel/step.go @@ -1,4 +1,4 @@ -package otel +package flowotel import ( "context" diff --git a/contrib/otel/step_test.go b/contrib/otel/step_test.go index e20e829..14b7e9a 100644 --- a/contrib/otel/step_test.go +++ b/contrib/otel/step_test.go @@ -1,4 +1,4 @@ -package otel_test +package flowotel_test import ( "context" @@ -6,7 +6,7 @@ import ( "testing" flow "github.com/Azure/go-workflow" - otelflow "github.com/Azure/go-workflow/contrib/otel" + "github.com/Azure/go-workflow/contrib/otel" "github.com/cenkalti/backoff/v4" "github.com/stretchr/testify/assert" @@ -77,7 +77,7 @@ func TestStepInterceptor_SuccessOneSpan(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := flow.NoOp("MyStep") - w := newTestWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), nil) + w := newTestWorkflow(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), nil) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) @@ -94,7 +94,7 @@ func TestStepInterceptor_RetriesStillOneSpan(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := &retryStep{Name: "Flaky", NeedAttempts: 3} - w := newTestWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), nil) + w := newTestWorkflow(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), nil) w.Add(flow.Step(step).Retry(noBackoff(5))) require.NoError(t, w.Do(context.Background())) @@ -110,7 +110,7 @@ func TestStepInterceptor_FinalErrorRecorded(t *testing.T) { tp, rec := newRecorderTracerProvider() boom := errors.New("boom") step := &alwaysFail{Name: "Fail", Err: boom} - w := newTestWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), nil) + w := newTestWorkflow(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), nil) w.Add(flow.Step(step).Retry(noBackoff(2))) err := w.Do(context.Background()) require.Error(t, err) @@ -136,7 +136,7 @@ func TestStepInterceptor_ContextCanceled(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() step := &alwaysFail{Name: "Cancel", Err: context.Canceled} - w := newTestWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), nil) + w := newTestWorkflow(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), nil) w.Add(flow.Step(step)) _ = w.Do(context.Background()) @@ -158,7 +158,7 @@ func TestStepInterceptor_SkippedStepNoSpan(t *testing.T) { t.Parallel() tp, rec := newRecorderTracerProvider() skipMe := flow.NoOp("Skipped") - w := newTestWorkflow(otelflow.NewStepInterceptor(otelflow.WithTracerProvider(tp)), nil) + w := newTestWorkflow(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)), nil) w.Add(flow.Step(skipMe).When(func(context.Context, map[flow.Steper]flow.StepResult) flow.StepStatus { return flow.Skipped })) @@ -174,9 +174,9 @@ func TestStepInterceptor_CustomNamer(t *testing.T) { tp, rec := newRecorderTracerProvider() step := flow.NoOp("Original") namer := func(s flow.Steper) string { return "custom:" + flow.String(s) } - w := newTestWorkflow(otelflow.NewStepInterceptor( - otelflow.WithTracerProvider(tp), - otelflow.WithStepSpanNamer(namer), + w := newTestWorkflow(flowotel.NewStepInterceptor( + flowotel.WithTracerProvider(tp), + flowotel.WithStepSpanNamer(namer), ), nil) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) @@ -199,9 +199,9 @@ func TestStepInterceptor_CustomAttributesAppend(t *testing.T) { attribute.Int("answer", 42), } } - w := newTestWorkflow(otelflow.NewStepInterceptor( - otelflow.WithTracerProvider(tp), - otelflow.WithStepAttributes(extras), + w := newTestWorkflow(flowotel.NewStepInterceptor( + flowotel.WithTracerProvider(tp), + flowotel.WithStepAttributes(extras), ), nil) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) @@ -224,9 +224,9 @@ func TestStepInterceptor_UserAttributeCannotOverrideCanonicalName(t *testing.T) hijack := func(flow.Steper) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("workflow.step.name", "HACKED")} } - w := newTestWorkflow(otelflow.NewStepInterceptor( - otelflow.WithTracerProvider(tp), - otelflow.WithStepAttributes(hijack), + w := newTestWorkflow(flowotel.NewStepInterceptor( + flowotel.WithTracerProvider(tp), + flowotel.WithStepAttributes(hijack), ), nil) w.Add(flow.Step(step)) require.NoError(t, w.Do(context.Background())) diff --git a/openspec/changes/contrib-otel-tracing/.openspec.yaml b/openspec/changes/contrib-otel-tracing/.openspec.yaml new file mode 100644 index 0000000..66dd08a --- /dev/null +++ b/openspec/changes/contrib-otel-tracing/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-05-14 diff --git a/openspec/changes/contrib-otel-tracing/design.md b/openspec/changes/contrib-otel-tracing/design.md new file mode 100644 index 0000000..4656cc2 --- /dev/null +++ b/openspec/changes/contrib-otel-tracing/design.md @@ -0,0 +1,143 @@ +## Context + +go-workflow exposes structured observability through two interceptor interfaces declared in `interceptor.go`: + +- `StepInterceptor.InterceptStep(ctx, step, next)` — wraps the **full lifetime** of a step (across all retries), invoked exactly once per step. +- `AttemptInterceptor.InterceptAttempt(ctx, step, attempt, next)` — wraps a **single attempt**, invoked once per attempt with a 0-based index. + +The Workflow stores them in two slice fields and chains the lowest-index entry on the outside. Steps that resolve to `Skipped` or `Canceled` via their `Condition` are settled inline in `tick()` and never enter either chain (see capability `step-interceptor`). + +That extension point is exactly the right shape for OpenTelemetry tracing: a **step span** maps to the outer chain, a **per-attempt span** maps to the inner chain, and the existing `Steper` identity gives a stable key for span naming and attributes via `flow.String(step)`. + +The repository today is a single Go module (`github.com/Azure/go-workflow`) with no contrib subdirectory and no OpenTelemetry dependency. This change introduces the first contrib submodule, sets the precedent for all future contrib packages (e.g. logging, metrics, slog), and converts the repo into a multi-module workspace. + +Stakeholders: library users who need traces today (presently hand-rolling interceptors), library maintainers (must keep core dependency-light and avoid OTel API churn bleeding into core users). + +## Goals / Non-Goals + +**Goals:** + +- Provide a one-call API to wire OpenTelemetry traces into any `*flow.Workflow` via the existing interceptor extension points — no `BeforeStep` / `AfterStep` hooks, no fork of the core types. +- Ship as an **independent Go module** so the OpenTelemetry dependency does not enter the core module's transitive graph. +- Ship sane defaults (span names from `flow.String(step)`, retry-aware naming, automatic error recording) while letting users override every default through functional options. +- Keep the two interceptor factories **independently usable**: a user may register only the step layer, only the attempt layer, or both, and the result is always coherent. +- Demonstrate the pattern with a runnable godoc `Example` so other contrib modules (e.g. metrics) have a template. + +**Non-Goals:** + +- OTel **metrics** and **logs** signals. Out of scope for v0.1; addressed in a follow-up change if needed. +- A **workflow-level** span. The interceptor API is step-scoped; a workflow span is the caller's responsibility (`ctx, span := tracer.Start(ctx, "my-workflow"); defer span.End(); w.Do(ctx)`). Documented in package godoc, not implemented. +- Changing core behavior. No edits to any file under the root module. The Skipped/Canceled bypass is documented as inherited behavior, not redesigned. +- Auto-instrumentation discovery (e.g. `otel.GetTracerProvider()` magic in tests). Tests inject a `TracerProvider` explicitly via `WithTracerProvider`. +- Propagation across process boundaries. Step `ctx` carries whatever the caller put in; we do not inject/extract carriers. + +## Decisions + +### D1. Independent submodule, not a subpackage + +`contrib/otel/` gets its own `go.mod` declaring `github.com/Azure/go-workflow/contrib/otel` at module path. During development the contrib `go.mod` carries: + +``` +require github.com/Azure/go-workflow v0.0.0 +replace github.com/Azure/go-workflow => ../.. +``` + +so `go test ./...` in the contrib dir builds against the in-tree core. Released tags (`contrib/otel/v0.1.0`) drop or pin the replace. + +**Rationale:** OpenTelemetry's `go.opentelemetry.io/otel` pulls in a non-trivial dep tree. Core users who don't want traces should not pay for them in their `go.sum`. This is the pattern used by `gin-contrib`, `otelhttp`, etc. + +**Alternative considered:** Subpackage under core (`github.com/Azure/go-workflow/otel`). Rejected — would make OTel a hard transitive dep of every core user. + +### D2. Two factories, no glue helper + +Public API is exactly: + +```go +func NewStepInterceptor(opts ...Option) flow.StepInterceptor +func NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor +``` + +Users wire them with the existing `flow.WithStepInterceptor` / `flow.WithAttemptInterceptor` workflow options. + +**Rationale:** Per user decision in brainstorming. Mirrors the underlying interceptor model 1:1, lets users register only one layer if they want, and avoids inventing a new "OTel option" abstraction. Cost: caller writes two lines instead of one. + +**Alternative considered:** A single `otel.Tracing(opts...)` returning a `flow.WorkflowOption` that registers both. Rejected for v0.1 — hides the orthogonality, harder to register only the attempt layer (a common pattern for low-overhead deployments). + +### D3. Functional options, shared `config` + +Both factories take the same `Option` type and the same internal `config` struct. Options that only make sense for one layer (e.g. `WithAttemptSpanNamer`) are still accepted by the other; they are simply ignored. + +**Rationale:** One option type means users don't have to remember two namespaces (`StepOption` vs `AttemptOption`). The shared config is internal — package surface stays small. + +**Alternative considered:** Separate `StepOption` / `AttemptOption`. Rejected — doubles the API surface for almost no safety benefit; the only "wrong" combo is silently ignored, never an error. + +### D4. Span naming and attribute conventions + +| Concern | Default | +|---|---| +| Tracer name | `"github.com/Azure/go-workflow/contrib/otel"` (overridable via `WithTracerName`) | +| Step span name | `flow.String(step)` | +| Attempt span name | `fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt)` | +| Step span attrs | `workflow.step.name = flow.String(step)`; `workflow.step.status` = `"success"` \| `"error"` (set on End from the `next` error) | +| Attempt span attrs | `workflow.step.name`; `workflow.step.attempt = attempt` | +| Error path | `next` returns non-nil → `span.RecordError(err)` + `span.SetStatus(codes.Error, err.Error())`. `context.Canceled` is recorded the same way as any other error. | + +Custom namers / attribute providers override only what they replace; defaults still apply to non-overridden fields. The `workflow.step.*` namespace is chosen to match the `workflow-options` and `step-configuration` capabilities' terminology. + +**Rationale:** Aligned with user decision. `context.Canceled = Error` keeps the rule one-line and avoids the "is this user-initiated cancel or upstream timeout?" classification problem we have no signal to answer. + +### D5. Tracer acquisition + +`config` resolves a `trace.Tracer` exactly once at factory-call time: + +```go +tp := cfg.tracerProvider +if tp == nil { tp = otel.GetTracerProvider() } +tracer := tp.Tracer(cfg.tracerName) +``` + +This means swapping the global provider after `NewStepInterceptor` returns will not be observed. Documented. + +**Rationale:** Avoids per-step `otel.GetTracerProvider()` lookups (each call is a `sync.Map` access). Matches `otelhttp.NewHandler` behavior. + +### D6. Test strategy + +Tests live in `contrib/otel/*_test.go` and use: + +- `sdktrace.NewTracerProvider(sdktrace.WithSyncer(spanRecorder))` to produce real spans synchronously. +- `tracetest.NewSpanRecorder` to capture them. +- `flow.NoOpStep` / minimal handcrafted `Steper` impls for the workflow. +- An exported `(*Workflow).Do(ctx)` to drive end-to-end runs. + +The example (`example_test.go`) uses `stdouttrace` to demonstrate a real exporter — it is a build-checked godoc Example, not asserted on output. + +### D7. Version & release + +- Initial tag `contrib/otel/v0.1.0`. Pre-1.0 because OTel semconv naming for "workflow" is not standardized — we may rename `workflow.step.*` attributes once a convention exists upstream. +- CI must add a job that runs `go test ./...` inside `contrib/otel/`. Tracked as a follow-up infra task referenced from `tasks.md`. + +## Risks / Trade-offs + +- **OTel API breakage** → contrib module pinned to a single major; core unaffected. Mitigation: only depend on stable `go.opentelemetry.io/otel/trace` interfaces (Tracer, Span, SpanStartOption); no SDK types in the public API. +- **Multi-module repo cognitive overhead** → developers must `cd contrib/otel && go test ./...` separately from root. Mitigation: documented in `contrib/otel/README.md` and reinforced by CI splitting jobs by module. +- **`workflow.step.*` attribute names may clash with future OTel semconv** → could force a v1 rename. Accepted: pre-1.0 lets us rename without breaking SemVer promises. +- **`context.Canceled` always being `Error`** → users running graceful shutdowns will see error-status spans. Mitigation: documented loud and clear; we accept this for v0.1 simplicity. Users who need different semantics can wrap the interceptor (composability is preserved). +- **Replace directive in `contrib/otel/go.mod`** → breaks `go install github.com/Azure/go-workflow/contrib/otel@latest` if not removed before tagging a release. Mitigation: release checklist item + mention in `tasks.md`. +- **Single shared `config` for both layers** → user passes `WithAttemptSpanNamer` to `NewStepInterceptor`; it silently no-ops. Mitigation: godoc on each option says which layer it affects. + +## Migration Plan + +This is a new submodule with no prior version — no migration of existing users. For repository operators: + +1. Land the change on the development branch. +2. CI gains a `go-test-contrib-otel` job (`cd contrib/otel && go test ./...`). +3. Tag `contrib/otel/v0.1.0` after first release. Before tagging, remove the `replace` directive (or pin it to the matching core version) so `go get github.com/Azure/go-workflow/contrib/otel@v0.1.0` resolves cleanly. + +Rollback: revert the change branch. No core files are modified, so rollback is purely deletion of `contrib/otel/`. + +## Open Questions + +None blocking. Future follow-ups (out of scope here): + +- Should `contrib/otel` provide a metrics interceptor in v0.2? +- Should we adopt OTel semconv attribute names if/when a "task/workflow" namespace is standardized? diff --git a/openspec/changes/contrib-otel-tracing/proposal.md b/openspec/changes/contrib-otel-tracing/proposal.md new file mode 100644 index 0000000..9612d60 --- /dev/null +++ b/openspec/changes/contrib-otel-tracing/proposal.md @@ -0,0 +1,41 @@ +## Why + +go-workflow already has a clean two-layer interceptor model (`StepInterceptor` + `AttemptInterceptor`) that is the natural extension point for cross-cutting observability — but the repo ships no batteries-included integration with any tracing system. Users who want OpenTelemetry traces today have to hand-roll a pair of interceptors, get span parenting / retry semantics right, and re-derive sensible span names and attributes. That is repetitive boilerplate every project copies. + +A first-party `contrib/otel` module turns the interceptor extension point into a one-liner, ships sensible defaults, and demonstrates the recommended way to build other observability integrations against the interceptor API. + +## What Changes + +- Add `contrib/otel/` as a **new, independent Go module** at module path `github.com/Azure/go-workflow/contrib/otel`. Released and tagged separately from the core module so its OpenTelemetry dependency stays out of core users' transitive graph. +- New public API in package `flowotel`: + - `NewStepInterceptor(opts ...Option) flow.StepInterceptor` + - `NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor` + - `Option` functional options: `WithTracerProvider`, `WithTracerName`, `WithStepSpanNamer`, `WithAttemptSpanNamer`, `WithStepAttributes`, `WithAttemptAttributes`. +- Default span behavior: + - Step span name = `flow.String(step)`; attempt span name = `flow.String(step) + " (attempt N)"`. + - Step span attributes: `workflow.step.name`, `workflow.step.status`. Attempt span attributes: `workflow.step.name`, `workflow.step.attempt`. + - Errors (including `context.Canceled`) → `RecordError` + `SetStatus(codes.Error, …)`. + - The two interceptors are independent — when both registered, the attempt span is a child of the step span; when only one is registered, it works standalone against the caller's `ctx`. +- Document the existing core behavior that `Skipped` / `Canceled by Condition` steps bypass the interceptor chain, so they will not produce spans (no change to core). +- Add a runnable godoc `Example` (`contrib/otel/example_test.go`) that wires the SDK + a stdout exporter to a minimal workflow. +- Dependency policy: contrib/otel depends only on the OpenTelemetry **API** packages (`go.opentelemetry.io/otel`, `…/otel/trace`); the SDK and `tracetest` are test-only dependencies. + +## Capabilities + +### New Capabilities + +- `contrib-otel`: OpenTelemetry traces integration for go-workflow, delivered as an independent submodule that exposes two interceptor factories (step-level and attempt-level) wrapping the core `StepInterceptor` / `AttemptInterceptor` extension points. + +### Modified Capabilities + + + +## Impact + +- **New module**: `contrib/otel/` with its own `go.mod`, `go.sum`, source files, tests, and example. Replaces the placeholder change directory `contrib-common-steps/`. +- **No source changes** to the existing core module (`github.com/Azure/go-workflow`). Its `go.mod` and existing files are untouched. +- **New external dependencies** (scoped to the contrib module only): + - Runtime: `go.opentelemetry.io/otel`, `go.opentelemetry.io/otel/trace`. + - Test-only: `go.opentelemetry.io/otel/sdk`, `go.opentelemetry.io/otel/sdk/trace`, `go.opentelemetry.io/otel/sdk/trace/tracetest`, plus `stdouttrace` for the example. +- **Repository becomes multi-module**. CI must build/test `./contrib/otel` in addition to the root module. The contrib module uses a `replace github.com/Azure/go-workflow => ../..` directive so it builds against in-tree core during development; releases drop or pin the replace. +- **Versioning**: contrib/otel will be tagged independently as `contrib/otel/v0.x.y` per Go submodule conventions. Initial release is `v0.1.0` (pre-1.0 surface). diff --git a/openspec/changes/contrib-otel-tracing/specs/contrib-otel/spec.md b/openspec/changes/contrib-otel-tracing/specs/contrib-otel/spec.md new file mode 100644 index 0000000..9fef99a --- /dev/null +++ b/openspec/changes/contrib-otel-tracing/specs/contrib-otel/spec.md @@ -0,0 +1,197 @@ +## ADDED Requirements + +### Requirement: Independent submodule + +The `contrib/otel` package SHALL be delivered as an independent Go module at module path `github.com/Azure/go-workflow/contrib/otel`, located at the repository directory `contrib/otel/`. The module SHALL declare Go 1.23 (matching the core module) and SHALL NOT cause the core module `github.com/Azure/go-workflow` to acquire any new direct or transitive dependency on OpenTelemetry packages. + +#### Scenario: Core go.mod is unchanged +- **GIVEN** the change has been applied +- **WHEN** the root `go.mod` of `github.com/Azure/go-workflow` is inspected +- **THEN** it contains no `require`, `replace`, or `exclude` line referencing any `go.opentelemetry.io/...` module +- **AND** `go mod tidy` at the repository root produces no diff caused by this change + +#### Scenario: Contrib module builds standalone +- **GIVEN** a checkout of the repository +- **WHEN** the developer runs `go build ./...` and `go test ./...` from `contrib/otel/` +- **THEN** the commands succeed using only the contrib module's `go.mod` +- **AND** the contrib `go.mod` declares the module path `github.com/Azure/go-workflow/contrib/otel` +- **AND** the contrib `go.mod` carries `go 1.23` + +#### Scenario: Contrib module resolves core via replace during development +- **GIVEN** the repository is checked out at the change branch +- **WHEN** `contrib/otel/go.mod` is inspected +- **THEN** it contains a `replace github.com/Azure/go-workflow => ../..` directive (or an equivalent local replace) so the contrib module builds against in-tree core sources + +--- + +### Requirement: Dependency policy + +The `contrib/otel` module's runtime (non-test) dependencies SHALL be limited to the OpenTelemetry **API** packages — `go.opentelemetry.io/otel` and `go.opentelemetry.io/otel/trace` (and their transitive requirements) — and `github.com/Azure/go-workflow`. The OpenTelemetry **SDK**, exporter packages, and `tracetest` SHALL appear only as test dependencies. + +#### Scenario: SDK is test-only +- **GIVEN** the contrib module +- **WHEN** `go list -deps -test=false ./...` is run inside `contrib/otel/` +- **THEN** the output contains no package under `go.opentelemetry.io/otel/sdk/...` +- **AND** the output contains no `stdouttrace` or other exporter packages + +#### Scenario: API packages available at runtime +- **WHEN** a consumer imports `github.com/Azure/go-workflow/contrib/otel` +- **THEN** the import graph includes `go.opentelemetry.io/otel/trace` and `go.opentelemetry.io/otel/attribute` + +--- + +### Requirement: Step interceptor factory + +`contrib/otel` SHALL export a function `NewStepInterceptor(opts ...Option) flow.StepInterceptor` that returns a value satisfying the `flow.StepInterceptor` interface. Each invocation of the returned `InterceptStep` method SHALL start exactly one span at the beginning of `next` and end it after `next` returns, regardless of how many retry attempts occur inside `next`. + +#### Scenario: One span per step on success +- **GIVEN** a Workflow with a single step `s` registered with `flow.WithStepInterceptor(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)))` where `tp` records spans +- **WHEN** the Workflow runs and `s` succeeds on the first attempt +- **THEN** exactly one span is recorded with the step span name +- **AND** the span has status `OK` (or unset, the OTel default for success) + +#### Scenario: One span per step despite retries +- **GIVEN** a Workflow whose step `s` is configured to retry up to 3 times and finally succeeds on attempt 2 +- **AND** only `NewStepInterceptor` is registered (no AttemptInterceptor) +- **WHEN** the Workflow runs +- **THEN** exactly one span is recorded for `s` (not three) + +#### Scenario: Step span records terminal failure +- **GIVEN** a step `s` whose final attempt returns a non-nil error `err` +- **WHEN** the step interceptor's outer `next` returns +- **THEN** the recorded span calls `RecordError(err)` and `SetStatus(codes.Error, err.Error())` +- **AND** the span attribute `workflow.step.status` equals `"error"` + +--- + +### Requirement: Attempt interceptor factory + +`contrib/otel` SHALL export a function `NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor` that returns a value satisfying the `flow.AttemptInterceptor` interface. Each invocation of `InterceptAttempt` SHALL start one span before calling `next` and end it after `next` returns. Each per-attempt span SHALL carry the attempt index as the OTel attribute `workflow.step.attempt`. + +#### Scenario: One attempt span per attempt +- **GIVEN** a Workflow whose step `s` is configured to retry up to 3 times and ultimately succeeds on attempt 2 +- **AND** only `NewAttemptInterceptor` is registered (no StepInterceptor) +- **WHEN** the Workflow runs +- **THEN** exactly three spans are recorded (attempt 0, attempt 1, attempt 2) +- **AND** each span carries attribute `workflow.step.attempt` with the matching 0-based index + +#### Scenario: Failing attempt records its error +- **GIVEN** an attempt that returns error `err` from `next` +- **WHEN** that attempt's span is ended +- **THEN** the span calls `RecordError(err)` and `SetStatus(codes.Error, err.Error())` + +--- + +### Requirement: Independent registration of the two layers + +The two interceptor factories SHALL be usable independently or together. When both are registered on the same Workflow, the attempt span SHALL be a child of the step span (i.e. the attempt's `Span.Parent.SpanID` equals the step span's `SpanID`). When only one is registered, that span SHALL be a child of whatever span (if any) is on the caller-provided context. + +#### Scenario: Both layers registered → parent/child relation +- **GIVEN** a Workflow registered with both `NewStepInterceptor` and `NewAttemptInterceptor` +- **WHEN** a single step runs once +- **THEN** the step span and the attempt span share the same `TraceID` +- **AND** the attempt span's `Parent.SpanID` equals the step span's `SpanID` + +#### Scenario: Only attempt layer registered → no orphan +- **GIVEN** a Workflow registered with only `NewAttemptInterceptor` and a caller-supplied context that already contains an outer span `outer` +- **WHEN** a step runs +- **THEN** the attempt span's `Parent.SpanID` equals `outer`'s `SpanID` + +--- + +### Requirement: Default span naming + +When no custom namer is supplied, the default step span name SHALL equal `flow.String(step)`, and the default attempt span name SHALL equal `fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt)`. + +#### Scenario: Default step span name +- **GIVEN** a step `s` for which `flow.String(s) == "MyStep"` +- **WHEN** the step interceptor produces a span without a custom `WithStepSpanNamer` +- **THEN** the span's name is `"MyStep"` + +#### Scenario: Default attempt span name carries attempt index +- **GIVEN** a step `s` for which `flow.String(s) == "MyStep"` running its second attempt (`attempt == 1`) +- **WHEN** the attempt interceptor produces a span without a custom `WithAttemptSpanNamer` +- **THEN** the span's name is `"MyStep (attempt 1)"` + +--- + +### Requirement: Default span attributes + +The step span SHALL be created with attribute `workflow.step.name = flow.String(step)`, and on `End` it SHALL receive attribute `workflow.step.status` equal to `"success"` when `next` returned nil and `"error"` otherwise. The attempt span SHALL be created with attributes `workflow.step.name = flow.String(step)` and `workflow.step.attempt = attempt` (as an `Int64` attribute). + +#### Scenario: Step span carries name and status +- **GIVEN** a step `s` that succeeds +- **WHEN** its span is ended +- **THEN** the span's attribute set contains `workflow.step.name = flow.String(s)` +- **AND** the attribute `workflow.step.status` equals `"success"` + +#### Scenario: Attempt span carries name and attempt index +- **GIVEN** an attempt at index 2 of step `s` +- **WHEN** its span is created +- **THEN** the span's attribute set contains `workflow.step.name = flow.String(s)` +- **AND** the attribute `workflow.step.attempt` equals the int64 `2` + +--- + +### Requirement: Functional options + +`contrib/otel` SHALL expose a single exported `Option` type and the following functional option constructors. Both factories SHALL accept the same `Option` values. Options that target one layer SHALL be no-ops on the other layer (no error, no panic). + +| Option | Effect | +|---|---| +| `WithTracerProvider(tp trace.TracerProvider)` | Sets the provider. If `nil` or unset, defaults to `otel.GetTracerProvider()` resolved at factory-call time. | +| `WithTracerName(name string)` | Sets the tracer instrumentation name. Default `"github.com/Azure/go-workflow/contrib/otel"`. | +| `WithStepSpanNamer(fn func(flow.Steper) string)` | Overrides default step span naming. | +| `WithAttemptSpanNamer(fn func(flow.Steper, uint64) string)` | Overrides default attempt span naming. | +| `WithStepAttributes(fn func(flow.Steper) []attribute.KeyValue)` | Adds extra attributes to the step span at start. Defaults still apply. | +| `WithAttemptAttributes(fn func(flow.Steper, uint64) []attribute.KeyValue)` | Adds extra attributes to the attempt span at start. Defaults still apply. | + +#### Scenario: Custom step namer overrides default +- **GIVEN** a `NewStepInterceptor(WithStepSpanNamer(func(flow.Steper) string { return "custom-name" }))` +- **WHEN** any step runs through the interceptor +- **THEN** its step span name is `"custom-name"` + +#### Scenario: Custom attempt attributes are appended, not replaced +- **GIVEN** a `NewAttemptInterceptor(WithAttemptAttributes(func(flow.Steper, uint64) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("custom.k", "v")} }))` +- **WHEN** an attempt span is created +- **THEN** it carries the default attributes (`workflow.step.name`, `workflow.step.attempt`) AND the custom attribute `custom.k = "v"` + +#### Scenario: Provider defaults to global +- **GIVEN** no `WithTracerProvider` option +- **WHEN** `NewStepInterceptor()` is called +- **THEN** the constructed interceptor obtains its tracer from `otel.GetTracerProvider()` at the moment of the call (not lazily on every interception) + +--- + +### Requirement: Cancellation is recorded as error + +When the error returned by `next` satisfies `errors.Is(err, context.Canceled)`, the span produced by either interceptor SHALL be ended with `RecordError(err)` and `SetStatus(codes.Error, err.Error())` — the same as any other non-nil error. No special-case suppression SHALL be applied in v0.1. + +#### Scenario: Cancelled step is Error +- **GIVEN** a step that observes context cancellation and returns `context.Canceled` +- **WHEN** the span is ended +- **THEN** the span has status code `codes.Error` +- **AND** the span's events include the recorded `context.Canceled` error + +--- + +### Requirement: Skipped and Canceled-by-Condition steps produce no spans + +The contrib package SHALL document, and tests SHALL verify, that a step whose `Condition` resolves to `Skipped` or `Canceled` produces zero spans through either interceptor — by virtue of the core capability `step-interceptor` bypassing the chain entirely for such steps. `contrib/otel` SHALL NOT implement any compensating behavior. + +#### Scenario: Skipped step contributes no span +- **GIVEN** a Workflow with a step `s` whose `Condition` returns `Skipped` +- **AND** both `NewStepInterceptor` and `NewAttemptInterceptor` are registered +- **WHEN** the Workflow runs +- **THEN** the span recorder contains zero spans referencing `s` + +--- + +### Requirement: Runnable godoc Example + +The `contrib/otel` module SHALL include a runnable godoc Example (`Example*` function in `example_test.go`) that constructs a `TracerProvider` with a stdout exporter, registers both interceptors on a minimal Workflow, and runs the Workflow. The Example SHALL compile and execute under `go test ./...` inside the contrib module without external network access. + +#### Scenario: Example builds and runs +- **WHEN** `go test ./... -run Example` is executed inside `contrib/otel/` +- **THEN** the command exits with status 0 +- **AND** the Example function appears in `go doc -all github.com/Azure/go-workflow/contrib/otel` From 598707277e8441072dd976439fd47984345b183e Mon Sep 17 00:00:00 2001 From: Xingfei Xu Date: Fri, 15 May 2026 02:35:14 +0000 Subject: [PATCH 13/13] chore(openspec): sync contrib-otel capability and archive change Move openspec/changes/contrib-otel-tracing/ to archive/2026-05-15-* and copy the delta spec into openspec/specs/contrib-otel/spec.md so the new capability is part of the main spec set. --- .../.openspec.yaml | 0 .../design.md | 0 .../proposal.md | 0 .../specs/contrib-otel/spec.md | 0 .../2026-05-15-contrib-otel-tracing}/tasks.md | 0 openspec/specs/contrib-otel/spec.md | 197 ++++++++++++++++++ 6 files changed, 197 insertions(+) rename openspec/changes/{contrib-otel-tracing => archive/2026-05-15-contrib-otel-tracing}/.openspec.yaml (100%) rename openspec/changes/{contrib-otel-tracing => archive/2026-05-15-contrib-otel-tracing}/design.md (100%) rename openspec/changes/{contrib-otel-tracing => archive/2026-05-15-contrib-otel-tracing}/proposal.md (100%) rename openspec/changes/{contrib-otel-tracing => archive/2026-05-15-contrib-otel-tracing}/specs/contrib-otel/spec.md (100%) rename openspec/changes/{contrib-otel-tracing => archive/2026-05-15-contrib-otel-tracing}/tasks.md (100%) create mode 100644 openspec/specs/contrib-otel/spec.md diff --git a/openspec/changes/contrib-otel-tracing/.openspec.yaml b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/.openspec.yaml similarity index 100% rename from openspec/changes/contrib-otel-tracing/.openspec.yaml rename to openspec/changes/archive/2026-05-15-contrib-otel-tracing/.openspec.yaml diff --git a/openspec/changes/contrib-otel-tracing/design.md b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/design.md similarity index 100% rename from openspec/changes/contrib-otel-tracing/design.md rename to openspec/changes/archive/2026-05-15-contrib-otel-tracing/design.md diff --git a/openspec/changes/contrib-otel-tracing/proposal.md b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/proposal.md similarity index 100% rename from openspec/changes/contrib-otel-tracing/proposal.md rename to openspec/changes/archive/2026-05-15-contrib-otel-tracing/proposal.md diff --git a/openspec/changes/contrib-otel-tracing/specs/contrib-otel/spec.md b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/specs/contrib-otel/spec.md similarity index 100% rename from openspec/changes/contrib-otel-tracing/specs/contrib-otel/spec.md rename to openspec/changes/archive/2026-05-15-contrib-otel-tracing/specs/contrib-otel/spec.md diff --git a/openspec/changes/contrib-otel-tracing/tasks.md b/openspec/changes/archive/2026-05-15-contrib-otel-tracing/tasks.md similarity index 100% rename from openspec/changes/contrib-otel-tracing/tasks.md rename to openspec/changes/archive/2026-05-15-contrib-otel-tracing/tasks.md diff --git a/openspec/specs/contrib-otel/spec.md b/openspec/specs/contrib-otel/spec.md new file mode 100644 index 0000000..9fef99a --- /dev/null +++ b/openspec/specs/contrib-otel/spec.md @@ -0,0 +1,197 @@ +## ADDED Requirements + +### Requirement: Independent submodule + +The `contrib/otel` package SHALL be delivered as an independent Go module at module path `github.com/Azure/go-workflow/contrib/otel`, located at the repository directory `contrib/otel/`. The module SHALL declare Go 1.23 (matching the core module) and SHALL NOT cause the core module `github.com/Azure/go-workflow` to acquire any new direct or transitive dependency on OpenTelemetry packages. + +#### Scenario: Core go.mod is unchanged +- **GIVEN** the change has been applied +- **WHEN** the root `go.mod` of `github.com/Azure/go-workflow` is inspected +- **THEN** it contains no `require`, `replace`, or `exclude` line referencing any `go.opentelemetry.io/...` module +- **AND** `go mod tidy` at the repository root produces no diff caused by this change + +#### Scenario: Contrib module builds standalone +- **GIVEN** a checkout of the repository +- **WHEN** the developer runs `go build ./...` and `go test ./...` from `contrib/otel/` +- **THEN** the commands succeed using only the contrib module's `go.mod` +- **AND** the contrib `go.mod` declares the module path `github.com/Azure/go-workflow/contrib/otel` +- **AND** the contrib `go.mod` carries `go 1.23` + +#### Scenario: Contrib module resolves core via replace during development +- **GIVEN** the repository is checked out at the change branch +- **WHEN** `contrib/otel/go.mod` is inspected +- **THEN** it contains a `replace github.com/Azure/go-workflow => ../..` directive (or an equivalent local replace) so the contrib module builds against in-tree core sources + +--- + +### Requirement: Dependency policy + +The `contrib/otel` module's runtime (non-test) dependencies SHALL be limited to the OpenTelemetry **API** packages — `go.opentelemetry.io/otel` and `go.opentelemetry.io/otel/trace` (and their transitive requirements) — and `github.com/Azure/go-workflow`. The OpenTelemetry **SDK**, exporter packages, and `tracetest` SHALL appear only as test dependencies. + +#### Scenario: SDK is test-only +- **GIVEN** the contrib module +- **WHEN** `go list -deps -test=false ./...` is run inside `contrib/otel/` +- **THEN** the output contains no package under `go.opentelemetry.io/otel/sdk/...` +- **AND** the output contains no `stdouttrace` or other exporter packages + +#### Scenario: API packages available at runtime +- **WHEN** a consumer imports `github.com/Azure/go-workflow/contrib/otel` +- **THEN** the import graph includes `go.opentelemetry.io/otel/trace` and `go.opentelemetry.io/otel/attribute` + +--- + +### Requirement: Step interceptor factory + +`contrib/otel` SHALL export a function `NewStepInterceptor(opts ...Option) flow.StepInterceptor` that returns a value satisfying the `flow.StepInterceptor` interface. Each invocation of the returned `InterceptStep` method SHALL start exactly one span at the beginning of `next` and end it after `next` returns, regardless of how many retry attempts occur inside `next`. + +#### Scenario: One span per step on success +- **GIVEN** a Workflow with a single step `s` registered with `flow.WithStepInterceptor(flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)))` where `tp` records spans +- **WHEN** the Workflow runs and `s` succeeds on the first attempt +- **THEN** exactly one span is recorded with the step span name +- **AND** the span has status `OK` (or unset, the OTel default for success) + +#### Scenario: One span per step despite retries +- **GIVEN** a Workflow whose step `s` is configured to retry up to 3 times and finally succeeds on attempt 2 +- **AND** only `NewStepInterceptor` is registered (no AttemptInterceptor) +- **WHEN** the Workflow runs +- **THEN** exactly one span is recorded for `s` (not three) + +#### Scenario: Step span records terminal failure +- **GIVEN** a step `s` whose final attempt returns a non-nil error `err` +- **WHEN** the step interceptor's outer `next` returns +- **THEN** the recorded span calls `RecordError(err)` and `SetStatus(codes.Error, err.Error())` +- **AND** the span attribute `workflow.step.status` equals `"error"` + +--- + +### Requirement: Attempt interceptor factory + +`contrib/otel` SHALL export a function `NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor` that returns a value satisfying the `flow.AttemptInterceptor` interface. Each invocation of `InterceptAttempt` SHALL start one span before calling `next` and end it after `next` returns. Each per-attempt span SHALL carry the attempt index as the OTel attribute `workflow.step.attempt`. + +#### Scenario: One attempt span per attempt +- **GIVEN** a Workflow whose step `s` is configured to retry up to 3 times and ultimately succeeds on attempt 2 +- **AND** only `NewAttemptInterceptor` is registered (no StepInterceptor) +- **WHEN** the Workflow runs +- **THEN** exactly three spans are recorded (attempt 0, attempt 1, attempt 2) +- **AND** each span carries attribute `workflow.step.attempt` with the matching 0-based index + +#### Scenario: Failing attempt records its error +- **GIVEN** an attempt that returns error `err` from `next` +- **WHEN** that attempt's span is ended +- **THEN** the span calls `RecordError(err)` and `SetStatus(codes.Error, err.Error())` + +--- + +### Requirement: Independent registration of the two layers + +The two interceptor factories SHALL be usable independently or together. When both are registered on the same Workflow, the attempt span SHALL be a child of the step span (i.e. the attempt's `Span.Parent.SpanID` equals the step span's `SpanID`). When only one is registered, that span SHALL be a child of whatever span (if any) is on the caller-provided context. + +#### Scenario: Both layers registered → parent/child relation +- **GIVEN** a Workflow registered with both `NewStepInterceptor` and `NewAttemptInterceptor` +- **WHEN** a single step runs once +- **THEN** the step span and the attempt span share the same `TraceID` +- **AND** the attempt span's `Parent.SpanID` equals the step span's `SpanID` + +#### Scenario: Only attempt layer registered → no orphan +- **GIVEN** a Workflow registered with only `NewAttemptInterceptor` and a caller-supplied context that already contains an outer span `outer` +- **WHEN** a step runs +- **THEN** the attempt span's `Parent.SpanID` equals `outer`'s `SpanID` + +--- + +### Requirement: Default span naming + +When no custom namer is supplied, the default step span name SHALL equal `flow.String(step)`, and the default attempt span name SHALL equal `fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt)`. + +#### Scenario: Default step span name +- **GIVEN** a step `s` for which `flow.String(s) == "MyStep"` +- **WHEN** the step interceptor produces a span without a custom `WithStepSpanNamer` +- **THEN** the span's name is `"MyStep"` + +#### Scenario: Default attempt span name carries attempt index +- **GIVEN** a step `s` for which `flow.String(s) == "MyStep"` running its second attempt (`attempt == 1`) +- **WHEN** the attempt interceptor produces a span without a custom `WithAttemptSpanNamer` +- **THEN** the span's name is `"MyStep (attempt 1)"` + +--- + +### Requirement: Default span attributes + +The step span SHALL be created with attribute `workflow.step.name = flow.String(step)`, and on `End` it SHALL receive attribute `workflow.step.status` equal to `"success"` when `next` returned nil and `"error"` otherwise. The attempt span SHALL be created with attributes `workflow.step.name = flow.String(step)` and `workflow.step.attempt = attempt` (as an `Int64` attribute). + +#### Scenario: Step span carries name and status +- **GIVEN** a step `s` that succeeds +- **WHEN** its span is ended +- **THEN** the span's attribute set contains `workflow.step.name = flow.String(s)` +- **AND** the attribute `workflow.step.status` equals `"success"` + +#### Scenario: Attempt span carries name and attempt index +- **GIVEN** an attempt at index 2 of step `s` +- **WHEN** its span is created +- **THEN** the span's attribute set contains `workflow.step.name = flow.String(s)` +- **AND** the attribute `workflow.step.attempt` equals the int64 `2` + +--- + +### Requirement: Functional options + +`contrib/otel` SHALL expose a single exported `Option` type and the following functional option constructors. Both factories SHALL accept the same `Option` values. Options that target one layer SHALL be no-ops on the other layer (no error, no panic). + +| Option | Effect | +|---|---| +| `WithTracerProvider(tp trace.TracerProvider)` | Sets the provider. If `nil` or unset, defaults to `otel.GetTracerProvider()` resolved at factory-call time. | +| `WithTracerName(name string)` | Sets the tracer instrumentation name. Default `"github.com/Azure/go-workflow/contrib/otel"`. | +| `WithStepSpanNamer(fn func(flow.Steper) string)` | Overrides default step span naming. | +| `WithAttemptSpanNamer(fn func(flow.Steper, uint64) string)` | Overrides default attempt span naming. | +| `WithStepAttributes(fn func(flow.Steper) []attribute.KeyValue)` | Adds extra attributes to the step span at start. Defaults still apply. | +| `WithAttemptAttributes(fn func(flow.Steper, uint64) []attribute.KeyValue)` | Adds extra attributes to the attempt span at start. Defaults still apply. | + +#### Scenario: Custom step namer overrides default +- **GIVEN** a `NewStepInterceptor(WithStepSpanNamer(func(flow.Steper) string { return "custom-name" }))` +- **WHEN** any step runs through the interceptor +- **THEN** its step span name is `"custom-name"` + +#### Scenario: Custom attempt attributes are appended, not replaced +- **GIVEN** a `NewAttemptInterceptor(WithAttemptAttributes(func(flow.Steper, uint64) []attribute.KeyValue { return []attribute.KeyValue{attribute.String("custom.k", "v")} }))` +- **WHEN** an attempt span is created +- **THEN** it carries the default attributes (`workflow.step.name`, `workflow.step.attempt`) AND the custom attribute `custom.k = "v"` + +#### Scenario: Provider defaults to global +- **GIVEN** no `WithTracerProvider` option +- **WHEN** `NewStepInterceptor()` is called +- **THEN** the constructed interceptor obtains its tracer from `otel.GetTracerProvider()` at the moment of the call (not lazily on every interception) + +--- + +### Requirement: Cancellation is recorded as error + +When the error returned by `next` satisfies `errors.Is(err, context.Canceled)`, the span produced by either interceptor SHALL be ended with `RecordError(err)` and `SetStatus(codes.Error, err.Error())` — the same as any other non-nil error. No special-case suppression SHALL be applied in v0.1. + +#### Scenario: Cancelled step is Error +- **GIVEN** a step that observes context cancellation and returns `context.Canceled` +- **WHEN** the span is ended +- **THEN** the span has status code `codes.Error` +- **AND** the span's events include the recorded `context.Canceled` error + +--- + +### Requirement: Skipped and Canceled-by-Condition steps produce no spans + +The contrib package SHALL document, and tests SHALL verify, that a step whose `Condition` resolves to `Skipped` or `Canceled` produces zero spans through either interceptor — by virtue of the core capability `step-interceptor` bypassing the chain entirely for such steps. `contrib/otel` SHALL NOT implement any compensating behavior. + +#### Scenario: Skipped step contributes no span +- **GIVEN** a Workflow with a step `s` whose `Condition` returns `Skipped` +- **AND** both `NewStepInterceptor` and `NewAttemptInterceptor` are registered +- **WHEN** the Workflow runs +- **THEN** the span recorder contains zero spans referencing `s` + +--- + +### Requirement: Runnable godoc Example + +The `contrib/otel` module SHALL include a runnable godoc Example (`Example*` function in `example_test.go`) that constructs a `TracerProvider` with a stdout exporter, registers both interceptors on a minimal Workflow, and runs the Workflow. The Example SHALL compile and execute under `go test ./...` inside the contrib module without external network access. + +#### Scenario: Example builds and runs +- **WHEN** `go test ./... -run Example` is executed inside `contrib/otel/` +- **THEN** the command exits with status 0 +- **AND** the Example function appears in `go doc -all github.com/Azure/go-workflow/contrib/otel`