Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ and will be removed in the next major version.
conditions, retry/timeout, composite steps, interceptors and workflow options.
- **DeepWiki:** <https://deepwiki.com/Azure/go-workflow>

## Contrib

Optional, independently-versioned modules under `contrib/`:

- **[`contrib/otel`](./contrib/otel)** — OpenTelemetry tracing integration
via the existing `StepInterceptor` / `AttemptInterceptor` extension
points. Released as a separate Go module
(`github.com/Azure/go-workflow/contrib/otel`) so its OpenTelemetry
dependency does not enter core's transitive graph.

## Contributing

This project welcomes contributions. Most contributions require you to agree to a Contributor
Expand Down
66 changes: 66 additions & 0 deletions contrib/otel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# contrib/otel

OpenTelemetry tracing integration for [go-workflow](../..) — implemented as
two interceptor factories that plug into the existing `StepInterceptor` and
`AttemptInterceptor` extension points.

```go
import (
flow "github.com/Azure/go-workflow"
"github.com/Azure/go-workflow/contrib/otel"
)

w := &flow.Workflow{
Option: flow.WorkflowOption{
StepInterceptors: []flow.StepInterceptor{
flowotel.NewStepInterceptor(flowotel.WithTracerProvider(tp)),
},
AttemptInterceptors: []flow.AttemptInterceptor{
flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)),
},
},
}
```

See the package godoc and the runnable `Example` for the full wiring.

## What you get by default

- One **step span** per Step lifetime (covering all retries) — name
`flow.String(step)`, attributes `workflow.step.name`, `workflow.step.status`
(`"success"` or `"error"`).
- One **attempt span** per individual attempt — name
`"<step> (attempt N)"`, attributes `workflow.step.name`,
`workflow.step.attempt` (int64).
- Errors recorded with `RecordError` + `SetStatus(codes.Error)`; this
includes `context.Canceled`.
- Steps that are `Skipped` or `Canceled` by their `Condition` produce no
spans (they bypass the interceptor chain in core).

Every default can be overridden via the `With*` options. See the godoc.

## Dependency policy

`contrib/otel` is an **independent Go module** (`github.com/Azure/go-workflow/contrib/otel`)
so the OpenTelemetry dependency does not enter the core module's transitive
graph. Runtime requires are limited to the OpenTelemetry **API**
(`go.opentelemetry.io/otel`, `…/otel/trace`); the SDK and exporters are
test-only dependencies.

## Working on the module

Because the contrib module is separate, run its tests from inside the module:

cd contrib/otel && go test ./...

The module's `go.mod` carries `replace github.com/Azure/go-workflow => ../..`
so it builds against in-tree core during development.

## Open follow-ups

- **CI**: add a job that runs `go test ./...` (and `-race`) inside
`contrib/otel/` in addition to the root-module job.
- **Release**: before tagging `contrib/otel/v0.1.0`, drop or pin the
`replace github.com/Azure/go-workflow => ../..` directive in
`contrib/otel/go.mod` so `go get
github.com/Azure/go-workflow/contrib/otel@v0.1.0` resolves cleanly.
59 changes: 59 additions & 0 deletions contrib/otel/attempt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package flowotel

