Skip to content

feat(workflow): Support configurable workflows#936

Open
baptmont wants to merge 57 commits into
v2from
baptmont/v2-configurable
Open

feat(workflow): Support configurable workflows#936
baptmont wants to merge 57 commits into
v2from
baptmont/v2-configurable

Conversation

@baptmont
Copy link
Copy Markdown
Contributor

@baptmont baptmont commented Jun 1, 2026

Passing test "TestLoadComplexWorkflowWithSubAgentsYAML" depends on #943

Overview

This Pull Request implements YAML-driven agent workflow graph loading and orchestration.


Key Changes & Capabilities

1. Configurable Declarative Workflows (internal/configurable)

  • YAML Loader & Registry (configurable_workflow.go): Added full capabilities to load, parse, and orchestrate a complete DAG from a Workflow agent YAML file.
  • Dynamic Node Resolution: Dynamically resolves, instantiates, and caches nodes of varying classes:
    • FunctionNode: Instantiates a node wrapping custom functions registered in nodeFunctionRegistry.
    • JoinNode: Combines execution paths and buffers multi-branch inputs.
    • ToolNode: Resolves referenced tools and validates their configurations.
    • AgentNode: Resolves and wraps standard conversational/nested sub-agents.
  • Sequence & Edge Routing: Parses edge chains and maps custom route structures (both explicit string-based transitions and fallback default pathways).
  • Hierarchy Population: Automatically aggregates and registers sub-agents to populate the top-level agent's sub-agent dependencies.

2. Testing & Conformance Integration

  • CLI Registration: Registered conformance.uppercase_formatter function in cmd/internal/adkcli/main.go on startup to support conformance suite execution.

wolo-lab and others added 30 commits May 25, 2026 08:26
Besides naive implementation, basic struct and interfaces were added.

