Skip to content

spike(workflow): concurrent dynamic dispatch harness (DynamicNodeSupervisor + ctx.pipeline)#3

Draft
caohy1988 wants to merge 25 commits into
mainfrom
spike/dynamic-supervisor-concurrency
Draft

spike(workflow): concurrent dynamic dispatch harness (DynamicNodeSupervisor + ctx.pipeline)#3
caohy1988 wants to merge 25 commits into
mainfrom
spike/dynamic-supervisor-concurrency

Conversation

@caohy1988

@caohy1988 caohy1988 commented Jun 1, 2026

Copy link
Copy Markdown
Owner

Draft / staging PR (fork-internal), rebased onto current main. Three RFC spike artifacts under contributing/samples/workflows/. Not the upstream PR to google/adk-python.
Diff is additive-only (no unrelated src//tests/ changes) — three new sample directories:

1. dynamic_supervisor_spike/ — RFC google#92 (concurrent dynamic dispatch)

Prototype DynamicNodeSupervisor (gate-on-leaf, TaskGroup fan-out) + ctx.pipeline/ctx.parallel on the real ADK engine.

  • supervisor.py, test_dynamic_supervisor_spike.py (11 deterministic tests), test_live_gemini_e2e.py (env-gated), README.md.
  • Barrier-free execution, failed-item isolation, control-exception cancellation (requires TaskGroup, not gather), nested no-deadlock with leaf gating (+ driver-gating deadlock contrast), resume exactly-once for completed children.

2. authored_workflow_spike/ — RFC google#93 (agent-authored typed Workflows)

A model emits a declarative, validated WorkflowSpec (typed data, not code) executed on the real engine via the google#92 supervisor.

  • authoring.py (WorkflowSpec plain kind-tagged-union tree, CapabilityRegistry, WorkflowSpecValidator, SpecInterpreter for step/fan_out/pipeline/branch/loop_until, plus FrozenWorkflowRecord/export_plan/import_plan), test_authoring.py (25 deterministic tests, incl. a barrier-free pipeline proof, per-stage max_fan_out enforcement, and plan export/import round-trip + tamper / capability+registry version drift / schema_version checks, plus ADK-config lowering of the static subset), test_live_planner_sweep.py (env-gated; multi-stage/branch/loop on gemini-3.5-flash), DESIGN.md (canonical technical design incl. plan export/storage tiers), README.md.
  • Findings folded into Tutorial: Updated call_agent_async function not provided google/adk-python#93: open-dict maps are a structured-output hazard (Branch.routeslist[Route]); Gemini response_schema rejects Field(discriminator=...) so the vocabulary is a plain kind-tagged union; planning vs capability quality are separable.
  • Convergence with ADK Workflow config / root_agent.yaml (DESIGN §11, per reviewer feedback on Tutorial: Updated call_agent_async function not provided google/adk-python#93): static graph skeletons should lower/export toward the contributing/samples/workflows/loop_config/root_agent.yaml style (agent_class: Workflow, static edges, child YAML), while WorkflowSpec remains the model-facing source of truth. Raw YAML is not the planner output because loop_config intentionally resolves Python function refs (.agent.route_headline), _code refs, child config paths, tools/callbacks, and possibly FQNs. branch/runtime fan_out/pipeline stay new typed blocks because config does not directly express runtime per-item dispatch / barrier-free multi-stage flow; YAML would need a wrapper node. Caveat: Workflow itself is not deprecated, but the current config loader path and agent-config sugar classes are @deprecated + @experimental; this is convergence with the Workflow config shape for compatibility, not a long-term dependency on today's loader or deprecated sugar.

3. authored_workflow_demo/ — ADK Web demo wrapper

A discoverable root_agent (a Workflow) that exposes the flow in ADK Web — authors → validates → freezes-to-state → exports → lowers static config shape → executes, surfacing each step as a chat message (authored plan, validation, capabilities, frozen hash, exported plan, config projection, output).

  • security_audit_planner/agent.py, test_demo_agent.py (5 CI-safe tests, incl. a no-LLM reuse-path test + the config-lowering assertion), README.md (the ~7-min recording script), DEMO_NARRATIVE.md (beat-by-beat narration from a real run).
  • Load-or-author: an existing frozen spec is reused (planner not re-invoked) — the resume/reproducibility claim is real and CI-verified.
  • Run: adk web contributing/samples/workflows/authored_workflow_demo.

Hygiene

pyink / isort / mdformat / pre-commit clean. Deterministic suites: 11 + 25 + 5 = 41 green. Live tests env-gated (skip without SPIKE_LIVE + project). The fork-only agent-triage-pull-request check fails on an empty GITHUB_TOKEN (non-actionable; won't occur upstream).

caohy1988 added 2 commits June 1, 2026 17:03
…rvisor + ctx.pipeline)

RFC google#92 reference harness. DynamicNodeSupervisor (gate-on-leaf, TaskGroup
fan-out) + ctx.pipeline/ctx.parallel on the real ADK Workflow engine.
11 deterministic CI-safe tests (no LLM) + an env-gated live Gemini E2E.
Proves barrier-free execution, failed-item isolation, control-exception
cancellation (requires TaskGroup, not gather), nested no-deadlock with leaf
gating (+ driver-gating deadlock contrast), and resume exactly-once for
completed children. pyink/isort/mdformat clean.
A model emits a declarative, validated WorkflowSpec (typed data, not code) that
the framework validates and executes on the real ADK engine via the google#92
supervisor. authoring.py (WorkflowSpec plain kind-tagged recursive union;
CapabilityRegistry; WorkflowSpecValidator; SpecInterpreter for step/fan_out/
branch/loop_until), 10 deterministic tests, env-gated live planner sweep
(multi-stage/branch/loop on gemini-3.5-flash, shape-specific assertions).

Findings folded into the RFC: open dict[str,X] maps are a structured-output
hazard (Branch.routes -> list[Route]); Gemini response_schema rejects
Field(discriminator=...), so the vocabulary is a plain kind-tagged union;
planning vs capability quality are separable. pyink/isort/mdformat clean.
@caohy1988 caohy1988 force-pushed the spike/dynamic-supervisor-concurrency branch 3 times, most recently from 4a70aa3 to 8e8905f Compare June 2, 2026 07:13
A discoverable `root_agent` (a Workflow) that exposes the google#93 flow in ADK Web:
the model authors a typed WorkflowSpec, ADK validates it against the capability
registry, freezes spec+hash to session state, and executes it on the real
engine via the google#92 supervisor — each step surfaced as a chat message so the
authored plan, validation, capabilities, frozen hash, and final output are all
visible in the ADK Web chat / State / Events surfaces.

  adk web contributing/samples/workflows/authored_workflow_demo

Load-or-author: if a frozen spec already exists in the session it is REUSED
(planner not re-invoked) and replayed — so the resume/reproducibility claim is
real, verified by a CI-safe no-LLM test (test_demo_agent.py: import + name +
registry + spec validation + reuse-path-with-stub-registry). Reuses the
authored_workflow_spike/ stack; model from env (default gemini-2.5-flash;
gemini-3.5-flash needs location=global); no hardcoded project. README is the
~7-min recording script. pyink/isort/mdformat clean; 4 demo tests pass.
@caohy1988 caohy1988 force-pushed the spike/dynamic-supervisor-concurrency branch from 8e8905f to fd3bd52 Compare June 2, 2026 07:27
caohy1988 added 20 commits June 2, 2026 00:36
…rage tiers)

Standalone design for the authored-workflow spike: data model, validator,
interpreter, frozen-spec contract, security model, testing, empirical findings,
and the plan export/storage tiers (v1 per-run persist; v1.1 portable JSON
export envelope; v2 reusable templates with import-time registry revalidation;
compiled Workflow is a derived artifact, never the stored source of truth).
…a, import-input rule

DESIGN.md §10: spec_hash/task_input_digest defined as sha256 over canonical JSON;
envelope carries an optional task_input_schema; import contract — digest is
advisory provenance for replaying the original run, template reuse validates a
new task input against task_input_schema, else replay-only on matching digest or
explicit template promotion. Never silently bind a stored plan to an
incompatible task shape.
One FrozenWorkflowRecord backs session state, audit event, and export envelope
(§5/§10) — v1 persists the full record under authored_workflow:frozen_record,
not a weaker {spec,hash} subset. import_plan recomputes spec_hash (reject on
mismatch) and re-runs validation against the current registry rather than
trusting envelope.validation; replay vs template execution-input rule made
explicit. 'discriminated-by-kind' -> 'plain kind-tagged union' wording fix.
Demo persists only {spec, hash}; production v1 stores the full
FrozenWorkflowRecord (DESIGN.md §5). State it explicitly in DESIGN.md, the demo
agent.py freeze step, and DEMO_NARRATIVE.md so the demo isn't misread as the
canonical persistence contract.
…aude Code comparison

Pipeline/PipelineStage make barrier-free multi-stage per-item flow first-class so
the authoring vocabulary is not less expressive than its google#92 executor. Add a
candid 'Comparison to Claude Code Dynamic Workflows' (wins: audit/safety; gaps:
expressiveness/maturity, plan-size ceiling, quality-pattern templates, scale).
Hierarchical/sub-plan authoring noted as post-gate future, not MVP.
authoring.py now covers step/fan_out/pipeline/branch/loop_until. Pipeline +
PipelineStage compile to google#92 ctx.pipeline: each item threads ALL stages
barrier-free (item A in stage k while item B in stage 1) — NOT two barriered
fan_outs. Validator: over is a list, every stage capability exists and takes an
item, stage input scope. Interpreter: stage[0] input defaults to the per-item
element, stage[n] to stage[n-1] output; collect=list.

3 new deterministic tests (13 total): validator accept/reject pipeline, and an
ordered + BARRIER-FREE proof (a verifier starts before the slow reviewer
finishes — impossible with two barriered fan_outs).
P2: each Pipeline stage dispatches once per item, so every stage capability is
subject to the same data-dependent fan_out cap as FanOut. The interpreter now
rejects (pre-dispatch) when len(items) > stage cap.max_fan_out, closing a gap
where a pipeline over N items bypassed a capability's cap — the RFC security
model relies on runtime enforcement of these caps. New deterministic test
(14 total) asserts rejection before any stage runs.

P3: sync stale shape lists (add pipeline) and test counts after the prior
Pipeline addition — authoring 10/13 -> 14, totals 11+14+4 = 29, across
authoring.py, test_authoring.py, both READMEs, DESIGN.md, DEMO_NARRATIVE.md.
…y-audit demo

The demo now exercises Pipeline — the construct that closes the Claude Code gap
— without adding visual complexity. The planner authors pipeline -> step ->
step: a reviewer->verifier pipeline over the files (each file reviewed then its
finding verified, barrier-free per item), then triager, then formatter.

- agent.py: add a 'verifier' capability (Finding -> confirmed Finding); planner
  instruction authors the pipeline; capability collection walks pipeline stages
  so the displayed/audited capability set includes stage caps.
- test_demo_agent.py: demo spec + stub registry use Pipeline/verifier; reuse
  path still proves no-LLM frozen replay.
- Narrative + README: Beat 1 plan is pipeline -> step -> step; Beat 4 calls out
  reviewer/verifier interleaving per file; fresh captured hash 1f4c0883beb6.

Validated live on gemini-3.5-flash: planner authored the pipeline 3/3 trials
(no flakiness); reuse replays the same hash without re-invoking the model.
…e registry

The first chat message hardcoded (reviewer, triager, formatter) and so
contradicted the validation list after verifier was added. Derive it from
CapabilityRegistry.names() (new accessor) so the recording can't drift from the
registered set again.
…plan + demo beat

Makes the frozen plan a first-class, portable artifact (DESIGN.md §10), so the
RFC's enterprise claim (reviewable / diffable / replayable model-authored plans)
is real, not paper.

authoring.py:
- FrozenWorkflowRecord (the single §5 shape) + ValidationResult; FrozenWorkflowRecord.freeze() captures spec_hash, planner_model, registry + per-capability versions, validation, task_input_digest.
- canonical_json / sha256_hex — the one fixed hash definition (sort_keys + tight separators) so two exporters agree.
- export_plan(record) -> dict; import_plan(envelope, registry, task_input=None) that NEVER trusts the envelope: recompute sha256 (reject tamper), re-validate vs CURRENT registry (reject dropped capability), reject per-capability version drift; execution-input contract (replay needs matching input digest; template needs task_input_schema).
- Capability.version + CapabilityRegistry.capability_versions() for drift detection; referenced_capabilities() walker.

test_authoring.py (+5 -> 19): round-trip replays same hash; tamper rejected; dropped capability rejected; version drift rejected; new input without template schema rejected (and accepted once a schema is attached).

demo: an 'Export plan' beat writes the full envelope to security_audit_plan.json and re-imports it (proving defensive import). Unifies the displayed hash on the canonical definition. Narrative/README gain Beat 3b; counts 11+19+4 = 34. Generated envelope + demo session dbs gitignored.

Validated live on gemini-3.5-flash: export beat writes a complete envelope, re-import passes, reuse replays the same hash.
…sort

P1: isort ordering in test_authoring.py imports (PlanImportError after
PipelineStage) — pre-commit clean.

P2: import_plan now hard-errors on registry_version drift (envelope vs current
registry.version), matching DESIGN.md §10 'registry-version match … drift = hard
error'. Previously only dropped capabilities + per-capability versions were
checked.

P3: import_plan rejects an unsupported schema_version (only 'v1' supported) — a
defensive importer refuses formats it can't read.

+2 deterministic tests (-> 21): registry-version drift and unsupported
schema_version both rejected. Counts 11+21+4 = 36.
…bservability

Adds DESIGN §11 'Convergence with ADK AgentConfig' (renumbers Future -> §12) in
response to reviewer questions on issue google#93:

- Lower the static subset (sequence/parallel/loop) to ADK's Sequential/Parallel/
  LoopAgentConfig instead of reinventing serialization; keep branch/fan_out/
  pipeline as new types only because config can't express them (static sub_agents
  resolved once at load; no ConditionalAgent; needs google#92 ctx.pipeline).
- Why the planner does NOT emit raw AgentConfig: static graph; Discriminator union
  rejected by response_schema; FQN tool/agent/callback refs (importlib, no
  allow-list) re-open the code-exec surface the declarative+allow-list model closes.
- Q1 storage: FrozenWorkflowRecord in session State + audit event + export envelope.
- Q2 custom tools: registered capability by registry name (allow-list), not FQN.
- Q3 version/observability: spec_hash + registry/capability versions -> drift
  rejected on import; compiled Workflow runs on the real engine so ADK tracing applies.
All claims source-verified against agents/agent_config.py and config_agent_utils.py.
Address review on the convergence section:
- 'design converges / should lower', not 'now lowers' — the spike does not yet
  implement an AgentConfig-lowering compiler (explicit caveat added).
- precise table: static parallel block -> ParallelAgentConfig; runtime
  fan_out/pipeline/branch have no direct config equivalent (ParallelAgentConfig
  is static parallel sub-agents, not data-mapping over a runtime list).
- soften FQN wording to a trust-boundary mismatch (FQN imports are fine for
  developer-authored config; the concern is a MODEL authoring raw FQNs), not
  'config is unsafe'.
Tie the demo to RFC google#93 §11 without overclaiming: this plan's top-level
sequence is the kind of static shape that should lower to SequentialAgentConfig,
while the reviewer->verifier pipeline (per-item over a runtime list) is exactly
what AgentConfig can't express. Explicitly notes this is a design direction —
the demo runs via SpecInterpreter and does NOT lower to AgentConfig (no such
compiler in the spike). README section + a presenter aside in the narrative.
… (§11)