import (
"context"
"fmt"

flow "github.com/Azure/go-workflow"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// NewAttemptInterceptor returns a flow.AttemptInterceptor that emits one
// OpenTelemetry span per attempt (one per call into the retry loop).
//
// The span name defaults to fmt.Sprintf("%s (attempt %d)", flow.String(step),
// attempt) and may be overridden via WithAttemptSpanNamer. The default
// attribute set always includes workflow.step.name = flow.String(step) and
// workflow.step.attempt = int64(attempt). Extra attributes can be supplied
// via WithAttemptAttributes; they are appended to (not in place of) the
// defaults at span-start time. Canonical attributes (workflow.step.name,
// workflow.step.attempt) always win over user-supplied attributes — i.e.,
// WithAttemptAttributes cannot override them.
//
// On a non-nil error from next() the span records the error via
// span.RecordError and sets its status to codes.Error. context.Canceled
// is treated like any other error (no special-case).
//
// Steps that the scheduler settles inline (Skipped or Canceled by their
// Condition) bypass the interceptor chain entirely and produce no span.
func NewAttemptInterceptor(opts ...Option) flow.AttemptInterceptor {
cfg := newConfig(opts)
tracer := cfg.resolveTracer()
return flow.AttemptInterceptorFunc(func(ctx context.Context, step flow.Steper, attempt uint64, next func(context.Context) error) error {
spanName := fmt.Sprintf("%s (attempt %d)", flow.String(step), attempt)
if cfg.attemptSpanNamer != nil {
spanName = cfg.attemptSpanNamer(step, attempt)
}
// User attributes first, canonical defaults last so OTel's
// last-write-wins semantics keep canonical attrs authoritative.
attrs := make([]attribute.KeyValue, 0, 4)
if cfg.attemptAttributes != nil {
attrs = append(attrs, cfg.attemptAttributes(step, attempt)...)
}
attrs = append(attrs,
attribute.String(attrStepName, flow.String(step)),
attribute.Int64(attrStepAttempt, int64(attempt)),
)
ctx, span := tracer.Start(ctx, spanName, trace.WithAttributes(attrs...))
defer span.End()

err := next(ctx)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
return err
})
}
176 changes: 176 additions & 0 deletions contrib/otel/attempt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package flowotel_test

import (
"context"
"fmt"
"testing"

flow "github.com/Azure/go-workflow"
"github.com/Azure/go-workflow/contrib/otel"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)

func TestAttemptInterceptor_OneSpanPerAttempt(t *testing.T) {
t.Parallel()
tp, rec := newRecorderTracerProvider()
step := &retryStep{Name: "S", NeedAttempts: 4} // succeeds on the 4th try
w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)))
w.Add(flow.Step(step).Retry(noBackoff(5)))
require.NoError(t, w.Do(context.Background()))

spans := rec.Ended()
require.Len(t, spans, 4, "expected one span per attempt (0..3)")
for i, s := range spans {
a, ok := findAttr(s.Attributes(), "workflow.step.attempt")
require.True(t, ok, "span %d missing workflow.step.attempt", i)
assert.Equal(t, int64(i), a.Value.AsInt64(), "span %d attempt index mismatch", i)
assertAttr(t, s.Attributes(), "workflow.step.name", "S")
}
}

func TestAttemptInterceptor_DefaultName(t *testing.T) {
t.Parallel()
tp, rec := newRecorderTracerProvider()
step := flow.NoOp("MyStep")
w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)))
w.Add(flow.Step(step))
require.NoError(t, w.Do(context.Background()))

spans := rec.Ended()
require.Len(t, spans, 1)
assert.Equal(t, "MyStep (attempt 0)", spans[0].Name())
}

func TestAttemptInterceptor_FailingAttemptRecorded(t *testing.T) {
t.Parallel()
tp, rec := newRecorderTracerProvider()
step := &retryStep{Name: "Flaky", NeedAttempts: 2} // fails once, then succeeds
w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)))
w.Add(flow.Step(step).Retry(noBackoff(2)))
require.NoError(t, w.Do(context.Background()))

spans := rec.Ended()
require.Len(t, spans, 2, "expected one span per attempt (failure + success)")

// First attempt span: failure with RecordError + codes.Error.
first := spans[0]
assert.Equal(t, codes.Error, first.Status().Code, "first attempt span should be Error")
var sawException bool
for _, ev := range first.Events() {
if ev.Name == exceptionEventName {
sawException = true
break
}
}
assert.True(t, sawException, "first attempt should record an exception event")

// Second attempt span: success leaves status Unset.
second := spans[1]
assert.Equal(t, codes.Unset, second.Status().Code, "successful attempt span should be Unset")
}

func TestAttemptInterceptor_ChildOfCallerSpan(t *testing.T) {
t.Parallel()
tp, rec := newRecorderTracerProvider()
tracer := tp.Tracer("test")
ctx, outer := tracer.Start(context.Background(), "OUTER")
defer outer.End()

step := flow.NoOp("S")
w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)))
w.Add(flow.Step(step))
require.NoError(t, w.Do(ctx))