Includes the core workflow implementation, unit tests, and an example
to run it in the web UI.
…tional graph traversal (#768)

* feat: implement workflow routing support with tagged events and conditional graph traversal

* refactor: rename event Route field to Routes and update workflow traversal logic
* feat: add tool node to worflow agents

* update tool node so it's not converting the output to the types, but only validates the schema

* check the tool is runnable inside constructor

* nit: fix node names in test
…ion (#803)

* workflow: introduce scheduler-based engine with goroutine-per-node execution

Squashes the workflow engine WIP work onto the latest wolo/workflows
base. The branch had 8 incremental commits; this single commit
captures the cumulative state on top of upstream PR #795 (EdgeBuilder),
PR #796 (NodeConfig), and PR #797 (node name validation).

Architecture:
  * Goroutine-per-node execution model: each scheduled node runs in
    its own goroutine, pushing events into a buffered channel.
  * Single consumer goroutine (runState.run) drains the channel,
    applies state-side effects, yields events to the caller, and
    schedules successors when nodes complete.
  * Replaces the legacy in-process BFS in Workflow.Run with the new
    scheduler, removing findNextNodes and the inline event loop.

New types and constructors:
  * BaseNode (and NewBaseNode) for shared Name/Description/Config
    bookkeeping; FunctionNode and toolNode now embed it.
  * Graph helpers in graph.go: indexed adjacency for O(1) successor
    lookup.
  * RunState (persistable) and runState (in-process scheduler bag)
    in state.go; node lifecycle map (NodeStatus + per-node accumulators).
  * NodeContext wraps InvocationContext with a per-node TriggeredBy
    accessor; agent.InvocationContext gains TriggeredBy() returning
    "" for non-workflow contexts (mocks updated accordingly).
  * Scheduler (scheduler.go): runNode goroutine wrapper, eventQueue,
    cancelAll, and the run consumer loop with sibling cancellation
    and per-node timeout.

Routing and validation:
  * findSuccessors honours unconditional edges, concrete Routes, and
    the Default fallback. Silent dead-ends remain intentional per
    adk-python parity.
  * Workflow.New now returns (*Workflow, error) — picks up the name
    validation introduced upstream by PR #797.

NodeConfig timeout shape:
  * Timeout is time.Duration (not *time.Duration), with zero meaning
    "inherit parent context". Matches net.Dialer.Timeout and the
    http.Server.*Timeout convention; keeps call sites free of pointer
    boilerplate.
  * Adds RerunOnResumeOr / WaitForOutputOr accessor helpers for
    pointer-typed pointer-typed tri-state fields.
  * Adds TestDefaultRetryConfig from upstream PR #796.

Examples and tests:
  * examples/workflow/basic uses NodeConfig{RetryConfig: DefaultRetryConfig()}
    to demo the helper.
  * 7 New(edges) callers updated for the new (*Workflow, error)
    signature.

Tests verified: go build ./..., go vet ./..., go test -race
./workflow/... ./agent/workflowagent/... all pass.
Adds the engine-side scaffolding for human-in-the-loop pauses: a
workflow node can now emit a session.RequestInput, and the
scheduler will park the node in NodeWaiting and stop scheduling its
successors instead of finalising the run. This is the pause half of
HITL only; the resume half (Workflow.Resume, the agent.Agent
wrapper, schema validation, handoff vs. re-entry modes) is left for
follow-up PRs.

Type names and field names are aligned with adk-python's
RequestInput in src/google/adk/events/request_input.py:
interrupt_id / message / response_schema / payload.

API additions:
* session.RequestInput carries the prompt: InterruptID (stable
  correlation key), Message (UI text), ResponseSchema (reserved for
  the future validator), Payload (opaque UI context).
* session.Event.RequestedInput field, parallel to Routes; populated
  by the node, consumed by the scheduler and forwarded to the UI
  surface unchanged.
* workflow.NewRequestInputEvent(ctx, req) constructor, including
  UUID auto-generation when InterruptID is empty.
* workflow.NodeState.PendingRequest, persisted on the per-node
  state when the waiting branch fires.
* workflow.ErrMultipleInputRequests sentinel for the
  single-request-per-activation invariant.

Scheduler changes:
* nodeRun gains an inputRequest field with a setInputRequest
  method that mirrors the existing setRoutingEvent / setOutput
  pattern.
* handleEvent dispatches on ev.RequestedInput exactly parallel to
  the existing dispatch on ev.Routes.
* handleCompletion gains a waiting branch checked AFTER the
  error/cancel branches: a clean activation that recorded a
  request transitions to NodeWaiting, persists the request on
  NodeState, and skips successor scheduling. Failures take
  precedence so a node that recorded a request and then errored
  out lands in NodeFailed, not NodeWaiting.
* The scheduler.run loop is unchanged: it terminates when the
  runsByName map empties, which now happens when every live node
  has either completed or moved into NodeWaiting.

Tests:
* TestScheduler_HitlNode_PausesAndForwardsRequest pins the
  happy-path single-waiting-node behaviour.
* TestScheduler_HitlNode_AutoGeneratesInterruptID and
  TestScheduler_HitlNode_PreservesExplicitInterruptID lock in the
  InterruptID contract.
* TestScheduler_HitlNode_MultipleRequestsFails surfaces
  ErrMultipleInputRequests at completion.
* TestScheduler_HitlNode_ErrorAfterRequestFails pins the
  fail-over-park precedence in handleCompletion.
* TestScheduler_HitlNode_ConcurrentBranches_PausesOnlyWhenAllNonRunning
  exercises a parallel-branch graph: the non-HITL branch finishes
  normally while the HITL branch parks; the workflow ends only
  when both reach a terminal state.

All existing workflow tests still pass; the suite is race-free
under go test -race.
…sistence (#847)

Builds on the pause-side primitives in the previous PR by adding the
resume half of the human-in-the-loop cycle: Workflow.Resume routes
a user-supplied response back into a paused workflow, and the
existing workflowagent.New wrapper learns to detect and dispatch
resume turns automatically. Run state survives across processes by
riding in session.State.

Engine additions in workflow/:

* Workflow.Resume(ctx, state, responses) iter.Seq2 — for each
  NodeWaiting whose InterruptID matches an entry in responses,
  validates the payload against PendingRequest.ResponseSchema (when
  non-nil), consumes PendingRequest before re-scheduling for
  idempotency, then routes the response to the asker's successors
  via findSuccessors so handoff reuses the normal routing / fan-out
  / fan-in path. ErrInvalidResumeResponse surfaces validation
  failures and leaves the node parked so the caller can retry.

* newSchedulerFromState lets Resume seed a scheduler with a
  loaded RunState rather than a fresh one.

* Workflow.SetName / Workflow.Name expose the persistence-namespacing
  identifier set by workflowagent.New.

* Workflow.Run now persists the post-run state at the end of every
  invocation so a follow-up Resume can pick it up.

* WorkflowInputFunctionCallName constant ('adk_request_workflow_input')
  and the synthesised FunctionCall part on RequestInputEvent — the
  same shape tool confirmation uses, lets the runner's generic
  ID-based dispatch route the user's follow-up FunctionResponse
  back to this agent without runner-side changes.

* RunState / NodeState / RequestInput gain JSON tags. Persistence
  uses session.State.Get/Set with JSON marshalling; binary payloads
  must be stashed via agent.Artifacts and referenced by URI in
  Payload (mirrors how the Python Live API surfaces audio).

Persistence helpers (workflow/persistence.go):

* LoadRunState / SaveRunState exported so workflowagent (and any
  custom wrapper) can manage RunState lifecycle without taking a
  dependency on internal package details.

* Both helpers handle nil session and nil session.State as no-ops
  for tests using minimal mocks.

Wrapper changes in agent/workflowagent/:

* workflowAgent.run dispatches between Workflow.Run and
  Workflow.Resume by inspecting ctx.UserContent for a
  FunctionResponse with the magic name. Stateless: every run state
  lives in session.State.

* detectResume + decodeWorkflowInputResponse handle the dual wire
  format used by the existing tool-confirmation processor (ADK web
  wraps payloads as {response: <json string>}; other clients inline
  {payload: ...}).

* MockSession in the existing workflow_test gains a State() method
  returning nil, supported by the persistence helpers' nil guards.

Tests (agent/workflowagent/hitl_test.go, ~430 lines):

* TestWorkflowAgent_RunThenResume_Handoff — canonical round-trip;
  pause on RequestInput, resume with payload, handler receives it
  as input.
* TestWorkflowAgent_Resume_RestoresStateFromSession — fresh agent
  instance built from the same edges resumes successfully off the
  same session, verifying cross-instance persistence.
* TestWorkflowAgent_Resume_Idempotent — two Resume calls with the
  same payload run the handler exactly once.
* TestWorkflowAgent_Resume_NoMatchingResponse — Resume with an
  InterruptID that no longer matches a waiting node falls through
  cleanly without blocking on an empty scheduler.
* TestWorkflowAgent_Resume_SchemaValidation_{Pass,Fail} — engine
  validates responses against ResponseSchema; invalid payloads
  surface ErrInvalidResumeResponse and leave the node parked, so a
  retry with a corrected payload still succeeds.
* TestWorkflowAgent_Resume_FanOut — handoff into a multi-successor
  asker delivers the response to every successor.
* TestWorkflowAgent_FreshTurn_NotMistakenForResume — a fresh user
  message that happens to share a session with leftover state from
  a prior workflow does not get misinterpreted as a resume.

All existing workflow and workflowagent tests still pass; the suite
is race-free under go test -race.
## Summary

Builds on #831 (HITL handoff resume) by adding re-entry mode:
when a paused node is configured with `NodeConfig.RerunOnResume =
true`, `Workflow.Resume` re-activates the asker itself instead of
forwarding the response to its successors. The asker observes the
response via `ctx.ResumedInput(interruptID)` and decides what to
emit. Direct analogue of adk-python's `@node(rerun_on_resume=True)`.

Re-entry preserves the asker's original input, so the node can
recompute its decision with the same context it had on the first
turn — only the user's response arrives separately. Successors
fire only when the re-entry activation produces an output, not on
the bare resume call.

Support of multiple asks while executing one node will be added in a follow-up PR.

## Test plan

- [x] `go build ./...` clean
- [x] `go test ./workflow/... ./agent/workflowagent/...` — all pass
- [x] `go test -race` clean
- [x] 4 new tests covering the canonical re-entry round-trip,
  original-input preservation across re-entry, the
  no-successor-before-output invariant, and a regression guard
  pinning that handoff is still the default mode
- [x] 1 new test (`TestNodeContext_ResumedInput`) pinning the
  per-node context-level contract
agent.InvocationContext.TriggeredBy() was added in the workflow
scheduler PR (#803) as 'engine-supplied metadata available to
nodes' but no production caller ever materialised: the method is
called only by its own round-trip test and by zero workflow nodes,
samples, agents, tools, callbacks, or telemetry sites in the repo.
The godoc on the interface method is aspirational; the
implementation behind it is a dead branch.

This change drops the public API surface (one interface method
plus six implementations: agent.invocationContext,
internal/context.InvocationContext, and the four MockInvocationContext
types in workflowagent, workflow, replayplugin, and llminternal).
The unrelated NodeState.TriggeredBy field remains: scheduler.go
populates it for resume bookkeeping (workflow.Resume reads it back
when re-scheduling a paused node), and it is part of the
JSON-serialised RunState so dropping it would break forward
compatibility for anyone already running the engine.

The TestNewNodeContext_TriggeredByRoundTrip test is removed (it
exercised the now-deleted method); TestNodeContext_ResumedInput
keeps its coverage of the surviving wrapper functionality.
* workflow: persist resume inputs across re-entry cycles

A re-entry-mode node (NodeConfig.RerunOnResume = true) that yields
RequestInput more than once across resume cycles previously lost
prior responses on each subsequent resume: ctx.ResumedInput exposed
only the response to the most recent InterruptID, and asking the
node about an earlier ID returned (nil, false) even though the user
had already answered it.

The fix accumulates response payloads on NodeState across resume
cycles. Each Resume call merges the new {InterruptID: response}
entry into ns.ResumedInputs; the scheduler hands the full map to
the per-node context on every re-entry activation. The node sees
every prior response, not only the most recent one.
Adds a fan-in primitive built on the orchestrator-aggregator
model: the scheduler waits until every predecessor of a JoinNode
has completed, assembles a map[string]any keyed by predecessor
name, and triggers the JoinNode once with that aggregated input.
The node itself is a pass-through that emits the input as its
output.
The Go workflow runtime synthesises a FunctionCall part on every
RequestInput event so the generic FunctionResponse-by-ID dispatch
can route the user's follow-up reply back to the agent that issued
the request. The synthesised call's Name was 'adk_request_workflow_input'.

adk-python uses 'adk_request_input' for the equivalent constant
(REQUEST_INPUT_FUNCTION_CALL_NAME in
google/adk/workflow/utils/_workflow_hitl_utils.py). The divergence
broke cross-runtime HITL workflows: a session recorded by Python
(with function_call.name='adk_request_input' in the interrupt event)
could not be replayed in Go, because Go-side conformance compare
would see 'adk_request_workflow_input' and flag the mismatch. The
reverse direction is broken too — a function_response addressed by
name to 'adk_request_input' from a Python-authored spec.yaml could
not be routed to a Go workflow agent's pending interrupt.

This change renames the constant value to 'adk_request_input'.
The exported symbol WorkflowInputFunctionCallName is unchanged, so
no Go consumer is affected. The only behaviour difference is the
literal value placed on the wire, which now matches Python.

Discovered while preparing the first cross-language conformance
test for graph-based Workflow + HITL.
…sample (#845)

* console: add HITL support for workflow input prompts

Adds engine-agnostic HITL handling to the console launcher: detect
interrupts emitted on the previous turn, render a prompt for the
operator, and forward the typed reply as a FunctionResponse on the
next turn.

Detection is uniform across interrupt kinds:

* collectPendingInterrupts walks the events yielded during a turn
  and returns one pendingInterrupt per FunctionCall part whose ID
  appears in Event.LongRunningToolIDs. The call's Name is only
  used for rendering and response shaping, never for detection.
  Workflow RequestInput and any future long-running call kind
  flow through the same path.

Per-name dispatch (renderInterruptPrompt / buildInterruptResponse):

* workflow.WorkflowInputFunctionCallName: prints '[HITL input]'
  with the message; pretty-prints the payload (the proposal /
  context attached by the asker node) and the JSON schema if
  either is present. Operator's reply is JSON-parsed first so
  structured replies (objects, arrays, scalars) round-trip as
  typed values; falls back to raw text. Wrapped under 'payload',
  the conventional key for workflow input responses.

* Anything else: prints a generic banner and wraps the raw input
  as {result: <text>}. Lets e.g. a future adk_request_credential
  path be answered through this launcher without a code change
  here, even before its dedicated renderer exists.

  Tool confirmation (toolconfirmation.FunctionCallName) currently
  hits this generic fallback and works at the transport layer
  (the reply still routes back to the tool by FunctionCall.ID),
  but the {result: <text>} envelope does not match what
  ctx.ToolConfirmation() expects to read. A follow-up adds a
  dedicated renderer and yes/no parser.

Main loop integration (console.go):

* After every r.Run iteration drains, scan collectedEvents for
  pending interrupts. If any, render the head's prompt and skip
  the normal '\nUser -> ' banner.

* The next stdin line is interpreted as the answer to that head;
  successive lines drain the rest. Once every interrupt has an
  answer, the assembled FunctionResponse parts are sent in one
  *genai.Content as the next 'turn' through the same r.Run path.
  The reply routes back to the agent by FunctionCall.ID.

* Multiple parallel pauses (rare but legal — e.g. two parallel
  workflow branches both yielding RequestInput on the same turn)
  are collected together and answered one prompt at a time, then
  submitted as one Content with multiple FunctionResponse parts.

Renamed the local 'session' variable to 'sess' inside Run to avoid
shadowing the session package import once the new code path needs
[]*session.Event.


Race-free under go test -race.

* examples/workflow/hitl_simple: minimal HITL sample for console launcher verification

A no-LLM, no-API-key sample that exercises the console launcher's
pause/resume support end-to-end. Two workflow nodes:

  Start → ask_name → greet

ask_name yields a RequestInput so the launcher renders the
prompt. greet receives the user's reply as plain text and emits
a greeting that the launcher prints.

Useful for verifying that wolo/workflow_hitl_console produces
the expected console output without any LLM streaming in the
mix. Run with:

	go run ./examples/workflow/hitl_simple/ console

	User -> hello
	Agent -> What's your name?
	User -> Alice
	Agent -> Hello, Alice!
* feat: add llmagent mode api
wolo-lab and others added 19 commits May 28, 2026 10:26
Wires the user-facing API for dynamic workflows on top of the
sub-scheduler skeleton from the previous PR. A dynamic node's
execution order is expressed as Go code (loops, branches, goroutines)
that calls other nodes inline via RunNode, branches on their typed
output, and pauses for HITL input.

The public surface:

workflow.NewDynamicNode[IN, OUT](name, fn, cfg) — orchestrator
constructor; cfg.RerunOnResume defaults to &true (an explicit
&false is respected).
workflow.RunNode[OUT](ctx, child, input, opts...) — generic
helper for scheduling a child. Returns its typed output, or
errors.Is-matchable ErrNodeInterrupted / ErrNodeFailed.
workflow.WithRunID(id) — option overriding the auto-counter
with a stable id (rejected if empty, purely numeric, or
containing / or @).
session.NodeInfo — substruct on Event carrying the emitting
node's composite path; shape mirrors adk-python's event.nodeInfo.
The scheduler's handleEvent scopes per-activation Output/Routes
invariants by NodeInfo.Path, so a dynamic node forwarding a child's
terminal output plus its own no longer trips ErrMultipleOutputs.
Descendant RequestedInput events are promoted onto the parent's
accumulator so Workflow.Resume matches the InterruptID against the
parent's NodeState.PendingRequest — enabling HITL inside a dynamic
orchestrator.
* examples/workflow/dynamic/basic: minimal dynamic-workflow sample

Runnable mirror of the "Get started" snippet from
https://adk.dev/graphs/dynamic/: a parent dynamic node orchestrates a
single FunctionNode child via workflow.RunNode and emits its output
upstream. ~50 lines of actual logic, no API keys required.

Sits alongside examples/workflow/basic/ and uses the same launcher
integration so users can `go run` it and exercise the workflow through
the standard CLI/UI.

Stacked on the NewDynamicNode + RunNode public-API PR; intentionally
minimal so reviewers can validate the API shape against the canonical
Python example. Richer samples (loop, parallel, HITL) follow once the
corresponding Go features (resume, parallel HITL detection) land.

* examples/workflow/dynamic/llm: add LlmAgent-backed dynamic-workflow sample

Smallest sensible composition that puts an LLM into a dynamic
workflow: one llmagent.New (gemini-3.1-flash-lite, one-line greeting
instruction) wrapped via workflow.NewAgentNode and invoked from a
NewDynamicNode body through workflow.RunNode. Mirrors the existing
examples/workflow/dynamic sample but replaces the trivial FunctionNode
child with a real LlmAgent, demonstrating the agent->node->dynamic
composition path.

Verified end-to-end with 'echo hi | go run ./examples/workflow/dynamic/llm console':
greeter responds with a one-sentence greeting via the deployed Gemini
endpoint.

* examples/workflow/dynamic/hitl: add HITL dynamic-workflow sample

Smallest sensible composition that pauses for human input from inside
a dynamic-node orchestrator: an inline askName node yields
workflow.NewRequestInputEvent; the orchestrator drives it via
workflow.RunNode, swallows ErrNodeInterrupted on the pause activation,
and on resume re-entry reads the reply via NodeContext.ResumedInput
(RerunOnResume defaults to &true for dynamic nodes).

A mid-body emit publishes the greeting as Content so the console
launcher renders it; the terminal Output carries the same string for
downstream nodes.
Adds an opaque go-context value stash, populated by the scheduler at
each per-node activation, that lets tools running inside an LlmAgent
(which is itself running as a workflow node) recover the surrounding
NodeContext via context.Value lookup.

This is a temporary bridge to unblock NewSingleTurnTool (and any
future runnable tool that needs to schedule sub-nodes) without
modifying the public tool.Context interface. The longer-term
solution is the CallbackContext / ToolContext unification tracked in
workflow/node_context.go.

Mechanism: tool.Context embeds context.Context (transitively via
agent.CallbackContext -> agent.ReadonlyContext), so the value
survives every downstream NewInvocationContext / WithContext call on
the path scheduler -> AgentNode.Run -> LlmAgent.run ->
Flow.handleFunctionCalls -> NewToolContext. No interface changes,
no agent_node modifications.
… schedulers (#906)

Adds branch derivation across the three workflow schedulers so the
LLM flow's branch-prefix history filter (already present in
contents_processor.go) actually scopes events per parallel branch
instead of seeing every node run on the empty root branch.

Before this PR, Event.Branch and InvocationContext.Branch()
existed and the filter respected them, but no scheduler ever
derived a non-empty branch — so an LlmAgent wrapped by
ParallelWorker (or by a hand-written errgroup fan-out) saw
every sibling worker's events in its prompt history.

The static scheduler now derives <successor>@1 sub-branches
at fan-out, computes the longest common dot-prefix of predecessor
branches for JoinNode, stamps Event.Branch when the node leaves
it empty, and persists each activation's branch on NodeState for
resume. The ParallelWorker replaces its single shared
workerCtx with per-iteration sub-contexts derived as
<parent>.<wrapped>@<i+1>. The dynamic sub-scheduler gains
two opt-in RunNode options — WithUseSubBranch() and
WithOverrideBranch(base) — for dynamic-node bodies that want to
isolate child activations.
PR #894 (feat(telemetry): add functional test infrastructure)
replaced the TelemetryOptions config in examples/tools/multipletools
with an os.Setenv call but left the telemetry import behind. This
broke 'go build ./...' on v2 with:

    examples/tools/multipletools/main.go:31:2: "google.golang.org/adk/telemetry" imported and not used

Removes the import to unblock the v2 CI.
* feat: add SingleTurnTool

* upd
Enables emission of the following spans:
- invoke_workflow <workflow_name>
- invoke_node <node_name>

Additionally:
- Adds a functional test for graph-based telemetry emission
  (workflow chain scenario).
…node parallelism (#917)

Add a workflow-level concurrency cap that limits how many nodes may run
concurrently within a single Workflow invocation. When the cap is
reached, additional ready-to-run nodes enter NodePending and queue in
FIFO order; the scheduler drains the queue as in-flight nodes complete.

API: workflow.New(name, edges, workflow.WithMaxConcurrency(n))
- n > 0: cap is enforced
- n == 0 (default): unlimited (no behavioural change for existing callers)
- n < 0: clamped to 0 (unlimited)

The cap applies to all top-level static scheduling paths: initial Start
dispatch, fan-out from completed nodes, retry scheduling, and Resume.
It explicitly does NOT apply to dynamic sub-nodes invoked via
workflow.RunNode from inside a DynamicNode body — gating them would
deadlock the parent that awaits the child inline. Same exclusion as
adk-python (see _workflow.py:164 comment).
…ructors accordingly (#911)

Implements JSON schema support and default input validation for BaseNode and integrates them across all workflow node types.
AgentNode now sets Event.Output from concatenated model text on
final agent responses, so RunNode(agentNode, ...) returns the
agent's reply instead of the zero value. Without this, dynamic
workflows that wrap an LlmAgent via NewAgentNode and call
RunNode[string] received "" and could not chain on the agent's
output.

Mirrors adk-python's process_llm_agent_output
(workflow/_llm_agent_wrapper.py:251-279) minus the output_schema
and output_key side-effects, which have no equivalent here yet.

Partial streaming events are not promoted; thoughts are excluded
from the concatenation; the synthesis is skipped when Output is
already set so callers retain control.
dynamicSubScheduler gains an in-memory cache keyed by the resolved
childPath ("<parentPath>/<name>@<runID>"). A repeated RunNode call
with the same WithRunID inside one parent activation returns the
cached output without re-running the child, enabling idempotent
dispatch for replayed or reorderable children. Failures and HITL
interrupts are not cached.

Cross-resume idempotency (rehydrating the cache from session events
on parent re-run) is out of scope here; a follow-up CL will replicate
adk-python's DynamicNodeScheduler._rehydrate_from_events via an
OutputFor event annotation.
…t JSON values (#933)

ConvertToWithJSONSchema previously decoded the marshalled value into
map[string]any before validating against the resolved schema. That
worked for object-shaped inputs but failed for scalars, arrays, and
booleans with:

  json: cannot unmarshal string into Go value of type map[string]interface {}
Introduce dynamic loading of complete declarative agent graph workflows from YAML
configuration files, bringing the Go SDK to full architectural feature parity
with the Python ADK 2.0 workflow runtime.

- Implement new YAML parser to dynamically resolve and cache nested nodes
  (FunctionNode, JoinNode, ToolNode, AgentNode) and map complex edge chains with routing.
- Support automatic output extraction in AgentNode to seamlessly propagate final
  non-thought text blocks as clean string outputs downstream.
- Enable self-healing JSON deserialization in ToolNode to convert raw string
  inputs into structured map[string]any args expected by the tool's schema.
- Allow direct event yielding in FunctionNode to support explicit, stateful event
  routing and conditional transition mechanics.
- Register conformance node functions at CLI startup to enable direct replay testing.
@baptmont baptmont changed the base branch from v2 to main June 1, 2026 13:28
@baptmont baptmont changed the base branch from main to v2 June 1, 2026 13:28
@baptmont baptmont requested a review from wolo-lab June 1, 2026 13:48
Comment thread cmd/internal/adkcli/main.go
Copy link
Copy Markdown

@wolo-lab wolo-lab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

type workflowYAMLConfig struct {
baseAgentConfig `yaml:",inline"`
Edges []yaml.Node `yaml:"edges"`
MaxConcurrency int `yaml:"max_concurrency,omitempty"`
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC it's not supported. It's not a critical field, so it's fine to add a comment/todo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants