From 74890a9661bd0b46799aad423f08a72d7fdb36b5 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 11 Jun 2026 15:30:10 -0700 Subject: [PATCH 1/4] refactor: accumulator consumes EventFrame instead of inline JSON MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces inline JSON parsing in process_sse_line with: - normalize_sse_line() from events/ module → typed EventFrame - New pub process_event(&EventFrame) method for direct consumption process_sse_line is now a thin wrapper — backwards compatible. process_event is pub for future StreamTee to call directly. No behavioral change — same output for same input. Pure refactor that wires the events/ module (PR #49) into the executor (PR #46). Signed-off-by: Ashwin Giridharan --- .../agentic-core/src/executor/accumulator.rs | 61 +++++++++---------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/crates/agentic-core/src/executor/accumulator.rs b/crates/agentic-core/src/executor/accumulator.rs index 5b94e8e..f561fce 100644 --- a/crates/agentic-core/src/executor/accumulator.rs +++ b/crates/agentic-core/src/executor/accumulator.rs @@ -12,11 +12,12 @@ use std::sync::mpsc; use futures::{Stream, StreamExt}; +use crate::events::{EventFrame, EventPayload, SSEEventType, normalize_sse_line}; use crate::executor::error::{ExecutorError, ExecutorResult}; -use crate::types::event::{MessageStatus, ResponseStatus, SSEEventType}; +use crate::types::event::{MessageStatus, ResponseStatus}; use crate::types::io::{OutputItem, OutputMessage, OutputTextContent, ResponseUsage}; use crate::types::request_response::{IncompleteDetails, ResponsePayload}; -use crate::utils::common::{deserialize_from_str, deserialize_from_value, deserialize_from_value_opt}; +use crate::utils::common::{deserialize_from_str, deserialize_from_value_opt}; use crate::utils::uuid7_str; /// Accumulates LLM response chunks from streaming or non-streaming sources. @@ -178,45 +179,43 @@ impl ResponseAccumulator { /// /// Non-`data:` lines, `[DONE]`, and malformed JSON are silently skipped. fn process_sse_line(&mut self, line: &str) { - let Some(data_str) = line.strip_prefix("data: ") else { - return; - }; - if data_str == "[DONE]" { - return; + if let Some(frame) = normalize_sse_line(line) { + self.process_event(&frame); } - let Ok(json) = deserialize_from_str::(data_str) else { - return; - }; + } - match json["type"] - .as_str() - .map_or(SSEEventType::Other, |s| s.parse().unwrap_or_default()) - { - SSEEventType::ResponseCreated => { - if let Some(id) = json["response"]["id"].as_str() { - self.response_id = id.to_string(); - } + /// Processes a typed [`EventFrame`], updating accumulator state. + /// + /// This is the core state machine — callers that already have a normalized + /// frame (e.g. [`StreamTee`](future)) can call this directly without + /// re-parsing from a raw line. + pub fn process_event(&mut self, frame: &EventFrame) { + match (&frame.event_type, &frame.payload) { + (SSEEventType::ResponseCreated, EventPayload::Response { id, .. }) if !id.is_empty() => { + self.response_id.clone_from(id); } - SSEEventType::ResponseOutputItemAdded => { + (_, EventPayload::OutputItemAdded { item_id, .. }) => { self.finalize_current_message(); - let item_id = json["item"]["id"] - .as_str() - .map_or_else(|| uuid7_str("msg_"), str::to_string); - self.current_message = Some(OutputMessage::new(&item_id, MessageStatus::InProgress.as_str())); + let id = if item_id.is_empty() { + uuid7_str("msg_") + } else { + item_id.clone() + }; + self.current_message = Some(OutputMessage::new(&id, MessageStatus::InProgress.as_str())); } - SSEEventType::ResponseOutputTextDelta => { - if let Some(delta) = json["delta"].as_str() { - self.accumulated_text.push_str(delta); - } + (_, EventPayload::TextDelta { delta, .. }) => { + self.accumulated_text.push_str(delta); } - SSEEventType::ResponseDone => { + (SSEEventType::ResponseCompleted, EventPayload::Response { usage, .. }) => { self.finalize_current_message(); self.status = ResponseStatus::Completed; - if let Ok(usage) = deserialize_from_value::(json["response"]["usage"].clone()) { - self.usage = Some(usage); + if let Some(u) = usage { + if let Ok(parsed) = serde_json::from_value::(u.clone()) { + self.usage = Some(parsed); + } } } - SSEEventType::Other => {} + _ => {} } } From eaca01f88110b154fa4391edd8f22cf2973c492c Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 11 Jun 2026 15:34:17 -0700 Subject: [PATCH 2/4] fix: restrict process_event visibility, match event types explicitly MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Change pub → pub(crate) for process_event (internal to crate only) - Match (SSEEventType::OutputItemAdded, EventPayload::OutputItemAdded) explicitly instead of wildcard (defensive correctness) - Same for (SSEEventType::OutputTextDelta, EventPayload::TextDelta) Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/executor/accumulator.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/agentic-core/src/executor/accumulator.rs b/crates/agentic-core/src/executor/accumulator.rs index f561fce..621b55c 100644 --- a/crates/agentic-core/src/executor/accumulator.rs +++ b/crates/agentic-core/src/executor/accumulator.rs @@ -189,12 +189,12 @@ impl ResponseAccumulator { /// This is the core state machine — callers that already have a normalized /// frame (e.g. [`StreamTee`](future)) can call this directly without /// re-parsing from a raw line. - pub fn process_event(&mut self, frame: &EventFrame) { + pub(crate) fn process_event(&mut self, frame: &EventFrame) { match (&frame.event_type, &frame.payload) { (SSEEventType::ResponseCreated, EventPayload::Response { id, .. }) if !id.is_empty() => { self.response_id.clone_from(id); } - (_, EventPayload::OutputItemAdded { item_id, .. }) => { + (SSEEventType::OutputItemAdded, EventPayload::OutputItemAdded { item_id, .. }) => { self.finalize_current_message(); let id = if item_id.is_empty() { uuid7_str("msg_") @@ -203,7 +203,7 @@ impl ResponseAccumulator { }; self.current_message = Some(OutputMessage::new(&id, MessageStatus::InProgress.as_str())); } - (_, EventPayload::TextDelta { delta, .. }) => { + (SSEEventType::OutputTextDelta, EventPayload::TextDelta { delta, .. }) => { self.accumulated_text.push_str(delta); } (SSEEventType::ResponseCompleted, EventPayload::Response { usage, .. }) => { From 6f08ef4e1d5f483b4f08c1034e77371e60c4fce8 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Thu, 11 Jun 2026 15:41:11 -0700 Subject: [PATCH 3/4] test: add process_event unit tests for accumulator refactor 5 new tests exercising the refactored process_event method directly: - ResponseCreated sets response_id - ResponseCreated with empty id doesn't overwrite - TextDelta accumulates and attaches to current message - ResponseCompleted extracts usage token counts - Unknown event types silently ignored Signed-off-by: Ashwin Giridharan --- .../agentic-core/src/executor/accumulator.rs | 140 ++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/crates/agentic-core/src/executor/accumulator.rs b/crates/agentic-core/src/executor/accumulator.rs index 621b55c..193657f 100644 --- a/crates/agentic-core/src/executor/accumulator.rs +++ b/crates/agentic-core/src/executor/accumulator.rs @@ -325,4 +325,144 @@ mod tests { assert_eq!(MessageStatus::Completed.as_str(), "completed"); assert_eq!(MessageStatus::InProgress.as_str(), "in_progress"); } + + // --- process_event tests (exercises the refactored path directly) --- + + /// Feeding a `ResponseCreated` `EventFrame` sets the `response_id` on the accumulator. + #[test] + fn test_process_event_response_created_sets_id() { + let mut acc = ResponseAccumulator::new("resp_old".into(), None); + let frame = EventFrame { + event_type: SSEEventType::ResponseCreated, + payload: EventPayload::Response { + id: "resp_new".into(), + status: "in_progress".into(), + usage: None, + }, + sequence_number: Some(0), + }; + + acc.process_event(&frame); + assert_eq!(acc.response_id, "resp_new"); + } + + /// `ResponseCreated` with empty id should NOT overwrite the existing `response_id`. + #[test] + fn test_process_event_response_created_empty_id_no_overwrite() { + let mut acc = ResponseAccumulator::new("resp_keep".into(), None); + let frame = EventFrame { + event_type: SSEEventType::ResponseCreated, + payload: EventPayload::Response { + id: String::new(), + status: "in_progress".into(), + usage: None, + }, + sequence_number: Some(0), + }; + + acc.process_event(&frame); + assert_eq!(acc.response_id, "resp_keep"); + } + + /// `TextDelta` events accumulate text which gets attached to the current message. + #[test] + fn test_process_event_text_delta_accumulates() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + + // Start a message + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputItemAdded, + payload: EventPayload::OutputItemAdded { + item_id: "msg_1".into(), + item_type: "message".into(), + output_index: 0, + name: None, + call_id: None, + }, + sequence_number: Some(1), + }); + + // Feed deltas + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputTextDelta, + payload: EventPayload::TextDelta { + delta: "Hello".into(), + item_id: "msg_1".into(), + output_index: 0, + content_index: 0, + }, + sequence_number: Some(2), + }); + acc.process_event(&EventFrame { + event_type: SSEEventType::OutputTextDelta, + payload: EventPayload::TextDelta { + delta: " world".into(), + item_id: "msg_1".into(), + output_index: 0, + content_index: 0, + }, + sequence_number: Some(3), + }); + + // Finalize + acc.process_event(&EventFrame { + event_type: SSEEventType::ResponseCompleted, + payload: EventPayload::Response { + id: "resp_1".into(), + status: "completed".into(), + usage: None, + }, + sequence_number: Some(4), + }); + + assert_eq!(acc.status, ResponseStatus::Completed); + assert_eq!(acc.output.len(), 1); + if let OutputItem::Message(msg) = &acc.output[0] { + assert_eq!(msg.content[0].text, "Hello world"); + } else { + panic!("expected Message"); + } + } + + /// `ResponseCompleted` with usage extracts token counts correctly. + #[test] + fn test_process_event_completed_with_usage() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + let frame = EventFrame { + event_type: SSEEventType::ResponseCompleted, + payload: EventPayload::Response { + id: "resp_1".into(), + status: "completed".into(), + usage: Some(serde_json::json!({ + "input_tokens": 10, + "output_tokens": 5, + "total_tokens": 15 + })), + }, + sequence_number: Some(9), + }; + + acc.process_event(&frame); + assert_eq!(acc.status, ResponseStatus::Completed); + assert!(acc.usage.is_some()); + assert_eq!(acc.usage.unwrap().total_tokens, 15); + } + + /// Unknown/unhandled event types are silently ignored — no panic or state change. + /// Verifies the wildcard `_ => {}` arm works correctly. + #[test] + fn test_process_event_unknown_payload_ignored() { + let mut acc = ResponseAccumulator::new("resp_1".into(), None); + let frame = EventFrame { + event_type: SSEEventType::ContentPartAdded, + payload: EventPayload::Raw(serde_json::json!({"type": "response.content_part.added"})), + sequence_number: Some(3), + }; + + acc.process_event(&frame); + // No state change — still initial state + assert_eq!(acc.response_id, "resp_1"); + assert_eq!(acc.status, ResponseStatus::InProgress); + assert!(acc.output.is_empty()); + } } From 72e430a52eebc53a65ba8a9aa8202caa1869fee9 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Tue, 16 Jun 2026 11:52:44 -0700 Subject: [PATCH 4/4] refactor: pass id by value to avoid redundant allocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OutputMessage::new takes `impl Into`, so passing the owned String directly avoids a second allocation from &String → String. Signed-off-by: Ashwin Giridharan --- crates/agentic-core/src/executor/accumulator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/agentic-core/src/executor/accumulator.rs b/crates/agentic-core/src/executor/accumulator.rs index 193657f..f64b53d 100644 --- a/crates/agentic-core/src/executor/accumulator.rs +++ b/crates/agentic-core/src/executor/accumulator.rs @@ -201,7 +201,7 @@ impl ResponseAccumulator { } else { item_id.clone() }; - self.current_message = Some(OutputMessage::new(&id, MessageStatus::InProgress.as_str())); + self.current_message = Some(OutputMessage::new(id, MessageStatus::InProgress.as_str())); } (SSEEventType::OutputTextDelta, EventPayload::TextDelta { delta, .. }) => { self.accumulated_text.push_str(delta);