Skip to content

feat(otel): Instrument dynamic workflows#947

Open
RKest wants to merge 1 commit into
google:v2from
RKest:dynamic-node-o11y
Open

feat(otel): Instrument dynamic workflows#947
RKest wants to merge 1 commit into
google:v2from
RKest:dynamic-node-o11y

Conversation

@RKest

@RKest RKest commented Jun 2, 2026

Copy link
Copy Markdown

Problem:
Children executed via workflow.RunNode inside a DynamicNode were not instrumented, so RunNode-driven delegation was invisible in the trace tree.

Solution:
dynamicSubScheduler.runNode now opens an invoke_node span per delegated child, nested under the dynamic node's span, and records genuine runtime failures while skipping expected control flow (context.Canceled, ErrNodeInterrupted) to match the top scheduler. The telemetry schema test case was replaced (WorkflowChainCaseWorkflowDynamicCase) to assert the nested span shape.

Testing Plan

Unit Tests:

  • I have added or updated unit tests for my change.
  • All unit tests pass locally.
ok  google.golang.org/adk/workflow                            0.111s
ok  google.golang.org/adk/internal/telemetry                  (cached)
ok  google.golang.org/adk/internal/telemetry/functionaltest   0.030s

Manual End-to-End (E2E) Tests:

go run ./examples/workflow/dynamic/hitl/main.go console -otel_to_cloud

Dynamic node is instrumented:
image

Checklist

  • I have read the CONTRIBUTING.md document.
  • I have performed a self-review of my own code.
  • I have commented my code, particularly in hard-to-understand areas.
  • I have added tests that prove my fix is effective or that my feature works.
  • New and existing unit tests pass locally with my changes.
  • I have manually tested my changes end-to-end.
  • Any dependent changes have been merged and published in downstream modules.

Additional context

N/A

// Mirrors test_telemetry_schema in
// adk-python/tests/unittests/telemetry/test_node_functional.py.
func TestTelemetrySchema_WorkflowChain(t *testing.T) {
func TestTelemetrySchema_Workflow(t *testing.T) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have two workflow types: static and dynamic. I think we should cover both of them, so just add a new test instead of replacing it.

//
// Mirrors test_telemetry_schema in
// adk-python/tests/unittests/telemetry/test_node_functional.py.
func TestTelemetrySchema_WorkflowChain(t *testing.T) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have some more tests with error handling

  1. e.g. upper_node returning error.
  2. check that ErrNodeInterrupted is properly handled

}
childPath := s.parentPath + "/" + name + "@" + runID

if cached, ok := s.lookupCachedOutput(childPath); ok {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to double check that it is desired behavior: cache lookup won't emit span. If so we could add a small comment so in the future we would know that it is ok.

// Session, invocation metadata, and cancellation come from
// s.parentCtx. opts carries the resolved RunNodeOption arguments.
func (s *dynamicSubScheduler) runNode(child Node, input any, opts runNodeOptions) (any, error) {
func (s *dynamicSubScheduler) runNode(child Node, input any, opts runNodeOptions) (result any, err error) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: result name is used but in the code we have return **out**, nil. It would be a bit more consistent if we use out or result in both places: in the function signature and in the body

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants