From cc645785c7440f3b9df940fcf92177efedc34719 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 16 Jun 2026 22:43:18 -0700 Subject: [PATCH 1/4] feat: accumulate function_call events in streaming path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ResponseAccumulator's process_event previously dropped all function_call SSE events via the wildcard arm, causing streaming responses to lose tool-call output items. This adds handlers for OutputItemAdded(function_call), FunctionCallArgumentsDelta, and FunctionCallArgumentsDone — matching the blocking JSON path's behavior and unblocking execute_loop for streaming tool dispatch. Signed-off-by: Ashwin Giridharan --- .../agentic-core/src/executor/accumulator.rs | 534 +++++++++++++++++- 1 file changed, 523 insertions(+), 11 deletions(-) diff --git a/crates/agentic-core/src/executor/accumulator.rs b/crates/agentic-core/src/executor/accumulator.rs index f71bcd0..42ba718 100644 --- a/crates/agentic-core/src/executor/accumulator.rs +++ b/crates/agentic-core/src/executor/accumulator.rs @@ -16,7 +16,8 @@ use crate::events::{EventFrame, EventPayload, SSEEventType, normalize_sse_line}; use crate::executor::error::{ExecutorError, ExecutorResult}; use crate::types::event::{MessageStatus, ResponseStatus}; use crate::types::io::{ - OutputItem, OutputMessage, OutputTextContent, ReasoningOutput, ReasoningTextContent, ResponseUsage, + FunctionToolCall, OutputItem, OutputMessage, OutputTextContent, ReasoningOutput, ReasoningTextContent, + ResponseUsage, }; use crate::types::request_response::{IncompleteDetails, ResponsePayload}; use crate::utils::common::{deserialize_from_str, deserialize_from_value_opt}; @@ -37,6 +38,9 @@ pub struct ResponseAccumulator { // In-flight reasoning state. current_reasoning: Option, accumulated_reasoning_text: String, + // In-flight function call state. + current_function_call: Option, + accumulated_arguments: String, } impl ResponseAccumulator { @@ -54,6 +58,8 @@ impl ResponseAccumulator { accumulated_text: String::new(), current_reasoning: None, accumulated_reasoning_text: String::new(), + current_function_call: None, + accumulated_arguments: String::new(), } } @@ -99,6 +105,8 @@ impl ResponseAccumulator { accumulated_text: String::new(), current_reasoning: None, accumulated_reasoning_text: String::new(), + current_function_call: None, + accumulated_arguments: String::new(), }) } @@ -151,6 +159,7 @@ impl ResponseAccumulator { acc.process_sse_line(&line); } acc.finalize_current_reasoning(); + acc.finalize_current_function_call(); acc.finalize_current_message(); if acc.status == ResponseStatus::InProgress { acc.status = ResponseStatus::Completed; @@ -170,6 +179,7 @@ impl ResponseAccumulator { acc.process_sse_line(&line); } acc.finalize_current_reasoning(); + acc.finalize_current_function_call(); acc.finalize_current_message(); acc } @@ -187,6 +197,18 @@ impl ResponseAccumulator { self.accumulated_reasoning_text.clear(); } + /// Closes the in-flight function call, pushing it to `output` with accumulated arguments. + fn finalize_current_function_call(&mut self) { + if let Some(mut fc) = self.current_function_call.take() { + if !self.accumulated_arguments.is_empty() && fc.arguments.is_empty() { + fc.arguments = std::mem::take(&mut self.accumulated_arguments); + } + fc.status = "completed".to_string(); + self.output.push(OutputItem::FunctionCall(fc)); + } + self.accumulated_arguments.clear(); + } + /// Closes the in-flight message, pushing it to `output` with accumulated text. fn finalize_current_message(&mut self) { if let Some(mut msg) = self.current_message.take() { @@ -218,21 +240,52 @@ impl ResponseAccumulator { (SSEEventType::ResponseCreated, EventPayload::Response { id, .. }) if !id.is_empty() => { self.response_id.clone_from(id); } - (SSEEventType::OutputItemAdded, EventPayload::OutputItemAdded { item_id, item_type, .. }) => { + ( + SSEEventType::OutputItemAdded, + EventPayload::OutputItemAdded { + item_id, + item_type, + name, + call_id, + .. + }, + ) => { let item_id = if item_id.is_empty() { - let prefix = if item_type == "reasoning" { "rs_" } else { "msg_" }; + let prefix = match item_type.as_str() { + "reasoning" => "rs_", + "function_call" => "fc_", + _ => "msg_", + }; uuid7_str(prefix) } else { item_id.clone() }; - if item_type == "reasoning" { - self.finalize_current_message(); - self.finalize_current_reasoning(); - self.current_reasoning = Some(ReasoningOutput::new(item_id)); - } else { - self.finalize_current_reasoning(); - self.finalize_current_message(); - self.current_message = Some(OutputMessage::new(item_id, MessageStatus::InProgress.as_str())); + match item_type.as_str() { + "reasoning" => { + self.finalize_current_function_call(); + self.finalize_current_message(); + self.finalize_current_reasoning(); + self.current_reasoning = Some(ReasoningOutput::new(item_id)); + } + "function_call" => { + self.finalize_current_reasoning(); + self.finalize_current_message(); + self.finalize_current_function_call(); + self.current_function_call = Some(FunctionToolCall { + id: item_id, + call_id: call_id.clone().unwrap_or_default(), + name: name.clone().unwrap_or_default(), + arguments: String::new(), + status: "in_progress".to_string(), + }); + } + _ => { + self.finalize_current_reasoning(); + self.finalize_current_function_call(); + self.finalize_current_message(); + self.current_message = + Some(OutputMessage::new(item_id, MessageStatus::InProgress.as_str())); + } } } (SSEEventType::ReasoningTextDelta, EventPayload::ReasoningDelta { delta, .. }) => { @@ -253,11 +306,42 @@ impl ResponseAccumulator { } } } + (SSEEventType::FunctionCallArgumentsDelta, EventPayload::FunctionCallArgsDelta { delta, .. }) => { + self.accumulated_arguments.push_str(delta); + } + ( + SSEEventType::FunctionCallArgumentsDone, + EventPayload::FunctionCallArgsDone { + arguments, + call_id, + name, + .. + }, + ) => { + if let Some(fc) = self.current_function_call.as_mut() { + fc.arguments = if arguments.is_empty() { + std::mem::take(&mut self.accumulated_arguments) + } else { + arguments.clone() + }; + if let Some(cid) = call_id { + if !cid.is_empty() { + fc.call_id.clone_from(cid); + } + } + if !name.is_empty() { + fc.name.clone_from(name); + } + self.accumulated_arguments.clear(); + } + self.finalize_current_function_call(); + } (SSEEventType::OutputTextDelta, EventPayload::TextDelta { delta, .. }) => { self.accumulated_text.push_str(delta); } (SSEEventType::ResponseCompleted, EventPayload::Response { usage, .. }) => { self.finalize_current_reasoning(); + self.finalize_current_function_call(); self.finalize_current_message(); self.status = ResponseStatus::Completed; if let Some(u) = usage { @@ -517,6 +601,8 @@ mod tests { assert!(acc.output.is_empty()); } + // --- Reasoning accumulation tests --- + #[test] fn test_accumulator_reasoning_and_message_from_sse() { let lines = vec![ @@ -619,4 +705,430 @@ mod tests { assert!(matches!(acc.output[0], OutputItem::Reasoning(_))); assert!(matches!(acc.output[1], OutputItem::Message(_))); } + + // --- Function call accumulation tests --- + + /// Full `function_call` lifecycle: `OutputItemAdded` → deltas → Done → `ResponseCompleted`. + #[test] + fn test_function_call_accumulation_basic() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: "fc_1".into(), + item_type: "function_call".into(), + output_index: 0, + name: Some("get_weather".into()), + call_id: Some("call_abc".into()), + }, + sequence_number: Some(1), + }); + + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDelta, + payload: EventPayload::FunctionCallArgsDelta { + delta: r#"{"location""#.into(), + call_id: Some("call_abc".into()), + item_id: "fc_1".into(), + output_index: 0, + }, + sequence_number: Some(2), + }); + + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDelta, + payload: EventPayload::FunctionCallArgsDelta { + delta: r#":"Paris"}"#.into(), + call_id: Some("call_abc".into()), + item_id: "fc_1".into(), + output_index: 0, + }, + sequence_number: Some(3), + }); + + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDone, + payload: EventPayload::FunctionCallArgsDone { + arguments: r#"{"location":"Paris"}"#.into(), + call_id: Some("call_abc".into()), + item_id: "fc_1".into(), + name: "get_weather".into(), + output_index: 0, + }, + sequence_number: Some(4), + }); + + acc.process_event(&EventFrame { + event_type: SSEEventType::ResponseCompleted, + payload: EventPayload::Response { + id: "resp_1".into(), + status: "completed".into(), + usage: None, + }, + sequence_number: Some(5), + }); + + assert_eq!(acc.status, ResponseStatus::Completed); + assert_eq!(acc.output.len(), 1); + if let OutputItem::FunctionCall(fc) = &acc.output[0] { + assert_eq!(fc.id, "fc_1"); + assert_eq!(fc.call_id, "call_abc"); + assert_eq!(fc.name, "get_weather"); + assert_eq!(fc.arguments, r#"{"location":"Paris"}"#); + assert_eq!(fc.status, "completed"); + } else { + panic!("expected FunctionCall"); + } + } + + /// `FunctionCallArgumentsDone` uses accumulated deltas when its own `arguments` field is empty. + #[test] + fn test_function_call_done_uses_deltas_when_arguments_empty() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: "fc_1".into(), + item_type: "function_call".into(), + output_index: 0, + name: Some("search".into()), + call_id: Some("call_1".into()), + }, + sequence_number: Some(1), + }); + + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDelta, + payload: EventPayload::FunctionCallArgsDelta { + delta: r#"{"q":"rust"}"#.into(), + call_id: Some("call_1".into()), + item_id: "fc_1".into(), + output_index: 0, + }, + sequence_number: Some(2), + }); + + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDone, + payload: EventPayload::FunctionCallArgsDone { + arguments: String::new(), + call_id: Some("call_1".into()), + item_id: "fc_1".into(), + name: "search".into(), + output_index: 0, + }, + sequence_number: Some(3), + }); + + assert_eq!(acc.output.len(), 1); + if let OutputItem::FunctionCall(fc) = &acc.output[0] { + assert_eq!(fc.arguments, r#"{"q":"rust"}"#); + } else { + panic!("expected FunctionCall"); + } + } + + /// Multiple function calls in one response (parallel tool use). + #[test] + fn test_function_call_multiple_parallel() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + + // First function call + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: "fc_1".into(), + item_type: "function_call".into(), + output_index: 0, + name: Some("get_weather".into()), + call_id: Some("call_1".into()), + }, + sequence_number: Some(1), + }); + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDone, + payload: EventPayload::FunctionCallArgsDone { + arguments: r#"{"city":"NYC"}"#.into(), + call_id: Some("call_1".into()), + item_id: "fc_1".into(), + name: "get_weather".into(), + output_index: 0, + }, + sequence_number: Some(2), + }); + + // Second function call + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: "fc_2".into(), + item_type: "function_call".into(), + output_index: 1, + name: Some("get_time".into()), + call_id: Some("call_2".into()), + }, + sequence_number: Some(3), + }); + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDone, + payload: EventPayload::FunctionCallArgsDone { + arguments: r#"{"tz":"EST"}"#.into(), + call_id: Some("call_2".into()), + item_id: "fc_2".into(), + name: "get_time".into(), + output_index: 1, + }, + sequence_number: Some(4), + }); + + acc.process_event(&EventFrame { + event_type: SSEEventType::ResponseCompleted, + payload: EventPayload::Response { + id: "resp_1".into(), + status: "completed".into(), + usage: None, + }, + sequence_number: Some(5), + }); + + assert_eq!(acc.output.len(), 2); + assert!(matches!(&acc.output[0], OutputItem::FunctionCall(fc) if fc.name == "get_weather")); + assert!(matches!(&acc.output[1], OutputItem::FunctionCall(fc) if fc.name == "get_time")); + } + + /// Function call interleaved with a message output item. + #[test] + fn test_function_call_interleaved_with_message() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + + // Message first + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: "msg_1".into(), + item_type: "message".into(), + output_index: 0, + name: None, + call_id: None, + }, + sequence_number: Some(1), + }); + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputTextDelta, + payload: EventPayload::TextDelta { + delta: "Let me check".into(), + item_id: "msg_1".into(), + output_index: 0, + content_index: 0, + }, + sequence_number: Some(2), + }); + + // Then function call + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: "fc_1".into(), + item_type: "function_call".into(), + output_index: 1, + name: Some("lookup".into()), + call_id: Some("call_x".into()), + }, + sequence_number: Some(3), + }); + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDone, + payload: EventPayload::FunctionCallArgsDone { + arguments: "{}".into(), + call_id: Some("call_x".into()), + item_id: "fc_1".into(), + name: "lookup".into(), + output_index: 1, + }, + sequence_number: Some(4), + }); + + acc.process_event(&EventFrame { + event_type: SSEEventType::ResponseCompleted, + payload: EventPayload::Response { + id: "resp_1".into(), + status: "completed".into(), + usage: None, + }, + sequence_number: Some(5), + }); + + assert_eq!(acc.output.len(), 2); + assert!(matches!(&acc.output[0], OutputItem::Message(m) if m.content[0].text == "Let me check")); + assert!(matches!(&acc.output[1], OutputItem::FunctionCall(fc) if fc.name == "lookup")); + } + + /// `FunctionCallArgumentsDone` updates `call_id` and `name` if provided. + #[test] + fn test_function_call_done_updates_metadata() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: "fc_1".into(), + item_type: "function_call".into(), + output_index: 0, + name: Some("old_name".into()), + call_id: Some("old_call".into()), + }, + sequence_number: Some(1), + }); + + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDone, + payload: EventPayload::FunctionCallArgsDone { + arguments: "{}".into(), + call_id: Some("new_call".into()), + item_id: "fc_1".into(), + name: "new_name".into(), + output_index: 0, + }, + sequence_number: Some(2), + }); + + if let OutputItem::FunctionCall(fc) = &acc.output[0] { + assert_eq!(fc.call_id, "new_call"); + assert_eq!(fc.name, "new_name"); + } else { + panic!("expected FunctionCall"); + } + } + + /// A `function_call` `OutputItemAdded` auto-generates an id when the server sends an empty one. + #[test] + fn test_function_call_empty_item_id_generates_uuid() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: String::new(), + item_type: "function_call".into(), + output_index: 0, + name: Some("tool".into()), + call_id: Some("c1".into()), + }, + sequence_number: Some(1), + }); + + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDone, + payload: EventPayload::FunctionCallArgsDone { + arguments: "{}".into(), + call_id: Some("c1".into()), + item_id: String::new(), + name: "tool".into(), + output_index: 0, + }, + sequence_number: Some(2), + }); + + if let OutputItem::FunctionCall(fc) = &acc.output[0] { + assert!(fc.id.starts_with("fc_"), "expected fc_ prefix, got: {}", fc.id); + } else { + panic!("expected FunctionCall"); + } + } + + /// Orphaned `FunctionCallArgumentsDelta` events (no active function call) are harmless. + #[test] + fn test_function_call_orphaned_delta_safe() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDelta, + payload: EventPayload::FunctionCallArgsDelta { + delta: "orphan".into(), + call_id: None, + item_id: String::new(), + output_index: 0, + }, + sequence_number: Some(1), + }); + + assert!(acc.output.is_empty()); + assert_eq!(acc.accumulated_arguments, "orphan"); + } + + /// `ResponseCompleted` finalizes any in-flight function call even without a Done event. + #[test] + fn test_function_call_finalized_on_response_completed() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: "fc_1".into(), + item_type: "function_call".into(), + output_index: 0, + name: Some("partial".into()), + call_id: Some("c1".into()), + }, + sequence_number: Some(1), + }); + acc.process_event(&EventFrame { + event_type: SSEEventType::FunctionCallArgumentsDelta, + payload: EventPayload::FunctionCallArgsDelta { + delta: r#"{"x":1}"#.into(), + call_id: Some("c1".into()), + item_id: "fc_1".into(), + output_index: 0, + }, + sequence_number: Some(2), + }); + + // No ArgumentsDone — jump straight to ResponseCompleted + acc.process_event(&EventFrame { + event_type: SSEEventType::ResponseCompleted, + payload: EventPayload::Response { + id: "resp_1".into(), + status: "completed".into(), + usage: None, + }, + sequence_number: Some(3), + }); + + assert_eq!(acc.output.len(), 1); + if let OutputItem::FunctionCall(fc) = &acc.output[0] { + assert_eq!(fc.arguments, r#"{"x":1}"#); + assert_eq!(fc.status, "completed"); + } else { + panic!("expected FunctionCall"); + } + } + + /// `from_sse_lines` end-to-end with function call SSE data. + #[test] + fn test_function_call_from_sse_lines() { + let lines = vec![ + r#"data: {"type":"response.created","response":{"id":"resp_fc"}}"#.to_string(), + r#"data: {"type":"response.output_item.added","item":{"id":"fc_1","type":"function_call","name":"get_weather","call_id":"call_abc"}}"#.to_string(), + r#"data: {"type":"response.function_call_arguments.delta","delta":"{\"city\":"}"#.to_string(), + r#"data: {"type":"response.function_call_arguments.delta","delta":"\"SF\"}"}"#.to_string(), + r#"data: {"type":"response.function_call_arguments.done","arguments":"{\"city\":\"SF\"}","call_id":"call_abc","name":"get_weather"}"#.to_string(), + r#"data: {"type":"response.done","response":{"id":"resp_fc","usage":{"input_tokens":10,"output_tokens":5,"total_tokens":15}}}"#.to_string(), + ]; + + let acc = ResponseAccumulator::from_sse_lines(lines, Some("conv_1")); + assert_eq!(acc.status, ResponseStatus::Completed); + assert_eq!(acc.output.len(), 1); + + if let OutputItem::FunctionCall(fc) = &acc.output[0] { + assert_eq!(fc.name, "get_weather"); + assert_eq!(fc.arguments, r#"{"city":"SF"}"#); + assert_eq!(fc.call_id, "call_abc"); + } else { + panic!("expected FunctionCall"); + } + + assert_eq!(acc.usage.unwrap().total_tokens, 15); + } } From ea44db71d94fe44c2340ee7fa8d63507a6327355 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 16 Jun 2026 23:04:08 -0700 Subject: [PATCH 2/4] test: add cassette-driven integration test for function_call accumulation Feeds real vLLM SSE recordings (gemma-4-26B) through the full accumulator pipeline and verifies OutputItem::FunctionCall is produced with correct name, arguments, call_id, and usage. Also adds a text-only regression guard ensuring no function_call items leak from text-only streams. Signed-off-by: Ashwin Giridharan --- .../tests/accumulator_cassette_test.rs | 80 +++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 crates/agentic-core/tests/accumulator_cassette_test.rs diff --git a/crates/agentic-core/tests/accumulator_cassette_test.rs b/crates/agentic-core/tests/accumulator_cassette_test.rs new file mode 100644 index 0000000..2d66eb0 --- /dev/null +++ b/crates/agentic-core/tests/accumulator_cassette_test.rs @@ -0,0 +1,80 @@ +//! Cassette-driven integration test: feeds real vLLM SSE recordings through +//! the full accumulator pipeline (normalize → `process_event` → finalize) and +//! verifies the resulting `OutputItem::FunctionCall` matches the expected values. + +use serde::Deserialize; + +use agentic_core::executor::accumulator::ResponseAccumulator; +use agentic_core::types::io::OutputItem; + +const CASSETTE_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/cassettes/events"); + +#[derive(Deserialize)] +struct EventCassette { + sse: Vec, + expected_function_call: Option, + #[allow(dead_code)] + expected_text: Option, +} + +#[derive(Deserialize)] +struct ExpectedFunctionCall { + name: String, + arguments: String, +} + +fn load_cassette(filename: &str) -> EventCassette { + let path = format!("{CASSETTE_DIR}/{filename}"); + let text = std::fs::read_to_string(&path).unwrap_or_else(|e| panic!("read {path}: {e}")); + serde_yml::from_str(&text).unwrap_or_else(|e| panic!("parse {path}: {e}")) +} + +/// Feeds a real vLLM `function_call` SSE recording through the accumulator and +/// verifies the output contains the correct `FunctionCall` item. +#[test] +fn test_accumulator_cassette_function_call_vllm_gemma4() { + let cassette = load_cassette("function-call-vllm-gemma4.yaml"); + let expected_fc = cassette + .expected_function_call + .expect("cassette must have expected_function_call"); + + let acc = ResponseAccumulator::from_sse_lines(cassette.sse, None); + let payload = acc.finalize("google/gemma-4-26B-A4B-it", None, None); + + assert_eq!(payload.status, "completed"); + assert_eq!(payload.output.len(), 1, "expected exactly one output item"); + + if let OutputItem::FunctionCall(fc) = &payload.output[0] { + assert_eq!(fc.name, expected_fc.name); + assert_eq!(fc.arguments, expected_fc.arguments); + assert_eq!(fc.status, "completed"); + assert!(!fc.call_id.is_empty(), "call_id should be populated"); + assert!(!fc.id.is_empty(), "id should be populated"); + } else { + panic!("expected OutputItem::FunctionCall, got {:?}", payload.output[0]); + } + + assert!(payload.usage.is_some(), "usage should be present"); + let usage = payload.usage.unwrap(); + assert_eq!(usage.input_tokens, 66); + assert_eq!(usage.output_tokens, 21); + assert_eq!(usage.total_tokens, 87); +} + +/// Feeds the text-only cassette through the accumulator and verifies no +/// `function_call` items leak in — regression guard for type-aware branching. +#[test] +fn test_accumulator_cassette_text_only_no_function_calls() { + let cassette = load_cassette("text-only-vllm-gemma4.yaml"); + + let acc = ResponseAccumulator::from_sse_lines(cassette.sse, None); + let payload = acc.finalize("google/gemma-4-26B-A4B-it", None, None); + + assert_eq!(payload.status, "completed"); + for item in &payload.output { + assert!( + matches!(item, OutputItem::Message(_)), + "text-only cassette should only produce Message items, got {item:?}" + ); + } +} From b044b284217633d9dd1b33f04e2ed46e83817522 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 17 Jun 2026 11:42:52 -0700 Subject: [PATCH 3/4] fix: clippy too_many_lines and fmt for merged state machine Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/executor/accumulator.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/agentic-core/src/executor/accumulator.rs b/crates/agentic-core/src/executor/accumulator.rs index 42ba718..bdd5cd0 100644 --- a/crates/agentic-core/src/executor/accumulator.rs +++ b/crates/agentic-core/src/executor/accumulator.rs @@ -235,6 +235,7 @@ impl ResponseAccumulator { /// This is the core state machine — callers that already have a normalized /// frame (e.g. [`StreamTee`](future)) can call this directly without /// re-parsing from a raw line. + #[allow(clippy::too_many_lines)] pub(crate) fn process_event(&mut self, frame: &EventFrame) { match (&frame.event_type, &frame.payload) { (SSEEventType::ResponseCreated, EventPayload::Response { id, .. }) if !id.is_empty() => { @@ -283,8 +284,7 @@ impl ResponseAccumulator { self.finalize_current_reasoning(); self.finalize_current_function_call(); self.finalize_current_message(); - self.current_message = - Some(OutputMessage::new(item_id, MessageStatus::InProgress.as_str())); + self.current_message = Some(OutputMessage::new(item_id, MessageStatus::InProgress.as_str())); } } } From 569a1f538f90397fae6a4ea84649627b05b1e3da Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Wed, 17 Jun 2026 13:43:12 -0700 Subject: [PATCH 4/4] test: exercise PR #60 tool_calls cassettes through accumulator Add 10 new cassette tests covering: - All tool_choice modes streaming (auto, required, named, none) - All tool_choice modes non-streaming (validates from_json path) - Reasoning streaming (Qwen3 and GPT-oss) Validates the accumulator correctly handles real multi-tool streaming responses and non-streaming JSON responses from multiple model families. Signed-off-by: Ashwin Giridharan --- .../tests/accumulator_cassette_test.rs | 393 +++++++++++++++++- 1 file changed, 391 insertions(+), 2 deletions(-) diff --git a/crates/agentic-core/tests/accumulator_cassette_test.rs b/crates/agentic-core/tests/accumulator_cassette_test.rs index 2d66eb0..3a06524 100644 --- a/crates/agentic-core/tests/accumulator_cassette_test.rs +++ b/crates/agentic-core/tests/accumulator_cassette_test.rs @@ -1,6 +1,9 @@ -//! Cassette-driven integration test: feeds real vLLM SSE recordings through +//! Cassette-driven integration tests: feed real vLLM SSE recordings through //! the full accumulator pipeline (normalize → `process_event` → finalize) and -//! verifies the resulting `OutputItem::FunctionCall` matches the expected values. +//! verify the resulting output items match expected values. +//! +//! Tests cover both the legacy `events/` cassettes (flat SSE list) and the +//! newer `tool_calls/` cassettes from PR #60 (multi-turn `turns` format). use serde::Deserialize; @@ -8,6 +11,10 @@ use agentic_core::executor::accumulator::ResponseAccumulator; use agentic_core::types::io::OutputItem; const CASSETTE_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/cassettes/events"); +const TOOL_CALLS_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/cassettes/tool_calls"); +const REASONING_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/cassettes/reasoning/responses"); + +// --- Legacy event cassette format --- #[derive(Deserialize)] struct EventCassette { @@ -29,6 +36,58 @@ fn load_cassette(filename: &str) -> EventCassette { serde_yml::from_str(&text).unwrap_or_else(|e| panic!("parse {path}: {e}")) } +// --- New multi-turn cassette format (PR #60) --- + +#[derive(Deserialize)] +struct TurnCassette { + turns: Vec, +} + +#[derive(Deserialize)] +struct Turn { + #[allow(dead_code)] + filename: String, + #[allow(dead_code)] + request: serde_yml::Value, + response: TurnResponse, +} + +#[derive(Deserialize)] +struct TurnResponse { + #[allow(dead_code)] + headers: serde_yml::Value, + #[serde(default)] + sse: Vec, + body: Option, +} + +fn load_turn_cassette_from(dir: &str, filename: &str) -> TurnCassette { + let path = format!("{dir}/{filename}"); + let text = std::fs::read_to_string(&path).unwrap_or_else(|e| panic!("read {path}: {e}")); + serde_yml::from_str(&text).unwrap_or_else(|e| panic!("parse {path}: {e}")) +} + +fn load_turn_cassette(filename: &str) -> TurnCassette { + load_turn_cassette_from(TOOL_CALLS_DIR, filename) +} + +fn load_reasoning_cassette(filename: &str) -> TurnCassette { + load_turn_cassette_from(REASONING_DIR, filename) +} + +/// Extracts `data: ...` lines from raw SSE entries (which may include +/// `event:` lines and blank separators). +fn extract_data_lines(sse_entries: &[String]) -> Vec { + sse_entries + .iter() + .flat_map(|entry| entry.lines()) + .filter(|line| line.starts_with("data: ")) + .map(ToString::to_string) + .collect() +} + +// === Legacy cassette tests === + /// Feeds a real vLLM `function_call` SSE recording through the accumulator and /// verifies the output contains the correct `FunctionCall` item. #[test] @@ -78,3 +137,333 @@ fn test_accumulator_cassette_text_only_no_function_calls() { ); } } + +// === PR #60 tool_calls cassette tests === + +/// `tool_choice=auto` streaming: model decides to call multiple tools (parallel tool use). +#[test] +fn test_tool_calls_cassette_auto_streaming() { + let cassette = load_turn_cassette("tool-call-auto-Qwen-Qwen3-30B-A3B-FP8-streaming.yaml"); + let turn = &cassette.turns[0]; + let data_lines = extract_data_lines(&turn.response.sse); + + let acc = ResponseAccumulator::from_sse_lines(data_lines, None); + let payload = acc.finalize("Qwen/Qwen3-30B-A3B-FP8", None, None); + + assert_eq!(payload.status, "completed"); + + let function_calls: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::FunctionCall(_))) + .collect(); + + assert!( + !function_calls.is_empty(), + "auto mode should produce at least one function call" + ); + + for item in &function_calls { + if let OutputItem::FunctionCall(fc) = item { + assert!(!fc.name.is_empty(), "function call name must not be empty"); + assert!(!fc.arguments.is_empty(), "function call arguments must not be empty"); + assert_eq!(fc.status, "completed"); + assert!(!fc.call_id.is_empty(), "call_id must be populated"); + } + } + + assert!(payload.usage.is_some()); +} + +/// `tool_choice=required` streaming: model is forced to call a tool. +#[test] +fn test_tool_calls_cassette_required_streaming() { + let cassette = load_turn_cassette("tool-call-required-Qwen-Qwen3-30B-A3B-FP8-streaming.yaml"); + let turn = &cassette.turns[0]; + let data_lines = extract_data_lines(&turn.response.sse); + + let acc = ResponseAccumulator::from_sse_lines(data_lines, None); + let payload = acc.finalize("Qwen/Qwen3-30B-A3B-FP8", None, None); + + assert_eq!(payload.status, "completed"); + + let function_calls: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::FunctionCall(_))) + .collect(); + + assert!( + !function_calls.is_empty(), + "required mode must produce at least one function call" + ); + + for item in &function_calls { + if let OutputItem::FunctionCall(fc) = item { + assert_eq!(fc.status, "completed"); + } + } +} + +/// `tool_choice=named` streaming: model calls a specific named tool. +#[test] +fn test_tool_calls_cassette_named_streaming() { + let cassette = load_turn_cassette("tool-call-named-Qwen-Qwen3-30B-A3B-FP8-streaming.yaml"); + let turn = &cassette.turns[0]; + let data_lines = extract_data_lines(&turn.response.sse); + + let acc = ResponseAccumulator::from_sse_lines(data_lines, None); + let payload = acc.finalize("Qwen/Qwen3-30B-A3B-FP8", None, None); + + assert_eq!(payload.status, "completed"); + + let function_calls: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::FunctionCall(_))) + .collect(); + + assert!( + !function_calls.is_empty(), + "named mode must produce at least one function call" + ); +} + +/// `tool_choice=none` streaming: model should NOT call any tools. +#[test] +fn test_tool_calls_cassette_none_streaming() { + let cassette = load_turn_cassette("tool-call-none-Qwen-Qwen3-30B-A3B-FP8-streaming.yaml"); + let turn = &cassette.turns[0]; + let data_lines = extract_data_lines(&turn.response.sse); + + let acc = ResponseAccumulator::from_sse_lines(data_lines, None); + let payload = acc.finalize("Qwen/Qwen3-30B-A3B-FP8", None, None); + + assert_eq!(payload.status, "completed"); + + let function_calls: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::FunctionCall(_))) + .collect(); + + assert!( + function_calls.is_empty(), + "none mode should produce zero function calls, got {}", + function_calls.len() + ); + + assert!( + !payload.output.is_empty(), + "none mode should still produce message output" + ); +} + +// === Non-streaming tool_calls cassette tests (exercises `from_json` path) === + +/// `tool_choice=auto` non-streaming: JSON response with parallel function calls. +#[test] +fn test_tool_calls_cassette_auto_nonstreaming() { + let cassette = load_turn_cassette("tool-call-auto-Qwen-Qwen3-30B-A3B-FP8-nonstreaming.yaml"); + let body = cassette.turns[0] + .response + .body + .as_ref() + .expect("non-streaming cassette must have body"); + let body_str = serde_json::to_string(body).unwrap(); + + let acc = ResponseAccumulator::from_json(&body_str, None).unwrap(); + let payload = acc.finalize("Qwen/Qwen3-30B-A3B-FP8", None, None); + + assert_eq!(payload.status, "completed"); + + let function_calls: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::FunctionCall(_))) + .collect(); + + assert!( + !function_calls.is_empty(), + "auto mode should produce at least one function call" + ); + + for item in &function_calls { + if let OutputItem::FunctionCall(fc) = item { + assert!(!fc.name.is_empty()); + assert!(!fc.arguments.is_empty()); + assert_eq!(fc.status, "completed"); + assert!(!fc.call_id.is_empty()); + } + } +} + +/// `tool_choice=required` non-streaming: forced tool call in JSON response. +#[test] +fn test_tool_calls_cassette_required_nonstreaming() { + let cassette = load_turn_cassette("tool-call-required-Qwen-Qwen3-30B-A3B-FP8-nonstreaming.yaml"); + let body = cassette.turns[0] + .response + .body + .as_ref() + .expect("non-streaming cassette must have body"); + let body_str = serde_json::to_string(body).unwrap(); + + let acc = ResponseAccumulator::from_json(&body_str, None).unwrap(); + let payload = acc.finalize("Qwen/Qwen3-30B-A3B-FP8", None, None); + + assert_eq!(payload.status, "completed"); + + let function_calls: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::FunctionCall(_))) + .collect(); + + assert!( + !function_calls.is_empty(), + "required mode must produce at least one function call" + ); +} + +/// `tool_choice=named` non-streaming: specific named tool in JSON response. +#[test] +fn test_tool_calls_cassette_named_nonstreaming() { + let cassette = load_turn_cassette("tool-call-named-Qwen-Qwen3-30B-A3B-FP8-nonstreaming.yaml"); + let body = cassette.turns[0] + .response + .body + .as_ref() + .expect("non-streaming cassette must have body"); + let body_str = serde_json::to_string(body).unwrap(); + + let acc = ResponseAccumulator::from_json(&body_str, None).unwrap(); + let payload = acc.finalize("Qwen/Qwen3-30B-A3B-FP8", None, None); + + assert_eq!(payload.status, "completed"); + + let function_calls: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::FunctionCall(_))) + .collect(); + + assert!( + !function_calls.is_empty(), + "named mode must produce at least one function call" + ); +} + +/// `tool_choice=none` non-streaming: no function calls in JSON response. +#[test] +fn test_tool_calls_cassette_none_nonstreaming() { + let cassette = load_turn_cassette("tool-call-none-Qwen-Qwen3-30B-A3B-FP8-nonstreaming.yaml"); + let body = cassette.turns[0] + .response + .body + .as_ref() + .expect("non-streaming cassette must have body"); + let body_str = serde_json::to_string(body).unwrap(); + + let acc = ResponseAccumulator::from_json(&body_str, None).unwrap(); + let payload = acc.finalize("Qwen/Qwen3-30B-A3B-FP8", None, None); + + assert_eq!(payload.status, "completed"); + + let function_calls: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::FunctionCall(_))) + .collect(); + + assert!( + function_calls.is_empty(), + "none mode should produce zero function calls, got {}", + function_calls.len() + ); +} + +// === Reasoning cassette tests (regression guard for reasoning + function_call coexistence) === + +/// Reasoning streaming (Qwen3): accumulator produces `Reasoning` + `Message` items. +#[test] +fn test_reasoning_cassette_qwen3_streaming() { + let cassette = load_reasoning_cassette("reasoning-single-Qwen-Qwen3-30B-A3B-FP8-streaming.yaml"); + let turn = &cassette.turns[0]; + let data_lines = extract_data_lines(&turn.response.sse); + + let acc = ResponseAccumulator::from_sse_lines(data_lines, None); + let payload = acc.finalize("Qwen/Qwen3-30B-A3B-FP8", None, None); + + assert_eq!(payload.status, "completed"); + + let reasoning_items: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::Reasoning(_))) + .collect(); + + let message_items: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::Message(_))) + .collect(); + + assert!( + !reasoning_items.is_empty(), + "reasoning cassette must produce at least one Reasoning item" + ); + assert!( + !message_items.is_empty(), + "reasoning cassette should also produce a Message item" + ); + + // No function calls should leak in + let function_calls: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::FunctionCall(_))) + .collect(); + assert!( + function_calls.is_empty(), + "reasoning-only cassette should not produce function calls" + ); +} + +/// Reasoning streaming (GPT-oss): validates accumulator handles different model's reasoning format. +/// Note: GPT-oss emits `output_text.done` without a preceding `output_item.added` for the +/// message, so the accumulator only captures the reasoning item from the streaming path. +/// The message content is available in the `response.completed` payload's output array. +#[test] +fn test_reasoning_cassette_gpt_oss_streaming() { + let cassette = load_reasoning_cassette("reasoning-single-openai-gpt-oss-20b-streaming.yaml"); + let turn = &cassette.turns[0]; + let data_lines = extract_data_lines(&turn.response.sse); + + let acc = ResponseAccumulator::from_sse_lines(data_lines, None); + let payload = acc.finalize("openai/gpt-oss-20b", None, None); + + assert_eq!(payload.status, "completed"); + + let reasoning_items: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::Reasoning(_))) + .collect(); + + assert!( + !reasoning_items.is_empty(), + "GPT-oss reasoning cassette must produce at least one Reasoning item" + ); + + // No function calls should leak in + let function_calls: Vec<_> = payload + .output + .iter() + .filter(|item| matches!(item, OutputItem::FunctionCall(_))) + .collect(); + assert!( + function_calls.is_empty(), + "reasoning-only cassette should not produce function calls" + ); +}