Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions crates/loopal-runtime/src/agent_loop/llm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use loopal_message::{ContentBlock, Message};
use loopal_protocol::AgentEventPayload;
use loopal_provider_api::StreamChunk;
use std::time::Instant;
use tracing::{error, info};
use tracing::{error, info, warn};

impl AgentLoopRunner {
/// Stream the LLM response using a provided working copy of messages.
Expand Down Expand Up @@ -52,13 +52,14 @@ impl AgentLoopRunner {
.retry_stream_chat(&chat_params, &*provider, cancel)
.await?;
let mut result = LlmStreamResult::default();
let mut received_done = false;

loop {
tokio::select! {
biased;
chunk = stream.next() => {
let Some(chunk_result) = chunk else { break; };
if !self.handle_stream_chunk(chunk_result, &mut result).await? {
if !self.handle_stream_chunk(chunk_result, &mut result, &mut received_done).await? {
break;
}
}
Expand All @@ -70,6 +71,14 @@ impl AgentLoopRunner {
}
}

// Stream EOF without Done → connection dropped mid-stream.
// Exclude cancellation: retry_stream_chat returns empty stream on cancel,
// which would look like truncation but is intentional.
if !received_done && !result.stream_error && !cancel.is_cancelled() {
warn!("SSE stream ended without message_stop — treating as stream truncation");
result.stream_error = true;
}

self.emit_thinking_complete(&result).await?;
let llm_duration = llm_start.elapsed();
info!(
Expand All @@ -88,6 +97,7 @@ impl AgentLoopRunner {
&mut self,
chunk: std::result::Result<StreamChunk, loopal_error::LoopalError>,
result: &mut LlmStreamResult,
received_done: &mut bool,
) -> Result<bool> {
match chunk {
Ok(StreamChunk::Text { text }) => {
Expand Down Expand Up @@ -162,6 +172,7 @@ impl AgentLoopRunner {
.await?;
}
Ok(StreamChunk::Done { stop_reason }) => {
*received_done = true;
result.stop_reason = stop_reason;
return Ok(false);
}
Expand Down
3 changes: 3 additions & 0 deletions crates/loopal-runtime/src/agent_loop/llm_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use loopal_provider_api::StopReason;
pub struct LlmStreamResult {
pub assistant_text: String,
pub tool_uses: Vec<(String, String, serde_json::Value)>,
/// True when the stream did not complete normally: explicit error chunk,
/// user cancellation, or silent truncation (EOF without `message_stop`).
/// `turn_exec` uses this to decide whether to auto-continue or bail.
pub stream_error: bool,
pub stop_reason: StopReason,
pub thinking_text: String,
Expand Down
1 change: 1 addition & 0 deletions crates/loopal-runtime/src/agent_loop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod turn_context;
mod turn_exec;
pub(crate) mod turn_metrics;
pub mod turn_observer;
mod turn_observer_dispatch;

use std::collections::HashSet;
use std::sync::Arc;
Expand Down
114 changes: 49 additions & 65 deletions crates/loopal-runtime/src/agent_loop/turn_exec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! Inner turn execution loop and observer dispatch.
//! Inner turn execution loop.
//!
//! Observer dispatch lives in `turn_observer_dispatch.rs`.

use loopal_error::Result;
use loopal_protocol::AgentEventPayload;
Expand All @@ -8,7 +10,6 @@ use tracing::{debug, info, warn};
use super::TurnOutput;
use super::runner::AgentLoopRunner;
use super::turn_context::TurnContext;
use super::turn_observer::ObserverAction;

impl AgentLoopRunner {
/// Inner loop: LLM → [tools → LLM]* → done.
Expand All @@ -26,9 +27,7 @@ impl AgentLoopRunner {
return Ok(TurnOutput { output: last_text });
}

// Persistent compaction (LLM summarization if over budget)
self.check_and_compact().await?;
// Prepare context for LLM (clone + strip old thinking)
let working = self.params.store.prepare_for_llm();
turn_ctx.metrics.llm_calls += 1;
let result = self.stream_llm_with(&working, &turn_ctx.cancel).await?;
Expand All @@ -45,12 +44,30 @@ impl AgentLoopRunner {
&result.tool_uses
};

// Auto-continue triggers: MaxTokens+tools, PauseTurn (server-side limit)
// Auto-continue: MaxTokens+tools, PauseTurn, or stream truncation.
// Exclude user cancellation — cancel should stop, not retry.
let needs_auto_continue = truncated || result.stop_reason == StopReason::PauseTurn;
if needs_auto_continue {
let stream_truncated = result.stream_error
&& !turn_ctx.cancel.is_cancelled()
&& !(result.assistant_text.is_empty() && result.tool_uses.is_empty());

if needs_auto_continue || stream_truncated {
if stream_truncated {
warn!(
text_len = result.assistant_text.len(),
tool_calls = result.tool_uses.len(),
"stream truncated — discarding incomplete tool calls"
);
}
// For truncated streams, discard possibly-incomplete tool_use blocks.
let tools = if stream_truncated {
&[][..]
} else {
effective_tools
};
self.record_assistant_message(
&result.assistant_text,
effective_tools,
tools,
&result.thinking_text,
result.thinking_signature.as_deref(),
result.server_blocks,
Expand All @@ -71,6 +88,23 @@ impl AgentLoopRunner {
return Ok(TurnOutput { output: last_text });
}

// Stream error (cancel or empty truncation) — record any partial
// text that was already streamed to the user, then exit.
if result.stream_error {
if !result.assistant_text.is_empty() {
let no_tools: &[(String, String, serde_json::Value)] = &[];
self.record_assistant_message(
&result.assistant_text,
no_tools,
&result.thinking_text,
result.thinking_signature.as_deref(),
result.server_blocks,
);
last_text.clone_from(&result.assistant_text);
}
return Ok(TurnOutput { output: last_text });
}

self.record_assistant_message(
&result.assistant_text,
&result.tool_uses,
Expand All @@ -82,13 +116,6 @@ impl AgentLoopRunner {
last_text.clone_from(&result.assistant_text);
}

if result.stream_error
&& result.tool_uses.is_empty()
&& result.assistant_text.is_empty()
{
return Ok(TurnOutput { output: last_text });
}

if result.tool_uses.is_empty() && result.stop_reason == StopReason::MaxTokens {
if continuation_count < self.params.harness.max_auto_continuations {
continuation_count += 1;
Expand All @@ -104,32 +131,14 @@ impl AgentLoopRunner {
}

if result.tool_uses.is_empty() {
// Stop hook: if any hook provides feedback, continue (bounded).
if stop_feedback_count < max_stop_feedback {
let stop_outputs = self
.params
.deps
.kernel
.hook_service()
.run_hooks(
loopal_config::HookEvent::Stop,
&loopal_hooks::HookContext {
stop_reason: Some("end_turn"),
..Default::default()
},
)
.await;
let feedback: Vec<&str> = stop_outputs
.iter()
.filter_map(|o| o.additional_context.as_deref())
.collect();
if !feedback.is_empty() {
stop_feedback_count += 1;
self.params
.store
.append_warnings_to_last_user(vec![feedback.join("\n")]);
continue;
}
if stop_feedback_count < max_stop_feedback
&& let Some(feedback) = self.run_stop_hooks().await
{
stop_feedback_count += 1;
self.params
.store
.append_warnings_to_last_user(vec![feedback]);
continue;
}
return Ok(TurnOutput { output: last_text });
}
Expand Down Expand Up @@ -158,12 +167,10 @@ impl AgentLoopRunner {
turn_ctx.metrics.tool_errors += stats.errors;
info!("tool exec complete");

// Append observer warnings after tool results (must follow ToolResult blocks).
let warnings = std::mem::take(&mut turn_ctx.pending_warnings);
self.params.store.append_warnings_to_last_user(warnings);

self.inject_pending_messages().await;
// Observer: on_after_tools with results from the last message
let result_blocks = self
.params
.store
Expand All @@ -178,27 +185,4 @@ impl AgentLoopRunner {
continuation_count = 0;
}
}

/// Run before-tools observers. Returns `true` if the turn should abort.
pub(super) async fn run_before_tools(
&mut self,
turn_ctx: &mut TurnContext,
tool_uses: &[(String, String, serde_json::Value)],
) -> Result<bool> {
for obs in &mut self.observers {
match obs.on_before_tools(turn_ctx, tool_uses) {
ObserverAction::Continue => {}
ObserverAction::InjectWarning(msg) => {
turn_ctx.pending_warnings.push(msg);
}
ObserverAction::AbortTurn(reason) => {
warn!(%reason, "observer aborted turn");
self.emit(AgentEventPayload::Error { message: reason })
.await?;
return Ok(true);
}
}
}
Ok(false)
}
}
64 changes: 64 additions & 0 deletions crates/loopal-runtime/src/agent_loop/turn_observer_dispatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
//! Stop hook and observer dispatch for the turn execution loop.
//!
//! Extracted from `turn_exec` — these are lifecycle extension points
//! with independent change reasons (hook config, observer API).

use loopal_error::Result;
use loopal_protocol::AgentEventPayload;
use tracing::warn;

use super::runner::AgentLoopRunner;
use super::turn_context::TurnContext;
use super::turn_observer::ObserverAction;

impl AgentLoopRunner {
/// Run before-tools observers. Returns `true` if the turn should abort.
pub(super) async fn run_before_tools(
&mut self,
turn_ctx: &mut TurnContext,
tool_uses: &[(String, String, serde_json::Value)],
) -> Result<bool> {
for obs in &mut self.observers {
match obs.on_before_tools(turn_ctx, tool_uses) {
ObserverAction::Continue => {}
ObserverAction::InjectWarning(msg) => {
turn_ctx.pending_warnings.push(msg);
}
ObserverAction::AbortTurn(reason) => {
warn!(%reason, "observer aborted turn");
self.emit(AgentEventPayload::Error { message: reason })
.await?;
return Ok(true);
}
}
}
Ok(false)
}

/// Run Stop lifecycle hooks. Returns feedback to inject if hooks want
/// the agent to continue, or `None` to let the turn end.
pub(super) async fn run_stop_hooks(&self) -> Option<String> {
let stop_outputs = self
.params
.deps
.kernel
.hook_service()
.run_hooks(
loopal_config::HookEvent::Stop,
&loopal_hooks::HookContext {
stop_reason: Some("end_turn"),
..Default::default()
},
)
.await;
let feedback: Vec<&str> = stop_outputs
.iter()
.filter_map(|o| o.additional_context.as_deref())
.collect();
if feedback.is_empty() {
None
} else {
Some(feedback.join("\n"))
}
}
}
7 changes: 5 additions & 2 deletions crates/loopal-runtime/tests/agent_loop/llm_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async fn test_stream_llm_error_in_stream() {

#[tokio::test]
async fn test_stream_llm_empty_stream() {
// Empty stream (no chunks at all) — tests the while loop body never executing
// Empty stream (no chunks at all) — stream EOF without Done = truncation.
let chunks = vec![];
let (mut runner, _event_rx, _input_tx, _ctrl_tx) = make_runner_with_mock_provider(chunks);

Expand All @@ -189,7 +189,10 @@ async fn test_stream_llm_empty_stream() {
let stream_error = result.stream_error;
assert!(text.is_empty());
assert!(tool_uses.is_empty());
assert!(!stream_error);
assert!(
stream_error,
"empty stream (no Done) should set stream_error"
);
}

#[test]
Expand Down
Loading
Loading