diff --git a/crates/agentic-core/src/executor/dispatch.rs b/crates/agentic-core/src/executor/dispatch.rs new file mode 100644 index 0000000..7020b9f --- /dev/null +++ b/crates/agentic-core/src/executor/dispatch.rs @@ -0,0 +1,101 @@ +//! Tool dispatch decision logic. +//! +//! This module is STATELESS — it inspects the model's output items, decides +//! whether to execute tools, and returns a decision. It does not manage loop +//! state, persistence, or re-entry. That's `execute_loop`'s job. +//! +//! Decision flow: +//! ```text +//! output items → filter FunctionCall → empty? → Done +//! → iteration >= max? → Incomplete +//! → execute all → Continue(results) +//! ``` + +use crate::executor::error::ExecutorResult; +use crate::executor::tool_context::ToolContext; +use crate::types::io::{InputItem, OutputItem}; + +/// Decision returned by [`dispatch_tools`] to drive the agentic loop. +/// +/// `#[non_exhaustive]` allows adding variants (e.g. `Partial` for mixed +/// gateway + client tools) without breaking downstream match arms. +#[derive(Debug)] +#[non_exhaustive] +pub enum LoopDecision { + /// Gateway-executed tools returned results. Caller should inject these + /// `InputItem::FunctionCallOutput` items into the request and re-infer. + Continue(Vec), + + /// No tool calls found in output, OR only client-side functions present. + /// Caller should return the response to the client as-is. + /// If `FunctionCall` items exist in output with this decision, the response + /// status should be `requires_action` (client executes them externally). + Done, + + /// Max iterations reached. The model wanted to call more tools but we're + /// cutting it off to prevent runaway loops. The reason string is included + /// for logging/debugging. Caller should set `payload.status = "incomplete"`. + Incomplete(String), +} + +/// Inspect executor output for function calls and dispatch them via [`ToolContext`]. +/// +/// # Decision Logic +/// +/// 1. Filter `OutputItem::FunctionCall` items from `output` +/// 2. If none found → `Done` (model produced only text/messages) +/// 3. If `iteration >= tool_ctx.max_iterations` → `Incomplete` (safety cap) +/// 4. Otherwise → execute all calls via `tool_ctx.execute_all()` → `Continue` +/// +/// # MVP Routing +/// +/// Currently ALL `FunctionCall` items are treated as gateway-executable (routed +/// to MCP/`web_search`/`vector_store` in priority order). The distinction between +/// client-side functions (`type: "function"` in the request) and gateway-executed +/// tools (`type: "mcp"`) requires access to the request's tools array — deferred +/// to a follow-up PR that changes this function's signature. +/// +/// # Error Semantics +/// +/// - Individual tool failures → error JSON string in the result (model sees it) +/// - This function only returns `Err` on internal/structural failures +/// - The function itself NEVER panics +/// +/// # Errors +/// +/// Returns `ExecutorError` only on internal failures (e.g. serialization). +/// Individual tool execution failures are captured as error output strings +/// in the returned `InputItem` list — they do NOT propagate as errors. +pub async fn dispatch_tools( + output: &[OutputItem], + tool_ctx: &ToolContext, + iteration: usize, +) -> ExecutorResult { + // Step 1: Extract FunctionCall items lazily — no allocation until we know we need it. + let mut function_calls = output + .iter() + .filter_map(|item| match item { + OutputItem::FunctionCall(fc) => Some(fc), + _ => None, + }) + .peekable(); + + // Step 2: No tool calls → model is done generating. + if function_calls.peek().is_none() { + return Ok(LoopDecision::Done); + } + + // Step 3: Safety cap — prevent infinite tool loops. + // This fires BEFORE execution, so no work is wasted on the capped iteration. + if iteration >= tool_ctx.max_iterations { + return Ok(LoopDecision::Incomplete(format!( + "max tool iterations reached ({iteration}/{})", + tool_ctx.max_iterations + ))); + } + + // Step 4: Only allocate when we'll actually execute. + let calls: Vec<_> = function_calls.collect(); + let results = tool_ctx.execute_all(&calls).await; + Ok(LoopDecision::Continue(results)) +} diff --git a/crates/agentic-core/src/executor/execute_loop.rs b/crates/agentic-core/src/executor/execute_loop.rs new file mode 100644 index 0000000..c162a63 --- /dev/null +++ b/crates/agentic-core/src/executor/execute_loop.rs @@ -0,0 +1,184 @@ +//! Agentic loop orchestrator. +//! +//! Composes `execute()` (LLM inference) with `dispatch_tools()` (tool routing) +//! in a loop that continues until the model stops producing tool calls. +//! +//! Architecture: +//! ```text +//! ┌─────────────────────────────────────────────────────────┐ +//! │ execute_loop │ +//! │ │ +//! │ for each iteration: │ +//! │ 1. execute(request) → ResponsePayload │ +//! │ 2. dispatch_tools(output) → LoopDecision │ +//! │ 3. if Continue: inject results, goto 1 │ +//! │ if Done/Incomplete: return payload │ +//! └─────────────────────────────────────────────────────────┘ +//! ``` +//! +//! Timeout budget per iteration: +//! - LLM inference: `exec_ctx.streaming_timeout` (default 30s) +//! - Each tool call: `tool_ctx.tool_timeout` (default 30s) +//! - Hard loop cap: `MAX_LOOP_GUARD` (128 iterations) +//! - Soft tool cap: `tool_ctx.max_iterations` (default 10) + +use std::sync::Arc; + +use either::Either; +use tracing::debug; + +use crate::executor::ExecutorError; +use crate::executor::dispatch::{LoopDecision, dispatch_tools}; +use crate::executor::engine::execute; +use crate::executor::error::ExecutorResult; +use crate::executor::request::ExecutionContext; +use crate::executor::tool_context::ToolContext; +use crate::types::io::{InputItem, InputMessage, InputMessageContent, ResponsesInput}; +use crate::types::request_response::{IncompleteDetails, RequestPayload, ResponsePayload}; + +/// Defense-in-depth hard cap, independent of `tool_ctx.max_iterations`. +/// Prevents infinite loops even if dispatch logic has a bug. +/// Set high enough to never trigger in normal operation (`max_iterations`=10 +/// would stop far earlier), but low enough to catch runaway loops quickly. +const MAX_LOOP_GUARD: usize = 128; + +/// Run the agentic loop: execute → dispatch tools → re-enter until done. +/// +/// # Contract +/// +/// - **Caller provides:** request, execution context (LLM + DB), tool context (providers) +/// - **This function returns:** the final `ResponsePayload` (caller persists it) +/// - **Persistence:** NOT done here. Caller (server handler) owns persistence because +/// it has the full `RequestContext` with correct `new_input_items`. We clear +/// all three persistence triggers (`store`, `previous_response_id`, +/// `conversation_id`) to suppress intermediate `execute()` calls from +/// persisting partial state (PR #56 persists when ANY of the three is set). +/// - **ID restoration:** Both `previous_response_id` and `conversation_id` on the +/// returned payload reflect the ORIGINAL caller-supplied values, not the +/// internal mutations. This is critical for the caller's persist step. +/// +/// # Timeouts +/// +/// - Each `execute()` call is wrapped in `tokio::time::timeout(exec_ctx.streaming_timeout)` +/// - Each tool call is wrapped in `tokio::time::timeout(tool_ctx.tool_timeout)` +/// - `Duration::ZERO` on either disables that timeout (provider manages its own) +/// +/// # Known Limitations (MVP) +/// +/// - Non-streaming only. `stream=true` returns an immediate error. +/// - `request.clone()` every iteration is O(n) in accumulated input size. +/// - `InputItem` lacks a `FunctionCall` variant, so the assistant's tool-call +/// items are not injected into context (the model doesn't see its own calls). +/// Follow-up PR needed to add the variant. +/// +/// # Errors +/// +/// Returns `ExecutorError` if: +/// - LLM inference fails or times out +/// - Tool dispatch encounters a fatal error (individual tool failures are NOT fatal) +/// - `stream=true` is passed +/// - Hard loop guard is breached +pub async fn execute_loop( + mut request: RequestPayload, + exec_ctx: Arc, + tool_ctx: &ToolContext, +) -> ExecutorResult { + let original_previous_response_id = request.previous_response_id.clone(); + let original_conversation_id = request.conversation_id.clone(); + + // Clear all three persistence triggers so intermediate execute() calls + // don't write partial tool-call-only responses to the store. + request.store = false; + request.previous_response_id = None; + request.conversation_id = None; + + for iteration in 0_usize.. { + // Defense-in-depth: even if dispatch_tools has a bug that never returns + // Incomplete, we won't loop forever. + if iteration >= MAX_LOOP_GUARD { + return Err(ExecutorError::InvalidRequest(format!( + "execute_loop exceeded hard iteration cap ({MAX_LOOP_GUARD})" + ))); + } + + debug!(iteration, "execute_loop iteration"); + + // --- Step 1: Call the LLM --- + // Timeout prevents hanging on unresponsive LLM backends. + // Duration::ZERO = no timeout (provider/reqwest manages its own). + let inference_timeout = exec_ctx.streaming_timeout; + let result = if inference_timeout.is_zero() { + execute(request.clone(), Arc::clone(&exec_ctx)).await? + } else { + tokio::time::timeout(inference_timeout, execute(request.clone(), Arc::clone(&exec_ctx))) + .await + .map_err(|_| { + ExecutorError::StreamError(format!( + "LLM inference timed out after {inference_timeout:?} on iteration {iteration}" + )) + })?? + }; + + // execute() returns Either. + // We only support non-streaming in execute_loop (streaming requires StreamTee). + let mut payload = match result { + Either::Left(payload) => payload, + Either::Right(_stream) => { + return Err(ExecutorError::InvalidRequest( + "execute_loop does not support streaming yet — set stream=false".into(), + )); + } + }; + + // --- Step 2: Inspect output for tool calls --- + // dispatch_tools filters FunctionCall items, executes them via ToolContext, + // and returns a decision: Continue (with results), Done, or Incomplete. + let decision = dispatch_tools(&payload.output, tool_ctx, iteration).await?; + + match decision { + // No tool calls (or only client-side functions) — we're done. + LoopDecision::Done => { + payload.previous_response_id = original_previous_response_id; + payload.conversation_id = original_conversation_id; + return Ok(payload); + } + // Hit max_iterations — stop looping, mark as incomplete. + // The model may have wanted to call more tools, but we're cutting it off. + // Attach the reason to incomplete_details so the client knows why. + LoopDecision::Incomplete(reason) => { + debug!(iteration, %reason, "loop incomplete"); + payload.status = "incomplete".to_string(); + payload.incomplete_details = Some(IncompleteDetails { reason: Some(reason) }); + payload.previous_response_id = original_previous_response_id; + payload.conversation_id = original_conversation_id; + return Ok(payload); + } + // Tools were executed — inject results and re-enter inference. + LoopDecision::Continue(tool_results) => { + debug!( + iteration, + results = tool_results.len(), + "tool results received, re-entering" + ); + + // Convert text input to structured items on first tool call, + // then extend in-place on subsequent iterations (no clone). + if let ResponsesInput::Text(t) = &request.input { + let msg = InputItem::Message(InputMessage { + role: "user".into(), + content: InputMessageContent::Text(t.clone()), + }); + request.input = ResponsesInput::Items(vec![msg]); + } + if let ResponsesInput::Items(ref mut items) = request.input { + // TODO: also inject assistant's FunctionCall output items here + // (requires InputItem::FunctionCall variant — follow-up PR) + items.reserve(tool_results.len()); + items.extend(tool_results); + } + } + } + } + + unreachable!("loop exits via return in Done/Incomplete/guard arms") +} diff --git a/crates/agentic-core/src/executor/mod.rs b/crates/agentic-core/src/executor/mod.rs index 32fbabc..d3e463c 100644 --- a/crates/agentic-core/src/executor/mod.rs +++ b/crates/agentic-core/src/executor/mod.rs @@ -1,13 +1,19 @@ //! Agentic loop executor. pub mod accumulator; +pub mod dispatch; pub mod engine; pub mod error; +pub mod execute_loop; pub mod modes; pub mod request; +pub mod tool_context; +pub use dispatch::{LoopDecision, dispatch_tools}; pub use engine::{BoxStream, call_inference, create_conversation, execute, persist_response, rehydrate_conversation}; pub use error::{ExecutorError, ExecutorResult}; +pub use execute_loop::execute_loop; pub use modes::{ConversationHandler, ResponseHandler}; pub use request::ExecutionContext; pub use request::RequestContext; +pub use tool_context::ToolContext; diff --git a/crates/agentic-core/src/executor/tool_context.rs b/crates/agentic-core/src/executor/tool_context.rs new file mode 100644 index 0000000..ea4f07b --- /dev/null +++ b/crates/agentic-core/src/executor/tool_context.rs @@ -0,0 +1,163 @@ +//! Tool execution context and provider routing. +//! +//! `ToolContext` is the runtime container for tool providers. It holds +//! references to MCP servers, web search backends, and vector stores. +//! When `dispatch_tools` decides to execute tool calls, it delegates here. +//! +//! Design opinions: +//! - Each provider is `Option>` — missing providers produce errors, not panics +//! - Execution is parallel via `join_all` — all tool calls in one response run concurrently +//! - Individual failures are isolated — one hung tool doesn't block others (each has a timeout) +//! - The error output format is JSON (`{"error": "..."}`) — the model can parse and react to it +//! - Routing is MVP priority-order, NOT by tool type. Real routing requires the request's +//! tools array (follow-up PR changes `dispatch_tools` signature) + +use std::sync::Arc; +use std::time::Duration; + +use crate::executor::ExecutorError; +use crate::tools::{McpToolExecutor, VectorStoreClient, WebSearchProvider}; +use crate::types::io::{FunctionToolCall, FunctionToolResultMessage, InputItem}; + +/// Runtime configuration for tool execution. +/// +/// Constructed once at server startup (or per-request if tools vary) +/// and passed into `execute_loop` / `dispatch_tools`. +/// +/// # Defaults +/// +/// - `max_iterations`: 10 (soft cap checked by `dispatch_tools`) +/// - `tool_timeout`: 30s per individual tool call +/// - All providers: None (calls produce "no provider configured" errors) +pub struct ToolContext { + /// MCP tool executor — connects to external MCP servers. + /// Used for user-defined tools declared as `type: "mcp"` in the request. + pub mcp: Option>, + + /// Web search provider (e.g., Brave, Google). + /// Used for the built-in `web_search` tool type. + pub web_search: Option>, + + /// Vector store client (e.g., OGX). + /// Used for the built-in `file_search` tool type. + pub vector_store: Option>, + + /// Maximum number of tool dispatch iterations before returning Incomplete. + /// This is a SOFT cap — `dispatch_tools` checks `iteration >= max_iterations`. + /// The HARD cap is ``MAX_LOOP_GUARD`` in `execute_loop`.rs (128). + pub max_iterations: usize, + + /// Per-tool-call timeout. If a provider takes longer than this, the call + /// produces a timeout error string (not a total dispatch failure). + /// `Duration::ZERO` disables the timeout — use when providers manage their own. + pub tool_timeout: Duration, +} + +impl Default for ToolContext { + fn default() -> Self { + Self { + mcp: None, + web_search: None, + vector_store: None, + max_iterations: 10, + tool_timeout: Duration::from_secs(30), + } + } +} + +impl ToolContext { + /// Execute all tool calls concurrently via `futures::future::join_all`. + /// + /// # Concurrency Model + /// + /// All calls start immediately and run in parallel on tokio's thread pool. + /// `join_all` awaits ALL futures before returning — wall-clock time is + /// bounded by the slowest individual call (not sum of all calls). + /// + /// With `tool_timeout = 30s` and N calls, worst case is 30s total (not N×30s). + /// + /// # Failure Model + /// + /// Individual failures produce an error JSON string as the tool output for + /// that `call_id`. The dispatch does NOT fail as a whole. This matches the + /// Responses API behavior where partial tool results are acceptable. + /// + /// The model sees `{"error": "..."}` as the tool output and decides: + /// - Retry the tool on the next iteration + /// - Answer without that result + /// - Try a different approach + /// + /// # Retry Policy + /// + /// This layer does NOT retry. Providers handle their own retries internally + /// (transient network errors, 503s, etc.). By the time an error reaches here, + /// the provider already exhausted its retry budget. The agentic loop itself + /// serves as a higher-level retry — the model can re-call a failed tool on + /// the next iteration if it chooses to. + pub async fn execute_all(&self, calls: &[&FunctionToolCall]) -> Vec { + futures::future::join_all(calls.iter().map(|call| self.execute_one(call))).await + } + + /// Execute a single tool call with timeout protection. + /// + /// Always returns an `InputItem::FunctionCallOutput` — either with the real + /// result or with an error JSON string. Never panics, never returns Err. + async fn execute_one(&self, call: &FunctionToolCall) -> InputItem { + // Apply per-call timeout. Duration::ZERO = no timeout (opt-out). + let result = if self.tool_timeout.is_zero() { + self.route_call(call).await + } else { + match tokio::time::timeout(self.tool_timeout, self.route_call(call)).await { + Ok(r) => r, + Err(_elapsed) => Err(ExecutorError::StreamError(format!( + "tool '{}' timed out after {:?}", + call.name, self.tool_timeout + ))), + } + }; + + // Convert Result → String (error becomes JSON). + // Using serde_json::json! ensures proper escaping of error messages + // that might contain quotes, newlines, or other special characters. + let output = match result { + Ok(s) => s, + Err(e) => serde_json::json!({"error": e.to_string()}).to_string(), + }; + + InputItem::FunctionCallOutput(FunctionToolResultMessage { + call_id: call.call_id.clone(), + output, + }) + } + + /// Route a tool call to the appropriate provider. + /// + /// MVP: Priority-order routing. Tries MCP first (most general), then + /// `web_search`, then `vector_store`. First configured provider wins. + /// + /// This is intentionally simple — real routing needs the request's `tools` + /// array to distinguish `type: "function"` (client-side) from `type: "mcp"` + /// (gateway-side). That requires changing `dispatch_tools`'s signature to + /// accept the tools array, which is a follow-up PR. + /// + /// When no provider is configured, returns a clear error that the model sees. + async fn route_call(&self, call: &FunctionToolCall) -> Result { + if let Some(mcp) = &self.mcp { + return mcp.execute(&call.name, &call.arguments, &serde_json::Value::Null).await; + } + + if let Some(web) = &self.web_search { + return web.search(&call.arguments, "medium").await; + } + + if let Some(vs) = &self.vector_store { + let results = vs.search("", &call.arguments, 5).await?; + return Ok(serde_json::to_string(&results).expect("Vec is always serializable")); + } + + Err(ExecutorError::InvalidRequest(format!( + "no tool provider configured for '{}'", + call.name + ))) + } +} diff --git a/crates/agentic-core/src/lib.rs b/crates/agentic-core/src/lib.rs index 5828a68..3826b12 100644 --- a/crates/agentic-core/src/lib.rs +++ b/crates/agentic-core/src/lib.rs @@ -5,6 +5,7 @@ pub mod executor; pub mod proxy; pub mod readiness; pub mod storage; +pub mod tools; pub mod types; pub mod utils; diff --git a/crates/agentic-core/src/tools/mcp.rs b/crates/agentic-core/src/tools/mcp.rs new file mode 100644 index 0000000..45e8349 --- /dev/null +++ b/crates/agentic-core/src/tools/mcp.rs @@ -0,0 +1,18 @@ +use std::future::Future; +use std::pin::Pin; + +use crate::executor::ExecutorError; + +/// Execute a tool call via the Model Context Protocol. +/// +/// Implementations connect to an MCP server and invoke the named tool +/// with the provided arguments. The result is returned as a serialized +/// JSON string suitable for injection into `FunctionToolResultMessage.output`. +pub trait McpToolExecutor: Send + Sync { + fn execute( + &self, + tool_name: &str, + arguments: &str, + server_config: &serde_json::Value, + ) -> Pin> + Send + '_>>; +} diff --git a/crates/agentic-core/src/tools/mod.rs b/crates/agentic-core/src/tools/mod.rs new file mode 100644 index 0000000..331cd07 --- /dev/null +++ b/crates/agentic-core/src/tools/mod.rs @@ -0,0 +1,7 @@ +pub mod mcp; +pub mod vector_store; +pub mod web_search; + +pub use mcp::McpToolExecutor; +pub use vector_store::VectorStoreClient; +pub use web_search::WebSearchProvider; diff --git a/crates/agentic-core/src/tools/vector_store.rs b/crates/agentic-core/src/tools/vector_store.rs new file mode 100644 index 0000000..bfbc3d2 --- /dev/null +++ b/crates/agentic-core/src/tools/vector_store.rs @@ -0,0 +1,17 @@ +use std::future::Future; +use std::pin::Pin; + +use crate::executor::ExecutorError; + +/// Search a vector store (e.g. OGX) and return matching documents. +/// +/// Results are returned as a JSON array of document objects. The caller +/// serializes them into `FunctionToolResultMessage.output`. +pub trait VectorStoreClient: Send + Sync { + fn search( + &self, + store_id: &str, + query: &str, + max_results: u32, + ) -> Pin, ExecutorError>> + Send + '_>>; +} diff --git a/crates/agentic-core/src/tools/web_search.rs b/crates/agentic-core/src/tools/web_search.rs new file mode 100644 index 0000000..1e6032b --- /dev/null +++ b/crates/agentic-core/src/tools/web_search.rs @@ -0,0 +1,15 @@ +use std::future::Future; +use std::pin::Pin; + +use crate::executor::ExecutorError; + +/// Perform a web search and return results as a serialized string. +/// +/// `context_size` controls result verbosity: `"low"`, `"medium"`, or `"high"`. +pub trait WebSearchProvider: Send + Sync { + fn search( + &self, + query: &str, + context_size: &str, + ) -> Pin> + Send + '_>>; +} diff --git a/crates/agentic-core/tests/cassettes/tool_loop/function-call-loop-vllm-gemma4.yaml b/crates/agentic-core/tests/cassettes/tool_loop/function-call-loop-vllm-gemma4.yaml new file mode 100644 index 0000000..1ad3f55 --- /dev/null +++ b/crates/agentic-core/tests/cassettes/tool_loop/function-call-loop-vllm-gemma4.yaml @@ -0,0 +1,43 @@ +# Recorded from google/gemma-4-26B-A4B-it via vLLM v0.21.0 +# Date: 2026-06-12 +# Scenario: User asks weather → model calls get_weather → tool returns result → model answers +# +# This cassette drives execute_loop through a complete 2-iteration tool loop: +# Iteration 0: model returns FunctionCall (get_weather) +# Iteration 1: model returns Message (final answer using tool result) + +model: google/gemma-4-26B-A4B-it +request: + input: "What is the weather in San Francisco?" + tools: + - type: function + name: get_weather + description: "Get current weather for a city" + parameters: + type: object + properties: + city: + type: string + required: ["city"] + tool_choice: required + stream: false + +tool_mock: + get_weather: '{"temperature": 72, "condition": "sunny", "humidity": 45}' + +expected: + iterations: 2 + final_text: "The current weather in San Francisco is sunny with a temperature of 72°F and 45% humidity." + function_call: + name: get_weather + arguments: '{"city": "San Francisco"}' + call_id: "chatcmpl-tool-a2bac67597e69338" + +turns: + - description: "Turn 1: model calls get_weather" + response: + body: '{"id":"resp_9ef6bc922b19eb44","created_at":1781296371,"incomplete_details":null,"instructions":null,"metadata":null,"model":"google/gemma-4-26B-A4B-it","object":"response","output":[{"arguments":"{\"city\": \"San Francisco\"}","call_id":"chatcmpl-tool-a2bac67597e69338","name":"get_weather","type":"function_call","id":"fc_a55c74f95b5c8fcd","namespace":null,"status":"completed"}],"parallel_tool_calls":true,"temperature":1.0,"tool_choice":"required","tools":[{"name":"get_weather","parameters":{"type":"object","properties":{"city":{"type":"string"}},"required":["city"]},"strict":null,"type":"function","defer_loading":null,"description":"Get current weather for a city"}],"top_p":0.95,"background":false,"max_output_tokens":32699,"max_tool_calls":null,"previous_response_id":null,"prompt":null,"reasoning":null,"service_tier":"auto","status":"completed","text":{"format":{"name":"tool_calling_response","schema":{"type":"array","minItems":1,"items":{"type":"object","anyOf":[{"properties":{"name":{"type":"string","enum":["get_weather"]},"parameters":{"type":"object","properties":{"city":{"type":"string"}},"required":["city"]}},"required":["name","parameters"]}]}},"type":"json_schema","description":null,"strict":true},"verbosity":null},"top_logprobs":null,"truncation":"disabled","usage":{"input_tokens":69,"input_tokens_details":{"cached_tokens":64,"input_tokens_per_turn":[],"cached_tokens_per_turn":[]},"output_tokens":21,"output_tokens_details":{"reasoning_tokens":0,"tool_output_tokens":0,"output_tokens_per_turn":[],"tool_output_tokens_per_turn":[]},"total_tokens":90},"user":null,"presence_penalty":0.0,"frequency_penalty":0.0,"kv_transfer_params":null,"input_messages":null,"output_messages":null}' + + - description: "Turn 2: model responds with final answer after receiving tool result" + response: + body: '{"id":"resp_a443bc5c4af1a46b","created_at":1781296379,"incomplete_details":null,"instructions":null,"metadata":null,"model":"google/gemma-4-26B-A4B-it","object":"response","output":[{"id":"msg_90c2c001ac5517da","content":[{"annotations":[],"text":"The current weather in San Francisco is sunny with a temperature of 72°F and 45% humidity.","type":"output_text","logprobs":null}],"role":"assistant","status":"completed","type":"message","phase":null}],"parallel_tool_calls":true,"temperature":1.0,"tool_choice":"none","tools":[],"top_p":0.95,"background":false,"max_output_tokens":32703,"max_tool_calls":null,"previous_response_id":null,"prompt":null,"reasoning":null,"service_tier":"auto","status":"completed","text":null,"top_logprobs":null,"truncation":"disabled","usage":{"input_tokens":65,"input_tokens_details":{"cached_tokens":64,"input_tokens_per_turn":[],"cached_tokens_per_turn":[]},"output_tokens":29,"output_tokens_details":{"reasoning_tokens":2,"tool_output_tokens":0,"output_tokens_per_turn":[],"tool_output_tokens_per_turn":[]},"total_tokens":94},"user":null,"presence_penalty":0.0,"frequency_penalty":0.0,"kv_transfer_params":null,"input_messages":null,"output_messages":null}' diff --git a/crates/agentic-core/tests/dispatch_test.rs b/crates/agentic-core/tests/dispatch_test.rs new file mode 100644 index 0000000..ab2f40f --- /dev/null +++ b/crates/agentic-core/tests/dispatch_test.rs @@ -0,0 +1,323 @@ +// Integration tests for the tool dispatch layer. +#![allow(clippy::doc_markdown)] + +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use agentic_core::executor::{ExecutorError, LoopDecision, ToolContext, dispatch_tools}; +use agentic_core::tools::McpToolExecutor; +use agentic_core::types::io::{FunctionToolCall, InputItem, OutputItem, OutputMessage, OutputTextContent}; + +// --- Mock implementations --- + +/// Mock MCP executor that returns pre-configured responses by tool name. +/// If a tool name is not in the map, returns an error (simulating unknown tool). +struct MockMcp { + responses: std::collections::HashMap, +} + +impl MockMcp { + fn new(pairs: &[(&str, &str)]) -> Self { + Self { + responses: pairs + .iter() + .map(|(k, v)| ((*k).to_string(), (*v).to_string())) + .collect(), + } + } +} + +impl McpToolExecutor for MockMcp { + fn execute( + &self, + tool_name: &str, + _arguments: &str, + _server_config: &serde_json::Value, + ) -> Pin> + Send + '_>> { + let name = tool_name.to_string(); + Box::pin(async move { + self.responses + .get(&name) + .cloned() + .ok_or_else(|| ExecutorError::InvalidRequest(format!("unknown tool: {name}"))) + }) + } +} + +/// Mock MCP executor that always fails — simulates a crashed/unavailable tool provider. +struct FailingMcp; + +impl McpToolExecutor for FailingMcp { + fn execute( + &self, + tool_name: &str, + _arguments: &str, + _server_config: &serde_json::Value, + ) -> Pin> + Send + '_>> { + let name = tool_name.to_string(); + Box::pin(async move { Err(ExecutorError::StreamError(format!("tool '{name}' crashed"))) }) + } +} + +// --- Helpers --- + +fn make_function_call(name: &str, args: &str, call_id: &str) -> OutputItem { + OutputItem::FunctionCall(FunctionToolCall { + id: format!("fc_{call_id}"), + call_id: call_id.to_string(), + name: name.to_string(), + arguments: args.to_string(), + status: "completed".to_string(), + }) +} + +fn make_message(text: &str) -> OutputItem { + OutputItem::Message(OutputMessage { + id: "msg_1".to_string(), + role: "assistant".to_string(), + status: "completed".to_string(), + content: vec![OutputTextContent::new(text)], + }) +} + +fn tool_ctx_with_mcp(mcp: impl McpToolExecutor + 'static) -> ToolContext { + ToolContext { + mcp: Some(Arc::new(mcp)), + max_iterations: 10, + ..ToolContext::default() + } +} + +// --- Tests --- + +/// When output contains only text messages (no FunctionCall items), +/// dispatch should return Done — nothing to execute. +#[tokio::test] +async fn test_no_function_calls_returns_done() { + let output = vec![make_message("Hello world")]; + let ctx = ToolContext::default(); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + assert!(matches!(decision, LoopDecision::Done)); +} + +/// When output is completely empty, dispatch returns Done. +#[tokio::test] +async fn test_empty_output_returns_done() { + let output: Vec = vec![]; + let ctx = ToolContext::default(); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + assert!(matches!(decision, LoopDecision::Done)); +} + +/// Single FunctionCall in output → execute via MCP → return Continue with +/// the tool result wrapped as InputItem::FunctionCallOutput. +#[tokio::test] +async fn test_single_function_call_returns_continue() { + let output = vec![make_function_call("get_weather", r#"{"city":"SF"}"#, "call_1")]; + let mcp = MockMcp::new(&[("get_weather", r#"{"temp": 72}"#)]); + let ctx = tool_ctx_with_mcp(mcp); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 1); + if let InputItem::FunctionCallOutput(result) = &items[0] { + assert_eq!(result.call_id, "call_1"); + assert_eq!(result.output, r#"{"temp": 72}"#); + } else { + panic!("expected FunctionCallOutput"); + } + } else { + panic!("expected Continue, got {decision:?}"); + } +} + +/// Multiple FunctionCall items in output → all execute concurrently via join_all → +/// Continue with results for each call_id. Order may vary (parallel execution). +#[tokio::test] +async fn test_parallel_function_calls() { + let output = vec![ + make_function_call("get_weather", r#"{"city":"SF"}"#, "call_1"), + make_function_call("get_time", r#"{"tz":"PST"}"#, "call_2"), + ]; + let mcp = MockMcp::new(&[("get_weather", "sunny"), ("get_time", "10:30 AM")]); + let ctx = tool_ctx_with_mcp(mcp); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 2); + let outputs: Vec<_> = items + .iter() + .filter_map(|item| match item { + InputItem::FunctionCallOutput(r) => Some((r.call_id.as_str(), r.output.as_str())), + _ => None, + }) + .collect(); + assert!(outputs.contains(&("call_1", "sunny"))); + assert!(outputs.contains(&("call_2", "10:30 AM"))); + } else { + panic!("expected Continue"); + } +} + +/// When iteration count reaches max_iterations, dispatch returns Incomplete +/// WITHOUT executing any tools — prevents infinite tool loops. +#[tokio::test] +async fn test_max_iterations_returns_incomplete() { + let output = vec![make_function_call("get_weather", "{}", "call_1")]; + let mcp = MockMcp::new(&[("get_weather", "sunny")]); + let ctx = ToolContext { + mcp: Some(Arc::new(mcp)), + max_iterations: 3, + ..ToolContext::default() + }; + + // iteration=3, max=3 → 3 >= 3 is true → Incomplete + let decision = dispatch_tools(&output, &ctx, 3).await.unwrap(); + + if let LoopDecision::Incomplete(reason) = decision { + assert!(reason.contains("max tool iterations")); + } else { + panic!("expected Incomplete, got {decision:?}"); + } +} + +/// When a tool provider returns an error, it becomes an error JSON string +/// in the output (not a total dispatch failure). The model sees the error +/// and can decide to retry on the next iteration. +#[tokio::test] +async fn test_failing_tool_produces_error_output_not_total_failure() { + let output = vec![make_function_call("bad_tool", "{}", "call_1")]; + let ctx = tool_ctx_with_mcp(FailingMcp); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 1); + if let InputItem::FunctionCallOutput(result) = &items[0] { + assert_eq!(result.call_id, "call_1"); + assert!(result.output.contains("error")); + assert!(result.output.contains("crashed")); + } else { + panic!("expected FunctionCallOutput"); + } + } else { + panic!("expected Continue (with error output), got {decision:?}"); + } +} + +/// When no providers are configured at all, each call gets an error output +/// saying "no tool provider configured". dispatch still returns Continue +/// (the model sees the errors and handles them). +#[tokio::test] +async fn test_no_provider_configured_produces_error_output() { + let output = vec![make_function_call("get_weather", "{}", "call_1")]; + let ctx = ToolContext { + max_iterations: 10, + ..ToolContext::default() + }; + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 1); + if let InputItem::FunctionCallOutput(result) = &items[0] { + assert!(result.output.contains("error")); + assert!(result.output.contains("no tool provider configured")); + } else { + panic!("expected FunctionCallOutput"); + } + } else { + panic!("expected Continue (with error output), got {decision:?}"); + } +} + +/// When multiple tools are called and some succeed while others fail, +/// ALL results are returned — successes with their output, failures with +/// error JSON. The model gets partial results and decides what to do. +#[tokio::test] +async fn test_mixed_success_and_failure() { + let output = vec![ + make_function_call("good_tool", "{}", "call_1"), + make_function_call("bad_tool", "{}", "call_2"), + ]; + let mcp = MockMcp::new(&[("good_tool", "success result")]); + // bad_tool not in MockMcp map → returns InvalidRequest error + let ctx = tool_ctx_with_mcp(mcp); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 2); + let results: std::collections::HashMap<_, _> = items + .iter() + .filter_map(|item| match item { + InputItem::FunctionCallOutput(r) => Some((r.call_id.as_str(), r.output.as_str())), + _ => None, + }) + .collect(); + assert_eq!(results["call_1"], "success result"); + assert!(results["call_2"].contains("error")); + } else { + panic!("expected Continue"); + } +} + +/// When output contains both Message and FunctionCall items, only the +/// FunctionCall items are dispatched. Messages are ignored by dispatch +/// (they're part of the response, not actionable tool calls). +#[tokio::test] +async fn test_function_call_mixed_with_message_output() { + let output = vec![ + make_message("Let me check the weather"), + make_function_call("get_weather", r#"{"city":"NYC"}"#, "call_1"), + ]; + let mcp = MockMcp::new(&[("get_weather", "rainy")]); + let ctx = tool_ctx_with_mcp(mcp); + + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + + if let LoopDecision::Continue(items) = decision { + assert_eq!(items.len(), 1); + if let InputItem::FunctionCallOutput(result) = &items[0] { + assert_eq!(result.call_id, "call_1"); + assert_eq!(result.output, "rainy"); + } else { + panic!("expected FunctionCallOutput"); + } + } else { + panic!("expected Continue"); + } +} + +/// Boundary test: iteration=0 with max_iterations=1 should execute (0 < 1). +/// iteration=1 with max_iterations=1 should return Incomplete (1 >= 1). +/// Verifies the >= comparison is correct. +#[tokio::test] +async fn test_iteration_zero_under_max_executes() { + let output = vec![make_function_call("tool", "{}", "call_1")]; + let mcp = MockMcp::new(&[("tool", "ok")]); + let ctx = ToolContext { + mcp: Some(Arc::new(mcp)), + max_iterations: 1, + ..ToolContext::default() + }; + + // iteration=0, max=1 → 0 < 1 → should execute + let decision = dispatch_tools(&output, &ctx, 0).await.unwrap(); + assert!(matches!(decision, LoopDecision::Continue(_))); + + // iteration=1, max=1 → 1 >= 1 → should be Incomplete + let mcp2 = MockMcp::new(&[("tool", "ok")]); + let ctx2 = ToolContext { + mcp: Some(Arc::new(mcp2)), + max_iterations: 1, + ..ToolContext::default() + }; + let decision2 = dispatch_tools(&output, &ctx2, 1).await.unwrap(); + assert!(matches!(decision2, LoopDecision::Incomplete(_))); +} diff --git a/crates/agentic-core/tests/execute_loop_test.rs b/crates/agentic-core/tests/execute_loop_test.rs new file mode 100644 index 0000000..db5e6cb --- /dev/null +++ b/crates/agentic-core/tests/execute_loop_test.rs @@ -0,0 +1,979 @@ +// Integration tests for execute_loop — the agentic loop orchestrator. +#![allow(clippy::doc_markdown)] + +mod support; + +use serde::Deserialize; + +use std::sync::Arc; + +use agentic_core::executor::{ExecutionContext, ExecutorError, ToolContext, execute_loop}; +use agentic_core::storage::{ConversationStore, ResponseStore}; +use agentic_core::tools::McpToolExecutor; +use agentic_core::types::io::{ResponsesInput, ToolChoice}; +use agentic_core::types::request_response::RequestPayload; +use support::{MockResponse, MockServer, setup_pool}; + +use std::future::Future; +use std::pin::Pin; + +// --- Mock tool implementations --- + +/// Returns a configured response for any tool call. +struct MockMcp { + response: String, +} + +impl MockMcp { + fn new(response: &str) -> Self { + Self { + response: response.to_string(), + } + } +} + +impl McpToolExecutor for MockMcp { + fn execute( + &self, + _tool_name: &str, + _arguments: &str, + _server_config: &serde_json::Value, + ) -> Pin> + Send + '_>> { + Box::pin(async { Ok(self.response.clone()) }) + } +} + +/// Always fails — simulates a crashed tool provider. +struct FailingMcp; + +impl McpToolExecutor for FailingMcp { + fn execute( + &self, + _tool_name: &str, + _arguments: &str, + _server_config: &serde_json::Value, + ) -> Pin> + Send + '_>> { + Box::pin(async { Err(ExecutorError::StreamError("MCP server unreachable".into())) }) + } +} + +// --- Helpers --- + +fn make_request(input: &str, stream: bool, store: bool) -> RequestPayload { + RequestPayload { + model: "test-model".to_string(), + input: ResponsesInput::Text(input.to_string()), + instructions: None, + previous_response_id: None, + conversation_id: None, + tools: None, + tool_choice: ToolChoice::Auto, + stream, + store, + include: None, + temperature: None, + top_p: None, + max_output_tokens: None, + truncation: None, + metadata: None, + } +} + +/// Build an ExecutionContext backed by a mock LLM server. +async fn build_exec_ctx(server: &MockServer) -> Arc { + let pool = setup_pool().await; + Arc::new(ExecutionContext::new( + agentic_core::executor::ConversationHandler::new(ConversationStore::new(pool.clone())), + agentic_core::executor::ResponseHandler::new(ResponseStore::new(pool)), + Arc::new(reqwest::Client::new()), + server.url().to_string(), + None, + )) +} + +/// Create a mock LLM response that contains only a text message. +fn text_llm_response(text: &str) -> MockResponse { + MockResponse::Json( + serde_json::json!({ + "id": "resp_mock_text", + "object": "response", + "created_at": 0, + "model": "test-model", + "status": "completed", + "output": [{ + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": text, "annotations": []}] + }], + "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15} + }) + .to_string(), + ) +} + +/// Create a mock LLM response that contains a function_call output item. +fn function_call_llm_response(name: &str, args: &str, call_id: &str) -> MockResponse { + MockResponse::Json( + serde_json::json!({ + "id": "resp_mock_fc", + "object": "response", + "created_at": 0, + "model": "test-model", + "status": "completed", + "output": [{ + "id": format!("fc_{call_id}"), + "type": "function_call", + "call_id": call_id, + "name": name, + "arguments": args, + "status": "completed" + }], + "usage": {"input_tokens": 10, "output_tokens": 8, "total_tokens": 18} + }) + .to_string(), + ) +} + +/// Create a mock LLM response with multiple function calls (parallel tool use). +fn parallel_function_calls_response() -> MockResponse { + MockResponse::Json( + serde_json::json!({ + "id": "resp_mock_parallel", + "object": "response", + "created_at": 0, + "model": "test-model", + "status": "completed", + "output": [ + { + "id": "fc_1", + "type": "function_call", + "call_id": "call_weather", + "name": "get_weather", + "arguments": "{\"city\":\"SF\"}", + "status": "completed" + }, + { + "id": "fc_2", + "type": "function_call", + "call_id": "call_time", + "name": "get_time", + "arguments": "{\"tz\":\"PST\"}", + "status": "completed" + } + ], + "usage": {"input_tokens": 10, "output_tokens": 12, "total_tokens": 22} + }) + .to_string(), + ) +} + +/// Create a mock LLM response with both a message AND a function call. +fn mixed_message_and_function_call_response() -> MockResponse { + MockResponse::Json( + serde_json::json!({ + "id": "resp_mock_mixed", + "object": "response", + "created_at": 0, + "model": "test-model", + "status": "completed", + "output": [ + { + "id": "msg_1", + "type": "message", + "role": "assistant", + "status": "completed", + "content": [{"type": "output_text", "text": "Let me check that for you.", "annotations": []}] + }, + { + "id": "fc_1", + "type": "function_call", + "call_id": "call_1", + "name": "lookup", + "arguments": "{\"q\":\"test\"}", + "status": "completed" + } + ], + "usage": {"input_tokens": 10, "output_tokens": 15, "total_tokens": 25} + }) + .to_string(), + ) +} + +// --- P0: Streaming rejection --- + +/// execute_loop only supports non-streaming (MVP). Passing stream=true should +/// return an immediate error without making any LLM calls. +#[tokio::test] +async fn test_rejects_streaming_request() { + let server = MockServer::start_deque(vec![text_llm_response("should not reach")]).await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext::default(); + + let request = make_request("hello", true, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await; + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!( + err.to_string().contains("streaming"), + "error should mention streaming: {err}" + ); + + // Verify no LLM call was made + assert_eq!(server.request_bodies().await.len(), 0); +} + +// --- P1: No tools → single iteration → Done --- + +/// When the model responds with only text (no FunctionCall items), execute_loop +/// should return immediately after one iteration without re-entering. +#[tokio::test] +async fn test_no_tool_calls_returns_directly() { + let server = MockServer::start_deque(vec![text_llm_response("Hello world")]).await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext::default(); + + let request = make_request("hi", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 1, "should call LLM exactly once"); +} + +/// When store=true and no tool calls, the response should be persisted. +/// Verify by checking the DB has a response record after the loop. +#[tokio::test] +async fn test_no_tool_calls_persists_when_store_true() { + let server = MockServer::start_deque(vec![text_llm_response("Persisted response")]).await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext::default(); + + let request = make_request("save me", false, true); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + // If persist failed, we'd see a warning but not an error — the function still succeeds +} + +// --- P1: Tool call → re-enter → text response --- + +/// Model calls a tool on first iteration, gets result, produces text on second. +/// This is the core agentic loop path. +#[tokio::test] +async fn test_one_tool_call_then_text_response() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("get_weather", r#"{"city":"SF"}"#, "call_1"), + text_llm_response("The weather in SF is sunny, 72°F"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new(r#"{"temp":72,"condition":"sunny"}"#))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("What's the weather in SF?", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + // Final response is the text from iteration 2 + assert_eq!(result.status, "completed"); + + // LLM was called twice: once for initial request, once with tool results + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 2, "should call LLM exactly twice"); + + // Second request should contain the tool result in its input + let second_body = &bodies[1]; + let input_str = serde_json::to_string(&second_body["input"]).unwrap(); + assert!( + input_str.contains("function_call_output"), + "second request should have tool result: {input_str}" + ); + assert!(input_str.contains("call_1"), "should reference the original call_id"); +} + +/// Model calls two tools in parallel, gets both results, produces final text. +#[tokio::test] +async fn test_parallel_tool_calls_then_text() { + let server = MockServer::start_deque(vec![ + parallel_function_calls_response(), + text_llm_response("Weather is sunny and time is 10:30 AM"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("tool result"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("weather and time?", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 2); + + // Second request input should have 2 function_call_output items + let bodies = server.request_bodies().await; + let input_str = serde_json::to_string(&bodies[1]["input"]).unwrap(); + assert!(input_str.contains("call_weather"), "should have weather result"); + assert!(input_str.contains("call_time"), "should have time result"); +} + +/// Mixed output: message + function_call. Only the function_call triggers dispatch. +/// On second iteration, model returns final text. +#[tokio::test] +async fn test_mixed_message_and_function_call() { + let server = MockServer::start_deque(vec![ + mixed_message_and_function_call_response(), + text_llm_response("Here's what I found."), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("lookup result"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("find something", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 2); +} + +// --- P1: Max iterations --- + +/// When the model keeps returning tool calls and max_iterations is hit, +/// execute_loop should stop and return the last payload (not error). +#[tokio::test] +async fn test_max_iterations_stops_loop() { + // LLM always returns a function call — would loop forever without max_iterations + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + function_call_llm_response("tool", "{}", "c2"), + function_call_llm_response("tool", "{}", "c3"), + text_llm_response("should not reach this"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 2, // will stop after 2 iterations + ..ToolContext::default() + }; + + let request = make_request("loop forever", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + // Should have called LLM 3 times (iteration 0, 1, 2) then stopped at dispatch + // iteration 0: execute → FC → dispatch(iter=0) → Continue + // iteration 1: execute → FC → dispatch(iter=1) → Continue + // iteration 2: execute → FC → dispatch(iter=2) → Incomplete (2 >= 2) + assert_eq!(server.request_bodies().await.len(), 3); + + // Returns the last payload (the one from iteration 2) + assert_eq!( + result.status, "incomplete", + "should be marked incomplete when max iterations hit" + ); +} + +/// max_iterations=1 means only 1 tool dispatch is allowed. +#[tokio::test] +async fn test_max_iterations_one_allows_single_dispatch() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + function_call_llm_response("tool", "{}", "c2"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 1, + ..ToolContext::default() + }; + + let request = make_request("once", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + // iteration 0: execute → FC → dispatch(iter=0) → Continue (0 < 1) + // iteration 1: execute → FC → dispatch(iter=1) → Incomplete (1 >= 1) + assert_eq!(server.request_bodies().await.len(), 2); + assert_eq!( + result.status, "incomplete", + "should be marked incomplete when max iterations hit" + ); +} + +// --- P1: Tool failure doesn't kill the loop --- + +/// When a tool provider fails, the error becomes output that the model sees. +/// The loop continues and the model responds to the error gracefully. +#[tokio::test] +async fn test_tool_failure_feeds_error_to_model() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("broken_tool", "{}", "call_err"), + text_llm_response("Sorry, the tool failed. Here's what I know..."), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(FailingMcp)), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("try the broken tool", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 2); + + // The second request should contain the error string as tool output + let bodies = server.request_bodies().await; + let input_str = serde_json::to_string(&bodies[1]["input"]).unwrap(); + assert!(input_str.contains("error"), "should contain error output: {input_str}"); + assert!( + input_str.contains("MCP server unreachable"), + "should contain error message" + ); +} + +// --- Edge cases --- + +/// No tool providers configured at all — calls still produce error output +/// and the model handles it on the next iteration. +#[tokio::test] +async fn test_no_providers_configured() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("any_tool", "{}", "call_1"), + text_llm_response("I can't use tools right now"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + max_iterations: 10, + ..ToolContext::default() // no providers + }; + + let request = make_request("use a tool", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 2); + + // Error message about no provider should be in the second request + let bodies = server.request_bodies().await; + let input_str = serde_json::to_string(&bodies[1]["input"]).unwrap(); + assert!(input_str.contains("no tool provider configured")); +} + +/// Empty model output (no message, no function calls) — should return Done. +#[tokio::test] +async fn test_empty_model_output() { + let empty_response = MockResponse::Json( + serde_json::json!({ + "id": "resp_empty", + "object": "response", + "created_at": 0, + "model": "test-model", + "status": "completed", + "output": [], + "usage": null + }) + .to_string(), + ); + + let server = MockServer::start_deque(vec![empty_response]).await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext::default(); + + let request = make_request("silence", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert!(result.output.is_empty()); + assert_eq!(server.request_bodies().await.len(), 1); +} + +/// Multi-hop: model calls tool A, then uses result to call tool B, then responds. +/// Tests 3 iterations of the loop. +#[tokio::test] +async fn test_multi_hop_tool_calls() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("search", "cats", "call_search"), + function_call_llm_response("summarize", "cat article text", "call_summarize"), + text_llm_response("Cats are wonderful pets."), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("tool output"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("tell me about cats", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(server.request_bodies().await.len(), 3, "should make 3 LLM calls"); +} + +/// LLM returns an error (non-2xx) — execute_loop should propagate the error. +#[tokio::test] +async fn test_llm_returns_error() { + // The current MockServer always returns 200, so we use an empty queue + // which causes "mock queue exhausted" panic — simulating server failure. + let server = MockServer::start_deque(vec![]).await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext::default(); + + let request = make_request("fail", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await; + + // Should propagate the error (mock queue exhausted = panic in mock, caught as error) + // In practice this tests that execute_loop doesn't swallow execute() errors + assert!(result.is_err(), "should propagate LLM error"); +} + +// --- P2: Cassette-driven integration test (real vLLM output) --- + +const TOOL_LOOP_CASSETTE: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/tests/cassettes/tool_loop"); + +#[derive(Deserialize)] +struct ToolLoopCassette { + turns: Vec, + expected: CassetteExpected, + tool_mock: std::collections::HashMap, +} + +#[derive(Deserialize)] +struct CassetteTurn { + response: CassetteTurnResponse, +} + +#[derive(Deserialize)] +struct CassetteTurnResponse { + body: String, +} + +#[derive(Deserialize)] +struct CassetteExpected { + iterations: usize, + final_text: String, +} + +/// Replays a recorded vLLM tool-call session through execute_loop. +/// Validates the loop produces the same final text as the real model. +#[tokio::test] +async fn test_cassette_tool_loop_vllm_gemma4() { + let path = format!("{TOOL_LOOP_CASSETTE}/function-call-loop-vllm-gemma4.yaml"); + let text = std::fs::read_to_string(&path).unwrap(); + let cassette: ToolLoopCassette = serde_yml::from_str(&text).unwrap(); + + // Build mock server with the recorded responses queued + let responses: Vec = cassette + .turns + .iter() + .map(|t| MockResponse::Json(t.response.body.clone())) + .collect(); + let server = MockServer::start_deque(responses).await; + let exec_ctx = build_exec_ctx(&server).await; + + // Build ToolContext with mock that returns the cassette's tool_mock value + let tool_response = cassette.tool_mock.get("get_weather").cloned().unwrap_or_default(); + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new(&tool_response))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("What is the weather in San Francisco?", false, false); + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + // Verify the loop ran the expected number of iterations + let bodies = server.request_bodies().await; + assert_eq!( + bodies.len(), + cassette.expected.iterations, + "expected {} LLM calls, got {}", + cassette.expected.iterations, + bodies.len() + ); + + // Verify final response contains the expected text + assert_eq!(result.status, "completed"); + let output_text: String = result + .output + .iter() + .filter_map(|item| match item { + agentic_core::types::io::OutputItem::Message(msg) => { + Some(msg.content.iter().map(|c| c.text.as_str()).collect::()) + } + _ => None, + }) + .collect(); + assert_eq!(output_text, cassette.expected.final_text); +} + +// --- Additional coverage tests --- + +/// When the request already has Items input (not Text), the loop should +/// extend the existing items with tool results on Continue. +#[tokio::test] +async fn test_items_input_extended_correctly() { + use agentic_core::types::io::{InputItem, InputMessage, InputMessageContent}; + + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "call_1"), + text_llm_response("done"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("result"))), + max_iterations: 10, + ..ToolContext::default() + }; + + // Start with Items input (not Text) + let request = RequestPayload { + model: "test-model".to_string(), + input: ResponsesInput::Items(vec![InputItem::Message(InputMessage { + role: "user".into(), + content: InputMessageContent::Text("hello from items".into()), + })]), + instructions: None, + previous_response_id: None, + conversation_id: None, + tools: None, + tool_choice: ToolChoice::Auto, + stream: false, + store: false, + include: None, + temperature: None, + top_p: None, + max_output_tokens: None, + truncation: None, + metadata: None, + }; + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + assert_eq!(result.status, "completed"); + + // Second request should have: original item + tool result + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 2); + let second_input = &bodies[1]["input"]; + let input_array = second_input.as_array().unwrap(); + // Should have at least 2 items: original message + function_call_output + assert!( + input_array.len() >= 2, + "expected at least 2 items, got {}", + input_array.len() + ); +} + +/// Verify that previous_response_id=None on input produces None on output payload. +/// (Testing with Some(id) requires a seeded DB — deferred to full integration test.) +#[tokio::test] +async fn test_previous_response_id_none_preserved() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + text_llm_response("done"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = make_request("test", false, false); + // request.previous_response_id is None + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + // After the loop mutated previous_response_id internally (set to None), + // the original value (None) should be restored on the payload. + assert_eq!( + result.previous_response_id, None, + "should preserve original None previous_response_id" + ); +} + +// --- P2: Persistence trigger suppression & ID restoration tests --- +// +// These tests verify the critical invariants from Section 14 of the design doc: +// - Internal LLM calls have store=false, prev_resp_id=null, conv_id=null +// - Original prev_resp_id and conv_id are restored on the returned payload +// - Both Done and Incomplete exit paths restore correctly + +/// previous_response_id=Some is captured before the loop and restored on Done. +#[tokio::test] +async fn test_previous_response_id_some_restored_on_done() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + text_llm_response("final answer"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("tool result"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let mut request = make_request("test", false, false); + request.previous_response_id = Some("resp_original_123".to_string()); + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!( + result.previous_response_id, + Some("resp_original_123".to_string()), + "previous_response_id must be restored from pre-loop capture" + ); + + // Verify internal LLM calls had prev_resp_id cleared + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 2); + for (i, body) in bodies.iter().enumerate() { + assert!( + body["previous_response_id"].is_null(), + "internal LLM call {i} should have previous_response_id=null, got: {}", + body["previous_response_id"] + ); + } +} + +/// conversation_id=Some is captured before the loop and restored on Done. +#[tokio::test] +async fn test_conversation_id_restored_on_done() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + text_llm_response("done"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let mut request = make_request("test", false, false); + request.conversation_id = Some("conv_abc_456".to_string()); + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!( + result.conversation_id, + Some("conv_abc_456".to_string()), + "conversation_id must be restored from pre-loop capture" + ); + + // Verify internal LLM calls had conv_id cleared + let bodies = server.request_bodies().await; + for (i, body) in bodies.iter().enumerate() { + assert!( + body["conversation_id"].is_null(), + "internal LLM call {i} should have conversation_id=null, got: {}", + body["conversation_id"] + ); + } +} + +/// store=true is suppressed to false for ALL internal iterations. +#[tokio::test] +async fn test_store_suppressed_in_internal_iterations() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + text_llm_response("stored result"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let mut request = make_request("test", false, true); // store=true + request.store = true; + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + assert_eq!(result.status, "completed"); + + // Both internal LLM calls should have store=false (or absent/null — serde skips false) + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 2); + for (i, body) in bodies.iter().enumerate() { + let store_val = &body["store"]; + assert!( + store_val.is_null() || store_val == false, + "internal LLM call {i} should have store=false/absent, got: {store_val}" + ); + } +} + +/// All three persistence triggers cleared in internal iterations: combined scenario. +/// Request: store=true, prev_resp_id=Some, conv_id=Some +/// Assert: all internal calls have store=false, both IDs null +/// Assert: returned payload has both IDs restored +#[tokio::test] +async fn test_all_persistence_triggers_cleared_internally() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("search", "{\"q\":\"rust\"}", "call_s"), + function_call_llm_response("summarize", "{\"t\":\"text\"}", "call_sum"), + text_llm_response("final summary"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("tool output"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let mut request = make_request("multi-hop with IDs", false, true); + request.previous_response_id = Some("resp_prev_999".to_string()); + request.conversation_id = Some("conv_session_42".to_string()); + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(result.previous_response_id, Some("resp_prev_999".to_string()),); + assert_eq!(result.conversation_id, Some("conv_session_42".to_string()),); + + // All 3 internal LLM calls should have all persistence triggers cleared + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 3, "3 iterations: FC → FC → text"); + for (i, body) in bodies.iter().enumerate() { + let store_val = &body["store"]; + assert!( + store_val.is_null() || store_val == false, + "call {i}: store should be false/absent, got: {store_val}" + ); + assert!( + body["previous_response_id"].is_null(), + "call {i}: previous_response_id should be null, got: {}", + body["previous_response_id"] + ); + assert!( + body["conversation_id"].is_null(), + "call {i}: conversation_id should be null, got: {}", + body["conversation_id"] + ); + } +} + +/// Incomplete path (max_iterations hit) also restores both IDs correctly. +#[tokio::test] +async fn test_incomplete_restores_both_ids() { + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + function_call_llm_response("tool", "{}", "c2"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("ok"))), + max_iterations: 1, // stops at iteration 1 + ..ToolContext::default() + }; + + let mut request = make_request("will be incomplete", false, false); + request.previous_response_id = Some("resp_incomplete_orig".to_string()); + request.conversation_id = Some("conv_incomplete_orig".to_string()); + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "incomplete"); + assert_eq!( + result.previous_response_id, + Some("resp_incomplete_orig".to_string()), + "Incomplete path must restore previous_response_id" + ); + assert_eq!( + result.conversation_id, + Some("conv_incomplete_orig".to_string()), + "Incomplete path must restore conversation_id" + ); + + // Verify incomplete_details has a reason + assert!(result.incomplete_details.is_some(), "incomplete should have details"); + let reason = result.incomplete_details.unwrap().reason.unwrap(); + assert!( + reason.contains("max tool iterations"), + "reason should mention max iterations: {reason}" + ); +} + +/// Input already as Items + conversation_id set: verify in-place extend works +/// and conversation_id appears on the output. +#[tokio::test] +async fn test_items_input_with_conversation_id() { + use agentic_core::types::io::{InputItem, InputMessage, InputMessageContent}; + + let server = MockServer::start_deque(vec![ + function_call_llm_response("tool", "{}", "c1"), + text_llm_response("done with items+conv"), + ]) + .await; + let exec_ctx = build_exec_ctx(&server).await; + let tool_ctx = ToolContext { + mcp: Some(Arc::new(MockMcp::new("result"))), + max_iterations: 10, + ..ToolContext::default() + }; + + let request = RequestPayload { + model: "test-model".to_string(), + input: ResponsesInput::Items(vec![InputItem::Message(InputMessage { + role: "user".into(), + content: InputMessageContent::Text("hello from items".into()), + })]), + instructions: None, + previous_response_id: Some("resp_items_prev".to_string()), + conversation_id: Some("conv_items_session".to_string()), + tools: None, + tool_choice: ToolChoice::Auto, + stream: false, + store: true, + include: None, + temperature: None, + top_p: None, + max_output_tokens: None, + truncation: None, + metadata: None, + }; + + let result = execute_loop(request, exec_ctx, &tool_ctx).await.unwrap(); + + assert_eq!(result.status, "completed"); + assert_eq!(result.previous_response_id, Some("resp_items_prev".to_string())); + assert_eq!(result.conversation_id, Some("conv_items_session".to_string())); + + // Second request should have original item + tool result (Items extended in-place) + let bodies = server.request_bodies().await; + assert_eq!(bodies.len(), 2); + let second_input = bodies[1]["input"].as_array().unwrap(); + assert!( + second_input.len() >= 2, + "should have original item + function_call_output, got {}", + second_input.len() + ); + // All internal calls should have persistence suppressed + assert!(bodies[0]["store"].is_null() || bodies[0]["store"] == false); + assert!(bodies[1]["store"].is_null() || bodies[1]["store"] == false); +}