// OUTER is still open (it ends via defer); the recorder has exactly the
// attempt span. A regression that emits more than one span will surface here.
spans := rec.Ended()
require.Len(t, spans, 1)
attempt := spans[0]
assert.Equal(t, outer.SpanContext().SpanID(), attempt.Parent().SpanID(),
"attempt span must be a child of the caller-supplied OUTER span")
assert.Equal(t, outer.SpanContext().TraceID(), attempt.SpanContext().TraceID())
}

func TestAttemptInterceptor_CustomNamer(t *testing.T) {
t.Parallel()
tp, rec := newRecorderTracerProvider()
step := flow.NoOp("Original")
namer := func(s flow.Steper, n uint64) string {
return fmt.Sprintf("X-%s-%d", flow.String(s), n)
}
w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(
flowotel.WithTracerProvider(tp),
flowotel.WithAttemptSpanNamer(namer),
))
w.Add(flow.Step(step))
require.NoError(t, w.Do(context.Background()))

spans := rec.Ended()
require.Len(t, spans, 1)
s := spans[0]
assert.Equal(t, "X-Original-0", s.Name(), "custom attempt namer should win")
// Canonical attributes still present despite the custom name.
assertAttr(t, s.Attributes(), "workflow.step.name", "Original")
a, ok := findAttr(s.Attributes(), "workflow.step.attempt")
require.True(t, ok)
assert.Equal(t, int64(0), a.Value.AsInt64())
}

func TestAttemptInterceptor_CustomAttributes(t *testing.T) {
t.Parallel()
tp, rec := newRecorderTracerProvider()
step := flow.NoOp("Hello")
extras := func(flow.Steper, uint64) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String("env", "test"),
attribute.Int("answer", 42),
// Regression: a malicious user attribute trying to override the
// canonical workflow.step.attempt key MUST be superseded by the
// interceptor's own value.
attribute.Int64("workflow.step.attempt", 999),
}
}
w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(
flowotel.WithTracerProvider(tp),
flowotel.WithAttemptAttributes(extras),
))
w.Add(flow.Step(step))
require.NoError(t, w.Do(context.Background()))

spans := rec.Ended()
require.Len(t, spans, 1)
attrs := spans[0].Attributes()

// Defaults still present.
assertAttr(t, attrs, "workflow.step.name", "Hello")
canonical, ok := findAttr(attrs, "workflow.step.attempt")
require.True(t, ok)
assert.Equal(t, int64(0), canonical.Value.AsInt64(),
"canonical workflow.step.attempt must win over user-supplied override")

// User attributes appended.
assertAttr(t, attrs, "env", "test")
a, ok := findAttr(attrs, "answer")
require.True(t, ok, "custom int attribute missing")
assert.Equal(t, int64(42), a.Value.AsInt64())
}

func TestAttemptInterceptor_ContextCanceled(t *testing.T) {
t.Parallel()
tp, rec := newRecorderTracerProvider()
step := &alwaysFail{Name: "S", Err: context.Canceled}
w := newTestWorkflow(nil, flowotel.NewAttemptInterceptor(flowotel.WithTracerProvider(tp)))
w.Add(flow.Step(step))
_ = w.Do(context.Background()) // workflow itself errors

spans := rec.Ended()
require.NotEmpty(t, spans)
s := spans[0]
assert.Equal(t, codes.Error, s.Status().Code)
require.NotEmpty(t, s.Events())
assert.Equal(t, exceptionEventName, s.Events()[0].Name)
}
11 changes: 11 additions & 0 deletions contrib/otel/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package flowotel

// Attribute keys and status values emitted by the contrib/otel interceptors.
const (
attrStepName = "workflow.step.name"
attrStepStatus = "workflow.step.status"
attrStepAttempt = "workflow.step.attempt"

statusSuccess = "success"
statusError = "error"
)
Loading
Loading