-
Notifications
You must be signed in to change notification settings - Fork 14
feat: add tool dispatch layer — ToolContext, traits, and LoopDecision #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5a08f64
ba9ed87
7865964
f8310fe
574c94b
6379bac
84b80c4
bb570e0
9594f09
b4dc7ec
9261e78
13a8812
a95b614
bebda59
a075e77
ab7ada6
4c4a28e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| //! Tool dispatch decision logic. | ||
| //! | ||
| //! This module is STATELESS — it inspects the model's output items, decides | ||
| //! whether to execute tools, and returns a decision. It does not manage loop | ||
| //! state, persistence, or re-entry. That's `execute_loop`'s job. | ||
| //! | ||
| //! Decision flow: | ||
| //! ```text | ||
| //! output items → filter FunctionCall → empty? → Done | ||
| //! → iteration >= max? → Incomplete | ||
| //! → execute all → Continue(results) | ||
| //! ``` | ||
|
|
||
| use crate::executor::error::ExecutorResult; | ||
| use crate::executor::tool_context::ToolContext; | ||
| use crate::types::io::{InputItem, OutputItem}; | ||
|
|
||
| /// Decision returned by [`dispatch_tools`] to drive the agentic loop. | ||
| /// | ||
| /// `#[non_exhaustive]` allows adding variants (e.g. `Partial` for mixed | ||
| /// gateway + client tools) without breaking downstream match arms. | ||
| #[derive(Debug)] | ||
| #[non_exhaustive] | ||
| pub enum LoopDecision { | ||
| /// Gateway-executed tools returned results. Caller should inject these | ||
| /// `InputItem::FunctionCallOutput` items into the request and re-infer. | ||
| Continue(Vec<InputItem>), | ||
|
|
||
| /// No tool calls found in output, OR only client-side functions present. | ||
| /// Caller should return the response to the client as-is. | ||
| /// If `FunctionCall` items exist in output with this decision, the response | ||
| /// status should be `requires_action` (client executes them externally). | ||
| Done, | ||
|
|
||
| /// Max iterations reached. The model wanted to call more tools but we're | ||
| /// cutting it off to prevent runaway loops. The reason string is included | ||
| /// for logging/debugging. Caller should set `payload.status = "incomplete"`. | ||
| Incomplete(String), | ||
| } | ||
|
|
||
| /// Inspect executor output for function calls and dispatch them via [`ToolContext`]. | ||
| /// | ||
| /// # Decision Logic | ||
| /// | ||
| /// 1. Filter `OutputItem::FunctionCall` items from `output` | ||
| /// 2. If none found → `Done` (model produced only text/messages) | ||
| /// 3. If `iteration >= tool_ctx.max_iterations` → `Incomplete` (safety cap) | ||
| /// 4. Otherwise → execute all calls via `tool_ctx.execute_all()` → `Continue` | ||
| /// | ||
| /// # MVP Routing | ||
| /// | ||
| /// Currently ALL `FunctionCall` items are treated as gateway-executable (routed | ||
| /// to MCP/`web_search`/`vector_store` in priority order). The distinction between | ||
| /// client-side functions (`type: "function"` in the request) and gateway-executed | ||
| /// tools (`type: "mcp"`) requires access to the request's tools array — deferred | ||
| /// to a follow-up PR that changes this function's signature. | ||
| /// | ||
| /// # Error Semantics | ||
| /// | ||
| /// - Individual tool failures → error JSON string in the result (model sees it) | ||
| /// - This function only returns `Err` on internal/structural failures | ||
| /// - The function itself NEVER panics | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns `ExecutorError` only on internal failures (e.g. serialization). | ||
| /// Individual tool execution failures are captured as error output strings | ||
| /// in the returned `InputItem` list — they do NOT propagate as errors. | ||
| pub async fn dispatch_tools( | ||
| output: &[OutputItem], | ||
| tool_ctx: &ToolContext, | ||
| iteration: usize, | ||
| ) -> ExecutorResult<LoopDecision> { | ||
| // Step 1: Extract FunctionCall items lazily — no allocation until we know we need it. | ||
| let mut function_calls = output | ||
| .iter() | ||
| .filter_map(|item| match item { | ||
| OutputItem::FunctionCall(fc) => Some(fc), | ||
| _ => None, | ||
| }) | ||
| .peekable(); | ||
|
|
||
| // Step 2: No tool calls → model is done generating. | ||
| if function_calls.peek().is_none() { | ||
| return Ok(LoopDecision::Done); | ||
| } | ||
|
|
||
| // Step 3: Safety cap — prevent infinite tool loops. | ||
| // This fires BEFORE execution, so no work is wasted on the capped iteration. | ||
| if iteration >= tool_ctx.max_iterations { | ||
| return Ok(LoopDecision::Incomplete(format!( | ||
| "max tool iterations reached ({iteration}/{})", | ||
| tool_ctx.max_iterations | ||
| ))); | ||
| } | ||
|
|
||
| // Step 4: Only allocate when we'll actually execute. | ||
| let calls: Vec<_> = function_calls.collect(); | ||
| let results = tool_ctx.execute_all(&calls).await; | ||
| Ok(LoopDecision::Continue(results)) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,184 @@ | ||
| //! Agentic loop orchestrator. | ||
| //! | ||
| //! Composes `execute()` (LLM inference) with `dispatch_tools()` (tool routing) | ||
| //! in a loop that continues until the model stops producing tool calls. | ||
| //! | ||
| //! Architecture: | ||
| //! ```text | ||
| //! ┌─────────────────────────────────────────────────────────┐ | ||
| //! │ execute_loop │ | ||
| //! │ │ | ||
| //! │ for each iteration: │ | ||
| //! │ 1. execute(request) → ResponsePayload │ | ||
| //! │ 2. dispatch_tools(output) → LoopDecision │ | ||
| //! │ 3. if Continue: inject results, goto 1 │ | ||
| //! │ if Done/Incomplete: return payload │ | ||
| //! └─────────────────────────────────────────────────────────┘ | ||
| //! ``` | ||
| //! | ||
| //! Timeout budget per iteration: | ||
| //! - LLM inference: `exec_ctx.streaming_timeout` (default 30s) | ||
| //! - Each tool call: `tool_ctx.tool_timeout` (default 30s) | ||
| //! - Hard loop cap: `MAX_LOOP_GUARD` (128 iterations) | ||
| //! - Soft tool cap: `tool_ctx.max_iterations` (default 10) | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| use either::Either; | ||
| use tracing::debug; | ||
|
|
||
| use crate::executor::ExecutorError; | ||
| use crate::executor::dispatch::{LoopDecision, dispatch_tools}; | ||
| use crate::executor::engine::execute; | ||
| use crate::executor::error::ExecutorResult; | ||
| use crate::executor::request::ExecutionContext; | ||
| use crate::executor::tool_context::ToolContext; | ||
| use crate::types::io::{InputItem, InputMessage, InputMessageContent, ResponsesInput}; | ||
| use crate::types::request_response::{IncompleteDetails, RequestPayload, ResponsePayload}; | ||
|
|
||
| /// Defense-in-depth hard cap, independent of `tool_ctx.max_iterations`. | ||
| /// Prevents infinite loops even if dispatch logic has a bug. | ||
| /// Set high enough to never trigger in normal operation (`max_iterations`=10 | ||
| /// would stop far earlier), but low enough to catch runaway loops quickly. | ||
| const MAX_LOOP_GUARD: usize = 128; | ||
|
|
||
| /// Run the agentic loop: execute → dispatch tools → re-enter until done. | ||
| /// | ||
| /// # Contract | ||
| /// | ||
| /// - **Caller provides:** request, execution context (LLM + DB), tool context (providers) | ||
| /// - **This function returns:** the final `ResponsePayload` (caller persists it) | ||
| /// - **Persistence:** NOT done here. Caller (server handler) owns persistence because | ||
| /// it has the full `RequestContext` with correct `new_input_items`. We clear | ||
| /// all three persistence triggers (`store`, `previous_response_id`, | ||
| /// `conversation_id`) to suppress intermediate `execute()` calls from | ||
| /// persisting partial state (PR #56 persists when ANY of the three is set). | ||
| /// - **ID restoration:** Both `previous_response_id` and `conversation_id` on the | ||
| /// returned payload reflect the ORIGINAL caller-supplied values, not the | ||
| /// internal mutations. This is critical for the caller's persist step. | ||
| /// | ||
| /// # Timeouts | ||
| /// | ||
| /// - Each `execute()` call is wrapped in `tokio::time::timeout(exec_ctx.streaming_timeout)` | ||
| /// - Each tool call is wrapped in `tokio::time::timeout(tool_ctx.tool_timeout)` | ||
| /// - `Duration::ZERO` on either disables that timeout (provider manages its own) | ||
| /// | ||
| /// # Known Limitations (MVP) | ||
| /// | ||
| /// - Non-streaming only. `stream=true` returns an immediate error. | ||
| /// - `request.clone()` every iteration is O(n) in accumulated input size. | ||
| /// - `InputItem` lacks a `FunctionCall` variant, so the assistant's tool-call | ||
| /// items are not injected into context (the model doesn't see its own calls). | ||
| /// Follow-up PR needed to add the variant. | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns `ExecutorError` if: | ||
| /// - LLM inference fails or times out | ||
| /// - Tool dispatch encounters a fatal error (individual tool failures are NOT fatal) | ||
| /// - `stream=true` is passed | ||
| /// - Hard loop guard is breached | ||
| pub async fn execute_loop( | ||
| mut request: RequestPayload, | ||
| exec_ctx: Arc<ExecutionContext>, | ||
| tool_ctx: &ToolContext, | ||
| ) -> ExecutorResult<ResponsePayload> { | ||
| let original_previous_response_id = request.previous_response_id.clone(); | ||
| let original_conversation_id = request.conversation_id.clone(); | ||
|
|
||
| // Clear all three persistence triggers so intermediate execute() calls | ||
| // don't write partial tool-call-only responses to the store. | ||
| request.store = false; | ||
| request.previous_response_id = None; | ||
| request.conversation_id = None; | ||
|
|
||
| for iteration in 0_usize.. { | ||
| // Defense-in-depth: even if dispatch_tools has a bug that never returns | ||
| // Incomplete, we won't loop forever. | ||
| if iteration >= MAX_LOOP_GUARD { | ||
| return Err(ExecutorError::InvalidRequest(format!( | ||
| "execute_loop exceeded hard iteration cap ({MAX_LOOP_GUARD})" | ||
| ))); | ||
| } | ||
|
|
||
| debug!(iteration, "execute_loop iteration"); | ||
|
|
||
| // --- Step 1: Call the LLM --- | ||
| // Timeout prevents hanging on unresponsive LLM backends. | ||
| // Duration::ZERO = no timeout (provider/reqwest manages its own). | ||
| let inference_timeout = exec_ctx.streaming_timeout; | ||
| let result = if inference_timeout.is_zero() { | ||
| execute(request.clone(), Arc::clone(&exec_ctx)).await? | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. to avoid frequent
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree this is the right long-term fix. The request.clone() is still there because execute() takes ownership today (it passes the payload into rehydrate_conversation which moves it). Changing that signature touches engine.rs internals (rehydrate, inject_ids, persist) which are outside our PR scope. I'll open a follow-up to refactor execute() to borrow — it'll be a clean diff once this lands. For now the clone cost is bounded by the input size which doesn't grow per-iteration anymore (tool results are just appended, not cloned on each loop). |
||
| } else { | ||
| tokio::time::timeout(inference_timeout, execute(request.clone(), Arc::clone(&exec_ctx))) | ||
| .await | ||
| .map_err(|_| { | ||
| ExecutorError::StreamError(format!( | ||
| "LLM inference timed out after {inference_timeout:?} on iteration {iteration}" | ||
| )) | ||
| })?? | ||
| }; | ||
|
|
||
| // execute() returns Either<ResponsePayload, BoxStream>. | ||
| // We only support non-streaming in execute_loop (streaming requires StreamTee). | ||
| let mut payload = match result { | ||
| Either::Left(payload) => payload, | ||
| Either::Right(_stream) => { | ||
| return Err(ExecutorError::InvalidRequest( | ||
| "execute_loop does not support streaming yet — set stream=false".into(), | ||
| )); | ||
| } | ||
| }; | ||
|
|
||
| // --- Step 2: Inspect output for tool calls --- | ||
| // dispatch_tools filters FunctionCall items, executes them via ToolContext, | ||
| // and returns a decision: Continue (with results), Done, or Incomplete. | ||
| let decision = dispatch_tools(&payload.output, tool_ctx, iteration).await?; | ||
|
|
||
| match decision { | ||
| // No tool calls (or only client-side functions) — we're done. | ||
| LoopDecision::Done => { | ||
| payload.previous_response_id = original_previous_response_id; | ||
| payload.conversation_id = original_conversation_id; | ||
| return Ok(payload); | ||
| } | ||
| // Hit max_iterations — stop looping, mark as incomplete. | ||
| // The model may have wanted to call more tools, but we're cutting it off. | ||
| // Attach the reason to incomplete_details so the client knows why. | ||
| LoopDecision::Incomplete(reason) => { | ||
| debug!(iteration, %reason, "loop incomplete"); | ||
| payload.status = "incomplete".to_string(); | ||
| payload.incomplete_details = Some(IncompleteDetails { reason: Some(reason) }); | ||
| payload.previous_response_id = original_previous_response_id; | ||
| payload.conversation_id = original_conversation_id; | ||
| return Ok(payload); | ||
| } | ||
| // Tools were executed — inject results and re-enter inference. | ||
| LoopDecision::Continue(tool_results) => { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each iteration: clone → extend → reassign. On iteration 2, you clone the vec that already contains iteration 1's results. On iteration 3, you clone a vec with iterations 1+2's results. The allocation grows linearly and is thrown away each time. The in-place alternative:
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed — now mutates request.input in-place. Text gets converted to Items once on the first tool call, then subsequent iterations just reserve+extend on the same vec. No cloning. |
||
| debug!( | ||
| iteration, | ||
| results = tool_results.len(), | ||
| "tool results received, re-entering" | ||
| ); | ||
|
|
||
| // Convert text input to structured items on first tool call, | ||
| // then extend in-place on subsequent iterations (no clone). | ||
| if let ResponsesInput::Text(t) = &request.input { | ||
| let msg = InputItem::Message(InputMessage { | ||
| role: "user".into(), | ||
| content: InputMessageContent::Text(t.clone()), | ||
| }); | ||
| request.input = ResponsesInput::Items(vec![msg]); | ||
| } | ||
| if let ResponsesInput::Items(ref mut items) = request.input { | ||
| // TODO: also inject assistant's FunctionCall output items here | ||
| // (requires InputItem::FunctionCall variant — follow-up PR) | ||
| items.reserve(tool_results.len()); | ||
| items.extend(tool_results); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| unreachable!("loop exits via return in Done/Incomplete/guard arms") | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,13 +1,19 @@ | ||
| //! Agentic loop executor. | ||
|
|
||
| pub mod accumulator; | ||
| pub mod dispatch; | ||
| pub mod engine; | ||
| pub mod error; | ||
| pub mod execute_loop; | ||
| pub mod modes; | ||
| pub mod request; | ||
| pub mod tool_context; | ||
|
|
||
| pub use dispatch::{LoopDecision, dispatch_tools}; | ||
| pub use engine::{BoxStream, call_inference, create_conversation, execute, persist_response, rehydrate_conversation}; | ||
| pub use error::{ExecutorError, ExecutorResult}; | ||
| pub use execute_loop::execute_loop; | ||
| pub use modes::{ConversationHandler, ResponseHandler}; | ||
| pub use request::ExecutionContext; | ||
| pub use request::RequestContext; | ||
| pub use tool_context::ToolContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this hardcoded false? I see the comment
could you clarify what do you mean by partial response?
the contract is as following:
The
storevalue from request payload coming in from server initially if isfalsewe are passing into vllm proxy for stateless inference.if is set to true we would invoke
execute_loopwhich should not set this to false. from second turn onwards if thestore=Falsecontext ID is given meaning either of (conversation_idorpreviouse_response_idis present) we need to still persist. if context ID not provided then we proxy to vLLM server stateless.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop suppresses persistence for intermediate iterations — otherwise each internal execute() call would write a partial response (just tool-call output, not the final answer) to the store. We clear all three persistence triggers (store, previous_response_id, conversation_id) because PR #56 persists when any of them is set. The caller (server handler) does the final persist once with the complete response.
Updated the doc comment to make this clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to discuss further, esp. the requirement to persist intermediate tool call/results and the layer responsible to persist.