Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,6 @@ type Event struct {

// NodeInfo carries the per-event metadata identifying which node in
// a workflow activation emitted it.
//
// TODO(wolo): adk-python's NodeInfo also has OutputFor []string
// (fan-in re-attribution) and MessageAsOutput bool (content-as-output
// shorthand). Add as the corresponding features land in adk-go.
type NodeInfo struct {
// Path is the composite path of the emitting node within its
// workflow activation. Empty for top-level static nodes;
Expand All @@ -150,6 +146,18 @@ type NodeInfo struct {
// invariants to the emitter, allowing dynamic nodes to forward
// children's terminal events alongside their own.
Path string `json:"path,omitempty"`

// MessageAsOutput marks that this event's content IS the node's
// output: when set and Event.Output is nil, readers derive the
// node output from the event's model text. Mirrors adk-python's
// node_info.message_as_output.
MessageAsOutput bool `json:"messageAsOutput,omitempty"`

// OutputFor lists the node paths this event's Output counts for: the
// emitter plus any WithUseAsOutput delegating ancestors, so one event
// stands in for a whole delegation chain rather than each level
// re-emitting a duplicate. Mirrors adk-python's node_info.output_for.
OutputFor []string `json:"outputFor,omitempty"`
}

// RequestInput describes a single human-in-the-loop prompt emitted
Expand Down
48 changes: 42 additions & 6 deletions workflow/agent_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ func (n *AgentNode) Run(ctx agent.InvocationContext, input any) iter.Seq2[*sessi

synthesizeAgentOutput(event)

// TODO: add output validation
// The output schema (if any) is applied by the scheduler via
// ValidateOutput; synthesizeAgentOutput leaves the raw model
// text for defaultValidateOutput to project onto the schema.
if !yield(event, nil) {
return
}
Expand All @@ -139,17 +141,40 @@ func (n *AgentNode) Run(ctx agent.InvocationContext, input any) iter.Seq2[*sessi

// synthesizeAgentOutput sets Event.Output from concatenated model
// text on final model responses so RunNode returns the agent's
// reply instead of the zero value.
// reply instead of the zero value. Empty model text yields an empty
// "" output (a value, not "no output"), matching adk-python and
// messageAsOutput; non-model events are left untouched.
//
// It also stamps NodeInfo.MessageAsOutput so readers (live and
// resume) know this event's output was derived from the model
// message, mirroring adk-python's process_llm_agent_output which
// sets event.output and node_info.message_as_output together.
func synthesizeAgentOutput(event *session.Event) {
if event == nil || event.Output != nil {
return
}
if !event.IsFinalResponse() {
return
}
if text, ok := messageText(event); ok {
event.Output = text
if event.NodeInfo == nil {
event.NodeInfo = &session.NodeInfo{}
}
event.NodeInfo.MessageAsOutput = true
}
}

// messageText concatenates the non-thought model text of an event. ok
// is false when the event carries no model content, distinguishing it
// from a model message with empty text.
func messageText(event *session.Event) (text string, ok bool) {
if event == nil {
return "", false
}
content := event.LLMResponse.Content
if content == nil || content.Role != "model" {
return
return "", false
}
var b []byte
for _, p := range content.Parts {
Expand All @@ -158,8 +183,19 @@ func synthesizeAgentOutput(event *session.Event) {
}
b = append(b, p.Text...)
}
if len(b) == 0 {
return
return string(b), true
}

// childEventOutput returns the output an event carries: its Output, or
// the model text when MessageAsOutput is set.
func childEventOutput(event *session.Event) (any, bool) {
if event.Output != nil {
return event.Output, true
}
if event.NodeInfo != nil && event.NodeInfo.MessageAsOutput {
if text, ok := messageText(event); ok {
return text, true
}
}
event.Output = string(b)
return nil, false
}
66 changes: 66 additions & 0 deletions workflow/agent_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,70 @@ func TestAgentNode_SynthesizesOutputFromModelText(t *testing.T) {
if got, want := gotFinal.Output, "Hello, world!"; got != want {
t.Errorf("final event Output = %v, want %q", got, want)
}
if gotFinal.NodeInfo == nil || !gotFinal.NodeInfo.MessageAsOutput {
t.Errorf("final event NodeInfo.MessageAsOutput = %v, want true", gotFinal.NodeInfo)
}
if gotPartial.NodeInfo != nil && gotPartial.NodeInfo.MessageAsOutput {
t.Errorf("partial event MessageAsOutput = true, want false/unset")
}
}

// TestAgentNode_StructuredOutputProjectedViaValidation verifies the
// end-to-end path that makes the validation fallback reachable: an
// AgentNode with a structured output schema yields JSON model text,
// and ValidateOutput projects it onto the schema.
func TestAgentNode_StructuredOutputProjectedViaValidation(t *testing.T) {
wrapped, err := agent.New(agent.Config{
Name: "json-talky",
Run: func(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
final := session.NewEvent(ctx.InvocationID())
final.LLMResponse.Content = &genai.Content{
Role: "model",
Parts: []*genai.Part{{Text: `{"value":"hello"}`}},
}
yield(final, nil)
}
},
})
if err != nil {
t.Fatalf("agent.New: %v", err)
}
outSchema, err := jsonschema.For[testSchemaInput](nil)
if err != nil {
t.Fatalf("jsonschema.For: %v", err)
}
node, err := NewAgentNodeWithSchemas(wrapped, nil, outSchema, NodeConfig{})
if err != nil {
t.Fatalf("NewAgentNodeWithSchemas: %v", err)
}

mockCtx := newMockCtx(t)
mockCtx.sess = &mockSession{id: "test-session-id"}
var gotFinal *session.Event
for ev, err := range node.Run(mockCtx, "ignored") {
if err != nil {
t.Fatalf("node.Run: %v", err)
}
if !ev.LLMResponse.Partial {
gotFinal = ev
}
}
if gotFinal == nil {
t.Fatal("missing final event")
}

// AgentNode itself only synthesizes the raw text; the projection
// onto the schema happens in ValidateOutput.
got, err := node.ValidateOutput(gotFinal.Output)
if err != nil {
t.Fatalf("ValidateOutput: %v", err)
}
gotMap, ok := got.(map[string]any)
if !ok {
t.Fatalf("ValidateOutput returned %T, want map[string]any", got)
}
if gotMap["value"] != "hello" {
t.Errorf("got %v, want value=hello", gotMap)
}
}
83 changes: 80 additions & 3 deletions workflow/base_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
package workflow

import (
"encoding/json"
"strings"

"github.com/google/jsonschema-go/jsonschema"
"google.golang.org/genai"

"google.golang.org/adk/session"
)

// BaseNode provides identity and a default Config implementation for
Expand Down Expand Up @@ -85,12 +91,83 @@ func (b BaseNode) ValidateOutput(out any) (any, error) {
return defaultValidateOutput(out, b.outputSchema)
}

// defaultValidateOutput is the shared output-validation helper used by
// BaseNode.ValidateOutput.
//
// Framework control values (*session.Event, *session.RequestInput) ride
// through Event.Output on some nodes but are not user output payloads,
// so they bypass schema validation. When direct validation fails on a
// model-text output (a string, or a *genai.Content of model parts —
// see synthesizeAgentOutput), the text fallback projects it onto the
// schema. On total failure the original validation error is returned,
// not a downstream parse error. Mirrors ADK Python's
// _validate_output_data.
func defaultValidateOutput(out any, schema *jsonschema.Resolved) (any, error) {
if schema == nil {
return out, nil
}
if err := schema.Validate(out); err != nil {
return nil, err
switch out.(type) {
case *session.Event, *session.RequestInput:
return out, nil
}
err := schema.Validate(out)
if err == nil {
return out, nil
}
if text, ok := modelText(out); ok {
if v, ok := projectTextOntoSchema(text, schema); ok {
return v, nil
}
}
return nil, err
}

// modelText extracts the model text carried by an output value: the
// string itself, or the concatenated text parts of a *genai.Content.
// ok is false for any other type.
func modelText(out any) (string, bool) {
switch v := out.(type) {
case string:
return v, true
case *genai.Content:
var text strings.Builder
for _, part := range v.Parts {
if part != nil && part.Text != "" {
text.WriteString(part.Text)
}
}
return text.String(), true
default:
return "", false
}
}

// projectTextOntoSchema projects model text onto schema: return it
// directly for a string schema, otherwise JSON-parse and re-validate.
// ok is false when no valid value can be produced, leaving error
// reporting to the caller.
func projectTextOntoSchema(s string, schema *jsonschema.Resolved) (any, bool) {
if rootSchemaIsString(schema) {
return s, true
}
if strings.TrimSpace(s) == "" {
return nil, false
}
var parsed any
if err := json.Unmarshal([]byte(s), &parsed); err != nil {
return nil, false
}
if err := schema.Validate(parsed); err != nil {
return nil, false
}
return parsed, true
}

// rootSchemaIsString reports whether schema's root type is "string".
func rootSchemaIsString(schema *jsonschema.Resolved) bool {
root := schema.Schema()
if root == nil {
return false
}
return out, nil
return root.Type == "string"
}
Loading