diff --git a/crates/agentic-core/benches/executor_throughput.rs b/crates/agentic-core/benches/executor_throughput.rs index 8774e5d..b4f577f 100644 --- a/crates/agentic-core/benches/executor_throughput.rs +++ b/crates/agentic-core/benches/executor_throughput.rs @@ -153,13 +153,7 @@ fn build_exec_ctx(rt: &tokio::runtime::Runtime, mock_url: String) -> (Arc + Send>>; - -/// Wire-format marker signalling end-of-stream to the client. -const DONE_MARKER: &str = "data: [DONE]\n\n"; - -/// Fetch the next raw bytes chunk from a streaming response. -/// -/// Returns `Ok(Some(bytes))` on data, `Ok(None)` when the stream ends cleanly, -/// and `Err` on a network failure or chunk timeout. -async fn next_chunk(stream: &mut S, timeout: Duration) -> ExecutorResult> -where - S: futures::Stream> + Unpin, -{ - let item = if timeout.is_zero() { - stream.next().await - } else { - tokio::time::timeout(timeout, stream.next()).await.map_err(|_| { - ExecutorError::StreamError("chunk timeout: no data received within the configured window".into()) - })? - }; - item.transpose().map_err(ExecutorError::NetworkError) -} - -/// Build, send, and validate an HTTP POST to the LLM backend. -/// -/// Shared by both the blocking path (caller reads `.text()`) and the streaming -/// path (caller reads `.bytes_stream()`). Maps connect/timeout failures and -/// non-2xx status codes to [`ExecutorError::LLMRequest`]. -async fn send_request( - client: &reqwest::Client, - url: &str, - body: String, - auth: Option<&str>, -) -> ExecutorResult { - let mut req = client.post(url).header("Content-Type", "application/json").body(body); - if let Some(key) = auth { - req = req.bearer_auth(key); - } - - let resp = req.send().await.map_err(|e| ExecutorError::LLMRequest { - status: if e.is_timeout() { - http::StatusCode::GATEWAY_TIMEOUT - } else { - http::StatusCode::BAD_GATEWAY - }, - body: if e.is_timeout() { - "upstream timeout".into() - } else { - "upstream unavailable".into() - }, - })?; - - if !resp.status().is_success() { - let status = resp.status().as_u16(); - // Log and discard any error reading the error body — the status code - // is the primary signal; an empty body is acceptable here. - let body = resp - .text() - .await - .inspect_err(|e| tracing::debug!("failed to read error response body: {e}")) - .unwrap_or_default(); - return Err(ExecutorError::LLMRequest { - status: http::StatusCode::from_u16(status).unwrap_or(http::StatusCode::INTERNAL_SERVER_ERROR), - body, - }); - } - - Ok(resp) -} - -/// Makes a non-streaming HTTP POST to the LLM backend and returns the full JSON body. -/// -/// Used by [`run_blocking`] so it can pass the result to [`ResponseAccumulator::from_json`]. -async fn fetch_response_json( - upstream_json: String, - url: &str, - client: &reqwest::Client, - auth: Option<&str>, -) -> ExecutorResult { - let resp = send_request(client, url, upstream_json, auth).await?; - // Preserve the reqwest::Error as the typed source (NetworkError). - resp.text().await.map_err(ExecutorError::NetworkError) -} - -/// Step 1 — Build [`RequestContext`] by rehydrating conversation history. -/// -/// `request` is moved into the context as `enriched_request`; one clone is taken -/// for `original_request` so the engine retains an unmodified copy for persistence -/// and ID resolution. -/// -/// Dispatches based on `store` flag and which ID is present: -/// - `previous_response_id`: rehydrate from the prior response checkpoint -/// - `conversation_id`: rehydrate from the conversation -/// - no ids: forward only the new input -/// -/// # Errors -/// Returns [`ExecutorError`] if storage is unavailable or a referenced ID does not exist. -pub async fn rehydrate_conversation( - request: RequestPayload, - exec_ctx: &ExecutionContext, -) -> ExecutorResult { - let response_id = uuid7_str("resp_"); - let new_input_items: Vec = Vec::from(&request.input); - - // One clone for the unmodified original; `request` is moved as enriched_request. - let original_request = request.clone(); - let mut ctx = RequestContext { - enriched_request: request, - original_request, - new_input_items, - response_id, - conversation_id: None, - }; - - if ctx.original_request.conversation_id.is_some() && ctx.original_request.previous_response_id.is_some() { - return Err(ExecutorError::InvalidRequest( - "provide only one of conversation_id or previous_response_id".into(), - )); - } - - if ctx.original_request.conversation_id.is_some() { - rehydrate_from_conversation(&mut ctx, exec_ctx).await?; - return Ok(ctx); - } - - if ctx.original_request.previous_response_id.is_some() { - rehydrate_from_response(&mut ctx, exec_ctx).await?; - return Ok(ctx); - } - - ctx.enriched_request.input = ResponsesInput::Items(ctx.new_input_items.clone()); - Ok(ctx) -} - -/// Hydrates `ctx` from the previous response chain. -/// -/// Loads the stored response, rehydrates its history items, resolves effective -/// tools and tool choice from the stored metadata, and prepends the history to -/// the enriched request input. -async fn rehydrate_from_response(ctx: &mut RequestContext, exec_ctx: &ExecutionContext) -> ExecutorResult<()> { - let stored = exec_ctx.resp_handler.get(ctx).await?; - let history = exec_ctx.resp_handler.rehydrate(ctx).await?; - - let mut items = InOutItem::into_input_items(history); - items.reserve(ctx.new_input_items.len()); - items.extend(ctx.new_input_items.iter().cloned()); - - ctx.enriched_request.previous_response_id = None; - ctx.enriched_request.input = ResponsesInput::Items(items); - ctx.enriched_request.tools = resolve_tools( - ctx.original_request.tools.as_deref(), - stored.metadata.effective_tools.as_deref(), - ctx.original_request.tools.is_some(), - ); - ctx.enriched_request.tool_choice = resolve_tool_choice( - &ctx.original_request.tool_choice, - &stored.metadata.effective_tool_choice, - false, - ); - ctx.conversation_id = stored.conversation_id; - Ok(()) -} - -/// Hydrates `ctx` from the conversation store. -/// -/// Gets or creates the conversation (depending on `store`) and rehydrates its -/// history in parallel, then prepends the history items to the enriched request input. -async fn rehydrate_from_conversation(ctx: &mut RequestContext, exec_ctx: &ExecutionContext) -> ExecutorResult<()> { - let (conv_data, history) = tokio::try_join!( - async { - if ctx.original_request.store { - exec_ctx.conv_handler.get_or_create(ctx).await - } else { - exec_ctx.conv_handler.get(ctx).await - } - }, - exec_ctx.conv_handler.rehydrate(ctx), - )?; - - let mut items = InOutItem::into_input_items(history); - items.reserve(ctx.new_input_items.len()); - items.extend(ctx.new_input_items.iter().cloned()); - - ctx.enriched_request.input = ResponsesInput::Items(items); - ctx.conversation_id = Some(conv_data.conversation_id); - Ok(()) -} - -/// Step 2 — Call the LLM inference backend; yields raw SSE lines (`data: …`). -/// -/// Always requests `stream=true` upstream. Stops on `[DONE]`. -/// -/// # Errors -/// Each stream item is `Result`. The stream yields `Err` on: -/// - [`ExecutorError::LLMRequest`] — connect timeout (504), connection failure (502), -/// or non-2xx HTTP status from the backend -/// - [`ExecutorError::NetworkError`] — network failure while reading the response body -pub fn call_inference( - upstream_json: String, - url: String, - client: Arc, - auth: Option, - chunk_timeout: Duration, -) -> impl Stream> + Send + 'static { - stream! { - let resp = match send_request(&client, &url, upstream_json, auth.as_deref()).await { - Ok(r) => r, - Err(e) => { yield Err(e); return; } - }; - - let mut bytes = resp.bytes_stream(); - let mut buf = String::with_capacity(8192); - - loop { - let chunk = match next_chunk(&mut bytes, chunk_timeout).await { - Ok(Some(c)) => c, - Ok(None) => break, - Err(e) => { yield Err(e); return; } - }; - - match std::str::from_utf8(&chunk) { - Ok(s) => buf.push_str(s), - Err(_) => buf.push_str(&String::from_utf8_lossy(&chunk)), - } - - while let Some(pos) = buf.find('\n') { - let line = buf[..pos].trim_end_matches('\r'); - match line { - "data: [DONE]" => return, - l if l.starts_with("data: ") => yield Ok(l.to_string()), - _ => {} - } - buf.drain(..=pos); - } - } - } -} - -/// Step 3 — Persist the completed response to storage. -/// -/// Skipped if [`ResponseStatus`] is not `Completed`/`Incomplete` or `payload.id` is empty. -/// Routes to [`ConversationHandler`] when `ctx.conversation_id` is set, -/// otherwise [`ResponseHandler`]. -/// -/// # Errors -/// Returns [`ExecutorError`] if the storage operation fails. -pub async fn persist_response( - payload: ResponsePayload, +async fn run_blocking( ctx: RequestContext, - conv_handler: ConversationHandler, - resp_handler: ResponseHandler, -) -> ExecutorResult<()> { - // Use typed enum — no hardcoded status strings. - if !matches!( - payload.status.parse::().unwrap_or_default(), - ResponseStatus::Completed | ResponseStatus::Incomplete - ) || payload.id.is_empty() - { - return Ok(()); - } - - // Move output items from payload; handlers build ResponseMetadata from ctx internally. - let output_items = payload.output; - - if ctx.conversation_id.is_some() { - conv_handler.execute_turn(ctx, output_items).await - } else { - resp_handler.execute_turn(ctx, output_items).await - } -} - -async fn run_blocking(ctx: RequestContext, exec_ctx: &ExecutionContext) -> ExecutorResult { + exec_ctx: &ExecutionContext, + auth: Option<&str>, +) -> ExecutorResult { let url = exec_ctx.responses_url(); // Non-streaming request: stream=false → full JSON body → from_json. let upstream_json = serialize_to_string(&ctx.enriched_request.to_upstream_request(false)).map_err(ExecutorError::JsonError)?; - let body = fetch_response_json(upstream_json, &url, &exec_ctx.client, exec_ctx.client_auth.as_deref()).await?; + let body = fetch_response_json(upstream_json, &url, &exec_ctx.client, auth).await?; let acc = ResponseAccumulator::from_json(&body, ctx.conversation_id.as_deref())?; let mut payload = acc.finalize( @@ -327,7 +56,7 @@ async fn run_blocking(ctx: RequestContext, exec_ctx: &ExecutionContext) -> Execu Ok(payload) } -fn run_stream(ctx: RequestContext, exec_ctx: Arc) -> BoxStream { +fn run_stream(ctx: RequestContext, exec_ctx: Arc, auth: Option) -> BoxStream { let url = exec_ctx.responses_url(); // Streaming request: stream=true → SSE lines → from_stream. let upstream_json = match serialize_to_string(&ctx.enriched_request.to_upstream_request(true)) { @@ -351,7 +80,7 @@ fn run_stream(ctx: RequestContext, exec_ctx: Arc) -> BoxStream upstream_json, url, Arc::clone(&exec_ctx.client), - exec_ctx.client_auth.clone(), + auth, exec_ctx.streaming_timeout, )); @@ -396,12 +125,57 @@ pub async fn create_conversation(exec_ctx: &ExecutionContext) -> ExecutorResult< exec_ctx.conv_handler.create().await } -/// Run the full agentic loop. +/// Builder for a stateful conversation turn. /// -/// Returns `Either::Left(ResponsePayload)` for non-streaming requests, or -// TODO: replace with a builder — ExecuteRequest::new(payload, ctx).auth(token).run().await -/// `Either::Right(BoxStream)` for streaming, each yielded `String` is an SSE -/// line ready to forward to the client. +/// ```ignore +/// ExecuteRequest::new(payload, exec_ctx).with_auth(token).run().await +/// ``` +pub struct ExecuteRequest { + payload: RequestPayload, + exec_ctx: Arc, + client_auth: Option, +} + +impl ExecuteRequest { + #[must_use] + pub fn new(payload: RequestPayload, exec_ctx: Arc) -> Self { + Self { + payload, + exec_ctx, + client_auth: None, + } + } + + /// Override the bearer token for this request only; does not touch the shared [`ExecutionContext`]. + #[must_use] + pub fn with_auth(mut self, token: Option) -> Self { + self.client_auth = token; + self + } + + /// Execute one stateful conversation turn. + /// + /// Returns `Either::Left(ResponsePayload)` for non-streaming requests, or + /// `Either::Right(BoxStream)` for streaming, each yielded `String` is an SSE + /// line ready to forward to the client. + /// + /// # Errors + /// Returns [`ExecutorError`] if rehydration or (non-streaming) LLM inference fails. + pub async fn run(self) -> ExecutorResult> { + let ctx = rehydrate_conversation(self.payload, &self.exec_ctx).await?; + if ctx.original_request.stream { + Ok(Either::Right(run_stream(ctx, self.exec_ctx, self.client_auth))) + } else { + Ok(Either::Left( + run_blocking(ctx, &self.exec_ctx, self.client_auth.as_deref()).await?, + )) + } + } +} + +/// Execute one stateful conversation turn. +/// +/// Thin shim over [`ExecuteRequest`] for callers that don't need per-request auth override. /// /// # Errors /// Returns [`ExecutorError`] if rehydration or (non-streaming) LLM inference fails. @@ -409,10 +183,5 @@ pub async fn execute( request: RequestPayload, exec_ctx: Arc, ) -> ExecutorResult> { - let ctx = rehydrate_conversation(request, &exec_ctx).await?; - if ctx.original_request.stream { - Ok(Either::Right(run_stream(ctx, exec_ctx))) - } else { - Ok(Either::Left(run_blocking(ctx, &exec_ctx).await?)) - } + ExecuteRequest::new(request, exec_ctx).run().await } diff --git a/crates/agentic-core/src/executor/inference.rs b/crates/agentic-core/src/executor/inference.rs new file mode 100644 index 0000000..6743607 --- /dev/null +++ b/crates/agentic-core/src/executor/inference.rs @@ -0,0 +1,147 @@ +//! HTTP transport layer for LLM backend communication. +//! +//! Handles sending requests, reading streaming chunks, and mapping network +//! and HTTP errors to [`ExecutorError`]. + +use std::sync::Arc; +use std::time::Duration; + +use async_stream::stream; +use futures::{Stream, StreamExt}; + +use crate::executor::error::{ExecutorError, ExecutorResult}; + +/// SSE stream of raw lines sent to the client (`data: …\n\n` per event). +pub type BoxStream = std::pin::Pin + Send>>; + +/// Wire-format marker signalling end-of-stream to the client. +pub(super) const DONE_MARKER: &str = "data: [DONE]\n\n"; + +/// Fetch the next raw bytes chunk from a streaming response. +/// +/// Returns `Ok(Some(bytes))` on data, `Ok(None)` when the stream ends cleanly, +/// and `Err` on a network failure or chunk timeout. +pub(super) async fn next_chunk(stream: &mut S, timeout: Duration) -> ExecutorResult> +where + S: futures::Stream> + Unpin, +{ + let item = if timeout.is_zero() { + stream.next().await + } else { + tokio::time::timeout(timeout, stream.next()).await.map_err(|_| { + ExecutorError::StreamError("chunk timeout: no data received within the configured window".into()) + })? + }; + item.transpose().map_err(ExecutorError::NetworkError) +} + +/// Build, send, and validate an HTTP POST to the LLM backend. +/// +/// Shared by both the blocking path (caller reads `.text()`) and the streaming +/// path (caller reads `.bytes_stream()`). Maps connect/timeout failures and +/// non-2xx status codes to [`ExecutorError::LLMRequest`]. +pub(super) async fn send_request( + client: &reqwest::Client, + url: &str, + body: String, + auth: Option<&str>, +) -> ExecutorResult { + let mut req = client.post(url).header("Content-Type", "application/json").body(body); + if let Some(key) = auth { + req = req.bearer_auth(key); + } + + let resp = req.send().await.map_err(|e| ExecutorError::LLMRequest { + status: if e.is_timeout() { + http::StatusCode::GATEWAY_TIMEOUT + } else { + http::StatusCode::BAD_GATEWAY + }, + body: if e.is_timeout() { + "upstream timeout".into() + } else { + "upstream unavailable".into() + }, + })?; + + if !resp.status().is_success() { + let status = resp.status().as_u16(); + // Log and discard any error reading the error body — the status code + // is the primary signal; an empty body is acceptable here. + let body = resp + .text() + .await + .inspect_err(|e| tracing::debug!("failed to read error response body: {e}")) + .unwrap_or_default(); + return Err(ExecutorError::LLMRequest { + status: http::StatusCode::from_u16(status).unwrap_or(http::StatusCode::INTERNAL_SERVER_ERROR), + body, + }); + } + + Ok(resp) +} + +/// Makes a non-streaming HTTP POST to the LLM backend and returns the full JSON body. +/// +/// Used by `run_blocking` so it can pass the result to [`ResponseAccumulator::from_json`](crate::executor::accumulator::ResponseAccumulator::from_json). +pub(super) async fn fetch_response_json( + upstream_json: String, + url: &str, + client: &reqwest::Client, + auth: Option<&str>, +) -> ExecutorResult { + let resp = send_request(client, url, upstream_json, auth).await?; + // Preserve the reqwest::Error as the typed source (NetworkError). + resp.text().await.map_err(ExecutorError::NetworkError) +} + +/// Step 2 — Call the LLM inference backend; yields raw SSE lines (`data: …`). +/// +/// Always requests `stream=true` upstream. Stops on `[DONE]`. +/// +/// # Errors +/// Each stream item is `Result`. The stream yields `Err` on: +/// - [`ExecutorError::LLMRequest`] — connect timeout (504), connection failure (502), +/// or non-2xx HTTP status from the backend +/// - [`ExecutorError::NetworkError`] — network failure while reading the response body +pub fn call_inference( + upstream_json: String, + url: String, + client: Arc, + auth: Option, + chunk_timeout: Duration, +) -> impl Stream> + Send + 'static { + stream! { + let resp = match send_request(&client, &url, upstream_json, auth.as_deref()).await { + Ok(r) => r, + Err(e) => { yield Err(e); return; } + }; + + let mut bytes = resp.bytes_stream(); + let mut buf = String::with_capacity(8192); + + loop { + let chunk = match next_chunk(&mut bytes, chunk_timeout).await { + Ok(Some(c)) => c, + Ok(None) => break, + Err(e) => { yield Err(e); return; } + }; + + match std::str::from_utf8(&chunk) { + Ok(s) => buf.push_str(s), + Err(_) => buf.push_str(&String::from_utf8_lossy(&chunk)), + } + + while let Some(pos) = buf.find('\n') { + let line = buf[..pos].trim_end_matches('\r'); + match line { + "data: [DONE]" => return, + l if l.starts_with("data: ") => yield Ok(l.to_string()), + _ => {} + } + buf.drain(..=pos); + } + } + } +} diff --git a/crates/agentic-core/src/executor/mod.rs b/crates/agentic-core/src/executor/mod.rs index 32fbabc..925e34b 100644 --- a/crates/agentic-core/src/executor/mod.rs +++ b/crates/agentic-core/src/executor/mod.rs @@ -1,13 +1,19 @@ -//! Agentic loop executor. +//! Agentic executor. pub mod accumulator; pub mod engine; pub mod error; +pub mod inference; pub mod modes; +pub mod persist; +pub mod rehydrate; pub mod request; -pub use engine::{BoxStream, call_inference, create_conversation, execute, persist_response, rehydrate_conversation}; +pub use engine::{BoxStream, ExecuteRequest, create_conversation, execute}; pub use error::{ExecutorError, ExecutorResult}; +pub use inference::call_inference; pub use modes::{ConversationHandler, ResponseHandler}; +pub use persist::persist_response; +pub use rehydrate::rehydrate_conversation; pub use request::ExecutionContext; pub use request::RequestContext; diff --git a/crates/agentic-core/src/executor/persist.rs b/crates/agentic-core/src/executor/persist.rs new file mode 100644 index 0000000..6c106e4 --- /dev/null +++ b/crates/agentic-core/src/executor/persist.rs @@ -0,0 +1,43 @@ +//! Step 3 of the conversation pipeline — response persistence. +//! +//! Writes the completed response and output items to storage, routing to the +//! appropriate handler based on whether the turn belongs to a conversation. + +use crate::executor::error::ExecutorResult; +use crate::executor::modes::{ConversationHandler, ResponseHandler}; +use crate::executor::request::RequestContext; +use crate::types::event::ResponseStatus; +use crate::types::request_response::ResponsePayload; + +/// Step 3 — Persist the completed response to storage. +/// +/// Skipped if [`ResponseStatus`] is not `Completed`/`Incomplete` or `payload.id` is empty. +/// Routes to [`ConversationHandler`] when `ctx.conversation_id` is set, +/// otherwise [`ResponseHandler`]. +/// +/// # Errors +/// Returns [`ExecutorError`] if the storage operation fails. +pub async fn persist_response( + payload: ResponsePayload, + ctx: RequestContext, + conv_handler: ConversationHandler, + resp_handler: ResponseHandler, +) -> ExecutorResult<()> { + // Use typed enum — no hardcoded status strings. + if !matches!( + payload.status.parse::().unwrap_or_default(), + ResponseStatus::Completed | ResponseStatus::Incomplete + ) || payload.id.is_empty() + { + return Ok(()); + } + + // Move output items from payload; handlers build ResponseMetadata from ctx internally. + let output_items = payload.output; + + if ctx.conversation_id.is_some() { + conv_handler.execute_turn(ctx, output_items).await + } else { + resp_handler.execute_turn(ctx, output_items).await + } +} diff --git a/crates/agentic-core/src/executor/rehydrate.rs b/crates/agentic-core/src/executor/rehydrate.rs new file mode 100644 index 0000000..3a038ff --- /dev/null +++ b/crates/agentic-core/src/executor/rehydrate.rs @@ -0,0 +1,115 @@ +//! Step 1 of the conversation pipeline — history rehydration. +//! +//! Builds a [`RequestContext`] by loading prior turns from storage and +//! injecting them into the enriched request before it is forwarded to the LLM. + +use crate::executor::error::{ExecutorError, ExecutorResult}; +use crate::executor::request::{ExecutionContext, RequestContext}; +use crate::storage::InOutItem; +use crate::types::io::{InputItem, ResponsesInput, resolve_tool_choice, resolve_tools}; +use crate::types::request_response::RequestPayload; +use crate::utils::uuid7_str; + +/// Step 1 — Build [`RequestContext`] by rehydrating conversation history. +/// +/// `request` is moved into the context as `enriched_request`; one clone is taken +/// for `original_request` so the engine retains an unmodified copy for persistence +/// and ID resolution. +/// +/// Dispatches based on `store` flag and which ID is present: +/// - `previous_response_id`: rehydrate from the prior response checkpoint +/// - `conversation_id`: rehydrate from the conversation +/// - no ids: forward only the new input +/// +/// # Errors +/// Returns [`ExecutorError`] if storage is unavailable or a referenced ID does not exist. +pub async fn rehydrate_conversation( + request: RequestPayload, + exec_ctx: &ExecutionContext, +) -> ExecutorResult { + let response_id = uuid7_str("resp_"); + let new_input_items: Vec = Vec::from(&request.input); + + // One clone for the unmodified original; `request` is moved as enriched_request. + let original_request = request.clone(); + let mut ctx = RequestContext { + enriched_request: request, + original_request, + new_input_items, + response_id, + conversation_id: None, + }; + + if ctx.original_request.conversation_id.is_some() && ctx.original_request.previous_response_id.is_some() { + return Err(ExecutorError::InvalidRequest( + "provide only one of conversation_id or previous_response_id".into(), + )); + } + + if ctx.original_request.conversation_id.is_some() { + from_conversation(&mut ctx, exec_ctx).await?; + return Ok(ctx); + } + + if ctx.original_request.previous_response_id.is_some() { + from_response(&mut ctx, exec_ctx).await?; + return Ok(ctx); + } + + ctx.enriched_request.input = ResponsesInput::Items(ctx.new_input_items.clone()); + Ok(ctx) +} + +/// Hydrates `ctx` from the previous response chain. +/// +/// Loads the stored response, rehydrates its history items, resolves effective +/// tools and tool choice from the stored metadata, and prepends the history to +/// the enriched request input. +async fn from_response(ctx: &mut RequestContext, exec_ctx: &ExecutionContext) -> ExecutorResult<()> { + let stored = exec_ctx.resp_handler.get(ctx).await?; + let history = exec_ctx.resp_handler.rehydrate(ctx).await?; + + let mut items = InOutItem::into_input_items(history); + items.reserve(ctx.new_input_items.len()); + items.extend(ctx.new_input_items.iter().cloned()); + + ctx.enriched_request.previous_response_id = None; + ctx.enriched_request.input = ResponsesInput::Items(items); + ctx.enriched_request.tools = resolve_tools( + ctx.original_request.tools.as_deref(), + stored.metadata.effective_tools.as_deref(), + ctx.original_request.tools.is_some(), + ); + ctx.enriched_request.tool_choice = resolve_tool_choice( + &ctx.original_request.tool_choice, + &stored.metadata.effective_tool_choice, + false, + ); + ctx.conversation_id = stored.conversation_id; + Ok(()) +} + +/// Hydrates `ctx` from the conversation store. +/// +/// Gets or creates the conversation (depending on `store`) and rehydrates its +/// history in parallel, then prepends the history items to the enriched request input. +async fn from_conversation(ctx: &mut RequestContext, exec_ctx: &ExecutionContext) -> ExecutorResult<()> { + let (conv_data, history) = tokio::try_join!( + async { + if ctx.original_request.store { + exec_ctx.conv_handler.get_or_create(ctx).await + } else { + exec_ctx.conv_handler.get(ctx).await + } + }, + exec_ctx.conv_handler.rehydrate(ctx), + )?; + + let mut items = InOutItem::into_input_items(history); + items.reserve(ctx.new_input_items.len()); + items.extend(ctx.new_input_items.iter().cloned()); + + ctx.enriched_request.input = ResponsesInput::Items(items); + ctx.conversation_id = Some(conv_data.conversation_id); + Ok(()) +} diff --git a/crates/agentic-core/src/executor/request.rs b/crates/agentic-core/src/executor/request.rs index 66f7fe0..4c8ee2c 100644 --- a/crates/agentic-core/src/executor/request.rs +++ b/crates/agentic-core/src/executor/request.rs @@ -39,6 +39,8 @@ impl RequestContext { /// Runtime dependencies passed into `execute()`. /// /// Owns the storage handlers, HTTP client, and LLM endpoint configuration. +/// Per-request auth is supplied via [`crate::executor::engine::ExecuteRequest::with_auth`] +/// rather than stored here, keeping this context purely shared and immutable. #[derive(Clone, Debug)] pub struct ExecutionContext { pub conv_handler: ConversationHandler, @@ -46,8 +48,6 @@ pub struct ExecutionContext { pub client: Arc, /// Base URL for the LLM backend, e.g. `"http://localhost:8000"`. pub llm_base_url: String, - /// Bearer token forwarded from the client, if any. - pub client_auth: Option, /// Maximum wait time for the next SSE chunk. `Duration::ZERO` disables the timeout. /// Sourced from [`Config::streaming_chunk_timeout_s`](crate::config::Config::streaming_chunk_timeout_s). pub streaming_timeout: Duration, @@ -72,14 +72,12 @@ impl ExecutionContext { resp_handler: ResponseHandler, client: Arc, llm_base_url: String, - client_auth: Option, ) -> Self { Self { conv_handler, resp_handler, client, llm_base_url, - client_auth, streaming_timeout: Duration::from_secs(30), } } @@ -108,7 +106,6 @@ impl ExecutionContext { resp_handler, client, llm_base_url: cfg.llm_api_base.clone(), - client_auth: cfg.openai_api_key.clone(), streaming_timeout: Duration::from_secs(30), }) } diff --git a/crates/agentic-core/tests/support/mod.rs b/crates/agentic-core/tests/support/mod.rs index 76a4109..4837593 100644 --- a/crates/agentic-core/tests/support/mod.rs +++ b/crates/agentic-core/tests/support/mod.rs @@ -264,7 +264,6 @@ impl TestFixture { resp_handler, client, server.url().to_string(), - None, )); Self { exec_ctx, server } @@ -282,7 +281,6 @@ impl TestFixture { resp_handler, client, server.url().to_string(), - None, )); Self { exec_ctx, server } diff --git a/crates/agentic-server/benches/gateway_bench.rs b/crates/agentic-server/benches/gateway_bench.rs index 53860ab..f9e9abe 100644 --- a/crates/agentic-server/benches/gateway_bench.rs +++ b/crates/agentic-server/benches/gateway_bench.rs @@ -166,13 +166,13 @@ async fn spawn_gateway(llm_url: &str) -> (Arc, String) { ResponseHandler::new(ResponseStore::new(pool)), Arc::new(reqwest::Client::new()), config.llm_api_base.clone(), - config.openai_api_key.clone(), )); let state = AppState { proxy_state, exec_ctx, shutdown_token: CancellationToken::new(), llm_api_base: config.llm_api_base, + openai_api_key: config.openai_api_key, }; let router = build_router(state, &ServerConfig::from_env()); @@ -281,19 +281,18 @@ pub fn gateway_benchmarks(c: &mut Criterion) { " [seed] conversation_rehydration/non_streaming turns={turns} prior={}", turns - 1 ); - let (conv_id, prev_id) = rt.block_on(seed_conversation(&client, &gw_url, &model, turns - 1)); + let (conv_id, _) = rt.block_on(seed_conversation(&client, &gw_url, &model, turns - 1)); group.bench_with_input(BenchmarkId::new("turns", turns), &turns, |b, _| { b.to_async(Runtime::new().unwrap()).iter_batched( - || (conv_id.clone(), prev_id.clone(), model.clone()), - |(cid, pid, mdl)| { + || (conv_id.clone(), model.clone()), + |(cid, mdl)| { let client = Arc::clone(&client); let url = format!("{gw_url}/v1/responses"); async move { - let mut body = serde_json::json!({"model": mdl, "input": "bench", - "store": true, "stream": false, "conversation_id": cid}); - if let Some(id) = pid { - body["previous_response_id"] = serde_json::Value::String(id); - } + let body = serde_json::json!({ + "model": mdl, "input": "bench", + "store": true, "stream": false, "conversation_id": cid + }); client .post(&url) .json(&body) @@ -317,19 +316,18 @@ pub fn gateway_benchmarks(c: &mut Criterion) { " [seed] conversation_rehydration/streaming turns={turns} prior={}", turns - 1 ); - let (conv_id, prev_id) = rt.block_on(seed_conversation(&client, &gw_url, &model, turns - 1)); + let (conv_id, _) = rt.block_on(seed_conversation(&client, &gw_url, &model, turns - 1)); group.bench_with_input(BenchmarkId::new("turns", turns), &turns, |b, _| { b.to_async(Runtime::new().unwrap()).iter_batched( - || (conv_id.clone(), prev_id.clone(), model.clone()), - |(cid, pid, mdl)| { + || (conv_id.clone(), model.clone()), + |(cid, mdl)| { let client = Arc::clone(&client); let url = format!("{gw_url}/v1/responses"); async move { - let mut body = serde_json::json!({"model": mdl, "input": "bench", - "store": true, "stream": true, "conversation_id": cid}); - if let Some(id) = pid { - body["previous_response_id"] = serde_json::Value::String(id); - } + let body = serde_json::json!({ + "model": mdl, "input": "bench", + "store": true, "stream": true, "conversation_id": cid + }); client .post(&url) .json(&body) diff --git a/crates/agentic-server/benches/proxy_bench.rs b/crates/agentic-server/benches/proxy_bench.rs index 73317ac..8628da0 100644 --- a/crates/agentic-server/benches/proxy_bench.rs +++ b/crates/agentic-server/benches/proxy_bench.rs @@ -82,13 +82,13 @@ async fn spawn_gateway(config: Config) -> String { ResponseHandler::new(ResponseStore::disabled()), Arc::new(reqwest::Client::new()), config.llm_api_base.clone(), - config.openai_api_key.clone(), )); let state = AppState { proxy_state, exec_ctx, shutdown_token: CancellationToken::new(), llm_api_base: config.llm_api_base, + openai_api_key: config.openai_api_key, }; let server_config = ServerConfig::from_env(); let router = build_router(state, &server_config); diff --git a/crates/agentic-server/src/app.rs b/crates/agentic-server/src/app.rs index 44b43c1..f3e018e 100644 --- a/crates/agentic-server/src/app.rs +++ b/crates/agentic-server/src/app.rs @@ -64,6 +64,9 @@ pub struct AppState { pub shutdown_token: CancellationToken, /// vLLM base URL — used by the `/ready` health probe. pub llm_api_base: String, + /// Server-configured API key; used as fallback when the request carries no + /// `Authorization` header on the executor path. + pub openai_api_key: Option, } pub fn build_router(state: AppState, server_config: &ServerConfig) -> Router { diff --git a/crates/agentic-server/src/handler/common.rs b/crates/agentic-server/src/handler/common.rs index 7e96c62..25eac2d 100644 --- a/crates/agentic-server/src/handler/common.rs +++ b/crates/agentic-server/src/handler/common.rs @@ -1,20 +1,15 @@ -use std::sync::Arc; - use axum::body::Body; use axum::http::HeaderMap; -use axum::http::request::Parts; use axum::response::Response; use bytes::Bytes; use futures::StreamExt; use http::StatusCode; use tracing::warn; -use agentic_core::executor::{BoxStream, ExecutionContext, ExecutorError}; +use agentic_core::executor::{BoxStream, ExecutorError}; use agentic_core::proxy::{ProxyBody, ProxyResponse, error_response}; use agentic_core::types::request_response::RequestPayload; -use crate::app::AppState; - pub(super) const MAX_BODY_SIZE: usize = 10 * 1024 * 1024; /// # Panics @@ -68,25 +63,14 @@ pub(super) fn extract_store(bytes: &[u8]) -> bool { .unwrap_or(true) } -pub(super) fn resolve_exec_ctx_from_headers(state: &AppState, headers: &HeaderMap) -> Arc { - let request_auth = headers +pub(super) fn extract_bearer(headers: &HeaderMap, config_key: Option<&str>) -> Option { + headers .get("authorization") .and_then(|v| v.to_str().ok()) .and_then(|v| v.strip_prefix("Bearer ")) .filter(|s| !s.is_empty()) - .map(str::to_string); - - if request_auth.is_some() && request_auth != state.exec_ctx.client_auth { - let mut ctx = (*state.exec_ctx).clone(); - ctx.client_auth = request_auth; - Arc::new(ctx) - } else { - Arc::clone(&state.exec_ctx) - } -} - -pub(super) fn resolve_exec_ctx(state: &AppState, parts: &Parts) -> Arc { - resolve_exec_ctx_from_headers(state, &parts.headers) + .map(str::to_string) + .or_else(|| config_key.filter(|s| !s.is_empty()).map(str::to_string)) } pub(super) fn sse_response(stream: BoxStream) -> Response { diff --git a/crates/agentic-server/src/handler/http/responses.rs b/crates/agentic-server/src/handler/http/responses.rs index 5d22034..7a79e5c 100644 --- a/crates/agentic-server/src/handler/http/responses.rs +++ b/crates/agentic-server/src/handler/http/responses.rs @@ -4,11 +4,13 @@ use axum::response::{IntoResponse, Response}; use bytes::Bytes; use either::Either; -use agentic_core::executor::execute; +use std::sync::Arc; + +use agentic_core::executor::ExecuteRequest; use agentic_core::proxy::{ProxyRequest, proxy_request}; use agentic_core::types::request_response::RequestPayload; -use super::super::common::{convert_response, executor_error_response, read_and_parse, resolve_exec_ctx, sse_response}; +use super::super::common::{convert_response, executor_error_response, extract_bearer, read_and_parse, sse_response}; use crate::app::AppState; async fn proxy_responses(state: &AppState, parts: Parts, body: Bytes) -> Response { @@ -21,7 +23,12 @@ async fn proxy_responses(state: &AppState, parts: Parts, body: Bytes) -> Respons } async fn execute_responses(state: &AppState, parts: Parts, payload: RequestPayload) -> Response { - match execute(payload, resolve_exec_ctx(state, &parts)).await { + let auth = extract_bearer(&parts.headers, state.openai_api_key.as_deref()); + match ExecuteRequest::new(payload, Arc::clone(&state.exec_ctx)) + .with_auth(auth) + .run() + .await + { Ok(Either::Left(response_payload)) => axum::Json(response_payload).into_response(), Ok(Either::Right(stream)) => sse_response(stream), Err(e) => executor_error_response(e), diff --git a/crates/agentic-server/src/handler/websocket/responses.rs b/crates/agentic-server/src/handler/websocket/responses.rs index 6a7c11a..d2ac068 100644 --- a/crates/agentic-server/src/handler/websocket/responses.rs +++ b/crates/agentic-server/src/handler/websocket/responses.rs @@ -19,7 +19,7 @@ use agentic_core::types::ResponsePayload; use agentic_core::types::request_response::RequestPayload; use agentic_core::utils::common::serialize_to_string; -use super::super::common::{MAX_BODY_SIZE, resolve_exec_ctx_from_headers}; +use super::super::common::{MAX_BODY_SIZE, extract_bearer}; use super::error::WsError; use crate::app::AppState; @@ -119,12 +119,26 @@ async fn handle_ws_text( payload.stream = true; payload.store = true; - let exec_ctx = resolve_exec_ctx_from_headers(state, headers); + let auth = extract_bearer(headers, state.openai_api_key.as_deref()); + let exec_ctx = Arc::clone(&state.exec_ctx); let ctx = rehydrate_conversation(payload, &exec_ctx).await?; let upstream_json = serialize_to_string(&ctx.enriched_request.to_upstream_request(true)).map_err(ExecutorError::from)?; - stream_ws_response(sender, receiver, exec_ctx, ctx, upstream_json, shutdown_token, queue).await + let req = WsStreamRequest { + exec_ctx, + ctx, + upstream_json, + auth, + }; + stream_ws_response(sender, receiver, req, shutdown_token, queue).await +} + +struct WsStreamRequest { + exec_ctx: Arc, + ctx: RequestContext, + upstream_json: String, + auth: Option, } /// Stream a response from the upstream LLM to the client. @@ -134,12 +148,16 @@ async fn handle_ws_text( async fn stream_ws_response( sender: &mut WsSender, receiver: &mut WsReceiver, - exec_ctx: Arc, - ctx: RequestContext, - upstream_json: String, + req: WsStreamRequest, shutdown_token: &CancellationToken, queue: &mut VecDeque, ) -> Result<(), WsError> { + let WsStreamRequest { + exec_ctx, + ctx, + upstream_json, + auth, + } = req; let should_persist = ctx.original_request.store || ctx.original_request.previous_response_id.is_some() || ctx.conversation_id.is_some(); @@ -148,7 +166,7 @@ async fn stream_ws_response( upstream_json, exec_ctx.responses_url(), Arc::clone(&exec_ctx.client), - exec_ctx.client_auth.clone(), + auth, exec_ctx.streaming_timeout, )); diff --git a/crates/agentic-server/src/server.rs b/crates/agentic-server/src/server.rs index 34aea6c..f907688 100644 --- a/crates/agentic-server/src/server.rs +++ b/crates/agentic-server/src/server.rs @@ -19,6 +19,7 @@ async fn build_state(config: &Config, shutdown_token: CancellationToken) -> Resu exec_ctx, shutdown_token, llm_api_base: config.llm_api_base.clone(), + openai_api_key: config.openai_api_key.clone(), }) } diff --git a/crates/agentic-server/tests/common/mod.rs b/crates/agentic-server/tests/common/mod.rs index eb87a5d..0a64a49 100644 --- a/crates/agentic-server/tests/common/mod.rs +++ b/crates/agentic-server/tests/common/mod.rs @@ -29,7 +29,6 @@ pub fn test_state(config: &Config) -> AppState { ResponseHandler::new(ResponseStore::disabled()), Arc::new(reqwest::Client::new()), config.llm_api_base.clone(), - config.openai_api_key.clone(), )); let proxy_state = ProxyState::new(config.clone()).expect("proxy state"); AppState { @@ -37,6 +36,7 @@ pub fn test_state(config: &Config) -> AppState { exec_ctx, shutdown_token: CancellationToken::new(), llm_api_base: config.llm_api_base.clone(), + openai_api_key: config.openai_api_key.clone(), } } diff --git a/crates/agentic-server/tests/responses_websocket_test.rs b/crates/agentic-server/tests/responses_websocket_test.rs index 5fb99f3..e0edd93 100644 --- a/crates/agentic-server/tests/responses_websocket_test.rs +++ b/crates/agentic-server/tests/responses_websocket_test.rs @@ -178,7 +178,6 @@ async fn storage_backed_state(llm_url: &str) -> StorageBackedState { ResponseHandler::new(ResponseStore::new(pool)), Arc::new(reqwest::Client::new()), config.llm_api_base.clone(), - config.openai_api_key.clone(), )); let proxy_state = ProxyState::new(config.clone()).expect("proxy state"); @@ -187,6 +186,7 @@ async fn storage_backed_state(llm_url: &str) -> StorageBackedState { exec_ctx, shutdown_token: CancellationToken::new(), llm_api_base: config.llm_api_base, + openai_api_key: config.openai_api_key, }; StorageBackedState { state, _db: db } }