Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
5a08f64
feat: add tool dispatch layer — ToolContext, traits, and LoopDecision
ashwing Jun 11, 2026
ba9ed87
fix: address review — JSON escaping, default iterations, expect
ashwing Jun 11, 2026
7865964
docs: document failure model and retry policy in execute_all
ashwing Jun 11, 2026
f8310fe
docs: add comments to dispatch tests explaining intent and expectations
ashwing Jun 11, 2026
574c94b
fix: add comment before inner attribute to avoid shebang false positive
ashwing Jun 11, 2026
6379bac
feat: add execute_loop — agentic loop orchestrator
ashwing Jun 12, 2026
84b80c4
test: add execute_loop integration tests with mock LLM
ashwing Jun 12, 2026
bb570e0
test: add cassette-driven execute_loop test from live vLLM
ashwing Jun 12, 2026
9594f09
fix: address review round 1+2 — loop guard, persistence, status
ashwing Jun 12, 2026
b4dc7ec
feat: add tool_timeout + additional test coverage
ashwing Jun 12, 2026
9261e78
feat: add inference timeout to execute_loop
ashwing Jun 12, 2026
13a8812
docs: add detailed code comments to dispatch, tool_context, execute_loop
ashwing Jun 12, 2026
a95b614
fix: suppress persistence on first iteration, populate incomplete_det…
ashwing Jun 12, 2026
bebda59
refactor: address review feedback — imports, in-place mutation, lazy …
ashwing Jun 16, 2026
a075e77
fix: clear all persistence triggers before loop iterations
ashwing Jun 16, 2026
ab7ada6
test: add persistence trigger suppression & ID restoration tests
ashwing Jun 16, 2026
4c4a28e
ci: fix rustfmt formatting in new tests
ashwing Jun 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions crates/agentic-core/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//! Tool dispatch decision logic.
//!
//! This module is STATELESS — it inspects the model's output items, decides
//! whether to execute tools, and returns a decision. It does not manage loop
//! state, persistence, or re-entry. That's `execute_loop`'s job.
//!
//! Decision flow:
//! ```text
//! output items → filter FunctionCall → empty? → Done
//! → iteration >= max? → Incomplete
//! → execute all → Continue(results)
//! ```

use crate::executor::error::ExecutorResult;
use crate::executor::tool_context::ToolContext;
use crate::types::io::{InputItem, OutputItem};

/// Decision returned by [`dispatch_tools`] to drive the agentic loop.
///
/// `#[non_exhaustive]` allows adding variants (e.g. `Partial` for mixed
/// gateway + client tools) without breaking downstream match arms.
#[derive(Debug)]
#[non_exhaustive]
pub enum LoopDecision {
/// Gateway-executed tools returned results. Caller should inject these
/// `InputItem::FunctionCallOutput` items into the request and re-infer.
Continue(Vec<InputItem>),

/// No tool calls found in output, OR only client-side functions present.
/// Caller should return the response to the client as-is.
/// If `FunctionCall` items exist in output with this decision, the response
/// status should be `requires_action` (client executes them externally).
Done,

/// Max iterations reached. The model wanted to call more tools but we're
/// cutting it off to prevent runaway loops. The reason string is included
/// for logging/debugging. Caller should set `payload.status = "incomplete"`.
Incomplete(String),
}

/// Inspect executor output for function calls and dispatch them via [`ToolContext`].
///
/// # Decision Logic
///
/// 1. Filter `OutputItem::FunctionCall` items from `output`
/// 2. If none found → `Done` (model produced only text/messages)
/// 3. If `iteration >= tool_ctx.max_iterations` → `Incomplete` (safety cap)
/// 4. Otherwise → execute all calls via `tool_ctx.execute_all()` → `Continue`
///
/// # MVP Routing
///
/// Currently ALL `FunctionCall` items are treated as gateway-executable (routed
/// to MCP/`web_search`/`vector_store` in priority order). The distinction between
/// client-side functions (`type: "function"` in the request) and gateway-executed
/// tools (`type: "mcp"`) requires access to the request's tools array — deferred
/// to a follow-up PR that changes this function's signature.
///
/// # Error Semantics
///
/// - Individual tool failures → error JSON string in the result (model sees it)
/// - This function only returns `Err` on internal/structural failures
/// - The function itself NEVER panics
///
/// # Errors
///
/// Returns `ExecutorError` only on internal failures (e.g. serialization).
/// Individual tool execution failures are captured as error output strings
/// in the returned `InputItem` list — they do NOT propagate as errors.
pub async fn dispatch_tools(
output: &[OutputItem],
tool_ctx: &ToolContext,
iteration: usize,
) -> ExecutorResult<LoopDecision> {
// Step 1: Extract FunctionCall items lazily — no allocation until we know we need it.
let mut function_calls = output
.iter()
.filter_map(|item| match item {
OutputItem::FunctionCall(fc) => Some(fc),
_ => None,
})
.peekable();

// Step 2: No tool calls → model is done generating.
if function_calls.peek().is_none() {
return Ok(LoopDecision::Done);
}

// Step 3: Safety cap — prevent infinite tool loops.
// This fires BEFORE execution, so no work is wasted on the capped iteration.
if iteration >= tool_ctx.max_iterations {
return Ok(LoopDecision::Incomplete(format!(
"max tool iterations reached ({iteration}/{})",
tool_ctx.max_iterations
)));
}

// Step 4: Only allocate when we'll actually execute.
let calls: Vec<_> = function_calls.collect();
let results = tool_ctx.execute_all(&calls).await;
Ok(LoopDecision::Continue(results))
}
184 changes: 184 additions & 0 deletions crates/agentic-core/src/executor/execute_loop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
//! Agentic loop orchestrator.
//!
//! Composes `execute()` (LLM inference) with `dispatch_tools()` (tool routing)
//! in a loop that continues until the model stops producing tool calls.
//!
//! Architecture:
//! ```text
//! ┌─────────────────────────────────────────────────────────┐
//! │ execute_loop │
//! │ │
//! │ for each iteration: │
//! │ 1. execute(request) → ResponsePayload │
//! │ 2. dispatch_tools(output) → LoopDecision │
//! │ 3. if Continue: inject results, goto 1 │
//! │ if Done/Incomplete: return payload │
//! └─────────────────────────────────────────────────────────┘
//! ```
//!
//! Timeout budget per iteration:
//! - LLM inference: `exec_ctx.streaming_timeout` (default 30s)
//! - Each tool call: `tool_ctx.tool_timeout` (default 30s)
//! - Hard loop cap: `MAX_LOOP_GUARD` (128 iterations)
//! - Soft tool cap: `tool_ctx.max_iterations` (default 10)

use std::sync::Arc;

use either::Either;
use tracing::debug;

use crate::executor::ExecutorError;
use crate::executor::dispatch::{LoopDecision, dispatch_tools};
use crate::executor::engine::execute;
use crate::executor::error::ExecutorResult;
use crate::executor::request::ExecutionContext;
use crate::executor::tool_context::ToolContext;
use crate::types::io::{InputItem, InputMessage, InputMessageContent, ResponsesInput};
use crate::types::request_response::{IncompleteDetails, RequestPayload, ResponsePayload};

/// Defense-in-depth hard cap, independent of `tool_ctx.max_iterations`.
/// Prevents infinite loops even if dispatch logic has a bug.
/// Set high enough to never trigger in normal operation (`max_iterations`=10
/// would stop far earlier), but low enough to catch runaway loops quickly.
const MAX_LOOP_GUARD: usize = 128;

/// Run the agentic loop: execute → dispatch tools → re-enter until done.
///
/// # Contract
///
/// - **Caller provides:** request, execution context (LLM + DB), tool context (providers)
/// - **This function returns:** the final `ResponsePayload` (caller persists it)
/// - **Persistence:** NOT done here. Caller (server handler) owns persistence because
/// it has the full `RequestContext` with correct `new_input_items`. We clear
/// all three persistence triggers (`store`, `previous_response_id`,
/// `conversation_id`) to suppress intermediate `execute()` calls from
/// persisting partial state (PR #56 persists when ANY of the three is set).
/// - **ID restoration:** Both `previous_response_id` and `conversation_id` on the
/// returned payload reflect the ORIGINAL caller-supplied values, not the
/// internal mutations. This is critical for the caller's persist step.
///
/// # Timeouts
///
/// - Each `execute()` call is wrapped in `tokio::time::timeout(exec_ctx.streaming_timeout)`
/// - Each tool call is wrapped in `tokio::time::timeout(tool_ctx.tool_timeout)`
/// - `Duration::ZERO` on either disables that timeout (provider manages its own)
///
/// # Known Limitations (MVP)
///
/// - Non-streaming only. `stream=true` returns an immediate error.
/// - `request.clone()` every iteration is O(n) in accumulated input size.
/// - `InputItem` lacks a `FunctionCall` variant, so the assistant's tool-call
/// items are not injected into context (the model doesn't see its own calls).
/// Follow-up PR needed to add the variant.
///
/// # Errors
///
/// Returns `ExecutorError` if:
/// - LLM inference fails or times out
/// - Tool dispatch encounters a fatal error (individual tool failures are NOT fatal)
/// - `stream=true` is passed
/// - Hard loop guard is breached
pub async fn execute_loop(
mut request: RequestPayload,
exec_ctx: Arc<ExecutionContext>,
tool_ctx: &ToolContext,
) -> ExecutorResult<ResponsePayload> {
let original_previous_response_id = request.previous_response_id.clone();
let original_conversation_id = request.conversation_id.clone();

// Clear all three persistence triggers so intermediate execute() calls
// don't write partial tool-call-only responses to the store.
request.store = false;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this hardcoded false? I see the comment

// Without this, the first iteration would persist a partial response
 // (containing only the tool-call output, not the final answer) to the DB.

could you clarify what do you mean by partial response?

the contract is as following:
The store value from request payload coming in from server initially if is false we are passing into vllm proxy for stateless inference.
if is set to true we would invoke execute_loop which should not set this to false. from second turn onwards if the store=False context ID is given meaning either of (conversation_id or previouse_response_id is present) we need to still persist. if context ID not provided then we proxy to vLLM server stateless.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop suppresses persistence for intermediate iterations — otherwise each internal execute() call would write a partial response (just tool-call output, not the final answer) to the store. We clear all three persistence triggers (store, previous_response_id, conversation_id) because PR #56 persists when any of them is set. The caller (server handler) does the final persist once with the complete response.

Updated the doc comment to make this clearer.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to discuss further, esp. the requirement to persist intermediate tool call/results and the layer responsible to persist.

request.previous_response_id = None;
request.conversation_id = None;

for iteration in 0_usize.. {
// Defense-in-depth: even if dispatch_tools has a bug that never returns
// Incomplete, we won't loop forever.
if iteration >= MAX_LOOP_GUARD {
return Err(ExecutorError::InvalidRequest(format!(
"execute_loop exceeded hard iteration cap ({MAX_LOOP_GUARD})"
)));
}

debug!(iteration, "execute_loop iteration");

// --- Step 1: Call the LLM ---
// Timeout prevents hanging on unresponsive LLM backends.
// Duration::ZERO = no timeout (provider/reqwest manages its own).
let inference_timeout = exec_ctx.streaming_timeout;
let result = if inference_timeout.is_zero() {
execute(request.clone(), Arc::clone(&exec_ctx)).await?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid frequent request.clone() let's change the execute() signature to borrow the RequestPayload execute(request: &RequestPayload,...)
this change would require also change in rehydrate_conversation(request: &RequestPayload, ...)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this is the right long-term fix. The request.clone() is still there because execute() takes ownership today (it passes the payload into rehydrate_conversation which moves it). Changing that signature touches engine.rs internals (rehydrate, inject_ids, persist) which are outside our PR scope.

I'll open a follow-up to refactor execute() to borrow — it'll be a clean diff once this lands. For now the clone cost is bounded by the input size which doesn't grow per-iteration anymore (tool results are just appended, not cloned on each loop).

} else {
tokio::time::timeout(inference_timeout, execute(request.clone(), Arc::clone(&exec_ctx)))
.await
.map_err(|_| {
ExecutorError::StreamError(format!(
"LLM inference timed out after {inference_timeout:?} on iteration {iteration}"
))
})??
};

// execute() returns Either<ResponsePayload, BoxStream>.
// We only support non-streaming in execute_loop (streaming requires StreamTee).
let mut payload = match result {
Either::Left(payload) => payload,
Either::Right(_stream) => {
return Err(ExecutorError::InvalidRequest(
"execute_loop does not support streaming yet — set stream=false".into(),
));
}
};

// --- Step 2: Inspect output for tool calls ---
// dispatch_tools filters FunctionCall items, executes them via ToolContext,
// and returns a decision: Continue (with results), Done, or Incomplete.
let decision = dispatch_tools(&payload.output, tool_ctx, iteration).await?;

match decision {
// No tool calls (or only client-side functions) — we're done.
LoopDecision::Done => {
payload.previous_response_id = original_previous_response_id;
payload.conversation_id = original_conversation_id;
return Ok(payload);
}
// Hit max_iterations — stop looping, mark as incomplete.
// The model may have wanted to call more tools, but we're cutting it off.
// Attach the reason to incomplete_details so the client knows why.
LoopDecision::Incomplete(reason) => {
debug!(iteration, %reason, "loop incomplete");
payload.status = "incomplete".to_string();
payload.incomplete_details = Some(IncompleteDetails { reason: Some(reason) });
payload.previous_response_id = original_previous_response_id;
payload.conversation_id = original_conversation_id;
return Ok(payload);
}
// Tools were executed — inject results and re-enter inference.
LoopDecision::Continue(tool_results) => {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each iteration: clone → extend → reassign. On iteration 2, you clone the vec that already contains iteration 1's results. On iteration 3, you clone a vec with iterations 1+2's results. The allocation grows linearly and is thrown away each time.

The in-place alternative:

LoopDecision::Continue(tool_results) => {
    // Convert Text → Items on first tool call, then just extend in-place
    if let ResponsesInput::Text(t) = &request.input {
        request.input = ResponsesInput::Items(vec![InputItem::Message(...)]);
    }
    if let ResponsesInput::Items(ref mut v) = request.input {
        v.extend(tool_results);   // ← no clone, mutates the existing Vec
    }
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — now mutates request.input in-place. Text gets converted to Items once on the first tool call, then subsequent iterations just reserve+extend on the same vec. No cloning.

debug!(
iteration,
results = tool_results.len(),
"tool results received, re-entering"
);

// Convert text input to structured items on first tool call,
// then extend in-place on subsequent iterations (no clone).
if let ResponsesInput::Text(t) = &request.input {
let msg = InputItem::Message(InputMessage {
role: "user".into(),
content: InputMessageContent::Text(t.clone()),
});
request.input = ResponsesInput::Items(vec![msg]);
}
if let ResponsesInput::Items(ref mut items) = request.input {
// TODO: also inject assistant's FunctionCall output items here
// (requires InputItem::FunctionCall variant — follow-up PR)
items.reserve(tool_results.len());
items.extend(tool_results);
}
}
}
}

unreachable!("loop exits via return in Done/Incomplete/guard arms")
}
6 changes: 6 additions & 0 deletions crates/agentic-core/src/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
//! Agentic loop executor.

pub mod accumulator;
pub mod dispatch;
pub mod engine;
pub mod error;
pub mod execute_loop;
pub mod modes;
pub mod request;
pub mod tool_context;

pub use dispatch::{LoopDecision, dispatch_tools};
pub use engine::{BoxStream, call_inference, create_conversation, execute, persist_response, rehydrate_conversation};
pub use error::{ExecutorError, ExecutorResult};
pub use execute_loop::execute_loop;
pub use modes::{ConversationHandler, ResponseHandler};
pub use request::ExecutionContext;
pub use request::RequestContext;
pub use tool_context::ToolContext;
Loading