examples: dynamic workflow with WithUseAsOutput#922
Draft
wolo-lab wants to merge 13 commits into
Draft
Conversation
982aa73 to
1ebaccb
Compare
3b93b94 to
01ce85a
Compare
1ebaccb to
f864f49
Compare
8e5abba to
f04ccc5
Compare
f864f49 to
94fd0ec
Compare
b1644a5 to
7948949
Compare
1156e83 to
f6ed1c3
Compare
…tion Workflow-engine support for human-in-the-loop, unified on a single mechanism — history rehydration — matching adk-python (no persisted run-state event, no PendingRequest field). - scheduler: per-event back-pressure handshake (a non-partial function-response is persisted before the node's flow rebuilds the next model request, fixing a non-deterministic re-issue race); pause a node on accumulated Event.LongRunningToolIDs (RequestInput rides on them); stamp NodeInfo.Path = node name on static node events so rehydration can attribute interrupts (dynamic children fold into their static ancestor). - persistence: ReconstructRunState ports adk-python's _reconstruct_node_states + _infer_node_state — per-node scan (interrupts, resolved user responses, schemas, output), status inference (WAITING / PENDING+ResumedInputs re-entry / COMPLETED+Output handoff), backward-edge predecessor input, and schema validation on the surviving (last-wins) response. - resume: single path over the rehydrated state, gated on the current turn's responses for idempotency; already-run handoff successors are skipped (RunState.completed). - state: NodeState.Interrupts + unexported interruptSchemas; RunState.completed; HasWaiting. No PendingRequest, no persisted run-state blob. - workflowagent: detectResume uses ReconstructRunState and surfaces reconstruction (schema-validation) errors. A node may raise multiple interrupts per activation. workflow and workflowagent suites pass with -race.
467581b to
3f8bec4
Compare
…, Routes) AppendEvent (in-memory) and the database storage layer dropped Event's workflow fields when persisting: the in-memory copy omitted NodeInfo, RequestedInput and Routes, and the database layer never serialized NodeInfo or RequestedInput. History-based resume attributes interrupts by NodeInfo.Path, so losing it broke HITL resume — a RequestInput workflow (e.g. examples/workflow/hitl_simple) would re-prompt instead of continuing after the reply. Persist all three fields in both backends and add round-trip regression tests for each.
6c18731 to
915433d
Compare
915433d to
1bbcff8
Compare
ba4809e to
dce4f28
Compare
Two resume-correctness fixes for dynamic orchestrators and HITL. 1. Cross-resume dedup. A dynamic node body re-runs from the top on resume, so every RunNode before the pause point would re-execute its child. rehydrateCache rebuilds the sub-scheduler's resultByPath from session events (child terminal events carry NodeInfo.Path + Output), so completed children with a stable WithRunID are served from cache. Mirrors adk-python's _rehydrate_from_events / DynamicNodeScheduler. 2. Terminal handoff asker now resumes. Resume only bumped its scheduled counter per scheduled successor, so a single-asker workflow (no successors) wrongly returned ErrNothingToResume. A matched handoff asker now counts as an effective resume itself, gated on answeredThisTurn (from a per-interrupt resolvedCount during rehydration) so a duplicate resume stays an idempotent no-op.
RunNode gains a per-call WithUseAsOutput() option that promotes a dynamic child's output to the parent dynamic node's terminal Event.Output, suppressing the orchestrator body's own return value. At most one delegating child per parent activation is allowed; a second attempt returns ErrOutputAlreadyDelegated without invoking the child. Builds on the idempotent cache from the previous CL: a WithRunID replay re-honours the delegation but does not re-run the child. BUG=515645490
dce4f28 to
43c93bd
Compare
…sOutput Add NodeInfo.MessageAsOutput: when set and Event.Output is nil, readers derive the node's output from the event's model text. The static and dynamic schedulers both honor it (Output wins, message text is the fallback), mirroring adk-python's _track_event_in_context. Empty text is a valid output, matching python; AgentNode's own empty-text-skips behavior is unchanged. This lets a delegated child whose message IS its output (e.g. an LlmAgent node) promote its text to the parent via WithUseAsOutput, and feeds it to a successor on a normal handoff.
…ution Add NodeInfo.OutputFor: the node paths an event's Output counts for — the emitter plus any WithUseAsOutput delegating ancestors. A delegating child's single event is stamped OutputFor=[child, parent, ...] and flows up, and the parent no longer re-emits a duplicate terminal output event (full suppression, matching adk-python's _output_delegated + output_for). Resume attributes a descendant's output to its delegating ancestors via OutputFor. Every output event records OutputFor (own path minimum), mirroring adk-python _enrich_event. Built on the temp integration branch (#960 + #920 + #966); rebase onto v2 once those merge.
collectNodeOutputs dereferenced ev.NodeInfo.OutputFor while only guarding ev.Output != nil, but an output event can carry a nil NodeInfo (non-workflow output events; eventNodeName in the same function already guards for this). Add the nil check to avoid a panic on resume. Add regression tests: collectNodeOutputs with a nil-NodeInfo output event and with delegated OutputFor attribution to ancestors, plus extend the DB roundtrip test to cover NodeInfo.OutputFor and MessageAsOutput.
Replace the single delegation-attribution test with two: one mirroring a real single-rooted delegation chain (all OutputFor entries share the emitting event's root segment, as the runtime always produces), and one that explicitly documents the cross-static-owner attribution branch in collectNodeOutputs as forward-looking — the current runtime cannot emit such an event, so the prior test asserted an unreachable state.
Rename staticOwner -> staticNodeName: it returns the static graph node name (matching eventNodeName's prose), and "owner" was overloaded in persistence.go with the unrelated interrupt-owner map. Condense the collectNodeOutputs test comments to their essentials while keeping the single-rooted vs forward-looking distinction explicit.
Minimal thin-dispatcher sample: a dynamic orchestrator delegates its terminal output to a single LlmAgent child via WithUseAsOutput, so the agent's streamed tokens become the orchestrator's output without buffering. BUG=515645490
The dynamic node runs AgentNodes wrapping the drafter/reviser LlmAgents, whose events are authored by the agent name. Without listing them in workflowagent.Config.SubAgents the runner cannot resolve those authors and logs "Event from an unknown agent". Registering them silences the noise and shows the correct usage pattern; sender stays out as it is a FunctionNode, not an agent.Agent.
43c93bd to
656ebe4
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Minimal thin-dispatcher sample: a dynamic orchestrator delegates its terminal output to a single LlmAgent child via WithUseAsOutput, so the agent's streamed tokens become the orchestrator's output without buffering.