From 5a08f64c7cfb9c4ae0fdef22d875487bd96307a2 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 11 Jun 2026 12:28:23 -0700 Subject: [PATCH 01/17] =?UTF-8?q?feat:=20add=20tool=20dispatch=20layer=20?= =?UTF-8?q?=E2=80=94=20ToolContext,=20traits,=20and=20LoopDecision?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces the tool execution framework for the agentic loop: - `tools/` module (crate root): McpToolExecutor, WebSearchProvider, VectorStoreClient traits with Pin> for dyn-compatibility - `executor/dispatch.rs`: dispatch_tools() inspects output for FunctionCall items, returns LoopDecision (Continue/Done/Incomplete) - `executor/tool_context.rs`: ToolContext holds provider Arcs, execute_all() runs tool calls in parallel via join_all, individual failures produce error output per call_id (not total failure) For MVP, all FunctionCall items route to configured providers (MCP first, then web_search, then vector_store). Client-side function routing (checking the request's tools array) is deferred to a follow-up. 10 integration tests with mock executors covering: empty output, single/ parallel calls, max iterations, failing tools, no provider, mixed results. Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/executor/dispatch.rs | 57 ++++ crates/agentic-core/src/executor/mod.rs | 4 + .../agentic-core/src/executor/tool_context.rs | 66 ++++ crates/agentic-core/src/lib.rs | 1 + crates/agentic-core/src/tools/mcp.rs | 18 ++ crates/agentic-core/src/tools/mod.rs | 7 + crates/agentic-core/src/tools/vector_store.rs | 17 + crates/agentic-core/src/tools/web_search.rs | 15 + crates/agentic-core/tests/dispatch_test.rs | 292 ++++++++++++++++++ 9 files changed, 477 insertions(+) create mode 100644 crates/agentic-core/src/executor/dispatch.rs create mode 100644 crates/agentic-core/src/executor/tool_context.rs create mode 100644 crates/agentic-core/src/tools/mcp.rs create mode 100644 crates/agentic-core/src/tools/mod.rs create mode 100644 crates/agentic-core/src/tools/vector_store.rs create mode 100644 crates/agentic-core/src/tools/web_search.rs create mode 100644 crates/agentic-core/tests/dispatch_test.rs diff --git a/crates/agentic-core/src/executor/dispatch.rs b/crates/agentic-core/src/executor/dispatch.rs new file mode 100644 index 0000000..ea228fb --- /dev/null +++ b/crates/agentic-core/src/executor/dispatch.rs @@ -0,0 +1,57 @@ +use crate::executor::error::ExecutorResult; +use crate::executor::tool_context::ToolContext; +use crate::types::io::{InputItem, OutputItem}; + +/// Decision returned by [`dispatch_tools`] to drive the agentic loop. +#[derive(Debug)] +#[non_exhaustive] +pub enum LoopDecision { + /// Gateway-executed tools returned results. Inject into input and re-infer. + Continue(Vec), + /// No tool calls found, or only client-side functions. Return response to client. + Done, + /// Max iterations reached or unrecoverable tool failure. + Incomplete(String), +} + +/// Inspect executor output for function calls and dispatch them via [`ToolContext`]. +/// +/// Returns [`LoopDecision`] indicating whether the caller should re-enter +/// inference (Continue), return the response (Done), or mark incomplete. +/// +/// For MVP, all `FunctionCall` items are treated as gateway-executable. +/// Client-side function routing (checking the request's `tools` array) is +/// deferred to a follow-up PR. +/// +/// # Errors +/// +/// Returns `ExecutorError` only on internal failures (e.g. serialization). +/// Individual tool execution failures are captured as error output strings +/// in the returned `InputItem` list — they do NOT propagate as errors. +pub async fn dispatch_tools( + output: &[OutputItem], + tool_ctx: &ToolContext, + iteration: usize, +) -> ExecutorResult { + let function_calls: Vec<_> = output + .iter() + .filter_map(|item| match item { + OutputItem::FunctionCall(fc) => Some(fc), + _ => None, + }) + .collect(); + + if function_calls.is_empty() { + return Ok(LoopDecision::Done); + } + + if iteration >= tool_ctx.max_iterations { + return Ok(LoopDecision::Incomplete(format!( + "max tool iterations reached ({iteration}/{})", + tool_ctx.max_iterations + ))); + } + + let results = tool_ctx.execute_all(&function_calls).await; + Ok(LoopDecision::Continue(results)) +} diff --git a/crates/agentic-core/src/executor/mod.rs b/crates/agentic-core/src/executor/mod.rs index 32fbabc..3025400 100644 --- a/crates/agentic-core/src/executor/mod.rs +++ b/crates/agentic-core/src/executor/mod.rs @@ -1,13 +1,17 @@ //! Agentic loop executor. pub mod accumulator; +pub mod dispatch; pub mod engine; pub mod error; pub mod modes; pub mod request; +pub mod tool_context; +pub use dispatch::{LoopDecision, dispatch_tools}; pub use engine::{BoxStream, call_inference, create_conversation, execute, persist_response, rehydrate_conversation}; pub use error::{ExecutorError, ExecutorResult}; pub use modes::{ConversationHandler, ResponseHandler}; pub use request::ExecutionContext; pub use request::RequestContext; +pub use tool_context::ToolContext; diff --git a/crates/agentic-core/src/executor/tool_context.rs b/crates/agentic-core/src/executor/tool_context.rs new file mode 100644 index 0000000..b9ee7d0 --- /dev/null +++ b/crates/agentic-core/src/executor/tool_context.rs @@ -0,0 +1,66 @@ +use std::sync::Arc; + +use crate::executor::ExecutorError; +use crate::tools::{McpToolExecutor, VectorStoreClient, WebSearchProvider}; +use crate::types::io::{FunctionToolCall, FunctionToolResultMessage, InputItem}; + +/// Holds references to tool execution providers. +/// +/// Passed into [`dispatch_tools`](super::dispatch::dispatch_tools) to resolve +/// and execute tool calls. Each provider is optional — calls to unconfigured +/// providers produce an error result (not a panic). +#[derive(Default)] +pub struct ToolContext { + pub mcp: Option>, + pub web_search: Option>, + pub vector_store: Option>, + pub max_iterations: usize, +} + +impl ToolContext { + /// Execute all tool calls in parallel. + /// + /// Individual failures produce an error string as the tool output for that + /// `call_id` — the dispatch does NOT fail as a whole. This matches the + /// Responses API behavior where partial tool results are acceptable. + pub async fn execute_all(&self, calls: &[&FunctionToolCall]) -> Vec { + let futures: Vec<_> = calls.iter().map(|call| self.execute_one(call)).collect(); + futures::future::join_all(futures).await + } + + async fn execute_one(&self, call: &FunctionToolCall) -> InputItem { + let result = self.route_call(call).await; + + let output = match result { + Ok(s) => s, + Err(e) => format!("{{\"error\": \"{e}\"}}"), + }; + + InputItem::FunctionCallOutput(FunctionToolResultMessage { + call_id: call.call_id.clone(), + output, + }) + } + + async fn route_call(&self, call: &FunctionToolCall) -> Result { + // MVP: route all calls to MCP executor. + // Future: inspect request tools array to determine provider. + if let Some(mcp) = &self.mcp { + return mcp.execute(&call.name, &call.arguments, &serde_json::Value::Null).await; + } + + if let Some(web) = &self.web_search { + return web.search(&call.arguments, "medium").await; + } + + if let Some(vs) = &self.vector_store { + let results = vs.search("", &call.arguments, 5).await?; + return Ok(serde_json::to_string(&results).unwrap_or_default()); + } + + Err(ExecutorError::InvalidRequest(format!( + "no tool provider configured for '{}'", + call.name + ))) + } +} diff --git a/crates/agentic-core/src/lib.rs b/crates/agentic-core/src/lib.rs index 5828a68..3826b12 100644 --- a/crates/agentic-core/src/lib.rs +++ b/crates/agentic-core/src/lib.rs @@ -5,6 +5,7 @@ pub mod executor; pub mod proxy; pub mod readiness; pub mod storage; +pub mod tools; pub mod types; pub mod utils; diff --git a/crates/agentic-core/src/tools/mcp.rs b/crates/agentic-core/src/tools/mcp.rs new file mode 100644 index 0000000..45e8349 --- /dev/null +++ b/crates/agentic-core/src/tools/mcp.rs @@ -0,0 +1,18 @@ +use std::future::Future; +use std::pin::Pin; + +use crate::executor::ExecutorError; + +/// Execute a tool call via the Model Context Protocol. +/// +/// Implementations connect to an MCP server and invoke the named tool +/// with the provided arguments. The result is returned as a serialized +/// JSON string suitable for injection into `FunctionToolResultMessage.output`. +pub trait McpToolExecutor: Send + Sync { + fn execute( + &self, + tool_name: &str, + arguments: &str, + server_config: &serde_json::Value, + ) -> Pin> + Send + '_>>; +} diff --git a/crates/agentic-core/src/tools/mod.rs b/crates/agentic-core/src/tools/mod.rs new file mode 100644 index 0000000..331cd07 --- /dev/null +++ b/crates/agentic-core/src/tools/mod.rs @@ -0,0 +1,7 @@ +pub mod mcp; +pub mod vector_store; +pub mod web_search; + +pub use mcp::McpToolExecutor; +pub use vector_store::VectorStoreClient; +pub use web_search::WebSearchProvider; diff --git a/crates/agentic-core/src/tools/vector_store.rs b/crates/agentic-core/src/tools/vector_store.rs new file mode 100644 index 0000000..bfbc3d2 --- /dev/null +++ b/crates/agentic-core/src/tools/vector_store.rs @@ -0,0 +1,17 @@ +use std::future::Future; +use std::pin::Pin; + +use crate::executor::ExecutorError; + +/// Search a vector store (e.g. OGX) and return matching documents. +/// +/// Results are returned as a JSON array of document objects. The caller +/// serializes them into `FunctionToolResultMessage.output`. +pub trait VectorStoreClient: Send + Sync { + fn search( + &self, + store_id: &str, + query: &str, + max_results: u32, + ) -> Pin, ExecutorError>> + Send + '_>>; +} diff --git a/crates/agentic-core/src/tools/web_search.rs b/crates/agentic-core/src/tools/web_search.rs new file mode 100644 index 0000000..1e6032b --- /dev/null +++ b/crates/agentic-core/src/tools/web_search.rs @@ -0,0 +1,15 @@ +use std::future::Future; +use std::pin::Pin; + +use crate::executor::ExecutorError; + +/// Perform a web search and return results as a serialized string. +/// +/// `context_size` controls result verbosity: `"low"`, `"medium"`, or `"high"`. +pub trait WebSearchProvider: Send + Sync { + fn search( + &self, + query: &str, + context_size: &str, + ) -> Pin> + Send + '_>>; +} diff --git a/crates/agentic-core/tests/dispatch_test.rs b/crates/agentic-core/tests/dispatch_test.rs new file mode 100644 index 0000000..fd7b535 --- /dev/null +++ b/crates/agentic-core/tests/dispatch_test.rs @@ -0,0 +1,292 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use agentic_core::executor::{ExecutorError, LoopDecision, ToolContext, dispatch_tools}; +use agentic_core::tools::McpToolExecutor; +use agentic_core::types::io::{FunctionToolCall, InputItem, OutputItem, OutputMessage, OutputTextContent}; + +// --- Mock implementations --- + +struct MockMcp { + responses: std::collections::HashMap, +} + +impl MockMcp { + fn new(pairs: &[(&str, &str)]) -> Self { + Self { + responses: pairs + .iter() + .map(|(k, v)| ((*k).to_string(), (*v).to_string())) + .collect(), + } + } +} + +impl McpToolExecutor for MockMcp { + fn execute( + &self, + tool_name: &str, + _arguments: &str, + _server_config: &serde_json::Value, + ) -> Pin> + Send + '_>> { + let name = tool_name.to_string(); + Box::pin(async move { + self.responses + .get(&name) + .cloned() + .ok_or_else(|| ExecutorError::InvalidRequest(format!("unknown tool: {name}"))) + }) + } +} + +struct FailingMcp; + +impl McpToolExecutor for FailingMcp { + fn execute( + &self, + tool_name: &str, + _arguments: &str, + _server_config: &serde_json::Value, + ) -> Pin> + Send + '_>> { + let name = tool_name.to_string(); + Box::pin(async move { Err(ExecutorError::StreamError(format!("tool '{name}' crashed"))) }) + } +} + +// --- Helpers --- + +fn make_function_call(name: &str, args: &str, call_id: &str) -> OutputItem { + OutputItem::FunctionCall(FunctionToolCall { + id: format!("fc_{call_id}"), + call_id: call_id.to_string(), + name: name.to_string(), + arguments: args.to_string(), + status: "completed".to_string(), + }) +} + +fn make_message(text: &str) -> OutputItem { + OutputItem::Message(OutputMessage { + id: "msg_1".to_string(), + role: "assistant".to_string(), + status: "completed".to_string(), + content: vec![OutputTextContent::new(text)], + }) +} + +fn tool_ctx_with_mcp(mcp: impl McpToolExecutor + 'static) -> ToolContext { + ToolContext { + mcp: Some(Arc::new(mcp)), + max_iterations: 10, + ..ToolContext::default() + } +} + +// --- Tests --- + +#[tokio::test] +async fn test_no_function_calls_returns_done() { + let output = vec![make_message("Hello world")]; + let ctx = ToolContext::default(); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + assert!(matches!(decision, LoopDecision::Done)); +} + +#[tokio::test] +async fn test_empty_output_returns_done() { + let output: Vec = vec![]; + let ctx = ToolContext::default(); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + assert!(matches!(decision, LoopDecision::Done)); +} + +#[tokio::test] +async fn test_single_function_call_returns_continue() { + let output = vec![make_function_call("get_weather", r#"{"city":"SF"}"#, "call_1")]; + let mcp = MockMcp::new(&[("get_weather", r#"{"temp": 72}"#)]); + let ctx = tool_ctx_with_mcp(mcp); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 1); + if let InputItem::FunctionCallOutput(result) = &items[0] { + assert_eq!(result.call_id, "call_1"); + assert_eq!(result.output, r#"{"temp": 72}"#); + } else { + panic!("expected FunctionCallOutput"); + } + } else { + panic!("expected Continue, got {decision:?}"); + } +} + +#[tokio::test] +async fn test_parallel_function_calls() { + let output = vec![ + make_function_call("get_weather", r#"{"city":"SF"}"#, "call_1"), + make_function_call("get_time", r#"{"tz":"PST"}"#, "call_2"), + ]; + let mcp = MockMcp::new(&[("get_weather", "sunny"), ("get_time", "10:30 AM")]); + let ctx = tool_ctx_with_mcp(mcp); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 2); + let outputs: Vec<_> = items + .iter() + .filter_map(|item| match item { + InputItem::FunctionCallOutput(r) => Some((r.call_id.as_str(), r.output.as_str())), + _ => None, + }) + .collect(); + assert!(outputs.contains(&("call_1", "sunny"))); + assert!(outputs.contains(&("call_2", "10:30 AM"))); + } else { + panic!("expected Continue"); + } +} + +#[tokio::test] +async fn test_max_iterations_returns_incomplete() { + let output = vec![make_function_call("get_weather", "{}", "call_1")]; + let mcp = MockMcp::new(&[("get_weather", "sunny")]); + let ctx = ToolContext { + mcp: Some(Arc::new(mcp)), + max_iterations: 3, + ..ToolContext::default() + }; + + let decision = dispatch_tools(&output, &ctx, 3).await.unwrap(); + + if let LoopDecision::Incomplete(reason) = decision { + assert!(reason.contains("max tool iterations")); + } else { + panic!("expected Incomplete, got {decision:?}"); + } +} + +#[tokio::test] +async fn test_failing_tool_produces_error_output_not_total_failure() { + let output = vec![make_function_call("bad_tool", "{}", "call_1")]; + let ctx = tool_ctx_with_mcp(FailingMcp); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 1); + if let InputItem::FunctionCallOutput(result) = &items[0] { + assert_eq!(result.call_id, "call_1"); + assert!(result.output.contains("error")); + assert!(result.output.contains("crashed")); + } else { + panic!("expected FunctionCallOutput"); + } + } else { + panic!("expected Continue (with error output), got {decision:?}"); + } +} + +#[tokio::test] +async fn test_no_provider_configured_produces_error_output() { + let output = vec![make_function_call("get_weather", "{}", "call_1")]; + let ctx = ToolContext { + max_iterations: 10, + ..ToolContext::default() + }; // no providers, but iterations allowed + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 1); + if let InputItem::FunctionCallOutput(result) = &items[0] { + assert!(result.output.contains("error")); + assert!(result.output.contains("no tool provider configured")); + } else { + panic!("expected FunctionCallOutput"); + } + } else { + panic!("expected Continue (with error output), got {decision:?}"); + } +} + +#[tokio::test] +async fn test_mixed_success_and_failure() { + let output = vec![ + make_function_call("good_tool", "{}", "call_1"), + make_function_call("bad_tool", "{}", "call_2"), + ]; + let mcp = MockMcp::new(&[("good_tool", "success result")]); + // bad_tool not in map → returns error from MockMcp + let ctx = tool_ctx_with_mcp(mcp); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 2); + let results: std::collections::HashMap<_, _> = items + .iter() + .filter_map(|item| match item { + InputItem::FunctionCallOutput(r) => Some((r.call_id.as_str(), r.output.as_str())), + _ => None, + }) + .collect(); + assert_eq!(results["call_1"], "success result"); + assert!(results["call_2"].contains("error")); + } else { + panic!("expected Continue"); + } +} + +#[tokio::test] +async fn test_function_call_mixed_with_message_output() { + let output = vec![ + make_message("Let me check the weather"), + make_function_call("get_weather", r#"{"city":"NYC"}"#, "call_1"), + ]; + let mcp = MockMcp::new(&[("get_weather", "rainy")]); + let ctx = tool_ctx_with_mcp(mcp); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 1); // only the function call, not the message + if let InputItem::FunctionCallOutput(result) = &items[0] { + assert_eq!(result.call_id, "call_1"); + assert_eq!(result.output, "rainy"); + } else { + panic!("expected FunctionCallOutput"); + } + } else { + panic!("expected Continue"); + } +} + +#[tokio::test] +async fn test_iteration_zero_under_max_executes() { + let output = vec![make_function_call("tool", "{}", "call_1")]; + let mcp = MockMcp::new(&[("tool", "ok")]); + let ctx = ToolContext { + mcp: Some(Arc::new(mcp)), + max_iterations: 1, + ..ToolContext::default() + }; + + // iteration=0, max=1 → should execute (0 < 1) + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + assert!(matches!(decision, LoopDecision::Continue(_))); + + // iteration=1, max=1 → should be incomplete (1 >= 1) + let mcp2 = MockMcp::new(&[("tool", "ok")]); + let ctx2 = ToolContext { + mcp: Some(Arc::new(mcp2)), + max_iterations: 1, + ..ToolContext::default() + }; + let decision2 = dispatch_tools(&output, &ctx2, 1).await.unwrap(); + assert!(matches!(decision2, LoopDecision::Incomplete(_))); +} From ba9ed8722dd446f2e51c9c8fac3b53b55784945a Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 11 Jun 2026 13:39:49 -0700 Subject: [PATCH 02/17] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20JSO?= =?UTF-8?q?N=20escaping,=20default=20iterations,=20expect?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix JSON injection: use serde_json::json! for error output (not format!) - Fix default max_iterations: manual Default impl with 10 (was 0 via derive) - Fix unwrap_or_default: use expect() for infallible Vec serialization - Add concurrency comment to execute_all - Document MVP routing order in route_call Signed-off-by: Ashwin Giridharan --- .../agentic-core/src/executor/tool_context.rs | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/crates/agentic-core/src/executor/tool_context.rs b/crates/agentic-core/src/executor/tool_context.rs index b9ee7d0..043e28a 100644 --- a/crates/agentic-core/src/executor/tool_context.rs +++ b/crates/agentic-core/src/executor/tool_context.rs @@ -9,7 +9,6 @@ use crate::types::io::{FunctionToolCall, FunctionToolResultMessage, InputItem}; /// Passed into [`dispatch_tools`](super::dispatch::dispatch_tools) to resolve /// and execute tool calls. Each provider is optional — calls to unconfigured /// providers produce an error result (not a panic). -#[derive(Default)] pub struct ToolContext { pub mcp: Option>, pub web_search: Option>, @@ -17,12 +16,24 @@ pub struct ToolContext { pub max_iterations: usize, } +impl Default for ToolContext { + fn default() -> Self { + Self { + mcp: None, + web_search: None, + vector_store: None, + max_iterations: 10, + } + } +} + impl ToolContext { - /// Execute all tool calls in parallel. + /// Execute all tool calls concurrently via `join_all`. /// + /// Concurrency note: futures run on the tokio runtime's thread pool. + /// True parallelism depends on the runtime being multi-threaded. /// Individual failures produce an error string as the tool output for that - /// `call_id` — the dispatch does NOT fail as a whole. This matches the - /// Responses API behavior where partial tool results are acceptable. + /// `call_id` — the dispatch does NOT fail as a whole. pub async fn execute_all(&self, calls: &[&FunctionToolCall]) -> Vec { let futures: Vec<_> = calls.iter().map(|call| self.execute_one(call)).collect(); futures::future::join_all(futures).await @@ -33,7 +44,7 @@ impl ToolContext { let output = match result { Ok(s) => s, - Err(e) => format!("{{\"error\": \"{e}\"}}"), + Err(e) => serde_json::json!({"error": e.to_string()}).to_string(), }; InputItem::FunctionCallOutput(FunctionToolResultMessage { @@ -42,9 +53,9 @@ impl ToolContext { }) } + /// MVP routing: tries providers in order (MCP → `web_search` → `vector_store`). + /// Future: route based on tool type from the request's tools array. async fn route_call(&self, call: &FunctionToolCall) -> Result { - // MVP: route all calls to MCP executor. - // Future: inspect request tools array to determine provider. if let Some(mcp) = &self.mcp { return mcp.execute(&call.name, &call.arguments, &serde_json::Value::Null).await; } @@ -55,7 +66,7 @@ impl ToolContext { if let Some(vs) = &self.vector_store { let results = vs.search("", &call.arguments, 5).await?; - return Ok(serde_json::to_string(&results).unwrap_or_default()); + return Ok(serde_json::to_string(&results).expect("Vec is always serializable")); } Err(ExecutorError::InvalidRequest(format!( From 7865964c06180531af9e27dbe9377162dcc13335 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 11 Jun 2026 14:45:54 -0700 Subject: [PATCH 03/17] docs: document failure model and retry policy in execute_all Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/executor/tool_context.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/crates/agentic-core/src/executor/tool_context.rs b/crates/agentic-core/src/executor/tool_context.rs index 043e28a..2071cf3 100644 --- a/crates/agentic-core/src/executor/tool_context.rs +++ b/crates/agentic-core/src/executor/tool_context.rs @@ -32,8 +32,17 @@ impl ToolContext { /// /// Concurrency note: futures run on the tokio runtime's thread pool. /// True parallelism depends on the runtime being multi-threaded. - /// Individual failures produce an error string as the tool output for that - /// `call_id` — the dispatch does NOT fail as a whole. + /// + /// Failure model: individual failures produce an error JSON string as the + /// tool output for that `call_id` — the dispatch does NOT fail as a whole. + /// The model sees the error and decides whether to retry (next iteration), + /// try a different approach, or answer without that result. + /// + /// Retry policy: this layer does NOT retry. Providers handle their own + /// retries internally (transient network errors, 503s, etc.). By the time + /// an error reaches here, the provider already exhausted its retry budget. + /// The agentic loop itself serves as a higher-level retry — the model can + /// re-call a failed tool on the next iteration if it chooses to. pub async fn execute_all(&self, calls: &[&FunctionToolCall]) -> Vec { let futures: Vec<_> = calls.iter().map(|call| self.execute_one(call)).collect(); futures::future::join_all(futures).await From f8310fe4882aeab56637554c7b89d2deb5ca7c16 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 11 Jun 2026 15:00:24 -0700 Subject: [PATCH 04/17] docs: add comments to dispatch tests explaining intent and expectations Signed-off-by: Ashwin Giridharan --- crates/agentic-core/tests/dispatch_test.rs | 40 +++++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/crates/agentic-core/tests/dispatch_test.rs b/crates/agentic-core/tests/dispatch_test.rs index fd7b535..ff09959 100644 --- a/crates/agentic-core/tests/dispatch_test.rs +++ b/crates/agentic-core/tests/dispatch_test.rs @@ -1,3 +1,5 @@ +#![allow(clippy::doc_markdown)] + use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -8,6 +10,8 @@ use agentic_core::types::io::{FunctionToolCall, InputItem, OutputItem, OutputMes // --- Mock implementations --- +/// Mock MCP executor that returns pre-configured responses by tool name. +/// If a tool name is not in the map, returns an error (simulating unknown tool). struct MockMcp { responses: std::collections::HashMap, } @@ -40,6 +44,7 @@ impl McpToolExecutor for MockMcp { } } +/// Mock MCP executor that always fails — simulates a crashed/unavailable tool provider. struct FailingMcp; impl McpToolExecutor for FailingMcp { @@ -85,6 +90,8 @@ fn tool_ctx_with_mcp(mcp: impl McpToolExecutor + 'static) -> ToolContext { // --- Tests --- +/// When output contains only text messages (no FunctionCall items), +/// dispatch should return Done — nothing to execute. #[tokio::test] async fn test_no_function_calls_returns_done() { let output = vec![make_message("Hello world")]; @@ -94,6 +101,7 @@ async fn test_no_function_calls_returns_done() { assert!(matches!(decision, LoopDecision::Done)); } +/// When output is completely empty, dispatch returns Done. #[tokio::test] async fn test_empty_output_returns_done() { let output: Vec = vec![]; @@ -103,6 +111,8 @@ async fn test_empty_output_returns_done() { assert!(matches!(decision, LoopDecision::Done)); } +/// Single FunctionCall in output → execute via MCP → return Continue with +/// the tool result wrapped as InputItem::FunctionCallOutput. #[tokio::test] async fn test_single_function_call_returns_continue() { let output = vec![make_function_call("get_weather", r#"{"city":"SF"}"#, "call_1")]; @@ -124,6 +134,8 @@ async fn test_single_function_call_returns_continue() { } } +/// Multiple FunctionCall items in output → all execute concurrently via join_all → +/// Continue with results for each call_id. Order may vary (parallel execution). #[tokio::test] async fn test_parallel_function_calls() { let output = vec![ @@ -151,6 +163,8 @@ async fn test_parallel_function_calls() { } } +/// When iteration count reaches max_iterations, dispatch returns Incomplete +/// WITHOUT executing any tools — prevents infinite tool loops. #[tokio::test] async fn test_max_iterations_returns_incomplete() { let output = vec![make_function_call("get_weather", "{}", "call_1")]; @@ -161,6 +175,7 @@ async fn test_max_iterations_returns_incomplete() { ..ToolContext::default() }; + // iteration=3, max=3 → 3 >= 3 is true → Incomplete let decision = dispatch_tools(&output, &ctx, 3).await.unwrap(); if let LoopDecision::Incomplete(reason) = decision { @@ -170,6 +185,9 @@ async fn test_max_iterations_returns_incomplete() { } } +/// When a tool provider returns an error, it becomes an error JSON string +/// in the output (not a total dispatch failure). The model sees the error +/// and can decide to retry on the next iteration. #[tokio::test] async fn test_failing_tool_produces_error_output_not_total_failure() { let output = vec![make_function_call("bad_tool", "{}", "call_1")]; @@ -191,13 +209,16 @@ async fn test_failing_tool_produces_error_output_not_total_failure() { } } +/// When no providers are configured at all, each call gets an error output +/// saying "no tool provider configured". dispatch still returns Continue +/// (the model sees the errors and handles them). #[tokio::test] async fn test_no_provider_configured_produces_error_output() { let output = vec![make_function_call("get_weather", "{}", "call_1")]; let ctx = ToolContext { max_iterations: 10, ..ToolContext::default() - }; // no providers, but iterations allowed + }; let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); @@ -214,6 +235,9 @@ async fn test_no_provider_configured_produces_error_output() { } } +/// When multiple tools are called and some succeed while others fail, +/// ALL results are returned — successes with their output, failures with +/// error JSON. The model gets partial results and decides what to do. #[tokio::test] async fn test_mixed_success_and_failure() { let output = vec![ @@ -221,7 +245,7 @@ async fn test_mixed_success_and_failure() { make_function_call("bad_tool", "{}", "call_2"), ]; let mcp = MockMcp::new(&[("good_tool", "success result")]); - // bad_tool not in map → returns error from MockMcp + // bad_tool not in MockMcp map → returns InvalidRequest error let ctx = tool_ctx_with_mcp(mcp); let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); @@ -242,6 +266,9 @@ async fn test_mixed_success_and_failure() { } } +/// When output contains both Message and FunctionCall items, only the +/// FunctionCall items are dispatched. Messages are ignored by dispatch +/// (they're part of the response, not actionable tool calls). #[tokio::test] async fn test_function_call_mixed_with_message_output() { let output = vec![ @@ -254,7 +281,7 @@ async fn test_function_call_mixed_with_message_output() { let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); if let LoopDecision::Continue(items) = decision { - assert_eq!(items.len(), 1); // only the function call, not the message + assert_eq!(items.len(), 1); if let InputItem::FunctionCallOutput(result) = &items[0] { assert_eq!(result.call_id, "call_1"); assert_eq!(result.output, "rainy"); @@ -266,6 +293,9 @@ async fn test_function_call_mixed_with_message_output() { } } +/// Boundary test: iteration=0 with max_iterations=1 should execute (0 < 1). +/// iteration=1 with max_iterations=1 should return Incomplete (1 >= 1). +/// Verifies the >= comparison is correct. #[tokio::test] async fn test_iteration_zero_under_max_executes() { let output = vec![make_function_call("tool", "{}", "call_1")]; @@ -276,11 +306,11 @@ async fn test_iteration_zero_under_max_executes() { ..ToolContext::default() }; - // iteration=0, max=1 → should execute (0 < 1) + // iteration=0, max=1 → 0 < 1 → should execute let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); assert!(matches!(decision, LoopDecision::Continue(_))); - // iteration=1, max=1 → should be incomplete (1 >= 1) + // iteration=1, max=1 → 1 >= 1 → should be Incomplete let mcp2 = MockMcp::new(&[("tool", "ok")]); let ctx2 = ToolContext { mcp: Some(Arc::new(mcp2)), From 574c94b92e41bdf88b09381ec9c31ce6d0476b0b Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 11 Jun 2026 15:06:42 -0700 Subject: [PATCH 05/17] fix: add comment before inner attribute to avoid shebang false positive Pre-commit hook interprets #![...] at line 1 as a shebang. Adding a comment line before it prevents the false positive. Signed-off-by: Ashwin Giridharan --- crates/agentic-core/tests/dispatch_test.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/agentic-core/tests/dispatch_test.rs b/crates/agentic-core/tests/dispatch_test.rs index ff09959..ab2f40f 100644 --- a/crates/agentic-core/tests/dispatch_test.rs +++ b/crates/agentic-core/tests/dispatch_test.rs @@ -1,3 +1,4 @@ +// Integration tests for the tool dispatch layer. #![allow(clippy::doc_markdown)] use std::future::Future; From 6379bac875220beb5ed71ef990158e7cdfbc8206 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 12 Jun 2026 13:13:55 -0700 Subject: [PATCH 06/17] =?UTF-8?q?feat:=20add=20execute=5Floop=20=E2=80=94?= =?UTF-8?q?=20agentic=20loop=20orchestrator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Composes execute() + dispatch_tools() in a tool-call loop: 1. Execute request (non-streaming) 2. dispatch_tools inspects output for FunctionCall items 3. If Continue → inject tool results into input, re-enter 4. If Done/Incomplete → persist and return final payload MVP: non-streaming only. Streaming + tool dispatch requires StreamTee (future PR). Rejects stream=true with clear error. This completes Phase 2 of the core API design (PR #44): traits + dispatch + loop orchestration in one PR. Signed-off-by: Ashwin Giridharan --- .../agentic-core/src/executor/execute_loop.rs | 86 +++++++++++++++++++ crates/agentic-core/src/executor/mod.rs | 2 + 2 files changed, 88 insertions(+) create mode 100644 crates/agentic-core/src/executor/execute_loop.rs diff --git a/crates/agentic-core/src/executor/execute_loop.rs b/crates/agentic-core/src/executor/execute_loop.rs new file mode 100644 index 0000000..77742e5 --- /dev/null +++ b/crates/agentic-core/src/executor/execute_loop.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use either::Either; +use tracing::debug; + +use crate::executor::dispatch::{LoopDecision, dispatch_tools}; +use crate::executor::engine::{execute, persist_response}; +use crate::executor::error::ExecutorResult; +use crate::executor::request::ExecutionContext; +use crate::executor::tool_context::ToolContext; +use crate::types::io::ResponsesInput; +use crate::types::request_response::{RequestPayload, ResponsePayload}; + +/// Run the agentic loop: execute → dispatch tools → re-enter until done. +/// +/// Non-streaming MVP: each iteration calls the LLM in blocking mode, +/// inspects output for tool calls, executes them, and re-enters. +/// +/// Returns the final `ResponsePayload` once `dispatch_tools` returns +/// `Done` or `Incomplete`. +/// +/// # Errors +/// +/// Returns `ExecutorError` if any step (execute, dispatch, persist) fails. +pub async fn execute_loop( + mut request: RequestPayload, + exec_ctx: Arc, + tool_ctx: &ToolContext, +) -> ExecutorResult { + for iteration in 0.. { + debug!(iteration, "execute_loop iteration"); + + let result = execute(request.clone(), Arc::clone(&exec_ctx)).await?; + + let payload = match result { + Either::Left(payload) => payload, + Either::Right(_stream) => { + // Streaming + tool dispatch requires StreamTee (future PR). + // For now, execute_loop only supports non-streaming. + // Callers should set stream=false when using execute_loop. + return Err(crate::executor::ExecutorError::InvalidRequest( + "execute_loop does not support streaming yet — set stream=false".into(), + )); + } + }; + + let decision = dispatch_tools(&payload.output, tool_ctx, iteration).await?; + + match decision { + LoopDecision::Done | LoopDecision::Incomplete(_) => { + if request.store { + let ch = exec_ctx.conv_handler.clone(); + let rh = exec_ctx.resp_handler.clone(); + let ctx = crate::executor::engine::rehydrate_conversation(request, &exec_ctx).await?; + if let Err(e) = persist_response(payload.clone(), ctx, ch, rh).await { + tracing::warn!("persist failed in execute_loop: {e}"); + } + } + return Ok(payload); + } + LoopDecision::Continue(tool_results) => { + debug!( + iteration, + results = tool_results.len(), + "tool results received, re-entering" + ); + // Inject tool results into the request input for next iteration. + // Use previous_response_id=None since we're managing state internally. + let mut items = match &request.input { + ResponsesInput::Items(existing) => existing.clone(), + ResponsesInput::Text(t) => { + vec![crate::types::io::InputItem::Message(crate::types::io::InputMessage { + role: "user".into(), + content: crate::types::io::InputMessageContent::Text(t.clone()), + })] + } + }; + items.extend(tool_results); + request.input = ResponsesInput::Items(items); + request.previous_response_id = None; + } + } + } + + unreachable!("loop is infinite with break via return") +} diff --git a/crates/agentic-core/src/executor/mod.rs b/crates/agentic-core/src/executor/mod.rs index 3025400..d3e463c 100644 --- a/crates/agentic-core/src/executor/mod.rs +++ b/crates/agentic-core/src/executor/mod.rs @@ -4,6 +4,7 @@ pub mod accumulator; pub mod dispatch; pub mod engine; pub mod error; +pub mod execute_loop; pub mod modes; pub mod request; pub mod tool_context; @@ -11,6 +12,7 @@ pub mod tool_context; pub use dispatch::{LoopDecision, dispatch_tools}; pub use engine::{BoxStream, call_inference, create_conversation, execute, persist_response, rehydrate_conversation}; pub use error::{ExecutorError, ExecutorResult}; +pub use execute_loop::execute_loop; pub use modes::{ConversationHandler, ResponseHandler}; pub use request::ExecutionContext; pub use request::RequestContext; From 84b80c46532104e62716f93cb05884b4750d0a44 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 12 Jun 2026 13:32:10 -0700 Subject: [PATCH 07/17] test: add execute_loop integration tests with mock LLM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 13 tests covering the full agentic loop orchestrator: P0 — streaming rejection: - Rejects stream=true with clear error, no LLM call made P1 — core loop paths: - No tool calls → returns directly after 1 LLM call - Persistence when store=true - Single tool call → dispatch → re-enter → text response - Parallel tool calls → both executed → re-enter - Mixed message + function_call → only FC dispatched - Max iterations → stops loop, returns last payload - Max iterations boundary (1 allows single dispatch) Failure cases: - Tool provider fails → error fed to model, loop continues - No providers configured → error per call_id - Empty model output → returns Done immediately - Multi-hop (3 iterations: tool A → tool B → text) - LLM server error → propagated as Err Uses existing TestFixture infrastructure (MockServer + SQLite). Signed-off-by: Ashwin Giridharan --- .../agentic-core/tests/execute_loop_test.rs | 535 ++++++++++++++++++ 1 file changed, 535 insertions(+) create mode 100644 crates/agentic-core/tests/execute_loop_test.rs diff --git a/crates/agentic-core/tests/execute_loop_test.rs b/crates/agentic-core/tests/execute_loop_test.rs new file mode 100644 index 0000000..2972526 --- /dev/null +++ b/crates/agentic-core/tests/execute_loop_test.rs @@ -0,0 +1,535 @@ +// Integration tests for execute_loop — the agentic loop orchestrator. +#![allow(clippy::doc_markdown)] + +mod support; + +use std::sync::Arc; + +use agentic_core::executor::{ExecutionContext, ExecutorError, ToolContext, execute_loop}; +use agentic_core::storage::{ConversationStore, ResponseStore}; +use agentic_core::tools::McpToolExecutor; +use agentic_core::types::io::{ResponsesInput, ToolChoice}; +use agentic_core::types::request_response::RequestPayload; +use support::{MockResponse, MockServer, setup_pool}; + +use std::future::Future; +use std::pin::Pin; + +// --- Mock tool implementations --- + +/// Returns a configured response for any tool call. +struct MockMcp { + response: String, +} + +impl MockMcp { + fn new(response: &str) -> Self { + Self { + response: response.to_string(), + } + } +} + +impl McpToolExecutor for MockMcp { + fn execute( + &self, + _tool_name: &str, + _arguments: &str, + _server_config: &serde_json::Value, + ) -> Pin> + Send + '_>> { + Box::pin(async { Ok(self.response.clone()) }) + } +} + +/// Always fails — simulates a crashed tool provider. +struct FailingMcp; + +impl McpToolExecutor for FailingMcp { + fn execute( + &self, + _tool_name: &str, + _arguments: &str, + _server_config: &serde_json::Value, + ) -> Pin> + Send + '_>> { + Box::pin(async { Err(ExecutorError::StreamError("MCP server unreachable".into())) }) + } +} + +// --- Helpers --- + +fn make_request(input: &str, stream: bool, store: bool) -> RequestPayload { + RequestPayload { + model: "test-model".to_string(), + input: ResponsesInput::Text(input.to_string()), + instructions: None, + previous_response_id: None, + conversation_id: None, + tools: None, + tool_choice: ToolChoice::Auto, + stream, + store, + include: None, + temperature: None, + top_p: None, + max_output_tokens: None, + truncation: None, + metadata: None, + } +} + +/// Build an ExecutionContext backed by a mock LLM server. +async fn build_exec_ctx(server: &MockServer) -> Arc { + let pool = setup_pool().await; + Arc::new(ExecutionContext::new( + agentic_core::executor::ConversationHandler::new(ConversationStore::new(pool.clone())), + agentic_core::executor::ResponseHandler::new(ResponseStore::new(pool)), + Arc::new(reqwest::Client::new()), + server.url().to_string(), + None, + )) +} + +/// Create a mock LLM response that contains only a text message. +fn text_llm_response(text: &str) -> MockResponse { + MockResponse::Json( + serde_json::json!({ + "id": "resp_mock_text", + "object": "response", + "created_at": 0, + "model": "test-model", + "status": "completed", + "output": [{ + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": text, "annotations": []}] + }], + "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15} + }) + .to_string(), + ) +} + +/// Create a mock LLM response that contains a function_call output item. +fn function_call_llm_response(name: &str, args: &str, call_id: &str) -> MockResponse { + MockResponse::Json( + serde_json::json!({ + "id": "resp_mock_fc", + "object": "response", + "created_at": 0, + "model": "test-model", + "status": "completed", + "output": [{ + "id": format!("fc_{call_id}"), + "type": "function_call", + "call_id": call_id, + "name": name, + "arguments": args, + "status": "completed" + }], + "usage": {"input_tokens": 10, "output_tokens": 8, "total_tokens": 18} + }) + .to_string(), + ) +} + +/// Create a mock LLM response with multiple function calls (parallel tool use). +fn parallel_function_calls_response() -> MockResponse { + MockResponse::Json( + serde_json::json!({ + "id": "resp_mock_parallel", + "object": "response", + "created_at": 0, + "model": "test-model", + "status": "completed", + "output": [ + { + "id": "fc_1", + "type": "function_call", + "call_id": "call_weather", + "name": "get_weather", + "arguments": "{\"city\":\"SF\"}", + "status": "completed" + }, + { + "id": "fc_2", + "type": "function_call", + "call_id": "call_time", + "name": "get_time", + "arguments": "{\"tz\":\"PST\"}", + "status": "completed" + } + ], + "usage": {"input_tokens": 10, "output_tokens": 12, "total_tokens": 22} + }) + .to_string(), + ) +} + +/// Create a mock LLM response with both a message AND a function call. +fn mixed_message_and_function_call_response() -> MockResponse { + MockResponse::Json( + serde_json::json!({ + "id": "resp_mock_mixed", + "object": "response", + "created_at": 0, + "model": "test-model", + "status": "completed", + "output": [ + { + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": "Let me check that for you.", "annotations": []}] + }, + { + "id": "fc_1", + "type": "function_call", + "call_id": "call_1", + "name": "lookup", + "arguments": "{\"q\":\"test\"}", + "status": "completed" + } + ], + "usage": {"input_tokens": 10, "output_tokens": 15, "total_tokens": 25} + }) + .to_string(), + ) +} + +// --- P0: Streaming rejection --- + +/// execute_loop only supports non-streaming (MVP). Passing stream=true should +/// return an immediate error without making any LLM calls. +#[tokio::test] +async fn test_rejects_streaming_request() { + let server = MockServer::start_deque(vec![text_llm_response("should not reach")]).await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext::default(); + + let request = make_request("hello", true, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("streaming"), + "error should mention streaming: {err}" + ); + + // Verify no LLM call was made + assert_eq!(server.request_bodies().await.len(), 0); +} + +// --- P1: No tools → single iteration → Done --- + +/// When the model responds with only text (no FunctionCall items), execute_loop +/// should return immediately after one iteration without re-entering. +#[tokio::test] +async fn test_no_tool_calls_returns_directly() { + let server = MockServer::start_deque(vec![text_llm_response("Hello world")]).await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext::default(); + + let request = make_request("hi", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 1, "should call LLM exactly once"); +} + +/// When store=true and no tool calls, the response should be persisted. +/// Verify by checking the DB has a response record after the loop. +#[tokio::test] +async fn test_no_tool_calls_persists_when_store_true() { + let server = MockServer::start_deque(vec![text_llm_response("Persisted response")]).await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext::default(); + + let request = make_request("save me", false, true); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + // If persist failed, we'd see a warning but not an error — the function still succeeds +} + +// --- P1: Tool call → re-enter → text response --- + +/// Model calls a tool on first iteration, gets result, produces text on second. +/// This is the core agentic loop path. +#[tokio::test] +async fn test_one_tool_call_then_text_response() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("get_weather", r#"{"city":"SF"}"#, "call_1"), + text_llm_response("The weather in SF is sunny, 72°F"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new(r#"{"temp":72,"condition":"sunny"}"#))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("What's the weather in SF?", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + // Final response is the text from iteration 2 + assert_eq!(result.status, "completed"); + + // LLM was called twice: once for initial request, once with tool results + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 2, "should call LLM exactly twice"); + + // Second request should contain the tool result in its input + let second_body = &bodies[1]; + let input_str = serde_json::to_string(&second_body["input"]).unwrap(); + assert!( + input_str.contains("function_call_output"), + "second request should have tool result: {input_str}" + ); + assert!(input_str.contains("call_1"), "should reference the original call_id"); +} + +/// Model calls two tools in parallel, gets both results, produces final text. +#[tokio::test] +async fn test_parallel_tool_calls_then_text() { + let server = MockServer::start_deque(vec![ + parallel_function_calls_response(), + text_llm_response("Weather is sunny and time is 10:30 AM"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("tool result"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("weather and time?", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 2); + + // Second request input should have 2 function_call_output items + let bodies = server.request_bodies().await; + let input_str = serde_json::to_string(&bodies[1]["input"]).unwrap(); + assert!(input_str.contains("call_weather"), "should have weather result"); + assert!(input_str.contains("call_time"), "should have time result"); +} + +/// Mixed output: message + function_call. Only the function_call triggers dispatch. +/// On second iteration, model returns final text. +#[tokio::test] +async fn test_mixed_message_and_function_call() { + let server = MockServer::start_deque(vec![ + mixed_message_and_function_call_response(), + text_llm_response("Here's what I found."), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("lookup result"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("find something", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 2); +} + +// --- P1: Max iterations --- + +/// When the model keeps returning tool calls and max_iterations is hit, +/// execute_loop should stop and return the last payload (not error). +#[tokio::test] +async fn test_max_iterations_stops_loop() { + // LLM always returns a function call — would loop forever without max_iterations + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + function_call_llm_response("tool", "{}", "c2"), + function_call_llm_response("tool", "{}", "c3"), + text_llm_response("should not reach this"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 2, // will stop after 2 iterations + ..ToolContext::default() + }; + + let request = make_request("loop forever", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + // Should have called LLM 3 times (iteration 0, 1, 2) then stopped at dispatch + // iteration 0: execute → FC → dispatch(iter=0) → Continue + // iteration 1: execute → FC → dispatch(iter=1) → Continue + // iteration 2: execute → FC → dispatch(iter=2) → Incomplete (2 >= 2) + assert_eq!(server.request_bodies().await.len(), 3); + + // Returns the last payload (the one from iteration 2) + assert_eq!(result.status, "completed"); +} + +/// max_iterations=1 means only 1 tool dispatch is allowed. +#[tokio::test] +async fn test_max_iterations_one_allows_single_dispatch() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + function_call_llm_response("tool", "{}", "c2"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 1, + ..ToolContext::default() + }; + + let request = make_request("once", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + // iteration 0: execute → FC → dispatch(iter=0) → Continue (0 < 1) + // iteration 1: execute → FC → dispatch(iter=1) → Incomplete (1 >= 1) + assert_eq!(server.request_bodies().await.len(), 2); + assert_eq!(result.status, "completed"); +} + +// --- P1: Tool failure doesn't kill the loop --- + +/// When a tool provider fails, the error becomes output that the model sees. +/// The loop continues and the model responds to the error gracefully. +#[tokio::test] +async fn test_tool_failure_feeds_error_to_model() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("broken_tool", "{}", "call_err"), + text_llm_response("Sorry, the tool failed. Here's what I know..."), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(FailingMcp)), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("try the broken tool", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 2); + + // The second request should contain the error string as tool output + let bodies = server.request_bodies().await; + let input_str = serde_json::to_string(&bodies[1]["input"]).unwrap(); + assert!(input_str.contains("error"), "should contain error output: {input_str}"); + assert!( + input_str.contains("MCP server unreachable"), + "should contain error message" + ); +} + +// --- Edge cases --- + +/// No tool providers configured at all — calls still produce error output +/// and the model handles it on the next iteration. +#[tokio::test] +async fn test_no_providers_configured() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("any_tool", "{}", "call_1"), + text_llm_response("I can't use tools right now"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + max_iterations: 10, + ..ToolContext::default() // no providers + }; + + let request = make_request("use a tool", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 2); + + // Error message about no provider should be in the second request + let bodies = server.request_bodies().await; + let input_str = serde_json::to_string(&bodies[1]["input"]).unwrap(); + assert!(input_str.contains("no tool provider configured")); +} + +/// Empty model output (no message, no function calls) — should return Done. +#[tokio::test] +async fn test_empty_model_output() { + let empty_response = MockResponse::Json( + serde_json::json!({ + "id": "resp_empty", + "object": "response", + "created_at": 0, + "model": "test-model", + "status": "completed", + "output": [], + "usage": null + }) + .to_string(), + ); + + let server = MockServer::start_deque(vec![empty_response]).await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext::default(); + + let request = make_request("silence", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert!(result.output.is_empty()); + assert_eq!(server.request_bodies().await.len(), 1); +} + +/// Multi-hop: model calls tool A, then uses result to call tool B, then responds. +/// Tests 3 iterations of the loop. +#[tokio::test] +async fn test_multi_hop_tool_calls() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("search", "cats", "call_search"), + function_call_llm_response("summarize", "cat article text", "call_summarize"), + text_llm_response("Cats are wonderful pets."), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("tool output"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("tell me about cats", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 3, "should make 3 LLM calls"); +} + +/// LLM returns an error (non-2xx) — execute_loop should propagate the error. +#[tokio::test] +async fn test_llm_returns_error() { + // The current MockServer always returns 200, so we use an empty queue + // which causes "mock queue exhausted" panic — simulating server failure. + let server = MockServer::start_deque(vec![]).await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext::default(); + + let request = make_request("fail", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await; + + // Should propagate the error (mock queue exhausted = panic in mock, caught as error) + // In practice this tests that execute_loop doesn't swallow execute() errors + assert!(result.is_err(), "should propagate LLM error"); +} From bb570e00292ecb88dd9a7ce8444ff6c9921a2166 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 12 Jun 2026 13:34:34 -0700 Subject: [PATCH 08/17] test: add cassette-driven execute_loop test from live vLLM MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Records a 2-turn tool-call session from google/gemma-4-26B-A4B-it (vLLM v0.21.0) and replays it through execute_loop: Turn 1: model calls get_weather → FunctionCall output Turn 2: model receives tool result → text response Cassette file: tests/cassettes/tool_loop/function-call-loop-vllm-gemma4.yaml Validates that execute_loop produces the same final text as the real model session (2 iterations, correct tool result injection). Signed-off-by: Ashwin Giridharan --- .../function-call-loop-vllm-gemma4.yaml | 43 ++++++++++ .../agentic-core/tests/execute_loop_test.rs | 82 +++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100644 crates/agentic-core/tests/cassettes/tool_loop/function-call-loop-vllm-gemma4.yaml diff --git a/crates/agentic-core/tests/cassettes/tool_loop/function-call-loop-vllm-gemma4.yaml b/crates/agentic-core/tests/cassettes/tool_loop/function-call-loop-vllm-gemma4.yaml new file mode 100644 index 0000000..1ad3f55 --- /dev/null +++ b/crates/agentic-core/tests/cassettes/tool_loop/function-call-loop-vllm-gemma4.yaml @@ -0,0 +1,43 @@ +# Recorded from google/gemma-4-26B-A4B-it via vLLM v0.21.0 +# Date: 2026-06-12 +# Scenario: User asks weather → model calls get_weather → tool returns result → model answers +# +# This cassette drives execute_loop through a complete 2-iteration tool loop: +# Iteration 0: model returns FunctionCall (get_weather) +# Iteration 1: model returns Message (final answer using tool result) + +model: google/gemma-4-26B-A4B-it +request: + input: "What is the weather in San Francisco?" + tools: + - type: function + name: get_weather + description: "Get current weather for a city" + parameters: + type: object + properties: + city: + type: string + required: ["city"] + tool_choice: required + stream: false + +tool_mock: + get_weather: '{"temperature": 72, "condition": "sunny", "humidity": 45}' + +expected: + iterations: 2 + final_text: "The current weather in San Francisco is sunny with a temperature of 72°F and 45% humidity." + function_call: + name: get_weather + arguments: '{"city": "San Francisco"}' + call_id: "chatcmpl-tool-a2bac67597e69338" + +turns: + - description: "Turn 1: model calls get_weather" + response: + body: '{"id":"resp_9ef6bc922b19eb44","created_at":1781296371,"incomplete_details":null,"instructions":null,"metadata":null,"model":"google/gemma-4-26B-A4B-it","object":"response","output":[{"arguments":"{\"city\": \"San Francisco\"}","call_id":"chatcmpl-tool-a2bac67597e69338","name":"get_weather","type":"function_call","id":"fc_a55c74f95b5c8fcd","namespace":null,"status":"completed"}],"parallel_tool_calls":true,"temperature":1.0,"tool_choice":"required","tools":[{"name":"get_weather","parameters":{"type":"object","properties":{"city":{"type":"string"}},"required":["city"]},"strict":null,"type":"function","defer_loading":null,"description":"Get current weather for a city"}],"top_p":0.95,"background":false,"max_output_tokens":32699,"max_tool_calls":null,"previous_response_id":null,"prompt":null,"reasoning":null,"service_tier":"auto","status":"completed","text":{"format":{"name":"tool_calling_response","schema":{"type":"array","minItems":1,"items":{"type":"object","anyOf":[{"properties":{"name":{"type":"string","enum":["get_weather"]},"parameters":{"type":"object","properties":{"city":{"type":"string"}},"required":["city"]}},"required":["name","parameters"]}]}},"type":"json_schema","description":null,"strict":true},"verbosity":null},"top_logprobs":null,"truncation":"disabled","usage":{"input_tokens":69,"input_tokens_details":{"cached_tokens":64,"input_tokens_per_turn":[],"cached_tokens_per_turn":[]},"output_tokens":21,"output_tokens_details":{"reasoning_tokens":0,"tool_output_tokens":0,"output_tokens_per_turn":[],"tool_output_tokens_per_turn":[]},"total_tokens":90},"user":null,"presence_penalty":0.0,"frequency_penalty":0.0,"kv_transfer_params":null,"input_messages":null,"output_messages":null}' + + - description: "Turn 2: model responds with final answer after receiving tool result" + response: + body: '{"id":"resp_a443bc5c4af1a46b","created_at":1781296379,"incomplete_details":null,"instructions":null,"metadata":null,"model":"google/gemma-4-26B-A4B-it","object":"response","output":[{"id":"msg_90c2c001ac5517da","content":[{"annotations":[],"text":"The current weather in San Francisco is sunny with a temperature of 72°F and 45% humidity.","type":"output_text","logprobs":null}],"role":"assistant","status":"completed","type":"message","phase":null}],"parallel_tool_calls":true,"temperature":1.0,"tool_choice":"none","tools":[],"top_p":0.95,"background":false,"max_output_tokens":32703,"max_tool_calls":null,"previous_response_id":null,"prompt":null,"reasoning":null,"service_tier":"auto","status":"completed","text":null,"top_logprobs":null,"truncation":"disabled","usage":{"input_tokens":65,"input_tokens_details":{"cached_tokens":64,"input_tokens_per_turn":[],"cached_tokens_per_turn":[]},"output_tokens":29,"output_tokens_details":{"reasoning_tokens":2,"tool_output_tokens":0,"output_tokens_per_turn":[],"tool_output_tokens_per_turn":[]},"total_tokens":94},"user":null,"presence_penalty":0.0,"frequency_penalty":0.0,"kv_transfer_params":null,"input_messages":null,"output_messages":null}' diff --git a/crates/agentic-core/tests/execute_loop_test.rs b/crates/agentic-core/tests/execute_loop_test.rs index 2972526..a8d8696 100644 --- a/crates/agentic-core/tests/execute_loop_test.rs +++ b/crates/agentic-core/tests/execute_loop_test.rs @@ -3,6 +3,8 @@ mod support; +use serde::Deserialize; + use std::sync::Arc; use agentic_core::executor::{ExecutionContext, ExecutorError, ToolContext, execute_loop}; @@ -533,3 +535,83 @@ async fn test_llm_returns_error() { // In practice this tests that execute_loop doesn't swallow execute() errors assert!(result.is_err(), "should propagate LLM error"); } + +// --- P2: Cassette-driven integration test (real vLLM output) --- + +const TOOL_LOOP_CASSETTE: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/cassettes/tool_loop"); + +#[derive(Deserialize)] +struct ToolLoopCassette { + turns: Vec, + expected: CassetteExpected, + tool_mock: std::collections::HashMap, +} + +#[derive(Deserialize)] +struct CassetteTurn { + response: CassetteTurnResponse, +} + +#[derive(Deserialize)] +struct CassetteTurnResponse { + body: String, +} + +#[derive(Deserialize)] +struct CassetteExpected { + iterations: usize, + final_text: String, +} + +/// Replays a recorded vLLM tool-call session through execute_loop. +/// Validates the loop produces the same final text as the real model. +#[tokio::test] +async fn test_cassette_tool_loop_vllm_gemma4() { + let path = format!("{TOOL_LOOP_CASSETTE}/function-call-loop-vllm-gemma4.yaml"); + let text = std::fs::read_to_string(&path).unwrap(); + let cassette: ToolLoopCassette = serde_yml::from_str(&text).unwrap(); + + // Build mock server with the recorded responses queued + let responses: Vec = cassette + .turns + .iter() + .map(|t| MockResponse::Json(t.response.body.clone())) + .collect(); + let server = MockServer::start_deque(responses).await; + let exec_ctx = build_exec_ctx(&server).await; + + // Build ToolContext with mock that returns the cassette's tool_mock value + let tool_response = cassette.tool_mock.get("get_weather").cloned().unwrap_or_default(); + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new(&tool_response))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("What is the weather in San Francisco?", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + // Verify the loop ran the expected number of iterations + let bodies = server.request_bodies().await; + assert_eq!( + bodies.len(), + cassette.expected.iterations, + "expected {} LLM calls, got {}", + cassette.expected.iterations, + bodies.len() + ); + + // Verify final response contains the expected text + assert_eq!(result.status, "completed"); + let output_text: String = result + .output + .iter() + .filter_map(|item| match item { + agentic_core::types::io::OutputItem::Message(msg) => { + Some(msg.content.iter().map(|c| c.text.as_str()).collect::()) + } + _ => None, + }) + .collect(); + assert_eq!(output_text, cassette.expected.final_text); +} From 9594f09ce839c99fe851725804d99ad45ef7ba58 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 12 Jun 2026 13:42:53 -0700 Subject: [PATCH 09/17] =?UTF-8?q?fix:=20address=20review=20round=201+2=20?= =?UTF-8?q?=E2=80=94=20loop=20guard,=20persistence,=20status?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 1 fixes: - Add MAX_LOOP_GUARD (128) — defense-in-depth hard cap - Remove broken persistence from execute_loop (caller handles it) - Capture original_previous_response_id, restore on final payload - Set payload.status = "incomplete" for Incomplete path - Set request.store = false for intermediate iterations Round 2 findings: - TODO: InputItem needs FunctionCall variant to inject assistant's tool-call items into context (type system limitation, follow-up PR) - Document store=false semantics and performance characteristics Signed-off-by: Ashwin Giridharan --- .../agentic-core/src/executor/execute_loop.rs | 63 +++++++++++++------ .../agentic-core/tests/execute_loop_test.rs | 10 ++- 2 files changed, 52 insertions(+), 21 deletions(-) diff --git a/crates/agentic-core/src/executor/execute_loop.rs b/crates/agentic-core/src/executor/execute_loop.rs index 77742e5..20da8f5 100644 --- a/crates/agentic-core/src/executor/execute_loop.rs +++ b/crates/agentic-core/src/executor/execute_loop.rs @@ -4,13 +4,16 @@ use either::Either; use tracing::debug; use crate::executor::dispatch::{LoopDecision, dispatch_tools}; -use crate::executor::engine::{execute, persist_response}; +use crate::executor::engine::execute; use crate::executor::error::ExecutorResult; use crate::executor::request::ExecutionContext; use crate::executor::tool_context::ToolContext; use crate::types::io::ResponsesInput; use crate::types::request_response::{RequestPayload, ResponsePayload}; +/// Hard cap on loop iterations — defense-in-depth independent of `tool_ctx.max_iterations`. +const MAX_LOOP_GUARD: usize = 128; + /// Run the agentic loop: execute → dispatch tools → re-enter until done. /// /// Non-streaming MVP: each iteration calls the LLM in blocking mode, @@ -19,25 +22,37 @@ use crate::types::request_response::{RequestPayload, ResponsePayload}; /// Returns the final `ResponsePayload` once `dispatch_tools` returns /// `Done` or `Incomplete`. /// +/// Performance note: `request` is cloned each iteration because `execute()` +/// takes ownership. The input grows with each tool result injection. This is +/// acceptable for MVP but should be optimized for long tool chains. +/// /// # Errors /// -/// Returns `ExecutorError` if any step (execute, dispatch, persist) fails. +/// Returns `ExecutorError` if any step (execute, dispatch) fails, or if +/// the loop guard is breached. pub async fn execute_loop( mut request: RequestPayload, exec_ctx: Arc, tool_ctx: &ToolContext, ) -> ExecutorResult { - for iteration in 0.. { + // Capture original previous_response_id before the loop mutates it. + // Restored on the final payload to maintain correct response chain. + let original_previous_response_id = request.previous_response_id.clone(); + + for iteration in 0_usize.. { + if iteration >= MAX_LOOP_GUARD { + return Err(crate::executor::ExecutorError::InvalidRequest(format!( + "execute_loop exceeded hard iteration cap ({MAX_LOOP_GUARD})" + ))); + } + debug!(iteration, "execute_loop iteration"); let result = execute(request.clone(), Arc::clone(&exec_ctx)).await?; - let payload = match result { + let mut payload = match result { Either::Left(payload) => payload, Either::Right(_stream) => { - // Streaming + tool dispatch requires StreamTee (future PR). - // For now, execute_loop only supports non-streaming. - // Callers should set stream=false when using execute_loop. return Err(crate::executor::ExecutorError::InvalidRequest( "execute_loop does not support streaming yet — set stream=false".into(), )); @@ -47,15 +62,18 @@ pub async fn execute_loop( let decision = dispatch_tools(&payload.output, tool_ctx, iteration).await?; match decision { - LoopDecision::Done | LoopDecision::Incomplete(_) => { - if request.store { - let ch = exec_ctx.conv_handler.clone(); - let rh = exec_ctx.resp_handler.clone(); - let ctx = crate::executor::engine::rehydrate_conversation(request, &exec_ctx).await?; - if let Err(e) = persist_response(payload.clone(), ctx, ch, rh).await { - tracing::warn!("persist failed in execute_loop: {e}"); - } - } + LoopDecision::Done => { + // Restore original previous_response_id for correct chain persistence. + payload.previous_response_id = original_previous_response_id; + // Persistence is handled by the caller (server handler) which has + // the full RequestContext. execute_loop returns the payload only. + return Ok(payload); + } + LoopDecision::Incomplete(reason) => { + debug!(iteration, %reason, "loop incomplete"); + // Mark the payload as incomplete so the caller/client knows. + payload.status = "incomplete".to_string(); + payload.previous_response_id = original_previous_response_id; return Ok(payload); } LoopDecision::Continue(tool_results) => { @@ -64,8 +82,6 @@ pub async fn execute_loop( results = tool_results.len(), "tool results received, re-entering" ); - // Inject tool results into the request input for next iteration. - // Use previous_response_id=None since we're managing state internally. let mut items = match &request.input { ResponsesInput::Items(existing) => existing.clone(), ResponsesInput::Text(t) => { @@ -75,12 +91,21 @@ pub async fn execute_loop( })] } }; + // TODO: Append assistant's FunctionCall output items to context. + // The Responses API wire format includes `type: "function_call"` in + // input, but InputItem doesn't have that variant yet. Adding it + // requires a type system change (follow-up PR). For now the model + // only sees function_call_output results, not its own tool requests. items.extend(tool_results); request.input = ResponsesInput::Items(items); + // Clear previous_response_id — we're managing context internally. request.previous_response_id = None; + // Suppress persistence for intermediate iterations — only the + // final response should be stored. + request.store = false; } } } - unreachable!("loop is infinite with break via return") + unreachable!("loop exits via return") } diff --git a/crates/agentic-core/tests/execute_loop_test.rs b/crates/agentic-core/tests/execute_loop_test.rs index a8d8696..885af2a 100644 --- a/crates/agentic-core/tests/execute_loop_test.rs +++ b/crates/agentic-core/tests/execute_loop_test.rs @@ -377,7 +377,10 @@ async fn test_max_iterations_stops_loop() { assert_eq!(server.request_bodies().await.len(), 3); // Returns the last payload (the one from iteration 2) - assert_eq!(result.status, "completed"); + assert_eq!( + result.status, "incomplete", + "should be marked incomplete when max iterations hit" + ); } /// max_iterations=1 means only 1 tool dispatch is allowed. @@ -401,7 +404,10 @@ async fn test_max_iterations_one_allows_single_dispatch() { // iteration 0: execute → FC → dispatch(iter=0) → Continue (0 < 1) // iteration 1: execute → FC → dispatch(iter=1) → Incomplete (1 >= 1) assert_eq!(server.request_bodies().await.len(), 2); - assert_eq!(result.status, "completed"); + assert_eq!( + result.status, "incomplete", + "should be marked incomplete when max iterations hit" + ); } // --- P1: Tool failure doesn't kill the loop --- From b4dc7ec4ef2e28ccf7ac53a2c2d391bdebe939ed Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 12 Jun 2026 13:51:11 -0700 Subject: [PATCH 10/17] feat: add tool_timeout + additional test coverage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add tool_timeout (Duration, default 30s) to ToolContext - Wrap each tool execution in tokio::time::timeout — hung providers produce timeout error string per call_id (not total failure) - Add test: Items input extended correctly on Continue - Add test: previous_response_id=None preserved through loop - Add test: cassette-driven loop from live vLLM (P2) Signed-off-by: Ashwin Giridharan --- .../agentic-core/src/executor/tool_context.rs | 18 +++- .../agentic-core/tests/execute_loop_test.rs | 87 +++++++++++++++++++ 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/crates/agentic-core/src/executor/tool_context.rs b/crates/agentic-core/src/executor/tool_context.rs index 2071cf3..6d53a63 100644 --- a/crates/agentic-core/src/executor/tool_context.rs +++ b/crates/agentic-core/src/executor/tool_context.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use crate::executor::ExecutorError; use crate::tools::{McpToolExecutor, VectorStoreClient, WebSearchProvider}; @@ -14,6 +15,10 @@ pub struct ToolContext { pub web_search: Option>, pub vector_store: Option>, pub max_iterations: usize, + /// Per-tool-call timeout. If a provider takes longer than this, the call + /// produces a timeout error string (not a total dispatch failure). + /// `Duration::ZERO` disables the timeout. + pub tool_timeout: Duration, } impl Default for ToolContext { @@ -23,6 +28,7 @@ impl Default for ToolContext { web_search: None, vector_store: None, max_iterations: 10, + tool_timeout: Duration::from_secs(30), } } } @@ -49,7 +55,17 @@ impl ToolContext { } async fn execute_one(&self, call: &FunctionToolCall) -> InputItem { - let result = self.route_call(call).await; + let result = if self.tool_timeout.is_zero() { + self.route_call(call).await + } else { + match tokio::time::timeout(self.tool_timeout, self.route_call(call)).await { + Ok(r) => r, + Err(_elapsed) => Err(ExecutorError::StreamError(format!( + "tool '{}' timed out after {:?}", + call.name, self.tool_timeout + ))), + } + }; let output = match result { Ok(s) => s, diff --git a/crates/agentic-core/tests/execute_loop_test.rs b/crates/agentic-core/tests/execute_loop_test.rs index 885af2a..d77f627 100644 --- a/crates/agentic-core/tests/execute_loop_test.rs +++ b/crates/agentic-core/tests/execute_loop_test.rs @@ -621,3 +621,90 @@ async fn test_cassette_tool_loop_vllm_gemma4() { .collect(); assert_eq!(output_text, cassette.expected.final_text); } + +// --- Additional coverage tests --- + +/// When the request already has Items input (not Text), the loop should +/// extend the existing items with tool results on Continue. +#[tokio::test] +async fn test_items_input_extended_correctly() { + use agentic_core::types::io::{InputItem, InputMessage, InputMessageContent}; + + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "call_1"), + text_llm_response("done"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("result"))), + max_iterations: 10, + ..ToolContext::default() + }; + + // Start with Items input (not Text) + let request = RequestPayload { + model: "test-model".to_string(), + input: ResponsesInput::Items(vec![InputItem::Message(InputMessage { + role: "user".into(), + content: InputMessageContent::Text("hello from items".into()), + })]), + instructions: None, + previous_response_id: None, + conversation_id: None, + tools: None, + tool_choice: ToolChoice::Auto, + stream: false, + store: false, + include: None, + temperature: None, + top_p: None, + max_output_tokens: None, + truncation: None, + metadata: None, + }; + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + assert_eq!(result.status, "completed"); + + // Second request should have: original item + tool result + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 2); + let second_input = &bodies[1]["input"]; + let input_array = second_input.as_array().unwrap(); + // Should have at least 2 items: original message + function_call_output + assert!( + input_array.len() >= 2, + "expected at least 2 items, got {}", + input_array.len() + ); +} + +/// Verify that previous_response_id=None on input produces None on output payload. +/// (Testing with Some(id) requires a seeded DB — deferred to full integration test.) +#[tokio::test] +async fn test_previous_response_id_none_preserved() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + text_llm_response("done"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("test", false, false); + // request.previous_response_id is None + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + // After the loop mutated previous_response_id internally (set to None), + // the original value (None) should be restored on the payload. + assert_eq!( + result.previous_response_id, None, + "should preserve original None previous_response_id" + ); +} From 9261e787c57a689db3a10d1363a31d090b825570 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 12 Jun 2026 13:53:56 -0700 Subject: [PATCH 11/17] feat: add inference timeout to execute_loop Wraps each execute() call in tokio::time::timeout using exec_ctx.streaming_timeout (default 30s). If the LLM hangs, produces a clear error with iteration number. Duration::ZERO disables the timeout (same pattern as streaming_timeout and tool_timeout). Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/executor/execute_loop.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/crates/agentic-core/src/executor/execute_loop.rs b/crates/agentic-core/src/executor/execute_loop.rs index 20da8f5..690615c 100644 --- a/crates/agentic-core/src/executor/execute_loop.rs +++ b/crates/agentic-core/src/executor/execute_loop.rs @@ -48,7 +48,18 @@ pub async fn execute_loop( debug!(iteration, "execute_loop iteration"); - let result = execute(request.clone(), Arc::clone(&exec_ctx)).await?; + let inference_timeout = exec_ctx.streaming_timeout; + let result = if inference_timeout.is_zero() { + execute(request.clone(), Arc::clone(&exec_ctx)).await? + } else { + tokio::time::timeout(inference_timeout, execute(request.clone(), Arc::clone(&exec_ctx))) + .await + .map_err(|_| { + crate::executor::ExecutorError::StreamError(format!( + "LLM inference timed out after {inference_timeout:?} on iteration {iteration}" + )) + })?? + }; let mut payload = match result { Either::Left(payload) => payload, From 13a8812cb8c39462b1f7d57595a14fcc7048e736 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 12 Jun 2026 14:01:27 -0700 Subject: [PATCH 12/17] docs: add detailed code comments to dispatch, tool_context, execute_loop - execute_loop.rs: module-level architecture diagram, contract docs, timeout documentation, known limitations, inline step explanations - dispatch.rs: module-level decision flow, step-by-step inline comments, error semantics documentation - tool_context.rs: design opinions, concurrency model, failure model, retry policy, routing explanation with follow-up notes All doc comments pass clippy::doc_markdown. Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/executor/dispatch.rs | 60 ++++++++-- .../agentic-core/src/executor/execute_loop.rs | 111 ++++++++++++++---- .../agentic-core/src/executor/tool_context.rs | 100 +++++++++++++--- 3 files changed, 224 insertions(+), 47 deletions(-) diff --git a/crates/agentic-core/src/executor/dispatch.rs b/crates/agentic-core/src/executor/dispatch.rs index ea228fb..d18f77a 100644 --- a/crates/agentic-core/src/executor/dispatch.rs +++ b/crates/agentic-core/src/executor/dispatch.rs @@ -1,27 +1,65 @@ +//! Tool dispatch decision logic. +//! +//! This module is STATELESS — it inspects the model's output items, decides +//! whether to execute tools, and returns a decision. It does not manage loop +//! state, persistence, or re-entry. That's `execute_loop`'s job. +//! +//! Decision flow: +//! ```text +//! output items → filter FunctionCall → empty? → Done +//! → iteration >= max? → Incomplete +//! → execute all → Continue(results) +//! ``` + use crate::executor::error::ExecutorResult; use crate::executor::tool_context::ToolContext; use crate::types::io::{InputItem, OutputItem}; /// Decision returned by [`dispatch_tools`] to drive the agentic loop. +/// +/// `#[non_exhaustive]` allows adding variants (e.g. `Partial` for mixed +/// gateway + client tools) without breaking downstream match arms. #[derive(Debug)] #[non_exhaustive] pub enum LoopDecision { - /// Gateway-executed tools returned results. Inject into input and re-infer. + /// Gateway-executed tools returned results. Caller should inject these + /// `InputItem::FunctionCallOutput` items into the request and re-infer. Continue(Vec), - /// No tool calls found, or only client-side functions. Return response to client. + + /// No tool calls found in output, OR only client-side functions present. + /// Caller should return the response to the client as-is. + /// If `FunctionCall` items exist in output with this decision, the response + /// status should be `requires_action` (client executes them externally). Done, - /// Max iterations reached or unrecoverable tool failure. + + /// Max iterations reached. The model wanted to call more tools but we're + /// cutting it off to prevent runaway loops. The reason string is included + /// for logging/debugging. Caller should set `payload.status = "incomplete"`. Incomplete(String), } /// Inspect executor output for function calls and dispatch them via [`ToolContext`]. /// -/// Returns [`LoopDecision`] indicating whether the caller should re-enter -/// inference (Continue), return the response (Done), or mark incomplete. +/// # Decision Logic +/// +/// 1. Filter `OutputItem::FunctionCall` items from `output` +/// 2. If none found → `Done` (model produced only text/messages) +/// 3. If `iteration >= tool_ctx.max_iterations` → `Incomplete` (safety cap) +/// 4. Otherwise → execute all calls via `tool_ctx.execute_all()` → `Continue` +/// +/// # MVP Routing +/// +/// Currently ALL `FunctionCall` items are treated as gateway-executable (routed +/// to MCP/`web_search`/`vector_store` in priority order). The distinction between +/// client-side functions (`type: "function"` in the request) and gateway-executed +/// tools (`type: "mcp"`) requires access to the request's tools array — deferred +/// to a follow-up PR that changes this function's signature. +/// +/// # Error Semantics /// -/// For MVP, all `FunctionCall` items are treated as gateway-executable. -/// Client-side function routing (checking the request's `tools` array) is -/// deferred to a follow-up PR. +/// - Individual tool failures → error JSON string in the result (model sees it) +/// - This function only returns `Err` on internal/structural failures +/// - The function itself NEVER panics /// /// # Errors /// @@ -33,6 +71,7 @@ pub async fn dispatch_tools( tool_ctx: &ToolContext, iteration: usize, ) -> ExecutorResult { + // Step 1: Extract FunctionCall items. Messages and Unknown are ignored. let function_calls: Vec<_> = output .iter() .filter_map(|item| match item { @@ -41,10 +80,13 @@ pub async fn dispatch_tools( }) .collect(); + // Step 2: No tool calls → model is done generating. if function_calls.is_empty() { return Ok(LoopDecision::Done); } + // Step 3: Safety cap — prevent infinite tool loops. + // This fires BEFORE execution, so no work is wasted on the capped iteration. if iteration >= tool_ctx.max_iterations { return Ok(LoopDecision::Incomplete(format!( "max tool iterations reached ({iteration}/{})", @@ -52,6 +94,8 @@ pub async fn dispatch_tools( ))); } + // Step 4: Execute all tool calls in parallel and return results. + // Individual failures produce error strings (not Err), so this always succeeds. let results = tool_ctx.execute_all(&function_calls).await; Ok(LoopDecision::Continue(results)) } diff --git a/crates/agentic-core/src/executor/execute_loop.rs b/crates/agentic-core/src/executor/execute_loop.rs index 690615c..35a3510 100644 --- a/crates/agentic-core/src/executor/execute_loop.rs +++ b/crates/agentic-core/src/executor/execute_loop.rs @@ -1,3 +1,27 @@ +//! Agentic loop orchestrator. +//! +//! Composes `execute()` (LLM inference) with `dispatch_tools()` (tool routing) +//! in a loop that continues until the model stops producing tool calls. +//! +//! Architecture: +//! ```text +//! ┌─────────────────────────────────────────────────────────┐ +//! │ execute_loop │ +//! │ │ +//! │ for each iteration: │ +//! │ 1. execute(request) → ResponsePayload │ +//! │ 2. dispatch_tools(output) → LoopDecision │ +//! │ 3. if Continue: inject results, goto 1 │ +//! │ if Done/Incomplete: return payload │ +//! └─────────────────────────────────────────────────────────┘ +//! ``` +//! +//! Timeout budget per iteration: +//! - LLM inference: `exec_ctx.streaming_timeout` (default 30s) +//! - Each tool call: `tool_ctx.tool_timeout` (default 30s) +//! - Hard loop cap: `MAX_LOOP_GUARD` (128 iterations) +//! - Soft tool cap: `tool_ctx.max_iterations` (default 10) + use std::sync::Arc; use either::Either; @@ -11,35 +35,59 @@ use crate::executor::tool_context::ToolContext; use crate::types::io::ResponsesInput; use crate::types::request_response::{RequestPayload, ResponsePayload}; -/// Hard cap on loop iterations — defense-in-depth independent of `tool_ctx.max_iterations`. +/// Defense-in-depth hard cap, independent of `tool_ctx.max_iterations`. +/// Prevents infinite loops even if dispatch logic has a bug. +/// Set high enough to never trigger in normal operation (`max_iterations`=10 +/// would stop far earlier), but low enough to catch runaway loops quickly. const MAX_LOOP_GUARD: usize = 128; /// Run the agentic loop: execute → dispatch tools → re-enter until done. /// -/// Non-streaming MVP: each iteration calls the LLM in blocking mode, -/// inspects output for tool calls, executes them, and re-enters. +/// # Contract +/// +/// - **Caller provides:** request, execution context (LLM + DB), tool context (providers) +/// - **This function returns:** the final `ResponsePayload` (caller persists it) +/// - **Persistence:** NOT done here. Caller (server handler) owns persistence because +/// it has the full `RequestContext` with correct `new_input_items`. We set +/// `request.store = false` for intermediate iterations to suppress the internal +/// `execute()` from persisting partial state. +/// - **Response chain:** `previous_response_id` on the returned payload reflects the +/// ORIGINAL caller-supplied value, not the internal mutations. +/// +/// # Timeouts /// -/// Returns the final `ResponsePayload` once `dispatch_tools` returns -/// `Done` or `Incomplete`. +/// - Each `execute()` call is wrapped in `tokio::time::timeout(exec_ctx.streaming_timeout)` +/// - Each tool call is wrapped in `tokio::time::timeout(tool_ctx.tool_timeout)` +/// - `Duration::ZERO` on either disables that timeout (provider manages its own) /// -/// Performance note: `request` is cloned each iteration because `execute()` -/// takes ownership. The input grows with each tool result injection. This is -/// acceptable for MVP but should be optimized for long tool chains. +/// # Known Limitations (MVP) +/// +/// - Non-streaming only. `stream=true` returns an immediate error. +/// - `request.clone()` every iteration is O(n) in accumulated input size. +/// - `InputItem` lacks a `FunctionCall` variant, so the assistant's tool-call +/// items are not injected into context (the model doesn't see its own calls). +/// Follow-up PR needed to add the variant. /// /// # Errors /// -/// Returns `ExecutorError` if any step (execute, dispatch) fails, or if -/// the loop guard is breached. +/// Returns `ExecutorError` if: +/// - LLM inference fails or times out +/// - Tool dispatch encounters a fatal error (individual tool failures are NOT fatal) +/// - `stream=true` is passed +/// - Hard loop guard is breached pub async fn execute_loop( mut request: RequestPayload, exec_ctx: Arc, tool_ctx: &ToolContext, ) -> ExecutorResult { - // Capture original previous_response_id before the loop mutates it. - // Restored on the final payload to maintain correct response chain. + // Capture before the loop mutates the request. Restored on final payload + // so the response chain (previous_response_id linkage) remains correct + // for the caller's persistence logic. let original_previous_response_id = request.previous_response_id.clone(); for iteration in 0_usize.. { + // Defense-in-depth: even if dispatch_tools has a bug that never returns + // Incomplete, we won't loop forever. if iteration >= MAX_LOOP_GUARD { return Err(crate::executor::ExecutorError::InvalidRequest(format!( "execute_loop exceeded hard iteration cap ({MAX_LOOP_GUARD})" @@ -48,6 +96,9 @@ pub async fn execute_loop( debug!(iteration, "execute_loop iteration"); + // --- Step 1: Call the LLM --- + // Timeout prevents hanging on unresponsive LLM backends. + // Duration::ZERO = no timeout (provider/reqwest manages its own). let inference_timeout = exec_ctx.streaming_timeout; let result = if inference_timeout.is_zero() { execute(request.clone(), Arc::clone(&exec_ctx)).await? @@ -61,6 +112,8 @@ pub async fn execute_loop( })?? }; + // execute() returns Either. + // We only support non-streaming in execute_loop (streaming requires StreamTee). let mut payload = match result { Either::Left(payload) => payload, Either::Right(_stream) => { @@ -70,31 +123,39 @@ pub async fn execute_loop( } }; + // --- Step 2: Inspect output for tool calls --- + // dispatch_tools filters FunctionCall items, executes them via ToolContext, + // and returns a decision: Continue (with results), Done, or Incomplete. let decision = dispatch_tools(&payload.output, tool_ctx, iteration).await?; match decision { + // No tool calls (or only client-side functions) — we're done. LoopDecision::Done => { - // Restore original previous_response_id for correct chain persistence. payload.previous_response_id = original_previous_response_id; - // Persistence is handled by the caller (server handler) which has - // the full RequestContext. execute_loop returns the payload only. return Ok(payload); } + // Hit max_iterations — stop looping, mark as incomplete. + // The model may have wanted to call more tools, but we're cutting it off. LoopDecision::Incomplete(reason) => { debug!(iteration, %reason, "loop incomplete"); - // Mark the payload as incomplete so the caller/client knows. payload.status = "incomplete".to_string(); payload.previous_response_id = original_previous_response_id; return Ok(payload); } + // Tools were executed — inject results and re-enter inference. LoopDecision::Continue(tool_results) => { debug!( iteration, results = tool_results.len(), "tool results received, re-entering" ); + + // Build the input for the next iteration: + // existing context + tool results appended. let mut items = match &request.input { + // Already structured items — clone and extend. ResponsesInput::Items(existing) => existing.clone(), + // First iteration with plain text — convert to structured item. ResponsesInput::Text(t) => { vec![crate::types::io::InputItem::Message(crate::types::io::InputMessage { role: "user".into(), @@ -102,21 +163,31 @@ pub async fn execute_loop( })] } }; + // TODO: Append assistant's FunctionCall output items to context. // The Responses API wire format includes `type: "function_call"` in // input, but InputItem doesn't have that variant yet. Adding it // requires a type system change (follow-up PR). For now the model // only sees function_call_output results, not its own tool requests. + // This may cause the model to re-call tools or behave unexpectedly + // in complex multi-hop scenarios. + + // Append tool execution results (function_call_output items). items.extend(tool_results); request.input = ResponsesInput::Items(items); - // Clear previous_response_id — we're managing context internally. + + // Clear previous_response_id — on re-entry, we don't want execute() + // to rehydrate from DB (we're managing context in-memory via items). request.previous_response_id = None; - // Suppress persistence for intermediate iterations — only the - // final response should be stored. + + // Suppress persistence for intermediate iterations. Only the final + // response (returned to caller) should be persisted. If execute() + // tried to persist each intermediate response, we'd get N partial + // records in the DB instead of 1 complete one. request.store = false; } } } - unreachable!("loop exits via return") + unreachable!("loop exits via return in Done/Incomplete/guard arms") } diff --git a/crates/agentic-core/src/executor/tool_context.rs b/crates/agentic-core/src/executor/tool_context.rs index 6d53a63..5422291 100644 --- a/crates/agentic-core/src/executor/tool_context.rs +++ b/crates/agentic-core/src/executor/tool_context.rs @@ -1,3 +1,17 @@ +//! Tool execution context and provider routing. +//! +//! `ToolContext` is the runtime container for tool providers. It holds +//! references to MCP servers, web search backends, and vector stores. +//! When `dispatch_tools` decides to execute tool calls, it delegates here. +//! +//! Design opinions: +//! - Each provider is `Option>` — missing providers produce errors, not panics +//! - Execution is parallel via `join_all` — all tool calls in one response run concurrently +//! - Individual failures are isolated — one hung tool doesn't block others (each has a timeout) +//! - The error output format is JSON (`{"error": "..."}`) — the model can parse and react to it +//! - Routing is MVP priority-order, NOT by tool type. Real routing requires the request's +//! tools array (follow-up PR changes `dispatch_tools` signature) + use std::sync::Arc; use std::time::Duration; @@ -5,19 +19,37 @@ use crate::executor::ExecutorError; use crate::tools::{McpToolExecutor, VectorStoreClient, WebSearchProvider}; use crate::types::io::{FunctionToolCall, FunctionToolResultMessage, InputItem}; -/// Holds references to tool execution providers. +/// Runtime configuration for tool execution. +/// +/// Constructed once at server startup (or per-request if tools vary) +/// and passed into `execute_loop` / `dispatch_tools`. /// -/// Passed into [`dispatch_tools`](super::dispatch::dispatch_tools) to resolve -/// and execute tool calls. Each provider is optional — calls to unconfigured -/// providers produce an error result (not a panic). +/// # Defaults +/// +/// - `max_iterations`: 10 (soft cap checked by `dispatch_tools`) +/// - `tool_timeout`: 30s per individual tool call +/// - All providers: None (calls produce "no provider configured" errors) pub struct ToolContext { + /// MCP tool executor — connects to external MCP servers. + /// Used for user-defined tools declared as `type: "mcp"` in the request. pub mcp: Option>, + + /// Web search provider (e.g., Brave, Google). + /// Used for the built-in `web_search` tool type. pub web_search: Option>, + + /// Vector store client (e.g., OGX). + /// Used for the built-in `file_search` tool type. pub vector_store: Option>, + + /// Maximum number of tool dispatch iterations before returning Incomplete. + /// This is a SOFT cap — `dispatch_tools` checks `iteration >= max_iterations`. + /// The HARD cap is ``MAX_LOOP_GUARD`` in `execute_loop`.rs (128). pub max_iterations: usize, + /// Per-tool-call timeout. If a provider takes longer than this, the call /// produces a timeout error string (not a total dispatch failure). - /// `Duration::ZERO` disables the timeout. + /// `Duration::ZERO` disables the timeout — use when providers manage their own. pub tool_timeout: Duration, } @@ -34,27 +66,45 @@ impl Default for ToolContext { } impl ToolContext { - /// Execute all tool calls concurrently via `join_all`. + /// Execute all tool calls concurrently via `futures::future::join_all`. + /// + /// # Concurrency Model /// - /// Concurrency note: futures run on the tokio runtime's thread pool. - /// True parallelism depends on the runtime being multi-threaded. + /// All calls start immediately and run in parallel on tokio's thread pool. + /// `join_all` awaits ALL futures before returning — wall-clock time is + /// bounded by the slowest individual call (not sum of all calls). /// - /// Failure model: individual failures produce an error JSON string as the - /// tool output for that `call_id` — the dispatch does NOT fail as a whole. - /// The model sees the error and decides whether to retry (next iteration), - /// try a different approach, or answer without that result. + /// With `tool_timeout = 30s` and N calls, worst case is 30s total (not N×30s). /// - /// Retry policy: this layer does NOT retry. Providers handle their own - /// retries internally (transient network errors, 503s, etc.). By the time - /// an error reaches here, the provider already exhausted its retry budget. - /// The agentic loop itself serves as a higher-level retry — the model can - /// re-call a failed tool on the next iteration if it chooses to. + /// # Failure Model + /// + /// Individual failures produce an error JSON string as the tool output for + /// that `call_id`. The dispatch does NOT fail as a whole. This matches the + /// Responses API behavior where partial tool results are acceptable. + /// + /// The model sees `{"error": "..."}` as the tool output and decides: + /// - Retry the tool on the next iteration + /// - Answer without that result + /// - Try a different approach + /// + /// # Retry Policy + /// + /// This layer does NOT retry. Providers handle their own retries internally + /// (transient network errors, 503s, etc.). By the time an error reaches here, + /// the provider already exhausted its retry budget. The agentic loop itself + /// serves as a higher-level retry — the model can re-call a failed tool on + /// the next iteration if it chooses to. pub async fn execute_all(&self, calls: &[&FunctionToolCall]) -> Vec { let futures: Vec<_> = calls.iter().map(|call| self.execute_one(call)).collect(); futures::future::join_all(futures).await } + /// Execute a single tool call with timeout protection. + /// + /// Always returns an `InputItem::FunctionCallOutput` — either with the real + /// result or with an error JSON string. Never panics, never returns Err. async fn execute_one(&self, call: &FunctionToolCall) -> InputItem { + // Apply per-call timeout. Duration::ZERO = no timeout (opt-out). let result = if self.tool_timeout.is_zero() { self.route_call(call).await } else { @@ -67,6 +117,9 @@ impl ToolContext { } }; + // Convert Result → String (error becomes JSON). + // Using serde_json::json! ensures proper escaping of error messages + // that might contain quotes, newlines, or other special characters. let output = match result { Ok(s) => s, Err(e) => serde_json::json!({"error": e.to_string()}).to_string(), @@ -78,8 +131,17 @@ impl ToolContext { }) } - /// MVP routing: tries providers in order (MCP → `web_search` → `vector_store`). - /// Future: route based on tool type from the request's tools array. + /// Route a tool call to the appropriate provider. + /// + /// MVP: Priority-order routing. Tries MCP first (most general), then + /// `web_search`, then `vector_store`. First configured provider wins. + /// + /// This is intentionally simple — real routing needs the request's `tools` + /// array to distinguish `type: "function"` (client-side) from `type: "mcp"` + /// (gateway-side). That requires changing `dispatch_tools`'s signature to + /// accept the tools array, which is a follow-up PR. + /// + /// When no provider is configured, returns a clear error that the model sees. async fn route_call(&self, call: &FunctionToolCall) -> Result { if let Some(mcp) = &self.mcp { return mcp.execute(&call.name, &call.arguments, &serde_json::Value::Null).await; From a95b6142967dc295fbbd1abf6787de6b031cbea9 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Fri, 12 Jun 2026 14:06:55 -0700 Subject: [PATCH 13/17] fix: suppress persistence on first iteration, populate incomplete_details Round 3 review fixes: - Move request.store=false BEFORE the loop (was inside Continue arm, too late for first iteration). Prevents persisting partial responses containing only tool-call output without final answer. - Populate payload.incomplete_details with the reason string when max_iterations is hit. Clients need this per Responses API spec. Signed-off-by: Ashwin Giridharan --- .../agentic-core/src/executor/execute_loop.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/agentic-core/src/executor/execute_loop.rs b/crates/agentic-core/src/executor/execute_loop.rs index 35a3510..1010f3e 100644 --- a/crates/agentic-core/src/executor/execute_loop.rs +++ b/crates/agentic-core/src/executor/execute_loop.rs @@ -85,6 +85,12 @@ pub async fn execute_loop( // for the caller's persistence logic. let original_previous_response_id = request.previous_response_id.clone(); + // Suppress persistence for ALL iterations inside the loop. The caller + // (server handler) owns final persistence with the correct RequestContext. + // Without this, the first iteration would persist a partial response + // (containing only the tool-call output, not the final answer) to the DB. + request.store = false; + for iteration in 0_usize.. { // Defense-in-depth: even if dispatch_tools has a bug that never returns // Incomplete, we won't loop forever. @@ -136,9 +142,12 @@ pub async fn execute_loop( } // Hit max_iterations — stop looping, mark as incomplete. // The model may have wanted to call more tools, but we're cutting it off. + // Attach the reason to incomplete_details so the client knows why. LoopDecision::Incomplete(reason) => { debug!(iteration, %reason, "loop incomplete"); payload.status = "incomplete".to_string(); + payload.incomplete_details = + Some(crate::types::request_response::IncompleteDetails { reason: Some(reason) }); payload.previous_response_id = original_previous_response_id; return Ok(payload); } @@ -179,12 +188,8 @@ pub async fn execute_loop( // Clear previous_response_id — on re-entry, we don't want execute() // to rehydrate from DB (we're managing context in-memory via items). request.previous_response_id = None; - - // Suppress persistence for intermediate iterations. Only the final - // response (returned to caller) should be persisted. If execute() - // tried to persist each intermediate response, we'd get N partial - // records in the DB instead of 1 complete one. - request.store = false; + // request.store already set to false before the loop — no need + // to re-set here. } } } From bebda591b18dc3bc6d63199c45681e3745fe9e6a Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 16 Jun 2026 12:01:54 -0700 Subject: [PATCH 14/17] =?UTF-8?q?refactor:=20address=20review=20feedback?= =?UTF-8?q?=20=E2=80=94=20imports,=20in-place=20mutation,=20lazy=20alloc?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Move types to top-level imports (ExecutorError, InputItem, InputMessage, etc.) - Mutate request.input in-place instead of clone→extend→reassign each iteration - Use peekable iterator in dispatch to skip Vec allocation on early-exit paths - Pass iterator directly to join_all, removing intermediate Vec in execute_all Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/executor/dispatch.rs | 14 ++--- .../agentic-core/src/executor/execute_loop.rs | 52 +++++++------------ .../agentic-core/src/executor/tool_context.rs | 3 +- 3 files changed, 28 insertions(+), 41 deletions(-) diff --git a/crates/agentic-core/src/executor/dispatch.rs b/crates/agentic-core/src/executor/dispatch.rs index d18f77a..7020b9f 100644 --- a/crates/agentic-core/src/executor/dispatch.rs +++ b/crates/agentic-core/src/executor/dispatch.rs @@ -71,17 +71,17 @@ pub async fn dispatch_tools( tool_ctx: &ToolContext, iteration: usize, ) -> ExecutorResult { - // Step 1: Extract FunctionCall items. Messages and Unknown are ignored. - let function_calls: Vec<_> = output + // Step 1: Extract FunctionCall items lazily — no allocation until we know we need it. + let mut function_calls = output .iter() .filter_map(|item| match item { OutputItem::FunctionCall(fc) => Some(fc), _ => None, }) - .collect(); + .peekable(); // Step 2: No tool calls → model is done generating. - if function_calls.is_empty() { + if function_calls.peek().is_none() { return Ok(LoopDecision::Done); } @@ -94,8 +94,8 @@ pub async fn dispatch_tools( ))); } - // Step 4: Execute all tool calls in parallel and return results. - // Individual failures produce error strings (not Err), so this always succeeds. - let results = tool_ctx.execute_all(&function_calls).await; + // Step 4: Only allocate when we'll actually execute. + let calls: Vec<_> = function_calls.collect(); + let results = tool_ctx.execute_all(&calls).await; Ok(LoopDecision::Continue(results)) } diff --git a/crates/agentic-core/src/executor/execute_loop.rs b/crates/agentic-core/src/executor/execute_loop.rs index 1010f3e..36673b5 100644 --- a/crates/agentic-core/src/executor/execute_loop.rs +++ b/crates/agentic-core/src/executor/execute_loop.rs @@ -27,13 +27,14 @@ use std::sync::Arc; use either::Either; use tracing::debug; +use crate::executor::ExecutorError; use crate::executor::dispatch::{LoopDecision, dispatch_tools}; use crate::executor::engine::execute; use crate::executor::error::ExecutorResult; use crate::executor::request::ExecutionContext; use crate::executor::tool_context::ToolContext; -use crate::types::io::ResponsesInput; -use crate::types::request_response::{RequestPayload, ResponsePayload}; +use crate::types::io::{InputItem, InputMessage, InputMessageContent, ResponsesInput}; +use crate::types::request_response::{IncompleteDetails, RequestPayload, ResponsePayload}; /// Defense-in-depth hard cap, independent of `tool_ctx.max_iterations`. /// Prevents infinite loops even if dispatch logic has a bug. @@ -95,7 +96,7 @@ pub async fn execute_loop( // Defense-in-depth: even if dispatch_tools has a bug that never returns // Incomplete, we won't loop forever. if iteration >= MAX_LOOP_GUARD { - return Err(crate::executor::ExecutorError::InvalidRequest(format!( + return Err(ExecutorError::InvalidRequest(format!( "execute_loop exceeded hard iteration cap ({MAX_LOOP_GUARD})" ))); } @@ -112,7 +113,7 @@ pub async fn execute_loop( tokio::time::timeout(inference_timeout, execute(request.clone(), Arc::clone(&exec_ctx))) .await .map_err(|_| { - crate::executor::ExecutorError::StreamError(format!( + ExecutorError::StreamError(format!( "LLM inference timed out after {inference_timeout:?} on iteration {iteration}" )) })?? @@ -123,7 +124,7 @@ pub async fn execute_loop( let mut payload = match result { Either::Left(payload) => payload, Either::Right(_stream) => { - return Err(crate::executor::ExecutorError::InvalidRequest( + return Err(ExecutorError::InvalidRequest( "execute_loop does not support streaming yet — set stream=false".into(), )); } @@ -146,8 +147,7 @@ pub async fn execute_loop( LoopDecision::Incomplete(reason) => { debug!(iteration, %reason, "loop incomplete"); payload.status = "incomplete".to_string(); - payload.incomplete_details = - Some(crate::types::request_response::IncompleteDetails { reason: Some(reason) }); + payload.incomplete_details = Some(IncompleteDetails { reason: Some(reason) }); payload.previous_response_id = original_previous_response_id; return Ok(payload); } @@ -159,31 +159,19 @@ pub async fn execute_loop( "tool results received, re-entering" ); - // Build the input for the next iteration: - // existing context + tool results appended. - let mut items = match &request.input { - // Already structured items — clone and extend. - ResponsesInput::Items(existing) => existing.clone(), - // First iteration with plain text — convert to structured item. - ResponsesInput::Text(t) => { - vec![crate::types::io::InputItem::Message(crate::types::io::InputMessage { - role: "user".into(), - content: crate::types::io::InputMessageContent::Text(t.clone()), - })] - } - }; - - // TODO: Append assistant's FunctionCall output items to context. - // The Responses API wire format includes `type: "function_call"` in - // input, but InputItem doesn't have that variant yet. Adding it - // requires a type system change (follow-up PR). For now the model - // only sees function_call_output results, not its own tool requests. - // This may cause the model to re-call tools or behave unexpectedly - // in complex multi-hop scenarios. - - // Append tool execution results (function_call_output items). - items.extend(tool_results); - request.input = ResponsesInput::Items(items); + // Convert text input to structured items on first tool call, + // then extend in-place on subsequent iterations (no clone). + if let ResponsesInput::Text(t) = &request.input { + let msg = InputItem::Message(InputMessage { + role: "user".into(), + content: InputMessageContent::Text(t.clone()), + }); + request.input = ResponsesInput::Items(vec![msg]); + } + if let ResponsesInput::Items(ref mut items) = request.input { + items.reserve(tool_results.len()); + items.extend(tool_results); + } // Clear previous_response_id — on re-entry, we don't want execute() // to rehydrate from DB (we're managing context in-memory via items). diff --git a/crates/agentic-core/src/executor/tool_context.rs b/crates/agentic-core/src/executor/tool_context.rs index 5422291..ea4f07b 100644 --- a/crates/agentic-core/src/executor/tool_context.rs +++ b/crates/agentic-core/src/executor/tool_context.rs @@ -95,8 +95,7 @@ impl ToolContext { /// serves as a higher-level retry — the model can re-call a failed tool on /// the next iteration if it chooses to. pub async fn execute_all(&self, calls: &[&FunctionToolCall]) -> Vec { - let futures: Vec<_> = calls.iter().map(|call| self.execute_one(call)).collect(); - futures::future::join_all(futures).await + futures::future::join_all(calls.iter().map(|call| self.execute_one(call))).await } /// Execute a single tool call with timeout protection. From a075e77a2381f724ba454391ac3bb69b0107e59c Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 16 Jun 2026 12:17:15 -0700 Subject: [PATCH 15/17] fix: clear all persistence triggers before loop iterations PR #56 changed persist logic to fire when ANY of store/previous_response_id/ conversation_id is set. Our loop was only clearing store=false, leaving conversation_id as a persistence trigger for intermediate iterations. Now clear all three before the loop starts and restore both IDs on the returned payload (matching the original request's response chain). Signed-off-by: Ashwin Giridharan --- .../agentic-core/src/executor/execute_loop.rs | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/agentic-core/src/executor/execute_loop.rs b/crates/agentic-core/src/executor/execute_loop.rs index 36673b5..c446d99 100644 --- a/crates/agentic-core/src/executor/execute_loop.rs +++ b/crates/agentic-core/src/executor/execute_loop.rs @@ -82,15 +82,19 @@ pub async fn execute_loop( tool_ctx: &ToolContext, ) -> ExecutorResult { // Capture before the loop mutates the request. Restored on final payload - // so the response chain (previous_response_id linkage) remains correct - // for the caller's persistence logic. + // so the response chain remains correct for the caller. let original_previous_response_id = request.previous_response_id.clone(); + let original_conversation_id = request.conversation_id.clone(); // Suppress persistence for ALL iterations inside the loop. The caller // (server handler) owns final persistence with the correct RequestContext. - // Without this, the first iteration would persist a partial response - // (containing only the tool-call output, not the final answer) to the DB. + // Without this, intermediate iterations would persist partial responses + // (containing only tool-call output, not the final answer). + // Must clear all three persistence triggers (PR #56 persists when any is set): + // store=true, previous_response_id.is_some(), conversation_id.is_some() request.store = false; + request.previous_response_id = None; + request.conversation_id = None; for iteration in 0_usize.. { // Defense-in-depth: even if dispatch_tools has a bug that never returns @@ -139,6 +143,7 @@ pub async fn execute_loop( // No tool calls (or only client-side functions) — we're done. LoopDecision::Done => { payload.previous_response_id = original_previous_response_id; + payload.conversation_id = original_conversation_id; return Ok(payload); } // Hit max_iterations — stop looping, mark as incomplete. @@ -149,6 +154,7 @@ pub async fn execute_loop( payload.status = "incomplete".to_string(); payload.incomplete_details = Some(IncompleteDetails { reason: Some(reason) }); payload.previous_response_id = original_previous_response_id; + payload.conversation_id = original_conversation_id; return Ok(payload); } // Tools were executed — inject results and re-enter inference. @@ -169,15 +175,11 @@ pub async fn execute_loop( request.input = ResponsesInput::Items(vec![msg]); } if let ResponsesInput::Items(ref mut items) = request.input { + // TODO: also inject assistant's FunctionCall output items here + // (requires InputItem::FunctionCall variant — follow-up PR) items.reserve(tool_results.len()); items.extend(tool_results); } - - // Clear previous_response_id — on re-entry, we don't want execute() - // to rehydrate from DB (we're managing context in-memory via items). - request.previous_response_id = None; - // request.store already set to false before the loop — no need - // to re-set here. } } } From ab7ada61f83b8972039ffc8301e7a2458b4427fa Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 16 Jun 2026 13:03:53 -0700 Subject: [PATCH 16/17] test: add persistence trigger suppression & ID restoration tests Add 6 new tests covering the critical invariants: - previous_response_id=Some restored on Done path - conversation_id=Some restored on Done path - store=true suppressed to false in internal LLM calls - all 3 persistence triggers cleared in multi-hop scenario - both IDs restored on Incomplete (max_iterations) path - Items input + conversation_id works with in-place extend Also tighten doc comment on execute_loop to accurately describe clearing all 3 triggers and restoring both IDs. Signed-off-by: Ashwin Giridharan --- .../agentic-core/src/executor/execute_loop.rs | 22 +- .../agentic-core/tests/execute_loop_test.rs | 278 ++++++++++++++++++ 2 files changed, 287 insertions(+), 13 deletions(-) diff --git a/crates/agentic-core/src/executor/execute_loop.rs b/crates/agentic-core/src/executor/execute_loop.rs index c446d99..c162a63 100644 --- a/crates/agentic-core/src/executor/execute_loop.rs +++ b/crates/agentic-core/src/executor/execute_loop.rs @@ -49,11 +49,13 @@ const MAX_LOOP_GUARD: usize = 128; /// - **Caller provides:** request, execution context (LLM + DB), tool context (providers) /// - **This function returns:** the final `ResponsePayload` (caller persists it) /// - **Persistence:** NOT done here. Caller (server handler) owns persistence because -/// it has the full `RequestContext` with correct `new_input_items`. We set -/// `request.store = false` for intermediate iterations to suppress the internal -/// `execute()` from persisting partial state. -/// - **Response chain:** `previous_response_id` on the returned payload reflects the -/// ORIGINAL caller-supplied value, not the internal mutations. +/// it has the full `RequestContext` with correct `new_input_items`. We clear +/// all three persistence triggers (`store`, `previous_response_id`, +/// `conversation_id`) to suppress intermediate `execute()` calls from +/// persisting partial state (PR #56 persists when ANY of the three is set). +/// - **ID restoration:** Both `previous_response_id` and `conversation_id` on the +/// returned payload reflect the ORIGINAL caller-supplied values, not the +/// internal mutations. This is critical for the caller's persist step. /// /// # Timeouts /// @@ -81,17 +83,11 @@ pub async fn execute_loop( exec_ctx: Arc, tool_ctx: &ToolContext, ) -> ExecutorResult { - // Capture before the loop mutates the request. Restored on final payload - // so the response chain remains correct for the caller. let original_previous_response_id = request.previous_response_id.clone(); let original_conversation_id = request.conversation_id.clone(); - // Suppress persistence for ALL iterations inside the loop. The caller - // (server handler) owns final persistence with the correct RequestContext. - // Without this, intermediate iterations would persist partial responses - // (containing only tool-call output, not the final answer). - // Must clear all three persistence triggers (PR #56 persists when any is set): - // store=true, previous_response_id.is_some(), conversation_id.is_some() + // Clear all three persistence triggers so intermediate execute() calls + // don't write partial tool-call-only responses to the store. request.store = false; request.previous_response_id = None; request.conversation_id = None; diff --git a/crates/agentic-core/tests/execute_loop_test.rs b/crates/agentic-core/tests/execute_loop_test.rs index d77f627..c57cbae 100644 --- a/crates/agentic-core/tests/execute_loop_test.rs +++ b/crates/agentic-core/tests/execute_loop_test.rs @@ -708,3 +708,281 @@ async fn test_previous_response_id_none_preserved() { "should preserve original None previous_response_id" ); } + +// --- P2: Persistence trigger suppression & ID restoration tests --- +// +// These tests verify the critical invariants from Section 14 of the design doc: +// - Internal LLM calls have store=false, prev_resp_id=null, conv_id=null +// - Original prev_resp_id and conv_id are restored on the returned payload +// - Both Done and Incomplete exit paths restore correctly + +/// previous_response_id=Some is captured before the loop and restored on Done. +#[tokio::test] +async fn test_previous_response_id_some_restored_on_done() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + text_llm_response("final answer"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("tool result"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let mut request = make_request("test", false, false); + request.previous_response_id = Some("resp_original_123".to_string()); + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!( + result.previous_response_id, + Some("resp_original_123".to_string()), + "previous_response_id must be restored from pre-loop capture" + ); + + // Verify internal LLM calls had prev_resp_id cleared + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 2); + for (i, body) in bodies.iter().enumerate() { + assert!( + body["previous_response_id"].is_null(), + "internal LLM call {i} should have previous_response_id=null, got: {}", + body["previous_response_id"] + ); + } +} + +/// conversation_id=Some is captured before the loop and restored on Done. +#[tokio::test] +async fn test_conversation_id_restored_on_done() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + text_llm_response("done"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let mut request = make_request("test", false, false); + request.conversation_id = Some("conv_abc_456".to_string()); + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!( + result.conversation_id, + Some("conv_abc_456".to_string()), + "conversation_id must be restored from pre-loop capture" + ); + + // Verify internal LLM calls had conv_id cleared + let bodies = server.request_bodies().await; + for (i, body) in bodies.iter().enumerate() { + assert!( + body["conversation_id"].is_null(), + "internal LLM call {i} should have conversation_id=null, got: {}", + body["conversation_id"] + ); + } +} + +/// store=true is suppressed to false for ALL internal iterations. +#[tokio::test] +async fn test_store_suppressed_in_internal_iterations() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + text_llm_response("stored result"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let mut request = make_request("test", false, true); // store=true + request.store = true; + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + assert_eq!(result.status, "completed"); + + // Both internal LLM calls should have store=false (or absent/null — serde skips false) + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 2); + for (i, body) in bodies.iter().enumerate() { + let store_val = &body["store"]; + assert!( + store_val.is_null() || store_val == false, + "internal LLM call {i} should have store=false/absent, got: {store_val}" + ); + } +} + +/// All three persistence triggers cleared in internal iterations: combined scenario. +/// Request: store=true, prev_resp_id=Some, conv_id=Some +/// Assert: all internal calls have store=false, both IDs null +/// Assert: returned payload has both IDs restored +#[tokio::test] +async fn test_all_persistence_triggers_cleared_internally() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("search", "{\"q\":\"rust\"}", "call_s"), + function_call_llm_response("summarize", "{\"t\":\"text\"}", "call_sum"), + text_llm_response("final summary"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("tool output"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let mut request = make_request("multi-hop with IDs", false, true); + request.previous_response_id = Some("resp_prev_999".to_string()); + request.conversation_id = Some("conv_session_42".to_string()); + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!( + result.previous_response_id, + Some("resp_prev_999".to_string()), + ); + assert_eq!( + result.conversation_id, + Some("conv_session_42".to_string()), + ); + + // All 3 internal LLM calls should have all persistence triggers cleared + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 3, "3 iterations: FC → FC → text"); + for (i, body) in bodies.iter().enumerate() { + let store_val = &body["store"]; + assert!( + store_val.is_null() || store_val == false, + "call {i}: store should be false/absent, got: {store_val}" + ); + assert!( + body["previous_response_id"].is_null(), + "call {i}: previous_response_id should be null, got: {}", + body["previous_response_id"] + ); + assert!( + body["conversation_id"].is_null(), + "call {i}: conversation_id should be null, got: {}", + body["conversation_id"] + ); + } +} + +/// Incomplete path (max_iterations hit) also restores both IDs correctly. +#[tokio::test] +async fn test_incomplete_restores_both_ids() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + function_call_llm_response("tool", "{}", "c2"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 1, // stops at iteration 1 + ..ToolContext::default() + }; + + let mut request = make_request("will be incomplete", false, false); + request.previous_response_id = Some("resp_incomplete_orig".to_string()); + request.conversation_id = Some("conv_incomplete_orig".to_string()); + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "incomplete"); + assert_eq!( + result.previous_response_id, + Some("resp_incomplete_orig".to_string()), + "Incomplete path must restore previous_response_id" + ); + assert_eq!( + result.conversation_id, + Some("conv_incomplete_orig".to_string()), + "Incomplete path must restore conversation_id" + ); + + // Verify incomplete_details has a reason + assert!( + result.incomplete_details.is_some(), + "incomplete should have details" + ); + let reason = result.incomplete_details.unwrap().reason.unwrap(); + assert!( + reason.contains("max tool iterations"), + "reason should mention max iterations: {reason}" + ); +} + +/// Input already as Items + conversation_id set: verify in-place extend works +/// and conversation_id appears on the output. +#[tokio::test] +async fn test_items_input_with_conversation_id() { + use agentic_core::types::io::{InputItem, InputMessage, InputMessageContent}; + + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + text_llm_response("done with items+conv"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("result"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = RequestPayload { + model: "test-model".to_string(), + input: ResponsesInput::Items(vec![InputItem::Message(InputMessage { + role: "user".into(), + content: InputMessageContent::Text("hello from items".into()), + })]), + instructions: None, + previous_response_id: Some("resp_items_prev".to_string()), + conversation_id: Some("conv_items_session".to_string()), + tools: None, + tool_choice: ToolChoice::Auto, + stream: false, + store: true, + include: None, + temperature: None, + top_p: None, + max_output_tokens: None, + truncation: None, + metadata: None, + }; + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(result.previous_response_id, Some("resp_items_prev".to_string())); + assert_eq!(result.conversation_id, Some("conv_items_session".to_string())); + + // Second request should have original item + tool result (Items extended in-place) + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 2); + let second_input = bodies[1]["input"].as_array().unwrap(); + assert!( + second_input.len() >= 2, + "should have original item + function_call_output, got {}", + second_input.len() + ); + // All internal calls should have persistence suppressed + assert!(bodies[0]["store"].is_null() || bodies[0]["store"] == false); + assert!(bodies[1]["store"].is_null() || bodies[1]["store"] == false); +} From 4c4a28eb77c53ee4d9ad5cc6c41ab072c71d0a64 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 16 Jun 2026 14:29:18 -0700 Subject: [PATCH 17/17] ci: fix rustfmt formatting in new tests Signed-off-by: Ashwin Giridharan --- crates/agentic-core/tests/execute_loop_test.rs | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/crates/agentic-core/tests/execute_loop_test.rs b/crates/agentic-core/tests/execute_loop_test.rs index c57cbae..db5e6cb 100644 --- a/crates/agentic-core/tests/execute_loop_test.rs +++ b/crates/agentic-core/tests/execute_loop_test.rs @@ -852,14 +852,8 @@ async fn test_all_persistence_triggers_cleared_internally() { let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); assert_eq!(result.status, "completed"); - assert_eq!( - result.previous_response_id, - Some("resp_prev_999".to_string()), - ); - assert_eq!( - result.conversation_id, - Some("conv_session_42".to_string()), - ); + assert_eq!(result.previous_response_id, Some("resp_prev_999".to_string()),); + assert_eq!(result.conversation_id, Some("conv_session_42".to_string()),); // All 3 internal LLM calls should have all persistence triggers cleared let bodies = server.request_bodies().await; @@ -917,10 +911,7 @@ async fn test_incomplete_restores_both_ids() { ); // Verify incomplete_details has a reason - assert!( - result.incomplete_details.is_some(), - "incomplete should have details" - ); + assert!(result.incomplete_details.is_some(), "incomplete should have details"); let reason = result.incomplete_details.unwrap().reason.unwrap(); assert!( reason.contains("max tool iterations"),