diff --git a/crates/loopal-agent-hub/src/agent_io.rs b/crates/loopal-agent-hub/src/agent_io.rs index e1b73ada..bdfb6abe 100644 --- a/crates/loopal-agent-hub/src/agent_io.rs +++ b/crates/loopal-agent-hub/src/agent_io.rs @@ -48,7 +48,7 @@ pub async fn agent_io_loop( } let h = hub.lock().await; if h.registry.event_sender().try_send(event).is_err() { - tracing::debug!(agent = %agent_name, "event dropped (channel full)"); + tracing::warn!(agent = %agent_name, "event dropped (channel full)"); } } } diff --git a/crates/loopal-agent-hub/src/agent_registry/completion.rs b/crates/loopal-agent-hub/src/agent_registry/completion.rs index 169230db..aebc3f8e 100644 --- a/crates/loopal-agent-hub/src/agent_registry/completion.rs +++ b/crates/loopal-agent-hub/src/agent_registry/completion.rs @@ -27,7 +27,9 @@ impl AgentRegistry { let pending_delivery = self.prepare_parent_delivery(name, &text); let event = AgentEvent::named(name, AgentEventPayload::Finished); - let _ = self.event_tx.try_send(event); + if self.event_tx.try_send(event).is_err() { + tracing::warn!(agent = %name, "Finished event dropped (channel full)"); + } if let Some(tx) = self.completions.remove(name) { let _ = tx.send(Some(text)); diff --git a/crates/loopal-agent-hub/src/routing.rs b/crates/loopal-agent-hub/src/routing.rs index fa8e181b..3fadbb95 100644 --- a/crates/loopal-agent-hub/src/routing.rs +++ b/crates/loopal-agent-hub/src/routing.rs @@ -26,6 +26,11 @@ pub async fn route_to_agent( target: envelope.target.clone(), content_preview: envelope.content_preview().to_string(), }); - let _ = observation_tx.try_send(event); + if observation_tx.try_send(event).is_err() { + tracing::warn!( + target = %envelope.target, + "observation event dropped (channel full)" + ); + } Ok(()) } diff --git a/crates/loopal-agent-hub/src/spawn_manager.rs b/crates/loopal-agent-hub/src/spawn_manager.rs index a573e6ca..78bd7598 100644 --- a/crates/loopal-agent-hub/src/spawn_manager.rs +++ b/crates/loopal-agent-hub/src/spawn_manager.rs @@ -130,7 +130,7 @@ pub async fn register_agent_connection( session_id: session_id.map(String::from), }); if h.registry.event_sender().try_send(event).is_err() { - tracing::debug!(agent = %name, "SubAgentSpawned event dropped"); + tracing::warn!(agent = %name, "SubAgentSpawned event dropped (channel full)"); } } agent_id diff --git a/crates/loopal-agent-server/src/hub_frontend.rs b/crates/loopal-agent-server/src/hub_frontend.rs index 549d77db..8d635b79 100644 --- a/crates/loopal-agent-server/src/hub_frontend.rs +++ b/crates/loopal-agent-server/src/hub_frontend.rs @@ -14,18 +14,18 @@ use tracing::{debug, info}; use loopal_error::{LoopalError, Result}; use loopal_ipc::protocol::methods; -use loopal_protocol::{AgentEvent, AgentEventPayload, Envelope, Question, UserQuestionResponse}; +use loopal_protocol::{AgentEvent, AgentEventPayload, Question, UserQuestionResponse}; use loopal_runtime::agent_input::AgentInput; use loopal_runtime::frontend::traits::{AgentFrontend, EventEmitter}; use loopal_tool_api::PermissionDecision; use crate::hub_emitter::HubEventEmitter; -use crate::session_hub::{InputFromClient, SharedSession}; +use crate::session_hub::SharedSession; /// Frontend that multiplexes across all clients in a shared session. pub struct HubFrontend { session: tokio::sync::RwLock>, - input_rx: tokio::sync::Mutex>, + input_rx: tokio::sync::Mutex>, agent_name: Option, /// Watch channel for interrupt detection in recv_input. interrupt_rx: tokio::sync::Mutex>, @@ -34,7 +34,7 @@ pub struct HubFrontend { impl HubFrontend { pub fn new( session: Arc, - input_rx: tokio::sync::mpsc::Receiver, + input_rx: tokio::sync::mpsc::Receiver, agent_name: Option, interrupt_rx: tokio::sync::watch::Receiver, ) -> Self { @@ -95,18 +95,12 @@ impl AgentFrontend for HubFrontend { // has already been handled by TurnCancel. Without this, changed() // fires immediately on the old value and exits the agent loop. interrupt_rx.borrow_and_update(); - loop { - tokio::select! { - msg = rx.recv() => { - match msg? { - InputFromClient::Message(env) => return Some(AgentInput::Message(env)), - InputFromClient::Control(cmd) => return Some(AgentInput::Control(cmd)), - InputFromClient::Interrupt => continue, - } - } - _ = interrupt_rx.changed() => { - return None; // Interrupted — exit agent loop - } + tokio::select! { + msg = rx.recv() => { + return msg; + } + _ = interrupt_rx.changed() => { + return None; // Interrupted — exit agent loop } } } @@ -199,14 +193,12 @@ impl AgentFrontend for HubFrontend { true } - async fn drain_pending(&self) -> Vec { + async fn drain_pending(&self) -> Vec { let mut rx = self.input_rx.lock().await; - let mut envelopes = Vec::new(); + let mut inputs = Vec::new(); while let Ok(msg) = rx.try_recv() { - if let InputFromClient::Message(env) = msg { - envelopes.push(env); - } + inputs.push(msg); } - envelopes + inputs } } diff --git a/crates/loopal-agent-server/src/lib.rs b/crates/loopal-agent-server/src/lib.rs index 95dc992c..42f7a33f 100644 --- a/crates/loopal-agent-server/src/lib.rs +++ b/crates/loopal-agent-server/src/lib.rs @@ -48,7 +48,7 @@ pub fn ipc_frontend_for_test( #[doc(hidden)] pub fn hub_frontend_for_test( session: std::sync::Arc, - input_rx: tokio::sync::mpsc::Receiver, + input_rx: tokio::sync::mpsc::Receiver, interrupt_rx: tokio::sync::watch::Receiver, ) -> std::sync::Arc { std::sync::Arc::new(hub_frontend::HubFrontend::new( @@ -64,5 +64,6 @@ pub fn hub_frontend_for_test( pub mod testing { pub use crate::agent_setup::build_with_frontend; pub use crate::params::{StartParams, build_kernel_with_provider}; - pub use crate::session_hub::{InputFromClient, SharedSession}; + pub use crate::session_hub::SharedSession; + pub use loopal_runtime::agent_input::AgentInput; } diff --git a/crates/loopal-agent-server/src/session_forward.rs b/crates/loopal-agent-server/src/session_forward.rs index 2ca20f70..31397ac9 100644 --- a/crates/loopal-agent-server/src/session_forward.rs +++ b/crates/loopal-agent-server/src/session_forward.rs @@ -10,8 +10,8 @@ use loopal_ipc::connection::{Connection, Incoming}; use loopal_ipc::jsonrpc; use loopal_ipc::protocol::methods; use loopal_protocol::{ControlCommand, Envelope}; +use loopal_runtime::agent_input::AgentInput; -use crate::session_hub::InputFromClient; use crate::session_start::SessionHandle; /// Result of forward_loop — tells dispatch_loop what happened. @@ -74,7 +74,7 @@ pub(crate) async fn forward_loop( } else if method == methods::AGENT_MESSAGE.name { // Hub-injected message (e.g. sub-agent completion notification). if let Ok(env) = serde_json::from_value::(params) { - let _ = session.input_tx.send(InputFromClient::Message(env)).await; + let _ = session.input_tx.send(AgentInput::Message(env)).await; } } } @@ -98,7 +98,7 @@ async fn route_request( match method { m if m == methods::AGENT_MESSAGE.name => match serde_json::from_value::(params) { Ok(env) => { - let _ = session.input_tx.send(InputFromClient::Message(env)).await; + let _ = session.input_tx.send(AgentInput::Message(env)).await; let _ = connection .respond(id, serde_json::json!({"ok": true})) .await; @@ -112,7 +112,7 @@ async fn route_request( m if m == methods::AGENT_CONTROL.name => { match serde_json::from_value::(params) { Ok(cmd) => { - let _ = session.input_tx.send(InputFromClient::Control(cmd)).await; + let _ = session.input_tx.send(AgentInput::Control(cmd)).await; let _ = connection .respond(id, serde_json::json!({"ok": true})) .await; diff --git a/crates/loopal-agent-server/src/session_hub.rs b/crates/loopal-agent-server/src/session_hub.rs index 73a4f611..ae6863c6 100644 --- a/crates/loopal-agent-server/src/session_hub.rs +++ b/crates/loopal-agent-server/src/session_hub.rs @@ -8,6 +8,7 @@ use tokio::sync::Mutex; use loopal_ipc::connection::Connection; use loopal_protocol::InterruptSignal; +use loopal_runtime::agent_input::AgentInput; /// A connected client handle within a shared session. pub struct ClientHandle { @@ -22,19 +23,12 @@ pub struct SharedSession { pub session_id: String, pub clients: Mutex>, /// Channel to send input into the agent loop. - pub input_tx: tokio::sync::mpsc::Sender, + pub input_tx: tokio::sync::mpsc::Sender, /// Interrupt signal shared with the agent loop. pub interrupt: InterruptSignal, pub interrupt_tx: Arc>, } -/// Input forwarded from a client connection to the agent loop. -pub enum InputFromClient { - Message(loopal_protocol::Envelope), - Control(loopal_protocol::ControlCommand), - Interrupt, -} - /// Server-wide session registry. #[derive(Default)] pub struct SessionHub { @@ -108,7 +102,7 @@ impl SessionHub { impl SharedSession { /// Create a placeholder session (for bootstrapping before session_id is known). pub fn placeholder( - input_tx: tokio::sync::mpsc::Sender, + input_tx: tokio::sync::mpsc::Sender, interrupt: InterruptSignal, interrupt_tx: Arc>, ) -> Self { diff --git a/crates/loopal-agent-server/src/session_start.rs b/crates/loopal-agent-server/src/session_start.rs index 8ff6e915..138e9c66 100644 --- a/crates/loopal-agent-server/src/session_start.rs +++ b/crates/loopal-agent-server/src/session_start.rs @@ -10,12 +10,13 @@ use loopal_config::load_config; use loopal_error::AgentOutput; use loopal_ipc::connection::Connection; use loopal_protocol::InterruptSignal; +use loopal_runtime::agent_input::AgentInput; use loopal_runtime::agent_loop; use crate::agent_setup; use crate::hub_frontend::HubFrontend; use crate::params::StartParams; -use crate::session_hub::{InputFromClient, SessionHub, SharedSession}; +use crate::session_hub::{SessionHub, SharedSession}; /// Handle returned to the dispatch loop after starting a session. pub(crate) struct SessionHandle { @@ -77,7 +78,7 @@ pub(crate) async fn start_session( }; // Create session infrastructure - let (input_tx, input_rx) = tokio::sync::mpsc::channel::(16); + let (input_tx, input_rx) = tokio::sync::mpsc::channel::(16); let interrupt = InterruptSignal::new(); let (watch_tx, watch_rx) = tokio::sync::watch::channel(0u64); let interrupt_tx = Arc::new(watch_tx); diff --git a/crates/loopal-agent-server/tests/suite.rs b/crates/loopal-agent-server/tests/suite.rs index 23ebe2e5..2f8c3b2a 100644 --- a/crates/loopal-agent-server/tests/suite.rs +++ b/crates/loopal-agent-server/tests/suite.rs @@ -6,6 +6,8 @@ mod bridge_edge_test; mod bridge_helpers; #[path = "suite/dispatch_loop_test.rs"] mod dispatch_loop_test; +#[path = "suite/hub_drain_test.rs"] +mod hub_drain_test; #[path = "suite/hub_frontend_edge_test.rs"] mod hub_frontend_edge_test; #[path = "suite/hub_frontend_test.rs"] diff --git a/crates/loopal-agent-server/tests/suite/hub_drain_test.rs b/crates/loopal-agent-server/tests/suite/hub_drain_test.rs new file mode 100644 index 00000000..c0c7131f --- /dev/null +++ b/crates/loopal-agent-server/tests/suite/hub_drain_test.rs @@ -0,0 +1,65 @@ +//! Tests for HubFrontend::drain_pending() — message and control routing. + +use std::sync::Arc; + +use tokio::sync::Mutex; + +use loopal_protocol::{ControlCommand, Envelope, InterruptSignal, MessageSource}; +use loopal_runtime::agent_input::AgentInput; +use loopal_runtime::frontend::traits::AgentFrontend; + +use loopal_agent_server::hub_frontend::HubFrontend; +use loopal_agent_server::session_hub::SharedSession; + +fn make_session() -> ( + Arc, + tokio::sync::mpsc::Sender, + tokio::sync::mpsc::Receiver, + tokio::sync::watch::Receiver, +) { + let (input_tx, input_rx) = tokio::sync::mpsc::channel(16); + let interrupt = InterruptSignal::new(); + let (watch_tx, watch_rx) = tokio::sync::watch::channel(0u64); + let session = Arc::new(SharedSession { + session_id: "test-session".into(), + clients: Mutex::new(Vec::new()), + input_tx: input_tx.clone(), + interrupt, + interrupt_tx: Arc::new(watch_tx), + }); + (session, input_tx, input_rx, watch_rx) +} + +#[tokio::test] +async fn test_hub_drain_pending_messages() { + let (session, input_tx, input_rx, watch_rx) = make_session(); + let frontend = HubFrontend::new(session, input_rx, None, watch_rx); + + let env = Envelope::new(MessageSource::Human, "main", "hello"); + input_tx.send(AgentInput::Message(env)).await.unwrap(); + + let pending = frontend.drain_pending().await; + assert_eq!(pending.len(), 1); + let AgentInput::Message(ref env) = pending[0] else { + panic!("expected AgentInput::Message"); + }; + assert_eq!(env.content.text, "hello"); +} + +#[tokio::test] +async fn test_hub_drain_pending_controls() { + let (session, input_tx, input_rx, watch_rx) = make_session(); + let frontend = HubFrontend::new(session, input_rx, None, watch_rx); + + input_tx + .send(AgentInput::Control(ControlCommand::Clear)) + .await + .unwrap(); + + let pending = frontend.drain_pending().await; + assert_eq!(pending.len(), 1); + assert!(matches!( + pending[0], + AgentInput::Control(ControlCommand::Clear) + )); +} diff --git a/crates/loopal-agent-server/tests/suite/hub_frontend_edge_test.rs b/crates/loopal-agent-server/tests/suite/hub_frontend_edge_test.rs index a25dbb77..e1fc1551 100644 --- a/crates/loopal-agent-server/tests/suite/hub_frontend_edge_test.rs +++ b/crates/loopal-agent-server/tests/suite/hub_frontend_edge_test.rs @@ -11,14 +11,14 @@ use loopal_runtime::agent_input::AgentInput; use loopal_runtime::frontend::traits::AgentFrontend; use loopal_agent_server::hub_frontend::HubFrontend; -use loopal_agent_server::session_hub::{InputFromClient, SharedSession}; +use loopal_agent_server::session_hub::SharedSession; const T: Duration = Duration::from_secs(5); fn make_session() -> ( Arc, - tokio::sync::mpsc::Sender, - tokio::sync::mpsc::Receiver, + tokio::sync::mpsc::Sender, + tokio::sync::mpsc::Receiver, tokio::sync::watch::Receiver, ) { let (input_tx, input_rx) = tokio::sync::mpsc::channel(16); @@ -60,7 +60,7 @@ async fn stale_interrupt_does_not_exit_recv_input() { // Now send a real message — recv_input should return it. let env = Envelope::new(MessageSource::Human, "main", "hello after interrupt"); - input_tx.send(InputFromClient::Message(env)).await.unwrap(); + input_tx.send(AgentInput::Message(env)).await.unwrap(); let result = tokio::time::timeout(T, recv_task).await.unwrap().unwrap(); assert!( @@ -104,7 +104,7 @@ async fn interrupt_then_continue_cycle() { // Send a message — should be delivered. let env = Envelope::new(MessageSource::Human, "main", "continue working"); - input_tx.send(InputFromClient::Message(env)).await.unwrap(); + input_tx.send(AgentInput::Message(env)).await.unwrap(); let result2 = tokio::time::timeout(T, recv2).await.unwrap().unwrap(); assert!( @@ -136,7 +136,7 @@ async fn multiple_stale_interrupts_all_consumed() { ); let env = Envelope::new(MessageSource::Human, "main", "msg"); - input_tx.send(InputFromClient::Message(env)).await.unwrap(); + input_tx.send(AgentInput::Message(env)).await.unwrap(); let result = tokio::time::timeout(T, recv_task).await.unwrap().unwrap(); assert!(matches!(result, Some(AgentInput::Message(_)))); diff --git a/crates/loopal-agent-server/tests/suite/hub_frontend_test.rs b/crates/loopal-agent-server/tests/suite/hub_frontend_test.rs index 3330d52d..19fddf7a 100644 --- a/crates/loopal-agent-server/tests/suite/hub_frontend_test.rs +++ b/crates/loopal-agent-server/tests/suite/hub_frontend_test.rs @@ -13,7 +13,8 @@ use loopal_ipc::transport::Transport; use loopal_protocol::{AgentEventPayload, InterruptSignal}; use loopal_runtime::frontend::traits::AgentFrontend; -use loopal_agent_server::session_hub::{InputFromClient, SharedSession}; +use loopal_agent_server::session_hub::SharedSession; +use loopal_runtime::agent_input::AgentInput; /// Create a bidirectional Connection pair (like a network socket pair). /// Returns (server_conn, client_conn, client_rx). @@ -41,7 +42,7 @@ fn conn_pair() -> ( fn make_session() -> ( Arc, - tokio::sync::mpsc::Receiver, + tokio::sync::mpsc::Receiver, tokio::sync::watch::Receiver, ) { let (input_tx, input_rx) = tokio::sync::mpsc::channel(16); diff --git a/crates/loopal-agent-server/tests/suite/hub_harness.rs b/crates/loopal-agent-server/tests/suite/hub_harness.rs index 70aae7c1..a3c84311 100644 --- a/crates/loopal-agent-server/tests/suite/hub_harness.rs +++ b/crates/loopal-agent-server/tests/suite/hub_harness.rs @@ -19,13 +19,13 @@ use loopal_test_support::mock_provider::MultiCallProvider; use loopal_test_support::scenarios::Calls; use loopal_agent_server::testing::{ - InputFromClient, SharedSession, StartParams, build_kernel_with_provider, + AgentInput, SharedSession, StartParams, build_kernel_with_provider, }; pub const T: Duration = Duration::from_secs(10); pub struct HubTestHarness { - pub input_tx: mpsc::Sender, + pub input_tx: mpsc::Sender, pub interrupt: InterruptSignal, pub interrupt_tx: Arc>, pub client_rx: mpsc::Receiver, @@ -52,7 +52,7 @@ impl HubTestHarness { pub async fn send_message(&self, text: &str) { let env = Envelope::new(MessageSource::Human, "main", text); self.input_tx - .send(InputFromClient::Message(env)) + .send(AgentInput::Message(env)) .await .expect("input channel open"); } @@ -101,7 +101,7 @@ pub async fn build_hub_harness_with( let provider = Arc::new(MultiCallProvider::new(calls)); let kernel = build_kernel_with_provider(provider).unwrap(); - let (input_tx, input_rx) = mpsc::channel::(16); + let (input_tx, input_rx) = mpsc::channel::(16); let interrupt = InterruptSignal::new(); let (watch_tx, watch_rx) = watch::channel(0u64); let interrupt_tx = Arc::new(watch_tx); diff --git a/crates/loopal-protocol/src/agent_state.rs b/crates/loopal-protocol/src/agent_state.rs index 3ac7e902..96371397 100644 --- a/crates/loopal-protocol/src/agent_state.rs +++ b/crates/loopal-protocol/src/agent_state.rs @@ -20,6 +20,11 @@ pub enum AgentStatus { /// /// Collected on the Observation Plane and consumed by the frontend to render /// per-agent status panels. All fields are cheap to clone. +/// +/// **Status contract**: `status` is derived solely from agent events +/// (`AwaitingInput`, `Finished`, `Error`, `Stream`, `ToolCall`, etc.). +/// The agent loop's internal status field is for local idempotency only; +/// this observable copy is the authoritative view for all external consumers. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ObservableAgentState { /// Current lifecycle status. diff --git a/crates/loopal-protocol/src/interrupt.rs b/crates/loopal-protocol/src/interrupt.rs index 61661d09..33ff8449 100644 --- a/crates/loopal-protocol/src/interrupt.rs +++ b/crates/loopal-protocol/src/interrupt.rs @@ -1,6 +1,8 @@ //! Shared interrupt signal for cancelling in-progress agent work. //! -//! Used by both cancel-to-interrupt and send-message-while-busy flows. +//! Used exclusively for explicit cancel (ESC key) and shutdown flows. +//! Message arrival does NOT trigger interrupt — messages are deposited +//! directly into the agent's mailbox and processed at the agent's pace. //! The consumer calls `signal()`, the runtime polls `is_signaled()` at //! key checkpoints (per stream chunk, before tool execution). @@ -18,7 +20,7 @@ impl InterruptSignal { Self(Arc::new(AtomicBool::new(false))) } - /// Raise the interrupt flag (called by consumer on cancel or new message). + /// Raise the interrupt flag (called by consumer on explicit cancel or shutdown). pub fn signal(&self) { self.0.store(true, Ordering::Release); } diff --git a/crates/loopal-runtime/src/agent_loop/input.rs b/crates/loopal-runtime/src/agent_loop/input.rs index f5ed1fd6..b86f7b19 100644 --- a/crates/loopal-runtime/src/agent_loop/input.rs +++ b/crates/loopal-runtime/src/agent_loop/input.rs @@ -118,8 +118,22 @@ impl AgentLoopRunner { } /// Non-blocking drain of all pending input (frontend + scheduler + rewake). + /// + /// Returns pending message envelopes. Control commands are processed + /// inline (they affect agent config, not conversation history). pub(super) async fn drain_pending_input(&mut self) -> Vec { - let mut pending = self.params.deps.frontend.drain_pending().await; + let all_inputs = self.params.deps.frontend.drain_pending().await; + let mut pending = Vec::new(); + for input in all_inputs { + match input { + AgentInput::Message(env) => pending.push(env), + AgentInput::Control(cmd) => { + if let Err(e) = self.handle_control(cmd).await { + tracing::warn!(error = %e, "failed to handle drained control"); + } + } + } + } if let Some(ref mut rx) = self.trigger_rx { while let Ok(env) = rx.try_recv() { pending.push(env); diff --git a/crates/loopal-runtime/src/agent_loop/run.rs b/crates/loopal-runtime/src/agent_loop/run.rs index 372ae65f..135463ce 100644 --- a/crates/loopal-runtime/src/agent_loop/run.rs +++ b/crates/loopal-runtime/src/agent_loop/run.rs @@ -29,9 +29,8 @@ impl AgentLoopRunner { match self.params.config.lifecycle { LifecycleMode::Ephemeral => { - // Ephemeral agent: check for pending input. If nothing pending - // after a brief yield, work is done — exit. - tokio::task::yield_now().await; + // Messages are delivered directly to the agent mailbox. + // drain is reliable — no yield/timeout needed. let pending = self.drain_pending_input().await; if pending.is_empty() { info!("ephemeral agent idle, exiting"); diff --git a/crates/loopal-runtime/src/agent_loop/runner.rs b/crates/loopal-runtime/src/agent_loop/runner.rs index fc54b432..144eef03 100644 --- a/crates/loopal-runtime/src/agent_loop/runner.rs +++ b/crates/loopal-runtime/src/agent_loop/runner.rs @@ -28,7 +28,12 @@ pub struct AgentLoopRunner { pub trigger_rx: Option>, /// Async hook rewake channel — background hooks send Envelopes here. pub rewake_rx: Option>, - /// Explicit agent state — source of truth, propagated via events to Session layer. + /// Local status for idempotent `transition()` checks. + /// + /// This is NOT the authoritative status for external observers. The session + /// layer derives its `observable.status` solely from agent events + /// (`AwaitingInput`, `Finished`, `Error`, etc.). If `emit()` fails during + /// `transition()`, this field is rolled back so the event can be retried. pub status: AgentStatus, /// Plan file for the current session (created lazily on first plan mode entry). pub plan_file: PlanFile, @@ -175,18 +180,27 @@ impl AgentLoopRunner { } /// Transition to a new agent status. Skips if already in target (idempotent). + /// + /// If the event emission fails, the local status is rolled back so the + /// transition can be retried. This keeps `self.status` consistent with + /// what observers have actually seen. pub(super) async fn transition(&mut self, new_status: AgentStatus) -> Result<()> { if self.status == new_status { return Ok(()); } + let old = self.status; self.status = new_status; - match new_status { + let result = match new_status { AgentStatus::Starting => Ok(()), AgentStatus::Running => Ok(()), // Running is signaled implicitly by Stream/ToolCall events. AgentStatus::WaitingForInput => self.emit(AgentEventPayload::AwaitingInput).await, AgentStatus::Finished => self.emit(AgentEventPayload::Finished).await, AgentStatus::Error => Ok(()), // Error event carries a message; use transition_error(). + }; + if result.is_err() { + self.status = old; } + result } /// Transition to Error status with a message. diff --git a/crates/loopal-runtime/src/agent_loop/tools_inject.rs b/crates/loopal-runtime/src/agent_loop/tools_inject.rs index 77d06ff0..bdc9c5e7 100644 --- a/crates/loopal-runtime/src/agent_loop/tools_inject.rs +++ b/crates/loopal-runtime/src/agent_loop/tools_inject.rs @@ -60,24 +60,34 @@ impl AgentLoopRunner { Ok(()) } - /// Drain pending envelopes from the frontend and inject them as user messages. + /// Drain pending input from the frontend and inject messages into the store. + /// Control commands are processed inline. pub async fn inject_pending_messages(&mut self) { let pending = self.params.deps.frontend.drain_pending().await; - for env in pending { - let mut user_msg = build_user_message(&env); - info!( - text_len = env.content.text.len(), - "injecting pending message" - ); - if let Err(e) = self - .params - .deps - .session_manager - .save_message(&self.params.session.id, &mut user_msg) - { - error!(error = %e, "failed to persist injected message"); + for input in pending { + match input { + crate::agent_input::AgentInput::Message(env) => { + let mut user_msg = build_user_message(&env); + info!( + text_len = env.content.text.len(), + "injecting pending message" + ); + if let Err(e) = self + .params + .deps + .session_manager + .save_message(&self.params.session.id, &mut user_msg) + { + error!(error = %e, "failed to persist injected message"); + } + self.params.store.push_user(user_msg); + } + crate::agent_input::AgentInput::Control(cmd) => { + if let Err(e) = self.handle_control(cmd).await { + tracing::warn!(error = %e, "failed to handle drained control"); + } + } } - self.params.store.push_user(user_msg); } } } diff --git a/crates/loopal-runtime/src/frontend/traits.rs b/crates/loopal-runtime/src/frontend/traits.rs index 4fe349a0..64945009 100644 --- a/crates/loopal-runtime/src/frontend/traits.rs +++ b/crates/loopal-runtime/src/frontend/traits.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use crate::agent_input::AgentInput; use loopal_error::Result; use loopal_protocol::AgentEventPayload; -use loopal_protocol::{Envelope, Question}; +use loopal_protocol::Question; use loopal_tool_api::PermissionDecision; /// Outcome of a plan approval request. @@ -56,13 +56,11 @@ pub trait AgentFrontend: Send + Sync { /// Create a cloneable event emitter for use in `tokio::spawn` blocks. fn event_emitter(&self) -> Box; - /// Non-blocking drain of pending messages from the mailbox. + /// Non-blocking drain of all pending input from the mailbox. /// - /// Returns raw `Envelope`s preserving full source metadata. - /// Called between tool executions and before sub-agent exit to - /// prevent message loss. Default returns empty — root agent uses - /// consumer inbox instead. - async fn drain_pending(&self) -> Vec { + /// Returns both data messages and control commands so that ephemeral + /// agents can process controls before exiting. Default returns empty. + async fn drain_pending(&self) -> Vec { Vec::new() } diff --git a/crates/loopal-runtime/src/frontend/unified.rs b/crates/loopal-runtime/src/frontend/unified.rs index 2742045a..43095b90 100644 --- a/crates/loopal-runtime/src/frontend/unified.rs +++ b/crates/loopal-runtime/src/frontend/unified.rs @@ -120,13 +120,17 @@ impl AgentFrontend for UnifiedFrontend { )) } - async fn drain_pending(&self) -> Vec { - let mut rx = self.mailbox_rx.lock().await; - let mut envelopes = Vec::new(); - while let Ok(env) = rx.try_recv() { - envelopes.push(env); + async fn drain_pending(&self) -> Vec { + let mut mbox = self.mailbox_rx.lock().await; + let mut ctrl = self.control_rx.lock().await; + let mut inputs = Vec::new(); + while let Ok(env) = mbox.try_recv() { + inputs.push(AgentInput::Message(env)); + } + while let Ok(cmd) = ctrl.try_recv() { + inputs.push(AgentInput::Control(cmd)); } - envelopes + inputs } async fn ask_user(&self, questions: Vec) -> Vec { diff --git a/crates/loopal-runtime/tests/suite.rs b/crates/loopal-runtime/tests/suite.rs index d3075550..e88bb8bd 100644 --- a/crates/loopal-runtime/tests/suite.rs +++ b/crates/loopal-runtime/tests/suite.rs @@ -3,6 +3,8 @@ mod agent_loop; #[path = "suite/diff_tracker_test.rs"] mod diff_tracker_test; +#[path = "suite/drain_controls_test.rs"] +mod drain_controls_test; #[path = "suite/env_context_test.rs"] mod env_context_test; #[path = "suite/frontend_unified_edge_test.rs"] diff --git a/crates/loopal-runtime/tests/suite/drain_controls_test.rs b/crates/loopal-runtime/tests/suite/drain_controls_test.rs new file mode 100644 index 00000000..5355fd30 --- /dev/null +++ b/crates/loopal-runtime/tests/suite/drain_controls_test.rs @@ -0,0 +1,70 @@ +//! Tests for UnifiedFrontend::drain_pending() — control commands and mixed input. + +use loopal_protocol::ControlCommand; +use loopal_protocol::{AgentMode, Envelope, MessageSource}; +use loopal_runtime::agent_input::AgentInput; +use loopal_runtime::frontend::UnifiedFrontend; +use loopal_runtime::frontend::{AgentFrontend, AutoCancelQuestionHandler, AutoDenyHandler}; +use tokio::sync::mpsc; + +fn make_unified( + mailbox_rx: mpsc::Receiver, + control_rx: mpsc::Receiver, +) -> UnifiedFrontend { + let (event_tx, _event_rx) = mpsc::channel(16); + UnifiedFrontend::new( + Some("sub".into()), + event_tx, + mailbox_rx, + control_rx, + None, + Box::new(AutoDenyHandler), + Box::new(AutoCancelQuestionHandler), + ) +} + +#[tokio::test] +async fn test_unified_drain_pending_with_controls() { + let (_mb_tx, mb_rx) = mpsc::channel(16); + let (ctrl_tx, ctrl_rx) = mpsc::channel(16); + + let f = make_unified(mb_rx, ctrl_rx); + ctrl_tx + .send(ControlCommand::ModeSwitch(AgentMode::Plan)) + .await + .unwrap(); + + let pending = f.drain_pending().await; + assert_eq!(pending.len(), 1); + assert!(matches!( + pending[0], + AgentInput::Control(ControlCommand::ModeSwitch(AgentMode::Plan)) + )); +} + +#[tokio::test] +async fn test_unified_drain_pending_mixed() { + let (mb_tx, mb_rx) = mpsc::channel(16); + let (ctrl_tx, ctrl_rx) = mpsc::channel(16); + + let f = make_unified(mb_rx, ctrl_rx); + + mb_tx + .send(Envelope::new( + MessageSource::Agent("lead".into()), + "sub", + "do task", + )) + .await + .unwrap(); + ctrl_tx.send(ControlCommand::Clear).await.unwrap(); + + let pending = f.drain_pending().await; + assert_eq!(pending.len(), 2); + // Messages come first (mailbox drained before control channel) + assert!(matches!(pending[0], AgentInput::Message(_))); + assert!(matches!( + pending[1], + AgentInput::Control(ControlCommand::Clear) + )); +} diff --git a/crates/loopal-runtime/tests/suite/frontend_unified_test.rs b/crates/loopal-runtime/tests/suite/frontend_unified_test.rs index 7edca471..b6fc8f2f 100644 --- a/crates/loopal-runtime/tests/suite/frontend_unified_test.rs +++ b/crates/loopal-runtime/tests/suite/frontend_unified_test.rs @@ -243,10 +243,17 @@ async fn test_unified_drain_pending() { let pending = f.drain_pending().await; assert_eq!(pending.len(), 2); - assert_eq!(pending[0].source.label(), "lead"); - assert_eq!(pending[0].content.text, "task A"); - assert_eq!(pending[1].source.label(), "peer"); - assert_eq!(pending[1].content.text, "task B"); + // drain_pending returns AgentInput; extract Envelope via pattern match. + let AgentInput::Message(ref env0) = pending[0] else { + panic!("expected Message"); + }; + let AgentInput::Message(ref env1) = pending[1] else { + panic!("expected Message"); + }; + assert_eq!(env0.source.label(), "lead"); + assert_eq!(env0.content.text, "task A"); + assert_eq!(env1.source.label(), "peer"); + assert_eq!(env1.content.text, "task B"); } // --- permission: auto deny --- diff --git a/crates/loopal-session/src/agent_conversation.rs b/crates/loopal-session/src/agent_conversation.rs index dadb92a6..3a3d9d09 100644 --- a/crates/loopal-session/src/agent_conversation.rs +++ b/crates/loopal-session/src/agent_conversation.rs @@ -16,7 +16,6 @@ pub struct AgentConversation { pub streaming_text: String, pub streaming_thinking: String, pub thinking_active: bool, - pub agent_idle: bool, pub pending_permission: Option, pub pending_question: Option, /// Transient retry error banner. diff --git a/crates/loopal-session/src/agent_handler.rs b/crates/loopal-session/src/agent_handler.rs index 1bbbc780..6077fd0a 100644 --- a/crates/loopal-session/src/agent_handler.rs +++ b/crates/loopal-session/src/agent_handler.rs @@ -2,7 +2,7 @@ use std::time::Instant; -use loopal_protocol::{AgentEventPayload, AgentStatus, UserContent}; +use loopal_protocol::{AgentEventPayload, AgentStatus}; use crate::agent_lifecycle::{extract_key_param, handle_idle, post_event_cleanup}; use crate::conversation_display::{ @@ -17,11 +17,7 @@ use crate::tool_result_handler::{ use crate::types::{PendingPermission, PendingQuestion, SessionMessage}; /// Handle an agent event — writes both observable metrics and conversation state. -pub(crate) fn apply_agent_event( - state: &mut SessionState, - name: &str, - payload: AgentEventPayload, -) -> Option { +pub(crate) fn apply_agent_event(state: &mut SessionState, name: &str, payload: AgentEventPayload) { let agent = state.agents.entry(name.to_string()).or_default(); if agent.started_at.is_none() { agent.started_at = Some(Instant::now()); @@ -128,13 +124,16 @@ pub(crate) fn apply_agent_event( } AgentEventPayload::RetryCleared => conv.retry_banner = None, AgentEventPayload::AwaitingInput => { - return handle_idle(state, name, AgentStatus::WaitingForInput); + handle_idle(state, name, AgentStatus::WaitingForInput); + return; } AgentEventPayload::Finished => { - return handle_idle(state, name, AgentStatus::Finished); + handle_idle(state, name, AgentStatus::Finished); + return; } AgentEventPayload::Interrupted => { - return handle_idle(state, name, AgentStatus::WaitingForInput); + handle_idle(state, name, AgentStatus::WaitingForInput); + return; } AgentEventPayload::AutoContinuation { continuation, @@ -215,5 +214,4 @@ pub(crate) fn apply_agent_event( } } post_event_cleanup(state, name, sync_parent); - None } diff --git a/crates/loopal-session/src/agent_lifecycle.rs b/crates/loopal-session/src/agent_lifecycle.rs index 8a515ba6..19b6850e 100644 --- a/crates/loopal-session/src/agent_lifecycle.rs +++ b/crates/loopal-session/src/agent_lifecycle.rs @@ -1,24 +1,23 @@ //! Agent lifecycle: idle state transitions, error recovery, topology registration, display helpers. -use loopal_protocol::{AgentStatus, UserContent}; +use loopal_protocol::AgentStatus; -use crate::inbox::try_forward_inbox; use crate::state::{ROOT_AGENT, SessionState}; /// Shared idle handling for AwaitingInput / Finished / Interrupted. -pub(crate) fn handle_idle( - state: &mut SessionState, - name: &str, - status: AgentStatus, -) -> Option { - let agent = state.agents.get_mut(name)?; +/// +/// Updates display state only. Messages are delivered directly to the agent +/// mailbox. +pub(crate) fn handle_idle(state: &mut SessionState, name: &str, status: AgentStatus) { + let Some(agent) = state.agents.get_mut(name) else { + return; + }; agent.conversation.flush_streaming(); agent.conversation.end_turn(); if status != AgentStatus::Finished { agent.conversation.turn_count += 1; agent.observable.turn_count += 1; } - agent.conversation.agent_idle = true; agent.conversation.retry_banner = None; agent.observable.status = status; // Auto-return to root when the viewed agent finishes @@ -27,10 +26,6 @@ pub(crate) fn handle_idle( } // Truncate completed sub-agent conversation to bound memory growth truncate_if_done(state, name); - if state.active_view == name { - return try_forward_inbox(state); - } - None } /// Register a newly spawned agent with parent/child topology. diff --git a/crates/loopal-session/src/controller.rs b/crates/loopal-session/src/controller.rs index 1fca82cc..28ae354f 100644 --- a/crates/loopal-session/src/controller.rs +++ b/crates/loopal-session/src/controller.rs @@ -92,10 +92,26 @@ impl SessionController { self.backend.interrupt_target(name); } - pub fn enqueue_message(&self, content: UserContent) -> Option { + /// Optimistic display update: append a user message to the conversation view. + /// + /// This is a pure display operation — it does NOT route the message to the + /// agent. Use `route_message()` separately for delivery to the agent mailbox. + /// Agent state (idle/busy) is NOT modified; it is derived from agent events. + pub fn append_user_display(&self, content: &UserContent) { let mut state = self.lock(); - state.inbox.push(content); - crate::controller_ops::try_forward_from_inbox(&mut state) + let conv = state.active_conversation_mut(); + let image_count = content.images.len(); + let mut display_text = content.text.clone(); + if image_count > 0 { + display_text.push_str(&format!(" [+{image_count} image(s)]")); + } + conv.messages.push(crate::types::SessionMessage { + role: "user".to_string(), + content: display_text, + tool_calls: Vec::new(), + image_count, + skill_info: content.skill_info.clone(), + }); } pub async fn approve_permission(&self) { @@ -155,9 +171,9 @@ impl SessionController { // === Event handling === - pub fn handle_event(&self, event: AgentEvent) -> Option { + pub fn handle_event(&self, event: AgentEvent) { let mut state = self.lock(); - event_handler::apply_event(&mut state, event) + event_handler::apply_event(&mut state, event); } /// Set the root session ID (for sub-agent ref persistence). diff --git a/crates/loopal-session/src/controller_control.rs b/crates/loopal-session/src/controller_control.rs index cbb95fd2..84bed959 100644 --- a/crates/loopal-session/src/controller_control.rs +++ b/crates/loopal-session/src/controller_control.rs @@ -66,7 +66,6 @@ impl SessionController { conv.cache_read_tokens = 0; conv.retry_banner = None; conv.reset_timer(); - s.inbox.clear(); s.active_view.clone() }; self.backend @@ -106,7 +105,6 @@ impl SessionController { conv.cache_read_tokens = 0; conv.retry_banner = None; conv.reset_timer(); - s.inbox.clear(); s.root_session_id = Some(session_id.to_string()); s.active_view.clone() }; diff --git a/crates/loopal-session/src/controller_ops.rs b/crates/loopal-session/src/controller_ops.rs index 9043ba34..e073259c 100644 --- a/crates/loopal-session/src/controller_ops.rs +++ b/crates/loopal-session/src/controller_ops.rs @@ -4,8 +4,6 @@ use std::sync::Arc; use loopal_protocol::{ControlCommand, UserContent, UserQuestionResponse}; -use crate::inbox::try_forward_inbox; -use crate::state::SessionState; use loopal_agent_hub::{HubClient, LocalChannels}; /// Backend for session control operations. @@ -115,8 +113,3 @@ impl ControlBackend { } } } - -/// Forward a pending inbox message to the agent. -pub(crate) fn try_forward_from_inbox(state: &mut SessionState) -> Option { - try_forward_inbox(state) -} diff --git a/crates/loopal-session/src/event_handler.rs b/crates/loopal-session/src/event_handler.rs index 9d55abea..77c89276 100644 --- a/crates/loopal-session/src/event_handler.rs +++ b/crates/loopal-session/src/event_handler.rs @@ -1,12 +1,13 @@ //! AgentEvent → SessionState update logic. Unified routing for all agents. -use loopal_protocol::{AgentEvent, AgentEventPayload, UserContent}; +use loopal_protocol::{AgentEvent, AgentEventPayload}; use crate::message_log::record_message_routed; use crate::state::{ROOT_AGENT, SessionState}; -/// Handle an AgentEvent. Returns `Some(content)` if an inbox message should be forwarded. -pub fn apply_event(state: &mut SessionState, event: AgentEvent) -> Option { +/// Handle an AgentEvent. Updates display state only — messages are delivered +/// directly to the agent mailbox. +pub fn apply_event(state: &mut SessionState, event: AgentEvent) { // Global logging for inter-agent messages if let AgentEventPayload::MessageRouted { ref source, @@ -61,5 +62,5 @@ pub fn apply_event(state: &mut SessionState, event: AgentEvent) -> Option, -} - -impl Inbox { - pub fn new() -> Self { - Self { - queue: VecDeque::new(), - } - } - - pub fn push(&mut self, content: UserContent) { - self.queue.push_back(content); - } - - pub fn pop_front(&mut self) -> Option { - self.queue.pop_front() - } - - pub fn pop_back(&mut self) -> Option { - self.queue.pop_back() - } - - pub fn clear(&mut self) { - self.queue.clear(); - } - - pub fn is_empty(&self) -> bool { - self.queue.is_empty() - } - - pub fn len(&self) -> usize { - self.queue.len() - } - - pub fn iter(&self) -> impl Iterator { - self.queue.iter() - } -} - -impl Default for Inbox { - fn default() -> Self { - Self::new() - } -} - -/// Try forwarding a queued inbox message when the active-view agent is idle. -/// -/// The inbox is global (session-level): messages are always routed to the -/// currently active agent, regardless of which agent they were typed for. -pub(crate) fn try_forward_inbox(state: &mut crate::state::SessionState) -> Option { - let agent = state.agents.get_mut(&state.active_view)?; - if !agent.conversation.agent_idle { - tracing::debug!("inbox: agent busy, message queued"); - return None; - } - let content = state.inbox.pop_front()?; - tracing::debug!(text_len = content.text.len(), "inbox: forwarding message"); - let image_count = content.images.len(); - let mut display_text = content.text.clone(); - if image_count > 0 { - display_text.push_str(&format!(" [+{image_count} image(s)]")); - } - let skill_info = content.skill_info.clone(); - agent.conversation.agent_idle = false; - agent.conversation.begin_turn(); - agent - .conversation - .messages - .push(crate::types::SessionMessage { - role: "user".to_string(), - content: display_text, - tool_calls: Vec::new(), - image_count, - skill_info, - }); - Some(content) -} diff --git a/crates/loopal-session/src/lib.rs b/crates/loopal-session/src/lib.rs index b0614160..dfa15e24 100644 --- a/crates/loopal-session/src/lib.rs +++ b/crates/loopal-session/src/lib.rs @@ -7,7 +7,6 @@ mod controller_control; mod controller_ops; mod conversation_display; pub mod event_handler; -pub mod inbox; pub mod message_log; pub mod rewind; mod server_tool_display; diff --git a/crates/loopal-session/src/session_display.rs b/crates/loopal-session/src/session_display.rs index 929160e2..babb0640 100644 --- a/crates/loopal-session/src/session_display.rs +++ b/crates/loopal-session/src/session_display.rs @@ -1,6 +1,6 @@ -//! Session display state operations: messages, welcome, history, inbox. +//! Session display state operations: messages, welcome, history. -use loopal_protocol::{AgentStatus, ProjectedMessage, UserContent}; +use loopal_protocol::{AgentStatus, ProjectedMessage}; use crate::controller::SessionController; use crate::conversation_display::push_system_msg; @@ -8,10 +8,6 @@ use crate::state::ROOT_AGENT; use crate::types::{SessionMessage, SessionToolCall, ToolCallStatus}; impl SessionController { - pub fn pop_inbox_to_edit(&self) -> Option { - self.lock().inbox.pop_back() - } - pub fn push_system_message(&self, content: String) { let mut state = self.lock(); let conv = state.active_conversation_mut(); @@ -68,7 +64,6 @@ impl SessionController { agent.observable.model = m.to_string(); } agent.conversation.messages = display_msgs; - agent.conversation.agent_idle = true; agent.observable.status = AgentStatus::Finished; if let Some(parent_name) = parent && let Some(parent_agent) = state.agents.get_mut(parent_name) diff --git a/crates/loopal-session/src/state.rs b/crates/loopal-session/src/state.rs index 7e779a21..532bdaaa 100644 --- a/crates/loopal-session/src/state.rs +++ b/crates/loopal-session/src/state.rs @@ -6,6 +6,7 @@ use std::time::Instant; use indexmap::IndexMap; +use loopal_protocol::AgentStatus; /// Name of the root agent in the agents map. pub const ROOT_AGENT: &str = "main"; @@ -13,7 +14,6 @@ pub const ROOT_AGENT: &str = "main"; use loopal_protocol::ObservableAgentState; use crate::agent_conversation::AgentConversation; -use crate::inbox::Inbox; use crate::message_log::{MessageFeed, MessageLogEntry}; /// Enhanced agent view state with full observability. @@ -53,7 +53,6 @@ pub struct SessionState { // === Observation plane === pub message_feed: MessageFeed, // === Interaction state === - pub inbox: Inbox, /// Pending sub-agent refs to be persisted (drained by caller). pub pending_sub_agent_refs: Vec, } @@ -71,9 +70,10 @@ impl SessionState { pub fn new(model: String, mode: String) -> Self { let mut agents = IndexMap::new(); // Root agent "main" is a regular entry — no special treatment. - let mut main_agent = AgentViewState::default(); - main_agent.conversation.agent_idle = false; - main_agent.started_at = Some(Instant::now()); + let main_agent = AgentViewState { + started_at: Some(Instant::now()), + ..Default::default() + }; agents.insert(ROOT_AGENT.to_string(), main_agent); Self { @@ -84,11 +84,18 @@ impl SessionState { thinking_config: "auto".to_string(), root_session_id: None, message_feed: MessageFeed::new(200), - inbox: Inbox::new(), pending_sub_agent_refs: Vec::new(), } } + /// Whether the currently viewed agent is idle (derived from observable status). + pub fn is_active_agent_idle(&self) -> bool { + self.agents + .get(&self.active_view) + .map(|a| a.is_idle()) + .unwrap_or(true) + } + // === Active conversation projection (zero branching) === /// Conversation of the currently viewed agent. @@ -117,6 +124,17 @@ impl SessionState { } impl AgentViewState { + /// Whether the agent is idle — derived solely from `observable.status`. + /// + /// Agent state is internal; external consumers must derive it from events, + /// never set it directly. This replaces the former `agent_idle` flag. + pub fn is_idle(&self) -> bool { + matches!( + self.observable.status, + AgentStatus::WaitingForInput | AgentStatus::Finished | AgentStatus::Error + ) + } + /// Elapsed time since the agent was first observed. pub fn elapsed(&self) -> std::time::Duration { self.started_at diff --git a/crates/loopal-session/tests/suite.rs b/crates/loopal-session/tests/suite.rs index df8312b2..5d39bfe1 100644 --- a/crates/loopal-session/tests/suite.rs +++ b/crates/loopal-session/tests/suite.rs @@ -15,8 +15,8 @@ mod controller_test; mod event_handler_edge_test; #[path = "suite/event_handler_test.rs"] mod event_handler_test; -#[path = "suite/inbox_test.rs"] -mod inbox_test; +#[path = "suite/is_idle_test.rs"] +mod is_idle_test; #[path = "suite/message_log_test.rs"] mod message_log_test; #[path = "suite/projection_convert_test.rs"] @@ -31,3 +31,5 @@ mod retry_banner_test; mod rewind_test; #[path = "suite/topology_test.rs"] mod topology_test; +#[path = "suite/user_display_test.rs"] +mod user_display_test; diff --git a/crates/loopal-session/tests/suite/agent_routing_test.rs b/crates/loopal-session/tests/suite/agent_routing_test.rs index 4fd1c750..e2cc1633 100644 --- a/crates/loopal-session/tests/suite/agent_routing_test.rs +++ b/crates/loopal-session/tests/suite/agent_routing_test.rs @@ -79,7 +79,7 @@ fn root_events_route_to_main_agent() { apply_event(&mut state, AgentEvent::root(AgentEventPayload::Finished)); // "main" exists (created at init), root events route there assert!(state.agents.contains_key("main")); - assert!(state.agents["main"].conversation.agent_idle); + assert!(state.agents["main"].is_idle()); } /// SubAgentSpawned event creates an AgentViewState entry with topology info. diff --git a/crates/loopal-session/tests/suite/controller_async_test.rs b/crates/loopal-session/tests/suite/controller_async_test.rs index cf8ef465..8354b88b 100644 --- a/crates/loopal-session/tests/suite/controller_async_test.rs +++ b/crates/loopal-session/tests/suite/controller_async_test.rs @@ -1,7 +1,7 @@ //! Async tests for SessionController interaction methods (channels). -use loopal_protocol::ControlCommand; use loopal_protocol::{AgentEvent, AgentEventPayload, UserQuestionResponse}; +use loopal_protocol::{AgentStatus, ControlCommand}; use loopal_session::SessionController; use tokio::sync::mpsc; @@ -64,21 +64,30 @@ async fn test_deny_permission() { } #[tokio::test] -async fn test_enqueue_message_forwards_when_idle() { +async fn test_append_user_display() { let (ctrl, _, _) = make_controller(); - ctrl.lock().active_conversation_mut().agent_idle = true; - let result = ctrl.enqueue_message("hello".into()); - assert_eq!(result.map(|c| c.text), Some("hello".to_string())); + let content = loopal_protocol::UserContent::from("hello"); + ctrl.append_user_display(&content); + let state = ctrl.lock(); + let conv = state.active_conversation(); + assert_eq!(conv.messages.last().unwrap().role, "user"); + assert_eq!(conv.messages.last().unwrap().content, "hello"); } #[tokio::test] -async fn test_enqueue_message_queues_when_busy() { +async fn test_append_user_display_does_not_change_status() { let (ctrl, _, _) = make_controller(); - ctrl.lock().active_conversation_mut().agent_idle = false; - - let result = ctrl.enqueue_message("queued".into()); - assert!(result.is_none()); - assert_eq!(ctrl.lock().inbox.len(), 1); + // Agent state is internal — TUI display ops never change it. + ctrl.lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::Running; + let content = loopal_protocol::UserContent::from("queued"); + ctrl.append_user_display(&content); + // Status unchanged — only agent events drive status transitions. + assert!(!ctrl.lock().is_active_agent_idle()); } #[tokio::test] @@ -136,7 +145,6 @@ async fn test_clear() { let state = ctrl.lock(); let conv = state.active_conversation(); assert!(conv.messages.is_empty()); - assert!(state.inbox.is_empty()); assert!(conv.streaming_text.is_empty()); assert_eq!(conv.turn_count, 0); assert_eq!(conv.input_tokens, 0); diff --git a/crates/loopal-session/tests/suite/controller_edge_test.rs b/crates/loopal-session/tests/suite/controller_edge_test.rs deleted file mode 100644 index 9de6da52..00000000 --- a/crates/loopal-session/tests/suite/controller_edge_test.rs +++ /dev/null @@ -1,78 +0,0 @@ -//! Edge case tests for SessionController: token usage, mode, errors, inbox. - -use loopal_protocol::{AgentEvent, AgentEventPayload}; - -use super::controller_test::make_controller; - -#[test] -fn test_token_usage() { - let (ctrl, _, _) = make_controller(); - ctrl.handle_event(AgentEvent::root(AgentEventPayload::TokenUsage { - input_tokens: 100, - output_tokens: 50, - context_window: 200_000, - cache_creation_input_tokens: 0, - cache_read_input_tokens: 0, - thinking_tokens: 0, - })); - - let state = ctrl.lock(); - let conv = state.active_conversation(); - assert_eq!(conv.input_tokens, 100); - assert_eq!(conv.output_tokens, 50); - assert_eq!(conv.context_window, 200_000); - assert_eq!(conv.token_count(), 150); -} - -#[test] -fn test_mode_changed() { - let (ctrl, _, _) = make_controller(); - ctrl.handle_event(AgentEvent::root(AgentEventPayload::ModeChanged { - mode: "plan".to_string(), - })); - // ModeChanged updates observable.mode on the agent - assert_eq!(ctrl.lock().agents["main"].observable.mode, "plan"); -} - -#[test] -fn test_error_event() { - let (ctrl, _, _) = make_controller(); - ctrl.handle_event(AgentEvent::root(AgentEventPayload::Error { - message: "bad".to_string(), - })); - - let state = ctrl.lock(); - let conv = state.active_conversation(); - assert_eq!(conv.messages.len(), 1); - assert_eq!(conv.messages[0].role, "error"); -} - -#[test] -fn test_push_system_message() { - let (ctrl, _, _) = make_controller(); - ctrl.push_system_message("hello".to_string()); - - let state = ctrl.lock(); - let conv = state.active_conversation(); - assert_eq!(conv.messages.len(), 1); - assert_eq!(conv.messages[0].role, "system"); - assert_eq!(conv.messages[0].content, "hello"); -} - -#[test] -fn test_pop_inbox_to_edit() { - let (ctrl, _, _) = make_controller(); - ctrl.lock().inbox.push("first".into()); - ctrl.lock().inbox.push("second".into()); - - assert_eq!( - ctrl.pop_inbox_to_edit().map(|c| c.text), - Some("second".to_string()) - ); - assert_eq!(ctrl.lock().inbox.len(), 1); - assert_eq!( - ctrl.pop_inbox_to_edit().map(|c| c.text), - Some("first".to_string()) - ); - assert!(ctrl.pop_inbox_to_edit().is_none()); -} diff --git a/crates/loopal-session/tests/suite/controller_test.rs b/crates/loopal-session/tests/suite/controller_test.rs index 2d45b012..3847d5df 100644 --- a/crates/loopal-session/tests/suite/controller_test.rs +++ b/crates/loopal-session/tests/suite/controller_test.rs @@ -36,11 +36,10 @@ fn test_initial_state() { let conv = state.active_conversation(); assert!(conv.messages.is_empty()); assert!(conv.streaming_text.is_empty()); - assert!(!conv.agent_idle); + assert!(!state.is_active_agent_idle()); assert_eq!(conv.turn_count, 0); assert_eq!(conv.token_count(), 0); assert!(conv.pending_permission.is_none()); - assert!(state.inbox.is_empty()); } #[test] @@ -75,31 +74,14 @@ fn test_awaiting_input_flushes_streaming() { assert_eq!(conv.messages[0].role, "assistant"); assert_eq!(conv.messages[0].content, "response"); assert_eq!(conv.turn_count, 1); - assert!(conv.agent_idle); + assert!(state.is_active_agent_idle()); } #[test] -fn test_awaiting_input_forwards_inbox() { +fn test_awaiting_input_stays_idle_with_no_messages() { let (ctrl, _, _) = make_controller(); - ctrl.lock().inbox.push("queued msg".into()); - - let forwarded = ctrl.handle_event(AgentEvent::root(AgentEventPayload::AwaitingInput)); - assert_eq!(forwarded.map(|c| c.text), Some("queued msg".to_string())); - - let state = ctrl.lock(); - let conv = state.active_conversation(); - assert!(!conv.agent_idle); - assert!(state.inbox.is_empty()); - assert_eq!(conv.messages.last().unwrap().role, "user"); - assert_eq!(conv.messages.last().unwrap().content, "queued msg"); -} - -#[test] -fn test_awaiting_input_no_inbox_stays_idle() { - let (ctrl, _, _) = make_controller(); - let forwarded = ctrl.handle_event(AgentEvent::root(AgentEventPayload::AwaitingInput)); - assert!(forwarded.is_none()); - assert!(ctrl.lock().active_conversation().agent_idle); + ctrl.handle_event(AgentEvent::root(AgentEventPayload::AwaitingInput)); + assert!(ctrl.lock().is_active_agent_idle()); } #[test] diff --git a/crates/loopal-session/tests/suite/event_handler_test.rs b/crates/loopal-session/tests/suite/event_handler_test.rs index 2af07ac8..d3f753de 100644 --- a/crates/loopal-session/tests/suite/event_handler_test.rs +++ b/crates/loopal-session/tests/suite/event_handler_test.rs @@ -1,6 +1,6 @@ //! Tests for event_handler: apply_event, unified routing, MessageRouted recording. -use loopal_protocol::{AgentEvent, AgentEventPayload, ImageAttachment, UserContent}; +use loopal_protocol::{AgentEvent, AgentEventPayload}; use loopal_session::event_handler::apply_event; use loopal_session::state::SessionState; @@ -83,26 +83,23 @@ fn test_apply_event_records_message_routed_to_agent_logs() { } #[test] -fn test_awaiting_input_forwards_inbox() { +fn test_awaiting_input_sets_idle() { let mut state = make_state(); - state.inbox.push("queued msg".into()); - let forward = apply_event( + apply_event( &mut state, AgentEvent::root(AgentEventPayload::AwaitingInput), ); - assert_eq!(forward.map(|c| c.text), Some("queued msg".to_string())); - assert!(!conv!(state).agent_idle); // Immediately busy again + assert!(state.agents["main"].is_idle()); } #[test] fn test_awaiting_input_no_inbox_stays_idle() { let mut state = make_state(); - let forward = apply_event( + apply_event( &mut state, AgentEvent::root(AgentEventPayload::AwaitingInput), ); - assert!(forward.is_none()); - assert!(conv!(state).agent_idle); + assert!(state.agents["main"].is_idle()); } #[test] @@ -135,7 +132,7 @@ fn test_error_flushes_streaming() { fn test_finished_marks_idle() { let mut state = make_state(); apply_event(&mut state, AgentEvent::root(AgentEventPayload::Finished)); - assert!(conv!(state).agent_idle); + assert!(state.agents["main"].is_idle()); } #[test] @@ -171,28 +168,11 @@ fn test_mode_changed_updates_mode() { } #[test] -fn test_try_forward_inbox_with_images() { +fn test_idle_event_does_not_forward_messages() { let mut state = make_state(); - let content = UserContent { - text: "look at this".to_string(), - images: vec![ImageAttachment { - media_type: "image/png".to_string(), - data: "iVBORw0KGgo=".to_string(), - }], - skill_info: None, - }; - state.inbox.push(content); - let forward = apply_event( + apply_event( &mut state, AgentEvent::root(AgentEventPayload::AwaitingInput), ); - let forwarded = forward.expect("should forward inbox content"); - assert_eq!(forwarded.text, "look at this"); - assert_eq!(forwarded.images.len(), 1); - assert_eq!(forwarded.images[0].media_type, "image/png"); - let display = conv!(state).messages.last().unwrap(); - assert_eq!(display.role, "user"); - assert!(display.content.contains("[+1 image(s)]")); - assert_eq!(display.image_count, 1); - assert!(!conv!(state).agent_idle); + assert!(state.agents["main"].is_idle()); } diff --git a/crates/loopal-session/tests/suite/inbox_test.rs b/crates/loopal-session/tests/suite/inbox_test.rs deleted file mode 100644 index 4847667f..00000000 --- a/crates/loopal-session/tests/suite/inbox_test.rs +++ /dev/null @@ -1,61 +0,0 @@ -//! Tests for Inbox queue. - -use loopal_session::inbox::Inbox; - -#[test] -fn test_inbox_new_is_empty() { - let inbox = Inbox::new(); - assert!(inbox.is_empty()); - assert_eq!(inbox.len(), 0); -} - -#[test] -fn test_push_and_pop_front() { - let mut inbox = Inbox::new(); - inbox.push("first".into()); - inbox.push("second".into()); - assert_eq!(inbox.len(), 2); - - assert_eq!(inbox.pop_front().map(|c| c.text), Some("first".to_string())); - assert_eq!( - inbox.pop_front().map(|c| c.text), - Some("second".to_string()) - ); - assert!(inbox.is_empty()); -} - -#[test] -fn test_pop_back() { - let mut inbox = Inbox::new(); - inbox.push("first".into()); - inbox.push("second".into()); - - assert_eq!(inbox.pop_back().map(|c| c.text), Some("second".to_string())); - assert_eq!(inbox.len(), 1); -} - -#[test] -fn test_clear() { - let mut inbox = Inbox::new(); - inbox.push("a".into()); - inbox.push("b".into()); - inbox.clear(); - assert!(inbox.is_empty()); -} - -#[test] -fn test_iter() { - let mut inbox = Inbox::new(); - inbox.push("a".into()); - inbox.push("b".into()); - - let items: Vec<&str> = inbox.iter().map(|c| c.text.as_str()).collect(); - assert_eq!(items, vec!["a", "b"]); -} - -#[test] -fn test_pop_empty() { - let mut inbox = Inbox::new(); - assert!(inbox.pop_front().is_none()); - assert!(inbox.pop_back().is_none()); -} diff --git a/crates/loopal-session/tests/suite/is_idle_test.rs b/crates/loopal-session/tests/suite/is_idle_test.rs new file mode 100644 index 00000000..4a9ccabf --- /dev/null +++ b/crates/loopal-session/tests/suite/is_idle_test.rs @@ -0,0 +1,38 @@ +//! Tests for AgentViewState::is_idle() correctness mapping. +//! +//! Exhaustively verifies each AgentStatus variant maps to the correct +//! idle/busy classification. is_idle() replaces the former `agent_idle` flag. + +use loopal_protocol::AgentStatus; +use loopal_session::state::AgentViewState; + +fn view_with_status(status: AgentStatus) -> AgentViewState { + let mut view = AgentViewState::default(); + view.observable.status = status; + view +} + +#[test] +fn test_is_idle_returns_true_for_waiting_for_input() { + assert!(view_with_status(AgentStatus::WaitingForInput).is_idle()); +} + +#[test] +fn test_is_idle_returns_true_for_finished() { + assert!(view_with_status(AgentStatus::Finished).is_idle()); +} + +#[test] +fn test_is_idle_returns_true_for_error() { + assert!(view_with_status(AgentStatus::Error).is_idle()); +} + +#[test] +fn test_is_idle_returns_false_for_starting() { + assert!(!view_with_status(AgentStatus::Starting).is_idle()); +} + +#[test] +fn test_is_idle_returns_false_for_running() { + assert!(!view_with_status(AgentStatus::Running).is_idle()); +} diff --git a/crates/loopal-session/tests/suite/resume_display_test.rs b/crates/loopal-session/tests/suite/resume_display_test.rs index d1d160bb..0513e193 100644 --- a/crates/loopal-session/tests/suite/resume_display_test.rs +++ b/crates/loopal-session/tests/suite/resume_display_test.rs @@ -41,7 +41,7 @@ fn test_load_sub_agent_history_creates_agent_entry() { assert_eq!(agent.session_id.as_deref(), Some("sub-sid")); assert_eq!(agent.observable.model, "gpt-4"); assert_eq!(agent.observable.status, AgentStatus::Finished); - assert!(agent.conversation.agent_idle); + assert!(agent.is_idle()); assert_eq!(agent.conversation.messages.len(), 1); assert_eq!(agent.conversation.messages[0].content, "sub-agent response"); } diff --git a/crates/loopal-session/tests/suite/user_display_test.rs b/crates/loopal-session/tests/suite/user_display_test.rs new file mode 100644 index 00000000..86e13aa8 --- /dev/null +++ b/crates/loopal-session/tests/suite/user_display_test.rs @@ -0,0 +1,87 @@ +//! Tests for append_user_display() — status preservation and image handling. + +use loopal_protocol::UserQuestionResponse; +use loopal_protocol::{AgentStatus, ControlCommand, ImageAttachment, UserContent}; +use loopal_session::SessionController; +use tokio::sync::mpsc; + +fn make_controller() -> SessionController { + let (control_tx, _control_rx) = mpsc::channel::(16); + let (perm_tx, _perm_rx) = mpsc::channel::(16); + let (question_tx, _question_rx) = mpsc::channel::(16); + SessionController::new( + "test-model".to_string(), + "act".to_string(), + control_tx, + perm_tx, + question_tx, + Default::default(), + std::sync::Arc::new(tokio::sync::watch::channel(0u64).0), + ) +} + +#[tokio::test] +async fn test_append_user_display_preserves_idle_status() { + let ctrl = make_controller(); + ctrl.lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; + + ctrl.append_user_display(&UserContent::from("hello")); + assert!(ctrl.lock().is_active_agent_idle()); +} + +#[tokio::test] +async fn test_append_user_display_preserves_running_status() { + let ctrl = make_controller(); + ctrl.lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::Running; + + ctrl.append_user_display(&UserContent::from("queued")); + assert!(!ctrl.lock().is_active_agent_idle()); +} + +#[tokio::test] +async fn test_append_user_display_with_images() { + let ctrl = make_controller(); + let content = UserContent { + text: "check this".to_string(), + images: vec![ + ImageAttachment { + media_type: "image/png".to_string(), + data: "AAAA".to_string(), + }, + ImageAttachment { + media_type: "image/jpeg".to_string(), + data: "BBBB".to_string(), + }, + ], + skill_info: None, + }; + ctrl.append_user_display(&content); + + let state = ctrl.lock(); + let conv = state.active_conversation(); + let msg = conv.messages.last().unwrap(); + assert!(msg.content.contains("[+2 image(s)]")); + assert_eq!(msg.image_count, 2); +} + +#[tokio::test] +async fn test_append_user_display_no_images() { + let ctrl = make_controller(); + ctrl.append_user_display(&UserContent::from("just text")); + + let state = ctrl.lock(); + let conv = state.active_conversation(); + let msg = conv.messages.last().unwrap(); + assert!(!msg.content.contains("image")); + assert_eq!(msg.image_count, 0); +} diff --git a/crates/loopal-tui/src/app/mod.rs b/crates/loopal-tui/src/app/mod.rs index a29063e3..40092e25 100644 --- a/crates/loopal-tui/src/app/mod.rs +++ b/crates/loopal-tui/src/app/mod.rs @@ -127,19 +127,6 @@ impl App { }) } - /// Pop the last Inbox message back into the input field for editing. - /// Returns true if a message was popped. - pub fn pop_inbox_to_input(&mut self) -> bool { - if let Some(content) = self.session.pop_inbox_to_edit() { - self.input = content.text; - self.pending_images = content.images; - self.input_cursor = self.input.len(); - true - } else { - false - } - } - /// Attach an image to the current pending input. pub fn attach_image(&mut self, attachment: ImageAttachment) { self.pending_images.push(attachment); diff --git a/crates/loopal-tui/src/command/mod.rs b/crates/loopal-tui/src/command/mod.rs index 684de7ef..8787db0c 100644 --- a/crates/loopal-tui/src/command/mod.rs +++ b/crates/loopal-tui/src/command/mod.rs @@ -23,7 +23,7 @@ use crate::app::App; pub enum CommandEffect { /// Command completed all work internally (e.g. push_system_message, open SubPage). Done, - /// Push expanded content into the inbox for the agent. + /// Push expanded content to the agent for processing. InboxPush(UserContent), /// Switch agent mode (plan / act). ModeSwitch(AgentMode), diff --git a/crates/loopal-tui/src/command/rewind_cmd.rs b/crates/loopal-tui/src/command/rewind_cmd.rs index e4385d19..d1245c30 100644 --- a/crates/loopal-tui/src/command/rewind_cmd.rs +++ b/crates/loopal-tui/src/command/rewind_cmd.rs @@ -24,7 +24,7 @@ impl CommandHandler for RewindCmd { fn open_rewind_picker(app: &mut App) { let state = app.session.lock(); let conv = state.active_conversation(); - if !conv.agent_idle { + if !state.is_active_agent_idle() { drop(state); app.session .push_system_message("Cannot rewind while the agent is busy.".into()); diff --git a/crates/loopal-tui/src/input/actions.rs b/crates/loopal-tui/src/input/actions.rs index 301e4e22..4af61dc8 100644 --- a/crates/loopal-tui/src/input/actions.rs +++ b/crates/loopal-tui/src/input/actions.rs @@ -19,7 +19,7 @@ pub enum SubPageResult { pub enum InputAction { /// No action needed None, - /// User message queued into Inbox for forwarding to agent + /// User message ready for forwarding to agent InboxPush(UserContent), /// User wants to quit Quit, diff --git a/crates/loopal-tui/src/input/editing.rs b/crates/loopal-tui/src/input/editing.rs index 89db5b32..69dd87bc 100644 --- a/crates/loopal-tui/src/input/editing.rs +++ b/crates/loopal-tui/src/input/editing.rs @@ -64,7 +64,7 @@ pub(super) fn handle_ctrl_c(app: &mut App) -> InputAction { } else if app.focused_agent.is_some() { app.focused_agent = None; InputAction::None - } else if !app.session.lock().active_conversation().agent_idle { + } else if !app.session.lock().is_active_agent_idle() { InputAction::Interrupt } else { InputAction::None diff --git a/crates/loopal-tui/src/input/navigation.rs b/crates/loopal-tui/src/input/navigation.rs index 694c3e38..61ca93a4 100644 --- a/crates/loopal-tui/src/input/navigation.rs +++ b/crates/loopal-tui/src/input/navigation.rs @@ -39,10 +39,8 @@ pub(super) fn handle_up(app: &mut App) -> InputAction { app.input_cursor = new_cursor; return InputAction::None; } - // Fall back to inbox pop / history - if app.pop_inbox_to_input() { - // Popped last Inbox message back into input for editing - } else if !app.input_history.is_empty() { + // Fall back to history browse + if !app.input_history.is_empty() { let idx = match app.history_index { None => app.input_history.len() - 1, Some(i) if i > 0 => i - 1, @@ -91,7 +89,7 @@ pub(super) fn handle_esc(app: &mut App) -> InputAction { tracing::info!(view = %active_view, "ESC: exit agent view (not root)"); return InputAction::ExitAgentView; } - let is_idle = app.session.lock().active_conversation().agent_idle; + let is_idle = app.session.lock().is_active_agent_idle(); if !is_idle { tracing::info!("ESC: agent busy, sending interrupt"); return InputAction::Interrupt; diff --git a/crates/loopal-tui/src/key_dispatch_ops.rs b/crates/loopal-tui/src/key_dispatch_ops.rs index d5094f9c..06a910d3 100644 --- a/crates/loopal-tui/src/key_dispatch_ops.rs +++ b/crates/loopal-tui/src/key_dispatch_ops.rs @@ -20,13 +20,10 @@ pub(crate) async fn push_to_inbox(app: &mut App, content: UserContent) { }; app.input_history.push(history_text); app.history_index = None; - if let Some(msg) = app.session.enqueue_message(content) { - tracing::debug!("TUI: message forwarded to agent"); - app.session.route_message(msg).await; - } else { - tracing::debug!("TUI: agent busy, message queued + interrupt sent"); - app.session.interrupt(); - } + // Optimistic display update, then deliver directly to agent mailbox. + // Agent state (idle/busy) is derived from agent events, not set here. + app.session.append_user_display(&content); + app.session.route_message(content).await; } pub(crate) async fn handle_effect(app: &mut App, effect: CommandEffect) -> bool { diff --git a/crates/loopal-tui/src/render.rs b/crates/loopal-tui/src/render.rs index cef88cc0..6d431636 100644 --- a/crates/loopal-tui/src/render.rs +++ b/crates/loopal-tui/src/render.rs @@ -12,8 +12,7 @@ pub fn draw(f: &mut Frame, app: &mut App) { let size = f.area(); let state = app.session.lock(); - let inbox_count = state.inbox.len(); - let pw = input_view::prefix_width(inbox_count, app.pending_image_count()); + let pw = input_view::prefix_width(app.pending_image_count()); let input_h = input_view::input_height(&app.input, size.width, pw); let conv = state.active_conversation(); let banner_h = views::retry_banner::banner_height(&conv.retry_banner); @@ -125,7 +124,7 @@ pub fn draw(f: &mut Frame, app: &mut App) { let pending_question = conv.pending_question.clone(); let topology_data = if app.show_topology { use loopal_protocol::AgentStatus; - let root_status = if conv.agent_idle { + let root_status = if state.is_active_agent_idle() { AgentStatus::WaitingForInput } else { AgentStatus::Running @@ -146,7 +145,6 @@ pub fn draw(f: &mut Frame, app: &mut App) { f, &app.input, app.input_cursor, - inbox_count, image_count, app.input_scroll, layout.input, diff --git a/crates/loopal-tui/src/tui_loop.rs b/crates/loopal-tui/src/tui_loop.rs index 8c8fc23c..8fee6ac2 100644 --- a/crates/loopal-tui/src/tui_loop.rs +++ b/crates/loopal-tui/src/tui_loop.rs @@ -77,9 +77,7 @@ where { load_resumed_display(app, session_id); } - if let Some(content) = app.session.handle_event(agent_event) { - app.session.route_message(content).await; - } + app.session.handle_event(agent_event); } AppEvent::Paste(result) => { paste::apply_paste_result(app, result); diff --git a/crates/loopal-tui/src/views/input_view.rs b/crates/loopal-tui/src/views/input_view.rs index 7ae9dbaa..33ce1677 100644 --- a/crates/loopal-tui/src/views/input_view.rs +++ b/crates/loopal-tui/src/views/input_view.rs @@ -19,7 +19,6 @@ pub fn render_input( f: &mut Frame, input: &str, cursor: usize, - inbox_count: usize, image_count: usize, input_scroll: usize, area: Rect, @@ -28,7 +27,7 @@ pub fn render_input( return; } - let prefix = build_prefix(inbox_count, image_count); + let prefix = build_prefix(image_count); let prefix_width = display_width(&prefix); let content_width = (area.width as usize).saturating_sub(prefix_width); @@ -71,8 +70,8 @@ pub fn input_height(input: &str, area_width: u16, prefix_width: usize) -> u16 { } /// Calculate the prefix display width (exported for layout computation). -pub fn prefix_width(inbox_count: usize, image_count: usize) -> usize { - display_width(&build_prefix(inbox_count, image_count)) +pub fn prefix_width(image_count: usize) -> usize { + display_width(&build_prefix(image_count)) } // --- Internal helpers --- @@ -138,12 +137,11 @@ fn set_cursor( f.set_cursor_position((x, y)); } -fn build_prefix(inbox_count: usize, image_count: usize) -> String { - match (inbox_count > 0, image_count > 0) { - (true, true) => format!("> [img:{image_count}] ({inbox_count} queued) "), - (true, false) => format!("> ({inbox_count} queued) "), - (false, true) => format!("> [img:{image_count}] "), - (false, false) => "> ".to_string(), +fn build_prefix(image_count: usize) -> String { + if image_count > 0 { + format!("> [img:{image_count}] ") + } else { + "> ".to_string() } } @@ -184,14 +182,13 @@ mod tests { #[test] fn test_prefix_variants() { - assert_eq!(build_prefix(0, 0), "> "); - assert_eq!(build_prefix(2, 0), "> (2 queued) "); - assert_eq!(build_prefix(0, 1), "> [img:1] "); - assert_eq!(build_prefix(3, 2), "> [img:2] (3 queued) "); + assert_eq!(build_prefix(0), "> "); + assert_eq!(build_prefix(1), "> [img:1] "); + assert_eq!(build_prefix(2), "> [img:2] "); } #[test] fn test_prefix_width_fn() { - assert_eq!(prefix_width(0, 0), 2); + assert_eq!(prefix_width(0), 2); } } diff --git a/crates/loopal-tui/src/views/unified_status.rs b/crates/loopal-tui/src/views/unified_status.rs index 3ee4c1a4..e43dde18 100644 --- a/crates/loopal-tui/src/views/unified_status.rs +++ b/crates/loopal-tui/src/views/unified_status.rs @@ -100,7 +100,7 @@ fn status_icon_and_label( Style::default().fg(Color::Yellow), "Waiting", ) - } else if !conv.agent_idle { + } else if !state.is_active_agent_idle() { let frame = spinner_frame(elapsed); ( frame.to_string(), @@ -131,7 +131,7 @@ pub fn spinner_frame(elapsed: std::time::Duration) -> &'static str { fn is_agent_active(state: &SessionState) -> bool { let conv = state.active_conversation(); - !conv.agent_idle + !state.is_active_agent_idle() || !conv.streaming_text.is_empty() || conv.thinking_active || has_live_subagents(state) diff --git a/crates/loopal-tui/tests/suite/app_test.rs b/crates/loopal-tui/tests/suite/app_test.rs index 336e7a06..56864ea2 100644 --- a/crates/loopal-tui/tests/suite/app_test.rs +++ b/crates/loopal-tui/tests/suite/app_test.rs @@ -65,50 +65,20 @@ fn test_submit_input_returns_text_and_resets() { #[test] fn test_awaiting_input_sets_idle() { let (app, _, _) = make_app(); - assert!(!app.session.lock().active_conversation().agent_idle); + assert!(!app.session.lock().is_active_agent_idle()); app.session .handle_event(AgentEvent::root(AgentEventPayload::AwaitingInput)); - assert!(app.session.lock().active_conversation().agent_idle); + assert!(app.session.lock().is_active_agent_idle()); } #[test] -fn test_awaiting_input_forwards_inbox_message() { +fn test_awaiting_input_does_not_auto_forward() { let (app, _, _) = make_app(); - { - let mut state = app.session.lock(); - state.inbox.push("queued".into()); - } - // AwaitingInput sets idle=true then tries forward - let forwarded = app - .session + // AwaitingInput no longer auto-forwards — messages go directly to agent mailbox. + app.session .handle_event(AgentEvent::root(AgentEventPayload::AwaitingInput)); - assert_eq!(forwarded.map(|c| c.text), Some("queued".to_string())); let state = app.session.lock(); - let conv = state.active_conversation(); - assert!(!conv.agent_idle); // forwarding clears idle - assert!(state.inbox.is_empty()); - assert_eq!(conv.messages.last().unwrap().role, "user"); - assert_eq!(conv.messages.last().unwrap().content, "queued"); -} - -#[test] -fn test_pop_inbox_to_input() { - let (mut app, _, _) = make_app(); - { - let mut state = app.session.lock(); - state.inbox.push("first".into()); - state.inbox.push("second".into()); - } - assert!(app.pop_inbox_to_input()); - assert_eq!(app.input, "second"); - assert_eq!(app.input_cursor, 6); - assert_eq!(app.session.lock().inbox.len(), 1); -} - -#[test] -fn test_pop_inbox_empty_returns_false() { - let (mut app, _, _) = make_app(); - assert!(!app.pop_inbox_to_input()); + assert!(state.is_active_agent_idle()); } fn sample_image(label: &str) -> ImageAttachment { diff --git a/crates/loopal-tui/tests/suite/command_edge_test.rs b/crates/loopal-tui/tests/suite/command_edge_test.rs index 637a6e26..88fba1f4 100644 --- a/crates/loopal-tui/tests/suite/command_edge_test.rs +++ b/crates/loopal-tui/tests/suite/command_edge_test.rs @@ -1,6 +1,6 @@ /// Command edge cases: handler effects, skill expansion, sub-page open. use loopal_config::Skill; -use loopal_protocol::{AgentMode, ControlCommand, UserQuestionResponse}; +use loopal_protocol::{AgentMode, AgentStatus, ControlCommand, UserQuestionResponse}; use loopal_session::SessionController; use loopal_tui::app::App; @@ -97,7 +97,7 @@ async fn test_rewind_on_idle_opens_sub_page() { let mut app = make_app(); { let mut state = app.session.lock(); - state.active_conversation_mut().agent_idle = true; + state.agents.get_mut("main").unwrap().observable.status = AgentStatus::WaitingForInput; state .active_conversation_mut() .messages @@ -118,7 +118,13 @@ async fn test_rewind_on_idle_opens_sub_page() { async fn test_rewind_on_busy_agent_shows_error() { let mut app = make_app(); { - app.session.lock().active_conversation_mut().agent_idle = false; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::Running; } let handler = app.command_registry.find("/rewind").unwrap(); handler.execute(&mut app, None).await; diff --git a/crates/loopal-tui/tests/suite/input_scroll_edge_test.rs b/crates/loopal-tui/tests/suite/input_scroll_edge_test.rs index 5e4e571d..c7033f4f 100644 --- a/crates/loopal-tui/tests/suite/input_scroll_edge_test.rs +++ b/crates/loopal-tui/tests/suite/input_scroll_edge_test.rs @@ -2,8 +2,8 @@ /// Ctrl+P/N scroll reset, and multiline boundary fall-through to history. use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; -use loopal_protocol::ControlCommand; use loopal_protocol::UserQuestionResponse; +use loopal_protocol::{AgentStatus, ControlCommand}; use loopal_session::SessionController; use loopal_tui::app::App; use loopal_tui::input::handle_key; @@ -78,7 +78,13 @@ fn test_esc_preserves_scroll_offset() { #[test] fn test_up_navigates_history_when_agent_busy() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = false; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::Running; app.input_history.push("prev".into()); handle_key(&mut app, key(KeyCode::Up)); assert_eq!( diff --git a/crates/loopal-tui/tests/suite/input_scroll_test.rs b/crates/loopal-tui/tests/suite/input_scroll_test.rs index a7eef8ca..8a7382f7 100644 --- a/crates/loopal-tui/tests/suite/input_scroll_test.rs +++ b/crates/loopal-tui/tests/suite/input_scroll_test.rs @@ -10,8 +10,8 @@ /// Ctrl+P/N always navigate history regardless of scroll state. use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; -use loopal_protocol::ControlCommand; use loopal_protocol::UserQuestionResponse; +use loopal_protocol::{AgentStatus, ControlCommand}; use loopal_session::SessionController; use loopal_tui::app::App; use loopal_tui::input::{InputAction, handle_key}; @@ -83,7 +83,13 @@ fn test_down_resets_scroll_and_navigates_history() { #[test] fn test_up_navigates_history_when_idle() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; app.input_history.push("older".into()); app.input_history.push("recent".into()); handle_key(&mut app, key(KeyCode::Up)); @@ -94,7 +100,13 @@ fn test_up_navigates_history_when_idle() { #[test] fn test_down_navigates_history_forward() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; app.input_history.push("first".into()); app.input_history.push("second".into()); handle_key(&mut app, key(KeyCode::Up)); @@ -107,7 +119,13 @@ fn test_down_navigates_history_forward() { #[test] fn test_up_navigates_history_when_content_fits() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; app.input_history.push("previous command".into()); let action = handle_key(&mut app, key(KeyCode::Up)); assert!(matches!(action, InputAction::None)); @@ -120,7 +138,13 @@ fn test_up_navigates_history_when_content_fits() { #[test] fn test_ctrl_p_navigates_history() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; app.input_history.push("first".into()); app.input_history.push("second".into()); handle_key(&mut app, ctrl('p')); @@ -131,7 +155,13 @@ fn test_ctrl_p_navigates_history() { #[test] fn test_ctrl_n_navigates_history_forward() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; app.input_history.push("first".into()); app.input_history.push("second".into()); handle_key(&mut app, ctrl('p')); diff --git a/crates/loopal-tui/tests/suite/input_test.rs b/crates/loopal-tui/tests/suite/input_test.rs index bcb86b2a..0c041a9a 100644 --- a/crates/loopal-tui/tests/suite/input_test.rs +++ b/crates/loopal-tui/tests/suite/input_test.rs @@ -1,7 +1,7 @@ /// Input handling tests: key routing priority chain + basic interactions. use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; -use loopal_protocol::{ControlCommand, UserQuestionResponse}; +use loopal_protocol::{AgentStatus, ControlCommand, UserQuestionResponse}; use loopal_session::SessionController; use loopal_tui::app::App; @@ -50,7 +50,13 @@ fn test_ctrl_c_clears_input_when_non_empty() { #[test] fn test_ctrl_c_interrupts_when_agent_busy() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = false; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::Running; let action = handle_key(&mut app, ctrl('c')); assert!(matches!(action, InputAction::Interrupt)); } @@ -58,7 +64,13 @@ fn test_ctrl_c_interrupts_when_agent_busy() { #[test] fn test_ctrl_c_noop_when_idle_and_empty() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; let action = handle_key(&mut app, ctrl('c')); assert!(matches!(action, InputAction::None)); }