Make the convergence concrete instead of paper. lower_to_agent_config() projects
a WorkflowSpec's static skeleton onto ADK AgentConfig shapes:
- sequence -> SequentialAgent; loop -> LoopAgent (max_iterations); leaf step ->
  LlmAgent, referenced by ALLOW-LISTED capability name (never an importable FQN);
- dynamic blocks (fan_out over a runtime list, pipeline, branch) are emitted as
  explicit <no-AgentConfig-equivalent> markers, never fabricated as config.
Illustrative structural projection — NOT a loadable root_agent.yaml. A full
loadable-config compiler stays future (DESIGN §12).

- authoring.py: lower_to_agent_config / agent_config_coverage / _lower_block.
- test_authoring.py (+4 -> 25): pure-sequence lowers to SequentialAgent; loop ->
  LoopAgent; dynamic blocks flagged unsupported; projection never emits an FQN.
- demo: a '🧬 AgentConfig lowering' beat prints the projection (2/3 of the demo
  plan lowers; pipeline flagged) — validated live on gemini-3.5-flash.
- demo test (+1 -> 5): demo plan lowers to SequentialAgent + 2 LlmAgent leaves,
  pipeline no-equivalent, no FQN.
- docs: README/DESIGN §11/narrative updated to 'demonstrated' (not 'not shown');
  counts 11+25+5 = 41.
…loop lowering; fix count

- §11: AgentConfig + Sequential/Parallel/LoopAgentConfig + BaseAgentConfig are
  @deprecated + @experimental in this checkout (agent_config.py:72-73,
  sequential_agent_config.py:28, loop_agent_config.py:30) — so this is
  convergence with the existing config SHAPE for compatibility, not a long-term
  dependency on deprecated YAML config.
- qualify LoopUntil lowering: only the max_iterations skeleton; the
  until-predicate has no AgentConfig field (enforced by the interpreter).
- §9: demo count 4 -> 5 CI-safe tests.
…o docs

Match RFC §11 in the demo-facing materials so an ADK TL watching the demo gets
the same one-sentence caveat: AgentConfig + the concrete config classes are
@deprecated + @experimental in ADK source, so the lowering is convergence with
the config SHAPE for compatibility, not a long-term dependency on YAML config.
Added to the README talking point and the narrative aside.
caohy1988 added 2 commits June 3, 2026 16:59
…e trailer

Declutter the 🧬 ADK config lowering beat:
- remove the 'reason' field from the fan_out/pipeline/branch unsupported markers
  (workflowspec_kind already names the construct); no test referenced it.
- drop the trailing italic 'illustrative structural projection…' line from the
  demo chat message (the README/narrative/DESIGN already carry that framing).
caohy1988 added a commit that referenced this pull request Jun 8, 2026
Caught in review of #6: the C7 pair keys
(pause_kind, function_call_id) were being passed via
EventData.extra_attributes, which _enrich_attributes() copies at the
top of attrs *before* attrs["adk"] = _build_adk_envelope(...). That
landed them at attributes.pause_kind / attributes.function_call_id,
not attributes.adk.pause_kind / attributes.adk.function_call_id.

The customer SQL pinned in google#293 v5 acceptance #3 is:

  JSON_VALUE(attributes, '$.adk.function_call_id') = JSON_VALUE(...)

so the pair join would have returned null on every row. This commit
makes the contract match the SQL.

Changes:
* EventData gains adk_extras: dict[str, Any], a sibling of
  extra_attributes that lives INSIDE attributes.adk.
* _enrich_attributes merges adk_extras into the envelope after
  _build_adk_envelope (envelope wins on conflict — producer-derived
  identity fields like source_event_id are the source of truth).
* The two emit sites (TOOL_PAUSED in on_event_callback,
  TOOL_COMPLETED in on_user_message_callback) pass the pair keys via
  adk_extras= instead of extra_attributes=.
* The three C7 tests are updated to assert
  json.loads(row["attributes"])["adk"]["pause_kind"] etc., locking
  in the right shape this time.

Full plugin suite: 252 passed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant