From 38232e5197153c26843184cab838005054c3de63 Mon Sep 17 00:00:00 2001 From: yishuiliunian Date: Sun, 5 Apr 2026 16:46:25 +0800 Subject: [PATCH] refactor: deliver messages directly to agent mailbox, remove TUI inbox routing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TUI previously buffered messages in a local Inbox and used agent_idle as a routing decision: when agent was busy, messages were queued locally and an interrupt signal was used to wake the agent to drain the queue. This was TUI concept leakage into the agent runtime — agent internal state was being directly mutated by the display layer, and interrupt was overloaded with "cancel work" AND "new message arrived" semantics. The worst symptom: ephemeral agents exited immediately on user input because the yield_now idle check fired before the TUI's delayed inbox forward could complete the IPC round-trip. Architecture principles applied: - Agent owns its internal state (idle/busy); external actors interact only via mailbox delivery + explicit interrupt - TUI state (observable.status, is_idle) is derived solely from agent events, never set directly - InterruptSignal carries only cancel/shutdown semantics Key changes: - Remove agent_idle field; derive via AgentViewState::is_idle() from status - Remove Inbox struct entirely (dead after routing bypass) - TUI calls append_user_display() + route_message() directly — no buffering - Ephemeral idle check: drop yield_now(), drain_pending() is reliable now - drain_pending() returns Vec so Control commands are not silently dropped - Merge InputFromClient into AgentInput (Interrupt variant was vestigial) - Status transition rolls back on emit failure for consistency - Upgrade observation-channel try_send drops from debug to warn for prod visibility 49/49 tests pass, zero clippy warnings, rustfmt clean. --- crates/loopal-agent-hub/src/agent_io.rs | 2 +- .../src/agent_registry/completion.rs | 4 +- crates/loopal-agent-hub/src/routing.rs | 7 +- crates/loopal-agent-hub/src/spawn_manager.rs | 2 +- .../loopal-agent-server/src/hub_frontend.rs | 36 +++----- crates/loopal-agent-server/src/lib.rs | 5 +- .../src/session_forward.rs | 8 +- crates/loopal-agent-server/src/session_hub.rs | 12 +-- .../loopal-agent-server/src/session_start.rs | 5 +- crates/loopal-agent-server/tests/suite.rs | 2 + .../tests/suite/hub_drain_test.rs | 65 ++++++++++++++ .../tests/suite/hub_frontend_edge_test.rs | 12 +-- .../tests/suite/hub_frontend_test.rs | 5 +- .../tests/suite/hub_harness.rs | 8 +- crates/loopal-protocol/src/agent_state.rs | 5 ++ crates/loopal-protocol/src/interrupt.rs | 6 +- crates/loopal-runtime/src/agent_loop/input.rs | 16 +++- crates/loopal-runtime/src/agent_loop/run.rs | 5 +- .../loopal-runtime/src/agent_loop/runner.rs | 18 +++- .../src/agent_loop/tools_inject.rs | 40 +++++---- crates/loopal-runtime/src/frontend/traits.rs | 12 ++- crates/loopal-runtime/src/frontend/unified.rs | 16 ++-- crates/loopal-runtime/tests/suite.rs | 2 + .../tests/suite/drain_controls_test.rs | 70 +++++++++++++++ .../tests/suite/frontend_unified_test.rs | 15 +++- .../loopal-session/src/agent_conversation.rs | 1 - crates/loopal-session/src/agent_handler.rs | 18 ++-- crates/loopal-session/src/agent_lifecycle.rs | 21 ++--- crates/loopal-session/src/controller.rs | 26 ++++-- .../loopal-session/src/controller_control.rs | 2 - crates/loopal-session/src/controller_ops.rs | 7 -- crates/loopal-session/src/event_handler.rs | 9 +- crates/loopal-session/src/inbox.rs | 83 ------------------ crates/loopal-session/src/lib.rs | 1 - crates/loopal-session/src/session_display.rs | 9 +- crates/loopal-session/src/state.rs | 30 +++++-- crates/loopal-session/tests/suite.rs | 6 +- .../tests/suite/agent_routing_test.rs | 2 +- .../tests/suite/controller_async_test.rs | 32 ++++--- .../tests/suite/controller_edge_test.rs | 78 ----------------- .../tests/suite/controller_test.rs | 28 ++---- .../tests/suite/event_handler_test.rs | 40 +++------ .../loopal-session/tests/suite/inbox_test.rs | 61 ------------- .../tests/suite/is_idle_test.rs | 38 ++++++++ .../tests/suite/resume_display_test.rs | 2 +- .../tests/suite/user_display_test.rs | 87 +++++++++++++++++++ crates/loopal-tui/src/app/mod.rs | 13 --- crates/loopal-tui/src/command/mod.rs | 2 +- crates/loopal-tui/src/command/rewind_cmd.rs | 2 +- crates/loopal-tui/src/input/actions.rs | 2 +- crates/loopal-tui/src/input/editing.rs | 2 +- crates/loopal-tui/src/input/navigation.rs | 8 +- crates/loopal-tui/src/key_dispatch_ops.rs | 11 +-- crates/loopal-tui/src/render.rs | 6 +- crates/loopal-tui/src/tui_loop.rs | 4 +- crates/loopal-tui/src/views/input_view.rs | 27 +++--- crates/loopal-tui/src/views/unified_status.rs | 4 +- crates/loopal-tui/tests/suite/app_test.rs | 42 ++------- .../tests/suite/command_edge_test.rs | 12 ++- .../tests/suite/input_scroll_edge_test.rs | 10 ++- .../tests/suite/input_scroll_test.rs | 42 +++++++-- crates/loopal-tui/tests/suite/input_test.rs | 18 +++- 62 files changed, 619 insertions(+), 545 deletions(-) create mode 100644 crates/loopal-agent-server/tests/suite/hub_drain_test.rs create mode 100644 crates/loopal-runtime/tests/suite/drain_controls_test.rs delete mode 100644 crates/loopal-session/src/inbox.rs delete mode 100644 crates/loopal-session/tests/suite/controller_edge_test.rs delete mode 100644 crates/loopal-session/tests/suite/inbox_test.rs create mode 100644 crates/loopal-session/tests/suite/is_idle_test.rs create mode 100644 crates/loopal-session/tests/suite/user_display_test.rs diff --git a/crates/loopal-agent-hub/src/agent_io.rs b/crates/loopal-agent-hub/src/agent_io.rs index e1b73ada..bdfb6abe 100644 --- a/crates/loopal-agent-hub/src/agent_io.rs +++ b/crates/loopal-agent-hub/src/agent_io.rs @@ -48,7 +48,7 @@ pub async fn agent_io_loop( } let h = hub.lock().await; if h.registry.event_sender().try_send(event).is_err() { - tracing::debug!(agent = %agent_name, "event dropped (channel full)"); + tracing::warn!(agent = %agent_name, "event dropped (channel full)"); } } } diff --git a/crates/loopal-agent-hub/src/agent_registry/completion.rs b/crates/loopal-agent-hub/src/agent_registry/completion.rs index 169230db..aebc3f8e 100644 --- a/crates/loopal-agent-hub/src/agent_registry/completion.rs +++ b/crates/loopal-agent-hub/src/agent_registry/completion.rs @@ -27,7 +27,9 @@ impl AgentRegistry { let pending_delivery = self.prepare_parent_delivery(name, &text); let event = AgentEvent::named(name, AgentEventPayload::Finished); - let _ = self.event_tx.try_send(event); + if self.event_tx.try_send(event).is_err() { + tracing::warn!(agent = %name, "Finished event dropped (channel full)"); + } if let Some(tx) = self.completions.remove(name) { let _ = tx.send(Some(text)); diff --git a/crates/loopal-agent-hub/src/routing.rs b/crates/loopal-agent-hub/src/routing.rs index fa8e181b..3fadbb95 100644 --- a/crates/loopal-agent-hub/src/routing.rs +++ b/crates/loopal-agent-hub/src/routing.rs @@ -26,6 +26,11 @@ pub async fn route_to_agent( target: envelope.target.clone(), content_preview: envelope.content_preview().to_string(), }); - let _ = observation_tx.try_send(event); + if observation_tx.try_send(event).is_err() { + tracing::warn!( + target = %envelope.target, + "observation event dropped (channel full)" + ); + } Ok(()) } diff --git a/crates/loopal-agent-hub/src/spawn_manager.rs b/crates/loopal-agent-hub/src/spawn_manager.rs index a573e6ca..78bd7598 100644 --- a/crates/loopal-agent-hub/src/spawn_manager.rs +++ b/crates/loopal-agent-hub/src/spawn_manager.rs @@ -130,7 +130,7 @@ pub async fn register_agent_connection( session_id: session_id.map(String::from), }); if h.registry.event_sender().try_send(event).is_err() { - tracing::debug!(agent = %name, "SubAgentSpawned event dropped"); + tracing::warn!(agent = %name, "SubAgentSpawned event dropped (channel full)"); } } agent_id diff --git a/crates/loopal-agent-server/src/hub_frontend.rs b/crates/loopal-agent-server/src/hub_frontend.rs index 549d77db..8d635b79 100644 --- a/crates/loopal-agent-server/src/hub_frontend.rs +++ b/crates/loopal-agent-server/src/hub_frontend.rs @@ -14,18 +14,18 @@ use tracing::{debug, info}; use loopal_error::{LoopalError, Result}; use loopal_ipc::protocol::methods; -use loopal_protocol::{AgentEvent, AgentEventPayload, Envelope, Question, UserQuestionResponse}; +use loopal_protocol::{AgentEvent, AgentEventPayload, Question, UserQuestionResponse}; use loopal_runtime::agent_input::AgentInput; use loopal_runtime::frontend::traits::{AgentFrontend, EventEmitter}; use loopal_tool_api::PermissionDecision; use crate::hub_emitter::HubEventEmitter; -use crate::session_hub::{InputFromClient, SharedSession}; +use crate::session_hub::SharedSession; /// Frontend that multiplexes across all clients in a shared session. pub struct HubFrontend { session: tokio::sync::RwLock>, - input_rx: tokio::sync::Mutex>, + input_rx: tokio::sync::Mutex>, agent_name: Option, /// Watch channel for interrupt detection in recv_input. interrupt_rx: tokio::sync::Mutex>, @@ -34,7 +34,7 @@ pub struct HubFrontend { impl HubFrontend { pub fn new( session: Arc, - input_rx: tokio::sync::mpsc::Receiver, + input_rx: tokio::sync::mpsc::Receiver, agent_name: Option, interrupt_rx: tokio::sync::watch::Receiver, ) -> Self { @@ -95,18 +95,12 @@ impl AgentFrontend for HubFrontend { // has already been handled by TurnCancel. Without this, changed() // fires immediately on the old value and exits the agent loop. interrupt_rx.borrow_and_update(); - loop { - tokio::select! { - msg = rx.recv() => { - match msg? { - InputFromClient::Message(env) => return Some(AgentInput::Message(env)), - InputFromClient::Control(cmd) => return Some(AgentInput::Control(cmd)), - InputFromClient::Interrupt => continue, - } - } - _ = interrupt_rx.changed() => { - return None; // Interrupted — exit agent loop - } + tokio::select! { + msg = rx.recv() => { + return msg; + } + _ = interrupt_rx.changed() => { + return None; // Interrupted — exit agent loop } } } @@ -199,14 +193,12 @@ impl AgentFrontend for HubFrontend { true } - async fn drain_pending(&self) -> Vec { + async fn drain_pending(&self) -> Vec { let mut rx = self.input_rx.lock().await; - let mut envelopes = Vec::new(); + let mut inputs = Vec::new(); while let Ok(msg) = rx.try_recv() { - if let InputFromClient::Message(env) = msg { - envelopes.push(env); - } + inputs.push(msg); } - envelopes + inputs } } diff --git a/crates/loopal-agent-server/src/lib.rs b/crates/loopal-agent-server/src/lib.rs index 95dc992c..42f7a33f 100644 --- a/crates/loopal-agent-server/src/lib.rs +++ b/crates/loopal-agent-server/src/lib.rs @@ -48,7 +48,7 @@ pub fn ipc_frontend_for_test( #[doc(hidden)] pub fn hub_frontend_for_test( session: std::sync::Arc, - input_rx: tokio::sync::mpsc::Receiver, + input_rx: tokio::sync::mpsc::Receiver, interrupt_rx: tokio::sync::watch::Receiver, ) -> std::sync::Arc { std::sync::Arc::new(hub_frontend::HubFrontend::new( @@ -64,5 +64,6 @@ pub fn hub_frontend_for_test( pub mod testing { pub use crate::agent_setup::build_with_frontend; pub use crate::params::{StartParams, build_kernel_with_provider}; - pub use crate::session_hub::{InputFromClient, SharedSession}; + pub use crate::session_hub::SharedSession; + pub use loopal_runtime::agent_input::AgentInput; } diff --git a/crates/loopal-agent-server/src/session_forward.rs b/crates/loopal-agent-server/src/session_forward.rs index 2ca20f70..31397ac9 100644 --- a/crates/loopal-agent-server/src/session_forward.rs +++ b/crates/loopal-agent-server/src/session_forward.rs @@ -10,8 +10,8 @@ use loopal_ipc::connection::{Connection, Incoming}; use loopal_ipc::jsonrpc; use loopal_ipc::protocol::methods; use loopal_protocol::{ControlCommand, Envelope}; +use loopal_runtime::agent_input::AgentInput; -use crate::session_hub::InputFromClient; use crate::session_start::SessionHandle; /// Result of forward_loop — tells dispatch_loop what happened. @@ -74,7 +74,7 @@ pub(crate) async fn forward_loop( } else if method == methods::AGENT_MESSAGE.name { // Hub-injected message (e.g. sub-agent completion notification). if let Ok(env) = serde_json::from_value::(params) { - let _ = session.input_tx.send(InputFromClient::Message(env)).await; + let _ = session.input_tx.send(AgentInput::Message(env)).await; } } } @@ -98,7 +98,7 @@ async fn route_request( match method { m if m == methods::AGENT_MESSAGE.name => match serde_json::from_value::(params) { Ok(env) => { - let _ = session.input_tx.send(InputFromClient::Message(env)).await; + let _ = session.input_tx.send(AgentInput::Message(env)).await; let _ = connection .respond(id, serde_json::json!({"ok": true})) .await; @@ -112,7 +112,7 @@ async fn route_request( m if m == methods::AGENT_CONTROL.name => { match serde_json::from_value::(params) { Ok(cmd) => { - let _ = session.input_tx.send(InputFromClient::Control(cmd)).await; + let _ = session.input_tx.send(AgentInput::Control(cmd)).await; let _ = connection .respond(id, serde_json::json!({"ok": true})) .await; diff --git a/crates/loopal-agent-server/src/session_hub.rs b/crates/loopal-agent-server/src/session_hub.rs index 73a4f611..ae6863c6 100644 --- a/crates/loopal-agent-server/src/session_hub.rs +++ b/crates/loopal-agent-server/src/session_hub.rs @@ -8,6 +8,7 @@ use tokio::sync::Mutex; use loopal_ipc::connection::Connection; use loopal_protocol::InterruptSignal; +use loopal_runtime::agent_input::AgentInput; /// A connected client handle within a shared session. pub struct ClientHandle { @@ -22,19 +23,12 @@ pub struct SharedSession { pub session_id: String, pub clients: Mutex>, /// Channel to send input into the agent loop. - pub input_tx: tokio::sync::mpsc::Sender, + pub input_tx: tokio::sync::mpsc::Sender, /// Interrupt signal shared with the agent loop. pub interrupt: InterruptSignal, pub interrupt_tx: Arc>, } -/// Input forwarded from a client connection to the agent loop. -pub enum InputFromClient { - Message(loopal_protocol::Envelope), - Control(loopal_protocol::ControlCommand), - Interrupt, -} - /// Server-wide session registry. #[derive(Default)] pub struct SessionHub { @@ -108,7 +102,7 @@ impl SessionHub { impl SharedSession { /// Create a placeholder session (for bootstrapping before session_id is known). pub fn placeholder( - input_tx: tokio::sync::mpsc::Sender, + input_tx: tokio::sync::mpsc::Sender, interrupt: InterruptSignal, interrupt_tx: Arc>, ) -> Self { diff --git a/crates/loopal-agent-server/src/session_start.rs b/crates/loopal-agent-server/src/session_start.rs index 8ff6e915..138e9c66 100644 --- a/crates/loopal-agent-server/src/session_start.rs +++ b/crates/loopal-agent-server/src/session_start.rs @@ -10,12 +10,13 @@ use loopal_config::load_config; use loopal_error::AgentOutput; use loopal_ipc::connection::Connection; use loopal_protocol::InterruptSignal; +use loopal_runtime::agent_input::AgentInput; use loopal_runtime::agent_loop; use crate::agent_setup; use crate::hub_frontend::HubFrontend; use crate::params::StartParams; -use crate::session_hub::{InputFromClient, SessionHub, SharedSession}; +use crate::session_hub::{SessionHub, SharedSession}; /// Handle returned to the dispatch loop after starting a session. pub(crate) struct SessionHandle { @@ -77,7 +78,7 @@ pub(crate) async fn start_session( }; // Create session infrastructure - let (input_tx, input_rx) = tokio::sync::mpsc::channel::(16); + let (input_tx, input_rx) = tokio::sync::mpsc::channel::(16); let interrupt = InterruptSignal::new(); let (watch_tx, watch_rx) = tokio::sync::watch::channel(0u64); let interrupt_tx = Arc::new(watch_tx); diff --git a/crates/loopal-agent-server/tests/suite.rs b/crates/loopal-agent-server/tests/suite.rs index 23ebe2e5..2f8c3b2a 100644 --- a/crates/loopal-agent-server/tests/suite.rs +++ b/crates/loopal-agent-server/tests/suite.rs @@ -6,6 +6,8 @@ mod bridge_edge_test; mod bridge_helpers; #[path = "suite/dispatch_loop_test.rs"] mod dispatch_loop_test; +#[path = "suite/hub_drain_test.rs"] +mod hub_drain_test; #[path = "suite/hub_frontend_edge_test.rs"] mod hub_frontend_edge_test; #[path = "suite/hub_frontend_test.rs"] diff --git a/crates/loopal-agent-server/tests/suite/hub_drain_test.rs b/crates/loopal-agent-server/tests/suite/hub_drain_test.rs new file mode 100644 index 00000000..c0c7131f --- /dev/null +++ b/crates/loopal-agent-server/tests/suite/hub_drain_test.rs @@ -0,0 +1,65 @@ +//! Tests for HubFrontend::drain_pending() — message and control routing. + +use std::sync::Arc; + +use tokio::sync::Mutex; + +use loopal_protocol::{ControlCommand, Envelope, InterruptSignal, MessageSource}; +use loopal_runtime::agent_input::AgentInput; +use loopal_runtime::frontend::traits::AgentFrontend; + +use loopal_agent_server::hub_frontend::HubFrontend; +use loopal_agent_server::session_hub::SharedSession; + +fn make_session() -> ( + Arc, + tokio::sync::mpsc::Sender, + tokio::sync::mpsc::Receiver, + tokio::sync::watch::Receiver, +) { + let (input_tx, input_rx) = tokio::sync::mpsc::channel(16); + let interrupt = InterruptSignal::new(); + let (watch_tx, watch_rx) = tokio::sync::watch::channel(0u64); + let session = Arc::new(SharedSession { + session_id: "test-session".into(), + clients: Mutex::new(Vec::new()), + input_tx: input_tx.clone(), + interrupt, + interrupt_tx: Arc::new(watch_tx), + }); + (session, input_tx, input_rx, watch_rx) +} + +#[tokio::test] +async fn test_hub_drain_pending_messages() { + let (session, input_tx, input_rx, watch_rx) = make_session(); + let frontend = HubFrontend::new(session, input_rx, None, watch_rx); + + let env = Envelope::new(MessageSource::Human, "main", "hello"); + input_tx.send(AgentInput::Message(env)).await.unwrap(); + + let pending = frontend.drain_pending().await; + assert_eq!(pending.len(), 1); + let AgentInput::Message(ref env) = pending[0] else { + panic!("expected AgentInput::Message"); + }; + assert_eq!(env.content.text, "hello"); +} + +#[tokio::test] +async fn test_hub_drain_pending_controls() { + let (session, input_tx, input_rx, watch_rx) = make_session(); + let frontend = HubFrontend::new(session, input_rx, None, watch_rx); + + input_tx + .send(AgentInput::Control(ControlCommand::Clear)) + .await + .unwrap(); + + let pending = frontend.drain_pending().await; + assert_eq!(pending.len(), 1); + assert!(matches!( + pending[0], + AgentInput::Control(ControlCommand::Clear) + )); +} diff --git a/crates/loopal-agent-server/tests/suite/hub_frontend_edge_test.rs b/crates/loopal-agent-server/tests/suite/hub_frontend_edge_test.rs index a25dbb77..e1fc1551 100644 --- a/crates/loopal-agent-server/tests/suite/hub_frontend_edge_test.rs +++ b/crates/loopal-agent-server/tests/suite/hub_frontend_edge_test.rs @@ -11,14 +11,14 @@ use loopal_runtime::agent_input::AgentInput; use loopal_runtime::frontend::traits::AgentFrontend; use loopal_agent_server::hub_frontend::HubFrontend; -use loopal_agent_server::session_hub::{InputFromClient, SharedSession}; +use loopal_agent_server::session_hub::SharedSession; const T: Duration = Duration::from_secs(5); fn make_session() -> ( Arc, - tokio::sync::mpsc::Sender, - tokio::sync::mpsc::Receiver, + tokio::sync::mpsc::Sender, + tokio::sync::mpsc::Receiver, tokio::sync::watch::Receiver, ) { let (input_tx, input_rx) = tokio::sync::mpsc::channel(16); @@ -60,7 +60,7 @@ async fn stale_interrupt_does_not_exit_recv_input() { // Now send a real message — recv_input should return it. let env = Envelope::new(MessageSource::Human, "main", "hello after interrupt"); - input_tx.send(InputFromClient::Message(env)).await.unwrap(); + input_tx.send(AgentInput::Message(env)).await.unwrap(); let result = tokio::time::timeout(T, recv_task).await.unwrap().unwrap(); assert!( @@ -104,7 +104,7 @@ async fn interrupt_then_continue_cycle() { // Send a message — should be delivered. let env = Envelope::new(MessageSource::Human, "main", "continue working"); - input_tx.send(InputFromClient::Message(env)).await.unwrap(); + input_tx.send(AgentInput::Message(env)).await.unwrap(); let result2 = tokio::time::timeout(T, recv2).await.unwrap().unwrap(); assert!( @@ -136,7 +136,7 @@ async fn multiple_stale_interrupts_all_consumed() { ); let env = Envelope::new(MessageSource::Human, "main", "msg"); - input_tx.send(InputFromClient::Message(env)).await.unwrap(); + input_tx.send(AgentInput::Message(env)).await.unwrap(); let result = tokio::time::timeout(T, recv_task).await.unwrap().unwrap(); assert!(matches!(result, Some(AgentInput::Message(_)))); diff --git a/crates/loopal-agent-server/tests/suite/hub_frontend_test.rs b/crates/loopal-agent-server/tests/suite/hub_frontend_test.rs index 3330d52d..19fddf7a 100644 --- a/crates/loopal-agent-server/tests/suite/hub_frontend_test.rs +++ b/crates/loopal-agent-server/tests/suite/hub_frontend_test.rs @@ -13,7 +13,8 @@ use loopal_ipc::transport::Transport; use loopal_protocol::{AgentEventPayload, InterruptSignal}; use loopal_runtime::frontend::traits::AgentFrontend; -use loopal_agent_server::session_hub::{InputFromClient, SharedSession}; +use loopal_agent_server::session_hub::SharedSession; +use loopal_runtime::agent_input::AgentInput; /// Create a bidirectional Connection pair (like a network socket pair). /// Returns (server_conn, client_conn, client_rx). @@ -41,7 +42,7 @@ fn conn_pair() -> ( fn make_session() -> ( Arc, - tokio::sync::mpsc::Receiver, + tokio::sync::mpsc::Receiver, tokio::sync::watch::Receiver, ) { let (input_tx, input_rx) = tokio::sync::mpsc::channel(16); diff --git a/crates/loopal-agent-server/tests/suite/hub_harness.rs b/crates/loopal-agent-server/tests/suite/hub_harness.rs index 70aae7c1..a3c84311 100644 --- a/crates/loopal-agent-server/tests/suite/hub_harness.rs +++ b/crates/loopal-agent-server/tests/suite/hub_harness.rs @@ -19,13 +19,13 @@ use loopal_test_support::mock_provider::MultiCallProvider; use loopal_test_support::scenarios::Calls; use loopal_agent_server::testing::{ - InputFromClient, SharedSession, StartParams, build_kernel_with_provider, + AgentInput, SharedSession, StartParams, build_kernel_with_provider, }; pub const T: Duration = Duration::from_secs(10); pub struct HubTestHarness { - pub input_tx: mpsc::Sender, + pub input_tx: mpsc::Sender, pub interrupt: InterruptSignal, pub interrupt_tx: Arc>, pub client_rx: mpsc::Receiver, @@ -52,7 +52,7 @@ impl HubTestHarness { pub async fn send_message(&self, text: &str) { let env = Envelope::new(MessageSource::Human, "main", text); self.input_tx - .send(InputFromClient::Message(env)) + .send(AgentInput::Message(env)) .await .expect("input channel open"); } @@ -101,7 +101,7 @@ pub async fn build_hub_harness_with( let provider = Arc::new(MultiCallProvider::new(calls)); let kernel = build_kernel_with_provider(provider).unwrap(); - let (input_tx, input_rx) = mpsc::channel::(16); + let (input_tx, input_rx) = mpsc::channel::(16); let interrupt = InterruptSignal::new(); let (watch_tx, watch_rx) = watch::channel(0u64); let interrupt_tx = Arc::new(watch_tx); diff --git a/crates/loopal-protocol/src/agent_state.rs b/crates/loopal-protocol/src/agent_state.rs index 3ac7e902..96371397 100644 --- a/crates/loopal-protocol/src/agent_state.rs +++ b/crates/loopal-protocol/src/agent_state.rs @@ -20,6 +20,11 @@ pub enum AgentStatus { /// /// Collected on the Observation Plane and consumed by the frontend to render /// per-agent status panels. All fields are cheap to clone. +/// +/// **Status contract**: `status` is derived solely from agent events +/// (`AwaitingInput`, `Finished`, `Error`, `Stream`, `ToolCall`, etc.). +/// The agent loop's internal status field is for local idempotency only; +/// this observable copy is the authoritative view for all external consumers. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ObservableAgentState { /// Current lifecycle status. diff --git a/crates/loopal-protocol/src/interrupt.rs b/crates/loopal-protocol/src/interrupt.rs index 61661d09..33ff8449 100644 --- a/crates/loopal-protocol/src/interrupt.rs +++ b/crates/loopal-protocol/src/interrupt.rs @@ -1,6 +1,8 @@ //! Shared interrupt signal for cancelling in-progress agent work. //! -//! Used by both cancel-to-interrupt and send-message-while-busy flows. +//! Used exclusively for explicit cancel (ESC key) and shutdown flows. +//! Message arrival does NOT trigger interrupt — messages are deposited +//! directly into the agent's mailbox and processed at the agent's pace. //! The consumer calls `signal()`, the runtime polls `is_signaled()` at //! key checkpoints (per stream chunk, before tool execution). @@ -18,7 +20,7 @@ impl InterruptSignal { Self(Arc::new(AtomicBool::new(false))) } - /// Raise the interrupt flag (called by consumer on cancel or new message). + /// Raise the interrupt flag (called by consumer on explicit cancel or shutdown). pub fn signal(&self) { self.0.store(true, Ordering::Release); } diff --git a/crates/loopal-runtime/src/agent_loop/input.rs b/crates/loopal-runtime/src/agent_loop/input.rs index f5ed1fd6..b86f7b19 100644 --- a/crates/loopal-runtime/src/agent_loop/input.rs +++ b/crates/loopal-runtime/src/agent_loop/input.rs @@ -118,8 +118,22 @@ impl AgentLoopRunner { } /// Non-blocking drain of all pending input (frontend + scheduler + rewake). + /// + /// Returns pending message envelopes. Control commands are processed + /// inline (they affect agent config, not conversation history). pub(super) async fn drain_pending_input(&mut self) -> Vec { - let mut pending = self.params.deps.frontend.drain_pending().await; + let all_inputs = self.params.deps.frontend.drain_pending().await; + let mut pending = Vec::new(); + for input in all_inputs { + match input { + AgentInput::Message(env) => pending.push(env), + AgentInput::Control(cmd) => { + if let Err(e) = self.handle_control(cmd).await { + tracing::warn!(error = %e, "failed to handle drained control"); + } + } + } + } if let Some(ref mut rx) = self.trigger_rx { while let Ok(env) = rx.try_recv() { pending.push(env); diff --git a/crates/loopal-runtime/src/agent_loop/run.rs b/crates/loopal-runtime/src/agent_loop/run.rs index 372ae65f..135463ce 100644 --- a/crates/loopal-runtime/src/agent_loop/run.rs +++ b/crates/loopal-runtime/src/agent_loop/run.rs @@ -29,9 +29,8 @@ impl AgentLoopRunner { match self.params.config.lifecycle { LifecycleMode::Ephemeral => { - // Ephemeral agent: check for pending input. If nothing pending - // after a brief yield, work is done — exit. - tokio::task::yield_now().await; + // Messages are delivered directly to the agent mailbox. + // drain is reliable — no yield/timeout needed. let pending = self.drain_pending_input().await; if pending.is_empty() { info!("ephemeral agent idle, exiting"); diff --git a/crates/loopal-runtime/src/agent_loop/runner.rs b/crates/loopal-runtime/src/agent_loop/runner.rs index fc54b432..144eef03 100644 --- a/crates/loopal-runtime/src/agent_loop/runner.rs +++ b/crates/loopal-runtime/src/agent_loop/runner.rs @@ -28,7 +28,12 @@ pub struct AgentLoopRunner { pub trigger_rx: Option>, /// Async hook rewake channel — background hooks send Envelopes here. pub rewake_rx: Option>, - /// Explicit agent state — source of truth, propagated via events to Session layer. + /// Local status for idempotent `transition()` checks. + /// + /// This is NOT the authoritative status for external observers. The session + /// layer derives its `observable.status` solely from agent events + /// (`AwaitingInput`, `Finished`, `Error`, etc.). If `emit()` fails during + /// `transition()`, this field is rolled back so the event can be retried. pub status: AgentStatus, /// Plan file for the current session (created lazily on first plan mode entry). pub plan_file: PlanFile, @@ -175,18 +180,27 @@ impl AgentLoopRunner { } /// Transition to a new agent status. Skips if already in target (idempotent). + /// + /// If the event emission fails, the local status is rolled back so the + /// transition can be retried. This keeps `self.status` consistent with + /// what observers have actually seen. pub(super) async fn transition(&mut self, new_status: AgentStatus) -> Result<()> { if self.status == new_status { return Ok(()); } + let old = self.status; self.status = new_status; - match new_status { + let result = match new_status { AgentStatus::Starting => Ok(()), AgentStatus::Running => Ok(()), // Running is signaled implicitly by Stream/ToolCall events. AgentStatus::WaitingForInput => self.emit(AgentEventPayload::AwaitingInput).await, AgentStatus::Finished => self.emit(AgentEventPayload::Finished).await, AgentStatus::Error => Ok(()), // Error event carries a message; use transition_error(). + }; + if result.is_err() { + self.status = old; } + result } /// Transition to Error status with a message. diff --git a/crates/loopal-runtime/src/agent_loop/tools_inject.rs b/crates/loopal-runtime/src/agent_loop/tools_inject.rs index 77d06ff0..bdc9c5e7 100644 --- a/crates/loopal-runtime/src/agent_loop/tools_inject.rs +++ b/crates/loopal-runtime/src/agent_loop/tools_inject.rs @@ -60,24 +60,34 @@ impl AgentLoopRunner { Ok(()) } - /// Drain pending envelopes from the frontend and inject them as user messages. + /// Drain pending input from the frontend and inject messages into the store. + /// Control commands are processed inline. pub async fn inject_pending_messages(&mut self) { let pending = self.params.deps.frontend.drain_pending().await; - for env in pending { - let mut user_msg = build_user_message(&env); - info!( - text_len = env.content.text.len(), - "injecting pending message" - ); - if let Err(e) = self - .params - .deps - .session_manager - .save_message(&self.params.session.id, &mut user_msg) - { - error!(error = %e, "failed to persist injected message"); + for input in pending { + match input { + crate::agent_input::AgentInput::Message(env) => { + let mut user_msg = build_user_message(&env); + info!( + text_len = env.content.text.len(), + "injecting pending message" + ); + if let Err(e) = self + .params + .deps + .session_manager + .save_message(&self.params.session.id, &mut user_msg) + { + error!(error = %e, "failed to persist injected message"); + } + self.params.store.push_user(user_msg); + } + crate::agent_input::AgentInput::Control(cmd) => { + if let Err(e) = self.handle_control(cmd).await { + tracing::warn!(error = %e, "failed to handle drained control"); + } + } } - self.params.store.push_user(user_msg); } } } diff --git a/crates/loopal-runtime/src/frontend/traits.rs b/crates/loopal-runtime/src/frontend/traits.rs index 4fe349a0..64945009 100644 --- a/crates/loopal-runtime/src/frontend/traits.rs +++ b/crates/loopal-runtime/src/frontend/traits.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use crate::agent_input::AgentInput; use loopal_error::Result; use loopal_protocol::AgentEventPayload; -use loopal_protocol::{Envelope, Question}; +use loopal_protocol::Question; use loopal_tool_api::PermissionDecision; /// Outcome of a plan approval request. @@ -56,13 +56,11 @@ pub trait AgentFrontend: Send + Sync { /// Create a cloneable event emitter for use in `tokio::spawn` blocks. fn event_emitter(&self) -> Box; - /// Non-blocking drain of pending messages from the mailbox. + /// Non-blocking drain of all pending input from the mailbox. /// - /// Returns raw `Envelope`s preserving full source metadata. - /// Called between tool executions and before sub-agent exit to - /// prevent message loss. Default returns empty — root agent uses - /// consumer inbox instead. - async fn drain_pending(&self) -> Vec { + /// Returns both data messages and control commands so that ephemeral + /// agents can process controls before exiting. Default returns empty. + async fn drain_pending(&self) -> Vec { Vec::new() } diff --git a/crates/loopal-runtime/src/frontend/unified.rs b/crates/loopal-runtime/src/frontend/unified.rs index 2742045a..43095b90 100644 --- a/crates/loopal-runtime/src/frontend/unified.rs +++ b/crates/loopal-runtime/src/frontend/unified.rs @@ -120,13 +120,17 @@ impl AgentFrontend for UnifiedFrontend { )) } - async fn drain_pending(&self) -> Vec { - let mut rx = self.mailbox_rx.lock().await; - let mut envelopes = Vec::new(); - while let Ok(env) = rx.try_recv() { - envelopes.push(env); + async fn drain_pending(&self) -> Vec { + let mut mbox = self.mailbox_rx.lock().await; + let mut ctrl = self.control_rx.lock().await; + let mut inputs = Vec::new(); + while let Ok(env) = mbox.try_recv() { + inputs.push(AgentInput::Message(env)); + } + while let Ok(cmd) = ctrl.try_recv() { + inputs.push(AgentInput::Control(cmd)); } - envelopes + inputs } async fn ask_user(&self, questions: Vec) -> Vec { diff --git a/crates/loopal-runtime/tests/suite.rs b/crates/loopal-runtime/tests/suite.rs index d3075550..e88bb8bd 100644 --- a/crates/loopal-runtime/tests/suite.rs +++ b/crates/loopal-runtime/tests/suite.rs @@ -3,6 +3,8 @@ mod agent_loop; #[path = "suite/diff_tracker_test.rs"] mod diff_tracker_test; +#[path = "suite/drain_controls_test.rs"] +mod drain_controls_test; #[path = "suite/env_context_test.rs"] mod env_context_test; #[path = "suite/frontend_unified_edge_test.rs"] diff --git a/crates/loopal-runtime/tests/suite/drain_controls_test.rs b/crates/loopal-runtime/tests/suite/drain_controls_test.rs new file mode 100644 index 00000000..5355fd30 --- /dev/null +++ b/crates/loopal-runtime/tests/suite/drain_controls_test.rs @@ -0,0 +1,70 @@ +//! Tests for UnifiedFrontend::drain_pending() — control commands and mixed input. + +use loopal_protocol::ControlCommand; +use loopal_protocol::{AgentMode, Envelope, MessageSource}; +use loopal_runtime::agent_input::AgentInput; +use loopal_runtime::frontend::UnifiedFrontend; +use loopal_runtime::frontend::{AgentFrontend, AutoCancelQuestionHandler, AutoDenyHandler}; +use tokio::sync::mpsc; + +fn make_unified( + mailbox_rx: mpsc::Receiver, + control_rx: mpsc::Receiver, +) -> UnifiedFrontend { + let (event_tx, _event_rx) = mpsc::channel(16); + UnifiedFrontend::new( + Some("sub".into()), + event_tx, + mailbox_rx, + control_rx, + None, + Box::new(AutoDenyHandler), + Box::new(AutoCancelQuestionHandler), + ) +} + +#[tokio::test] +async fn test_unified_drain_pending_with_controls() { + let (_mb_tx, mb_rx) = mpsc::channel(16); + let (ctrl_tx, ctrl_rx) = mpsc::channel(16); + + let f = make_unified(mb_rx, ctrl_rx); + ctrl_tx + .send(ControlCommand::ModeSwitch(AgentMode::Plan)) + .await + .unwrap(); + + let pending = f.drain_pending().await; + assert_eq!(pending.len(), 1); + assert!(matches!( + pending[0], + AgentInput::Control(ControlCommand::ModeSwitch(AgentMode::Plan)) + )); +} + +#[tokio::test] +async fn test_unified_drain_pending_mixed() { + let (mb_tx, mb_rx) = mpsc::channel(16); + let (ctrl_tx, ctrl_rx) = mpsc::channel(16); + + let f = make_unified(mb_rx, ctrl_rx); + + mb_tx + .send(Envelope::new( + MessageSource::Agent("lead".into()), + "sub", + "do task", + )) + .await + .unwrap(); + ctrl_tx.send(ControlCommand::Clear).await.unwrap(); + + let pending = f.drain_pending().await; + assert_eq!(pending.len(), 2); + // Messages come first (mailbox drained before control channel) + assert!(matches!(pending[0], AgentInput::Message(_))); + assert!(matches!( + pending[1], + AgentInput::Control(ControlCommand::Clear) + )); +} diff --git a/crates/loopal-runtime/tests/suite/frontend_unified_test.rs b/crates/loopal-runtime/tests/suite/frontend_unified_test.rs index 7edca471..b6fc8f2f 100644 --- a/crates/loopal-runtime/tests/suite/frontend_unified_test.rs +++ b/crates/loopal-runtime/tests/suite/frontend_unified_test.rs @@ -243,10 +243,17 @@ async fn test_unified_drain_pending() { let pending = f.drain_pending().await; assert_eq!(pending.len(), 2); - assert_eq!(pending[0].source.label(), "lead"); - assert_eq!(pending[0].content.text, "task A"); - assert_eq!(pending[1].source.label(), "peer"); - assert_eq!(pending[1].content.text, "task B"); + // drain_pending returns AgentInput; extract Envelope via pattern match. + let AgentInput::Message(ref env0) = pending[0] else { + panic!("expected Message"); + }; + let AgentInput::Message(ref env1) = pending[1] else { + panic!("expected Message"); + }; + assert_eq!(env0.source.label(), "lead"); + assert_eq!(env0.content.text, "task A"); + assert_eq!(env1.source.label(), "peer"); + assert_eq!(env1.content.text, "task B"); } // --- permission: auto deny --- diff --git a/crates/loopal-session/src/agent_conversation.rs b/crates/loopal-session/src/agent_conversation.rs index dadb92a6..3a3d9d09 100644 --- a/crates/loopal-session/src/agent_conversation.rs +++ b/crates/loopal-session/src/agent_conversation.rs @@ -16,7 +16,6 @@ pub struct AgentConversation { pub streaming_text: String, pub streaming_thinking: String, pub thinking_active: bool, - pub agent_idle: bool, pub pending_permission: Option, pub pending_question: Option, /// Transient retry error banner. diff --git a/crates/loopal-session/src/agent_handler.rs b/crates/loopal-session/src/agent_handler.rs index 1bbbc780..6077fd0a 100644 --- a/crates/loopal-session/src/agent_handler.rs +++ b/crates/loopal-session/src/agent_handler.rs @@ -2,7 +2,7 @@ use std::time::Instant; -use loopal_protocol::{AgentEventPayload, AgentStatus, UserContent}; +use loopal_protocol::{AgentEventPayload, AgentStatus}; use crate::agent_lifecycle::{extract_key_param, handle_idle, post_event_cleanup}; use crate::conversation_display::{ @@ -17,11 +17,7 @@ use crate::tool_result_handler::{ use crate::types::{PendingPermission, PendingQuestion, SessionMessage}; /// Handle an agent event — writes both observable metrics and conversation state. -pub(crate) fn apply_agent_event( - state: &mut SessionState, - name: &str, - payload: AgentEventPayload, -) -> Option { +pub(crate) fn apply_agent_event(state: &mut SessionState, name: &str, payload: AgentEventPayload) { let agent = state.agents.entry(name.to_string()).or_default(); if agent.started_at.is_none() { agent.started_at = Some(Instant::now()); @@ -128,13 +124,16 @@ pub(crate) fn apply_agent_event( } AgentEventPayload::RetryCleared => conv.retry_banner = None, AgentEventPayload::AwaitingInput => { - return handle_idle(state, name, AgentStatus::WaitingForInput); + handle_idle(state, name, AgentStatus::WaitingForInput); + return; } AgentEventPayload::Finished => { - return handle_idle(state, name, AgentStatus::Finished); + handle_idle(state, name, AgentStatus::Finished); + return; } AgentEventPayload::Interrupted => { - return handle_idle(state, name, AgentStatus::WaitingForInput); + handle_idle(state, name, AgentStatus::WaitingForInput); + return; } AgentEventPayload::AutoContinuation { continuation, @@ -215,5 +214,4 @@ pub(crate) fn apply_agent_event( } } post_event_cleanup(state, name, sync_parent); - None } diff --git a/crates/loopal-session/src/agent_lifecycle.rs b/crates/loopal-session/src/agent_lifecycle.rs index 8a515ba6..19b6850e 100644 --- a/crates/loopal-session/src/agent_lifecycle.rs +++ b/crates/loopal-session/src/agent_lifecycle.rs @@ -1,24 +1,23 @@ //! Agent lifecycle: idle state transitions, error recovery, topology registration, display helpers. -use loopal_protocol::{AgentStatus, UserContent}; +use loopal_protocol::AgentStatus; -use crate::inbox::try_forward_inbox; use crate::state::{ROOT_AGENT, SessionState}; /// Shared idle handling for AwaitingInput / Finished / Interrupted. -pub(crate) fn handle_idle( - state: &mut SessionState, - name: &str, - status: AgentStatus, -) -> Option { - let agent = state.agents.get_mut(name)?; +/// +/// Updates display state only. Messages are delivered directly to the agent +/// mailbox. +pub(crate) fn handle_idle(state: &mut SessionState, name: &str, status: AgentStatus) { + let Some(agent) = state.agents.get_mut(name) else { + return; + }; agent.conversation.flush_streaming(); agent.conversation.end_turn(); if status != AgentStatus::Finished { agent.conversation.turn_count += 1; agent.observable.turn_count += 1; } - agent.conversation.agent_idle = true; agent.conversation.retry_banner = None; agent.observable.status = status; // Auto-return to root when the viewed agent finishes @@ -27,10 +26,6 @@ pub(crate) fn handle_idle( } // Truncate completed sub-agent conversation to bound memory growth truncate_if_done(state, name); - if state.active_view == name { - return try_forward_inbox(state); - } - None } /// Register a newly spawned agent with parent/child topology. diff --git a/crates/loopal-session/src/controller.rs b/crates/loopal-session/src/controller.rs index 1fca82cc..28ae354f 100644 --- a/crates/loopal-session/src/controller.rs +++ b/crates/loopal-session/src/controller.rs @@ -92,10 +92,26 @@ impl SessionController { self.backend.interrupt_target(name); } - pub fn enqueue_message(&self, content: UserContent) -> Option { + /// Optimistic display update: append a user message to the conversation view. + /// + /// This is a pure display operation — it does NOT route the message to the + /// agent. Use `route_message()` separately for delivery to the agent mailbox. + /// Agent state (idle/busy) is NOT modified; it is derived from agent events. + pub fn append_user_display(&self, content: &UserContent) { let mut state = self.lock(); - state.inbox.push(content); - crate::controller_ops::try_forward_from_inbox(&mut state) + let conv = state.active_conversation_mut(); + let image_count = content.images.len(); + let mut display_text = content.text.clone(); + if image_count > 0 { + display_text.push_str(&format!(" [+{image_count} image(s)]")); + } + conv.messages.push(crate::types::SessionMessage { + role: "user".to_string(), + content: display_text, + tool_calls: Vec::new(), + image_count, + skill_info: content.skill_info.clone(), + }); } pub async fn approve_permission(&self) { @@ -155,9 +171,9 @@ impl SessionController { // === Event handling === - pub fn handle_event(&self, event: AgentEvent) -> Option { + pub fn handle_event(&self, event: AgentEvent) { let mut state = self.lock(); - event_handler::apply_event(&mut state, event) + event_handler::apply_event(&mut state, event); } /// Set the root session ID (for sub-agent ref persistence). diff --git a/crates/loopal-session/src/controller_control.rs b/crates/loopal-session/src/controller_control.rs index cbb95fd2..84bed959 100644 --- a/crates/loopal-session/src/controller_control.rs +++ b/crates/loopal-session/src/controller_control.rs @@ -66,7 +66,6 @@ impl SessionController { conv.cache_read_tokens = 0; conv.retry_banner = None; conv.reset_timer(); - s.inbox.clear(); s.active_view.clone() }; self.backend @@ -106,7 +105,6 @@ impl SessionController { conv.cache_read_tokens = 0; conv.retry_banner = None; conv.reset_timer(); - s.inbox.clear(); s.root_session_id = Some(session_id.to_string()); s.active_view.clone() }; diff --git a/crates/loopal-session/src/controller_ops.rs b/crates/loopal-session/src/controller_ops.rs index 9043ba34..e073259c 100644 --- a/crates/loopal-session/src/controller_ops.rs +++ b/crates/loopal-session/src/controller_ops.rs @@ -4,8 +4,6 @@ use std::sync::Arc; use loopal_protocol::{ControlCommand, UserContent, UserQuestionResponse}; -use crate::inbox::try_forward_inbox; -use crate::state::SessionState; use loopal_agent_hub::{HubClient, LocalChannels}; /// Backend for session control operations. @@ -115,8 +113,3 @@ impl ControlBackend { } } } - -/// Forward a pending inbox message to the agent. -pub(crate) fn try_forward_from_inbox(state: &mut SessionState) -> Option { - try_forward_inbox(state) -} diff --git a/crates/loopal-session/src/event_handler.rs b/crates/loopal-session/src/event_handler.rs index 9d55abea..77c89276 100644 --- a/crates/loopal-session/src/event_handler.rs +++ b/crates/loopal-session/src/event_handler.rs @@ -1,12 +1,13 @@ //! AgentEvent → SessionState update logic. Unified routing for all agents. -use loopal_protocol::{AgentEvent, AgentEventPayload, UserContent}; +use loopal_protocol::{AgentEvent, AgentEventPayload}; use crate::message_log::record_message_routed; use crate::state::{ROOT_AGENT, SessionState}; -/// Handle an AgentEvent. Returns `Some(content)` if an inbox message should be forwarded. -pub fn apply_event(state: &mut SessionState, event: AgentEvent) -> Option { +/// Handle an AgentEvent. Updates display state only — messages are delivered +/// directly to the agent mailbox. +pub fn apply_event(state: &mut SessionState, event: AgentEvent) { // Global logging for inter-agent messages if let AgentEventPayload::MessageRouted { ref source, @@ -61,5 +62,5 @@ pub fn apply_event(state: &mut SessionState, event: AgentEvent) -> Option, -} - -impl Inbox { - pub fn new() -> Self { - Self { - queue: VecDeque::new(), - } - } - - pub fn push(&mut self, content: UserContent) { - self.queue.push_back(content); - } - - pub fn pop_front(&mut self) -> Option { - self.queue.pop_front() - } - - pub fn pop_back(&mut self) -> Option { - self.queue.pop_back() - } - - pub fn clear(&mut self) { - self.queue.clear(); - } - - pub fn is_empty(&self) -> bool { - self.queue.is_empty() - } - - pub fn len(&self) -> usize { - self.queue.len() - } - - pub fn iter(&self) -> impl Iterator { - self.queue.iter() - } -} - -impl Default for Inbox { - fn default() -> Self { - Self::new() - } -} - -/// Try forwarding a queued inbox message when the active-view agent is idle. -/// -/// The inbox is global (session-level): messages are always routed to the -/// currently active agent, regardless of which agent they were typed for. -pub(crate) fn try_forward_inbox(state: &mut crate::state::SessionState) -> Option { - let agent = state.agents.get_mut(&state.active_view)?; - if !agent.conversation.agent_idle { - tracing::debug!("inbox: agent busy, message queued"); - return None; - } - let content = state.inbox.pop_front()?; - tracing::debug!(text_len = content.text.len(), "inbox: forwarding message"); - let image_count = content.images.len(); - let mut display_text = content.text.clone(); - if image_count > 0 { - display_text.push_str(&format!(" [+{image_count} image(s)]")); - } - let skill_info = content.skill_info.clone(); - agent.conversation.agent_idle = false; - agent.conversation.begin_turn(); - agent - .conversation - .messages - .push(crate::types::SessionMessage { - role: "user".to_string(), - content: display_text, - tool_calls: Vec::new(), - image_count, - skill_info, - }); - Some(content) -} diff --git a/crates/loopal-session/src/lib.rs b/crates/loopal-session/src/lib.rs index b0614160..dfa15e24 100644 --- a/crates/loopal-session/src/lib.rs +++ b/crates/loopal-session/src/lib.rs @@ -7,7 +7,6 @@ mod controller_control; mod controller_ops; mod conversation_display; pub mod event_handler; -pub mod inbox; pub mod message_log; pub mod rewind; mod server_tool_display; diff --git a/crates/loopal-session/src/session_display.rs b/crates/loopal-session/src/session_display.rs index 929160e2..babb0640 100644 --- a/crates/loopal-session/src/session_display.rs +++ b/crates/loopal-session/src/session_display.rs @@ -1,6 +1,6 @@ -//! Session display state operations: messages, welcome, history, inbox. +//! Session display state operations: messages, welcome, history. -use loopal_protocol::{AgentStatus, ProjectedMessage, UserContent}; +use loopal_protocol::{AgentStatus, ProjectedMessage}; use crate::controller::SessionController; use crate::conversation_display::push_system_msg; @@ -8,10 +8,6 @@ use crate::state::ROOT_AGENT; use crate::types::{SessionMessage, SessionToolCall, ToolCallStatus}; impl SessionController { - pub fn pop_inbox_to_edit(&self) -> Option { - self.lock().inbox.pop_back() - } - pub fn push_system_message(&self, content: String) { let mut state = self.lock(); let conv = state.active_conversation_mut(); @@ -68,7 +64,6 @@ impl SessionController { agent.observable.model = m.to_string(); } agent.conversation.messages = display_msgs; - agent.conversation.agent_idle = true; agent.observable.status = AgentStatus::Finished; if let Some(parent_name) = parent && let Some(parent_agent) = state.agents.get_mut(parent_name) diff --git a/crates/loopal-session/src/state.rs b/crates/loopal-session/src/state.rs index 7e779a21..532bdaaa 100644 --- a/crates/loopal-session/src/state.rs +++ b/crates/loopal-session/src/state.rs @@ -6,6 +6,7 @@ use std::time::Instant; use indexmap::IndexMap; +use loopal_protocol::AgentStatus; /// Name of the root agent in the agents map. pub const ROOT_AGENT: &str = "main"; @@ -13,7 +14,6 @@ pub const ROOT_AGENT: &str = "main"; use loopal_protocol::ObservableAgentState; use crate::agent_conversation::AgentConversation; -use crate::inbox::Inbox; use crate::message_log::{MessageFeed, MessageLogEntry}; /// Enhanced agent view state with full observability. @@ -53,7 +53,6 @@ pub struct SessionState { // === Observation plane === pub message_feed: MessageFeed, // === Interaction state === - pub inbox: Inbox, /// Pending sub-agent refs to be persisted (drained by caller). pub pending_sub_agent_refs: Vec, } @@ -71,9 +70,10 @@ impl SessionState { pub fn new(model: String, mode: String) -> Self { let mut agents = IndexMap::new(); // Root agent "main" is a regular entry — no special treatment. - let mut main_agent = AgentViewState::default(); - main_agent.conversation.agent_idle = false; - main_agent.started_at = Some(Instant::now()); + let main_agent = AgentViewState { + started_at: Some(Instant::now()), + ..Default::default() + }; agents.insert(ROOT_AGENT.to_string(), main_agent); Self { @@ -84,11 +84,18 @@ impl SessionState { thinking_config: "auto".to_string(), root_session_id: None, message_feed: MessageFeed::new(200), - inbox: Inbox::new(), pending_sub_agent_refs: Vec::new(), } } + /// Whether the currently viewed agent is idle (derived from observable status). + pub fn is_active_agent_idle(&self) -> bool { + self.agents + .get(&self.active_view) + .map(|a| a.is_idle()) + .unwrap_or(true) + } + // === Active conversation projection (zero branching) === /// Conversation of the currently viewed agent. @@ -117,6 +124,17 @@ impl SessionState { } impl AgentViewState { + /// Whether the agent is idle — derived solely from `observable.status`. + /// + /// Agent state is internal; external consumers must derive it from events, + /// never set it directly. This replaces the former `agent_idle` flag. + pub fn is_idle(&self) -> bool { + matches!( + self.observable.status, + AgentStatus::WaitingForInput | AgentStatus::Finished | AgentStatus::Error + ) + } + /// Elapsed time since the agent was first observed. pub fn elapsed(&self) -> std::time::Duration { self.started_at diff --git a/crates/loopal-session/tests/suite.rs b/crates/loopal-session/tests/suite.rs index df8312b2..5d39bfe1 100644 --- a/crates/loopal-session/tests/suite.rs +++ b/crates/loopal-session/tests/suite.rs @@ -15,8 +15,8 @@ mod controller_test; mod event_handler_edge_test; #[path = "suite/event_handler_test.rs"] mod event_handler_test; -#[path = "suite/inbox_test.rs"] -mod inbox_test; +#[path = "suite/is_idle_test.rs"] +mod is_idle_test; #[path = "suite/message_log_test.rs"] mod message_log_test; #[path = "suite/projection_convert_test.rs"] @@ -31,3 +31,5 @@ mod retry_banner_test; mod rewind_test; #[path = "suite/topology_test.rs"] mod topology_test; +#[path = "suite/user_display_test.rs"] +mod user_display_test; diff --git a/crates/loopal-session/tests/suite/agent_routing_test.rs b/crates/loopal-session/tests/suite/agent_routing_test.rs index 4fd1c750..e2cc1633 100644 --- a/crates/loopal-session/tests/suite/agent_routing_test.rs +++ b/crates/loopal-session/tests/suite/agent_routing_test.rs @@ -79,7 +79,7 @@ fn root_events_route_to_main_agent() { apply_event(&mut state, AgentEvent::root(AgentEventPayload::Finished)); // "main" exists (created at init), root events route there assert!(state.agents.contains_key("main")); - assert!(state.agents["main"].conversation.agent_idle); + assert!(state.agents["main"].is_idle()); } /// SubAgentSpawned event creates an AgentViewState entry with topology info. diff --git a/crates/loopal-session/tests/suite/controller_async_test.rs b/crates/loopal-session/tests/suite/controller_async_test.rs index cf8ef465..8354b88b 100644 --- a/crates/loopal-session/tests/suite/controller_async_test.rs +++ b/crates/loopal-session/tests/suite/controller_async_test.rs @@ -1,7 +1,7 @@ //! Async tests for SessionController interaction methods (channels). -use loopal_protocol::ControlCommand; use loopal_protocol::{AgentEvent, AgentEventPayload, UserQuestionResponse}; +use loopal_protocol::{AgentStatus, ControlCommand}; use loopal_session::SessionController; use tokio::sync::mpsc; @@ -64,21 +64,30 @@ async fn test_deny_permission() { } #[tokio::test] -async fn test_enqueue_message_forwards_when_idle() { +async fn test_append_user_display() { let (ctrl, _, _) = make_controller(); - ctrl.lock().active_conversation_mut().agent_idle = true; - let result = ctrl.enqueue_message("hello".into()); - assert_eq!(result.map(|c| c.text), Some("hello".to_string())); + let content = loopal_protocol::UserContent::from("hello"); + ctrl.append_user_display(&content); + let state = ctrl.lock(); + let conv = state.active_conversation(); + assert_eq!(conv.messages.last().unwrap().role, "user"); + assert_eq!(conv.messages.last().unwrap().content, "hello"); } #[tokio::test] -async fn test_enqueue_message_queues_when_busy() { +async fn test_append_user_display_does_not_change_status() { let (ctrl, _, _) = make_controller(); - ctrl.lock().active_conversation_mut().agent_idle = false; - - let result = ctrl.enqueue_message("queued".into()); - assert!(result.is_none()); - assert_eq!(ctrl.lock().inbox.len(), 1); + // Agent state is internal — TUI display ops never change it. + ctrl.lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::Running; + let content = loopal_protocol::UserContent::from("queued"); + ctrl.append_user_display(&content); + // Status unchanged — only agent events drive status transitions. + assert!(!ctrl.lock().is_active_agent_idle()); } #[tokio::test] @@ -136,7 +145,6 @@ async fn test_clear() { let state = ctrl.lock(); let conv = state.active_conversation(); assert!(conv.messages.is_empty()); - assert!(state.inbox.is_empty()); assert!(conv.streaming_text.is_empty()); assert_eq!(conv.turn_count, 0); assert_eq!(conv.input_tokens, 0); diff --git a/crates/loopal-session/tests/suite/controller_edge_test.rs b/crates/loopal-session/tests/suite/controller_edge_test.rs deleted file mode 100644 index 9de6da52..00000000 --- a/crates/loopal-session/tests/suite/controller_edge_test.rs +++ /dev/null @@ -1,78 +0,0 @@ -//! Edge case tests for SessionController: token usage, mode, errors, inbox. - -use loopal_protocol::{AgentEvent, AgentEventPayload}; - -use super::controller_test::make_controller; - -#[test] -fn test_token_usage() { - let (ctrl, _, _) = make_controller(); - ctrl.handle_event(AgentEvent::root(AgentEventPayload::TokenUsage { - input_tokens: 100, - output_tokens: 50, - context_window: 200_000, - cache_creation_input_tokens: 0, - cache_read_input_tokens: 0, - thinking_tokens: 0, - })); - - let state = ctrl.lock(); - let conv = state.active_conversation(); - assert_eq!(conv.input_tokens, 100); - assert_eq!(conv.output_tokens, 50); - assert_eq!(conv.context_window, 200_000); - assert_eq!(conv.token_count(), 150); -} - -#[test] -fn test_mode_changed() { - let (ctrl, _, _) = make_controller(); - ctrl.handle_event(AgentEvent::root(AgentEventPayload::ModeChanged { - mode: "plan".to_string(), - })); - // ModeChanged updates observable.mode on the agent - assert_eq!(ctrl.lock().agents["main"].observable.mode, "plan"); -} - -#[test] -fn test_error_event() { - let (ctrl, _, _) = make_controller(); - ctrl.handle_event(AgentEvent::root(AgentEventPayload::Error { - message: "bad".to_string(), - })); - - let state = ctrl.lock(); - let conv = state.active_conversation(); - assert_eq!(conv.messages.len(), 1); - assert_eq!(conv.messages[0].role, "error"); -} - -#[test] -fn test_push_system_message() { - let (ctrl, _, _) = make_controller(); - ctrl.push_system_message("hello".to_string()); - - let state = ctrl.lock(); - let conv = state.active_conversation(); - assert_eq!(conv.messages.len(), 1); - assert_eq!(conv.messages[0].role, "system"); - assert_eq!(conv.messages[0].content, "hello"); -} - -#[test] -fn test_pop_inbox_to_edit() { - let (ctrl, _, _) = make_controller(); - ctrl.lock().inbox.push("first".into()); - ctrl.lock().inbox.push("second".into()); - - assert_eq!( - ctrl.pop_inbox_to_edit().map(|c| c.text), - Some("second".to_string()) - ); - assert_eq!(ctrl.lock().inbox.len(), 1); - assert_eq!( - ctrl.pop_inbox_to_edit().map(|c| c.text), - Some("first".to_string()) - ); - assert!(ctrl.pop_inbox_to_edit().is_none()); -} diff --git a/crates/loopal-session/tests/suite/controller_test.rs b/crates/loopal-session/tests/suite/controller_test.rs index 2d45b012..3847d5df 100644 --- a/crates/loopal-session/tests/suite/controller_test.rs +++ b/crates/loopal-session/tests/suite/controller_test.rs @@ -36,11 +36,10 @@ fn test_initial_state() { let conv = state.active_conversation(); assert!(conv.messages.is_empty()); assert!(conv.streaming_text.is_empty()); - assert!(!conv.agent_idle); + assert!(!state.is_active_agent_idle()); assert_eq!(conv.turn_count, 0); assert_eq!(conv.token_count(), 0); assert!(conv.pending_permission.is_none()); - assert!(state.inbox.is_empty()); } #[test] @@ -75,31 +74,14 @@ fn test_awaiting_input_flushes_streaming() { assert_eq!(conv.messages[0].role, "assistant"); assert_eq!(conv.messages[0].content, "response"); assert_eq!(conv.turn_count, 1); - assert!(conv.agent_idle); + assert!(state.is_active_agent_idle()); } #[test] -fn test_awaiting_input_forwards_inbox() { +fn test_awaiting_input_stays_idle_with_no_messages() { let (ctrl, _, _) = make_controller(); - ctrl.lock().inbox.push("queued msg".into()); - - let forwarded = ctrl.handle_event(AgentEvent::root(AgentEventPayload::AwaitingInput)); - assert_eq!(forwarded.map(|c| c.text), Some("queued msg".to_string())); - - let state = ctrl.lock(); - let conv = state.active_conversation(); - assert!(!conv.agent_idle); - assert!(state.inbox.is_empty()); - assert_eq!(conv.messages.last().unwrap().role, "user"); - assert_eq!(conv.messages.last().unwrap().content, "queued msg"); -} - -#[test] -fn test_awaiting_input_no_inbox_stays_idle() { - let (ctrl, _, _) = make_controller(); - let forwarded = ctrl.handle_event(AgentEvent::root(AgentEventPayload::AwaitingInput)); - assert!(forwarded.is_none()); - assert!(ctrl.lock().active_conversation().agent_idle); + ctrl.handle_event(AgentEvent::root(AgentEventPayload::AwaitingInput)); + assert!(ctrl.lock().is_active_agent_idle()); } #[test] diff --git a/crates/loopal-session/tests/suite/event_handler_test.rs b/crates/loopal-session/tests/suite/event_handler_test.rs index 2af07ac8..d3f753de 100644 --- a/crates/loopal-session/tests/suite/event_handler_test.rs +++ b/crates/loopal-session/tests/suite/event_handler_test.rs @@ -1,6 +1,6 @@ //! Tests for event_handler: apply_event, unified routing, MessageRouted recording. -use loopal_protocol::{AgentEvent, AgentEventPayload, ImageAttachment, UserContent}; +use loopal_protocol::{AgentEvent, AgentEventPayload}; use loopal_session::event_handler::apply_event; use loopal_session::state::SessionState; @@ -83,26 +83,23 @@ fn test_apply_event_records_message_routed_to_agent_logs() { } #[test] -fn test_awaiting_input_forwards_inbox() { +fn test_awaiting_input_sets_idle() { let mut state = make_state(); - state.inbox.push("queued msg".into()); - let forward = apply_event( + apply_event( &mut state, AgentEvent::root(AgentEventPayload::AwaitingInput), ); - assert_eq!(forward.map(|c| c.text), Some("queued msg".to_string())); - assert!(!conv!(state).agent_idle); // Immediately busy again + assert!(state.agents["main"].is_idle()); } #[test] fn test_awaiting_input_no_inbox_stays_idle() { let mut state = make_state(); - let forward = apply_event( + apply_event( &mut state, AgentEvent::root(AgentEventPayload::AwaitingInput), ); - assert!(forward.is_none()); - assert!(conv!(state).agent_idle); + assert!(state.agents["main"].is_idle()); } #[test] @@ -135,7 +132,7 @@ fn test_error_flushes_streaming() { fn test_finished_marks_idle() { let mut state = make_state(); apply_event(&mut state, AgentEvent::root(AgentEventPayload::Finished)); - assert!(conv!(state).agent_idle); + assert!(state.agents["main"].is_idle()); } #[test] @@ -171,28 +168,11 @@ fn test_mode_changed_updates_mode() { } #[test] -fn test_try_forward_inbox_with_images() { +fn test_idle_event_does_not_forward_messages() { let mut state = make_state(); - let content = UserContent { - text: "look at this".to_string(), - images: vec![ImageAttachment { - media_type: "image/png".to_string(), - data: "iVBORw0KGgo=".to_string(), - }], - skill_info: None, - }; - state.inbox.push(content); - let forward = apply_event( + apply_event( &mut state, AgentEvent::root(AgentEventPayload::AwaitingInput), ); - let forwarded = forward.expect("should forward inbox content"); - assert_eq!(forwarded.text, "look at this"); - assert_eq!(forwarded.images.len(), 1); - assert_eq!(forwarded.images[0].media_type, "image/png"); - let display = conv!(state).messages.last().unwrap(); - assert_eq!(display.role, "user"); - assert!(display.content.contains("[+1 image(s)]")); - assert_eq!(display.image_count, 1); - assert!(!conv!(state).agent_idle); + assert!(state.agents["main"].is_idle()); } diff --git a/crates/loopal-session/tests/suite/inbox_test.rs b/crates/loopal-session/tests/suite/inbox_test.rs deleted file mode 100644 index 4847667f..00000000 --- a/crates/loopal-session/tests/suite/inbox_test.rs +++ /dev/null @@ -1,61 +0,0 @@ -//! Tests for Inbox queue. - -use loopal_session::inbox::Inbox; - -#[test] -fn test_inbox_new_is_empty() { - let inbox = Inbox::new(); - assert!(inbox.is_empty()); - assert_eq!(inbox.len(), 0); -} - -#[test] -fn test_push_and_pop_front() { - let mut inbox = Inbox::new(); - inbox.push("first".into()); - inbox.push("second".into()); - assert_eq!(inbox.len(), 2); - - assert_eq!(inbox.pop_front().map(|c| c.text), Some("first".to_string())); - assert_eq!( - inbox.pop_front().map(|c| c.text), - Some("second".to_string()) - ); - assert!(inbox.is_empty()); -} - -#[test] -fn test_pop_back() { - let mut inbox = Inbox::new(); - inbox.push("first".into()); - inbox.push("second".into()); - - assert_eq!(inbox.pop_back().map(|c| c.text), Some("second".to_string())); - assert_eq!(inbox.len(), 1); -} - -#[test] -fn test_clear() { - let mut inbox = Inbox::new(); - inbox.push("a".into()); - inbox.push("b".into()); - inbox.clear(); - assert!(inbox.is_empty()); -} - -#[test] -fn test_iter() { - let mut inbox = Inbox::new(); - inbox.push("a".into()); - inbox.push("b".into()); - - let items: Vec<&str> = inbox.iter().map(|c| c.text.as_str()).collect(); - assert_eq!(items, vec!["a", "b"]); -} - -#[test] -fn test_pop_empty() { - let mut inbox = Inbox::new(); - assert!(inbox.pop_front().is_none()); - assert!(inbox.pop_back().is_none()); -} diff --git a/crates/loopal-session/tests/suite/is_idle_test.rs b/crates/loopal-session/tests/suite/is_idle_test.rs new file mode 100644 index 00000000..4a9ccabf --- /dev/null +++ b/crates/loopal-session/tests/suite/is_idle_test.rs @@ -0,0 +1,38 @@ +//! Tests for AgentViewState::is_idle() correctness mapping. +//! +//! Exhaustively verifies each AgentStatus variant maps to the correct +//! idle/busy classification. is_idle() replaces the former `agent_idle` flag. + +use loopal_protocol::AgentStatus; +use loopal_session::state::AgentViewState; + +fn view_with_status(status: AgentStatus) -> AgentViewState { + let mut view = AgentViewState::default(); + view.observable.status = status; + view +} + +#[test] +fn test_is_idle_returns_true_for_waiting_for_input() { + assert!(view_with_status(AgentStatus::WaitingForInput).is_idle()); +} + +#[test] +fn test_is_idle_returns_true_for_finished() { + assert!(view_with_status(AgentStatus::Finished).is_idle()); +} + +#[test] +fn test_is_idle_returns_true_for_error() { + assert!(view_with_status(AgentStatus::Error).is_idle()); +} + +#[test] +fn test_is_idle_returns_false_for_starting() { + assert!(!view_with_status(AgentStatus::Starting).is_idle()); +} + +#[test] +fn test_is_idle_returns_false_for_running() { + assert!(!view_with_status(AgentStatus::Running).is_idle()); +} diff --git a/crates/loopal-session/tests/suite/resume_display_test.rs b/crates/loopal-session/tests/suite/resume_display_test.rs index d1d160bb..0513e193 100644 --- a/crates/loopal-session/tests/suite/resume_display_test.rs +++ b/crates/loopal-session/tests/suite/resume_display_test.rs @@ -41,7 +41,7 @@ fn test_load_sub_agent_history_creates_agent_entry() { assert_eq!(agent.session_id.as_deref(), Some("sub-sid")); assert_eq!(agent.observable.model, "gpt-4"); assert_eq!(agent.observable.status, AgentStatus::Finished); - assert!(agent.conversation.agent_idle); + assert!(agent.is_idle()); assert_eq!(agent.conversation.messages.len(), 1); assert_eq!(agent.conversation.messages[0].content, "sub-agent response"); } diff --git a/crates/loopal-session/tests/suite/user_display_test.rs b/crates/loopal-session/tests/suite/user_display_test.rs new file mode 100644 index 00000000..86e13aa8 --- /dev/null +++ b/crates/loopal-session/tests/suite/user_display_test.rs @@ -0,0 +1,87 @@ +//! Tests for append_user_display() — status preservation and image handling. + +use loopal_protocol::UserQuestionResponse; +use loopal_protocol::{AgentStatus, ControlCommand, ImageAttachment, UserContent}; +use loopal_session::SessionController; +use tokio::sync::mpsc; + +fn make_controller() -> SessionController { + let (control_tx, _control_rx) = mpsc::channel::(16); + let (perm_tx, _perm_rx) = mpsc::channel::(16); + let (question_tx, _question_rx) = mpsc::channel::(16); + SessionController::new( + "test-model".to_string(), + "act".to_string(), + control_tx, + perm_tx, + question_tx, + Default::default(), + std::sync::Arc::new(tokio::sync::watch::channel(0u64).0), + ) +} + +#[tokio::test] +async fn test_append_user_display_preserves_idle_status() { + let ctrl = make_controller(); + ctrl.lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; + + ctrl.append_user_display(&UserContent::from("hello")); + assert!(ctrl.lock().is_active_agent_idle()); +} + +#[tokio::test] +async fn test_append_user_display_preserves_running_status() { + let ctrl = make_controller(); + ctrl.lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::Running; + + ctrl.append_user_display(&UserContent::from("queued")); + assert!(!ctrl.lock().is_active_agent_idle()); +} + +#[tokio::test] +async fn test_append_user_display_with_images() { + let ctrl = make_controller(); + let content = UserContent { + text: "check this".to_string(), + images: vec![ + ImageAttachment { + media_type: "image/png".to_string(), + data: "AAAA".to_string(), + }, + ImageAttachment { + media_type: "image/jpeg".to_string(), + data: "BBBB".to_string(), + }, + ], + skill_info: None, + }; + ctrl.append_user_display(&content); + + let state = ctrl.lock(); + let conv = state.active_conversation(); + let msg = conv.messages.last().unwrap(); + assert!(msg.content.contains("[+2 image(s)]")); + assert_eq!(msg.image_count, 2); +} + +#[tokio::test] +async fn test_append_user_display_no_images() { + let ctrl = make_controller(); + ctrl.append_user_display(&UserContent::from("just text")); + + let state = ctrl.lock(); + let conv = state.active_conversation(); + let msg = conv.messages.last().unwrap(); + assert!(!msg.content.contains("image")); + assert_eq!(msg.image_count, 0); +} diff --git a/crates/loopal-tui/src/app/mod.rs b/crates/loopal-tui/src/app/mod.rs index a29063e3..40092e25 100644 --- a/crates/loopal-tui/src/app/mod.rs +++ b/crates/loopal-tui/src/app/mod.rs @@ -127,19 +127,6 @@ impl App { }) } - /// Pop the last Inbox message back into the input field for editing. - /// Returns true if a message was popped. - pub fn pop_inbox_to_input(&mut self) -> bool { - if let Some(content) = self.session.pop_inbox_to_edit() { - self.input = content.text; - self.pending_images = content.images; - self.input_cursor = self.input.len(); - true - } else { - false - } - } - /// Attach an image to the current pending input. pub fn attach_image(&mut self, attachment: ImageAttachment) { self.pending_images.push(attachment); diff --git a/crates/loopal-tui/src/command/mod.rs b/crates/loopal-tui/src/command/mod.rs index 684de7ef..8787db0c 100644 --- a/crates/loopal-tui/src/command/mod.rs +++ b/crates/loopal-tui/src/command/mod.rs @@ -23,7 +23,7 @@ use crate::app::App; pub enum CommandEffect { /// Command completed all work internally (e.g. push_system_message, open SubPage). Done, - /// Push expanded content into the inbox for the agent. + /// Push expanded content to the agent for processing. InboxPush(UserContent), /// Switch agent mode (plan / act). ModeSwitch(AgentMode), diff --git a/crates/loopal-tui/src/command/rewind_cmd.rs b/crates/loopal-tui/src/command/rewind_cmd.rs index e4385d19..d1245c30 100644 --- a/crates/loopal-tui/src/command/rewind_cmd.rs +++ b/crates/loopal-tui/src/command/rewind_cmd.rs @@ -24,7 +24,7 @@ impl CommandHandler for RewindCmd { fn open_rewind_picker(app: &mut App) { let state = app.session.lock(); let conv = state.active_conversation(); - if !conv.agent_idle { + if !state.is_active_agent_idle() { drop(state); app.session .push_system_message("Cannot rewind while the agent is busy.".into()); diff --git a/crates/loopal-tui/src/input/actions.rs b/crates/loopal-tui/src/input/actions.rs index 301e4e22..4af61dc8 100644 --- a/crates/loopal-tui/src/input/actions.rs +++ b/crates/loopal-tui/src/input/actions.rs @@ -19,7 +19,7 @@ pub enum SubPageResult { pub enum InputAction { /// No action needed None, - /// User message queued into Inbox for forwarding to agent + /// User message ready for forwarding to agent InboxPush(UserContent), /// User wants to quit Quit, diff --git a/crates/loopal-tui/src/input/editing.rs b/crates/loopal-tui/src/input/editing.rs index 89db5b32..69dd87bc 100644 --- a/crates/loopal-tui/src/input/editing.rs +++ b/crates/loopal-tui/src/input/editing.rs @@ -64,7 +64,7 @@ pub(super) fn handle_ctrl_c(app: &mut App) -> InputAction { } else if app.focused_agent.is_some() { app.focused_agent = None; InputAction::None - } else if !app.session.lock().active_conversation().agent_idle { + } else if !app.session.lock().is_active_agent_idle() { InputAction::Interrupt } else { InputAction::None diff --git a/crates/loopal-tui/src/input/navigation.rs b/crates/loopal-tui/src/input/navigation.rs index 694c3e38..61ca93a4 100644 --- a/crates/loopal-tui/src/input/navigation.rs +++ b/crates/loopal-tui/src/input/navigation.rs @@ -39,10 +39,8 @@ pub(super) fn handle_up(app: &mut App) -> InputAction { app.input_cursor = new_cursor; return InputAction::None; } - // Fall back to inbox pop / history - if app.pop_inbox_to_input() { - // Popped last Inbox message back into input for editing - } else if !app.input_history.is_empty() { + // Fall back to history browse + if !app.input_history.is_empty() { let idx = match app.history_index { None => app.input_history.len() - 1, Some(i) if i > 0 => i - 1, @@ -91,7 +89,7 @@ pub(super) fn handle_esc(app: &mut App) -> InputAction { tracing::info!(view = %active_view, "ESC: exit agent view (not root)"); return InputAction::ExitAgentView; } - let is_idle = app.session.lock().active_conversation().agent_idle; + let is_idle = app.session.lock().is_active_agent_idle(); if !is_idle { tracing::info!("ESC: agent busy, sending interrupt"); return InputAction::Interrupt; diff --git a/crates/loopal-tui/src/key_dispatch_ops.rs b/crates/loopal-tui/src/key_dispatch_ops.rs index d5094f9c..06a910d3 100644 --- a/crates/loopal-tui/src/key_dispatch_ops.rs +++ b/crates/loopal-tui/src/key_dispatch_ops.rs @@ -20,13 +20,10 @@ pub(crate) async fn push_to_inbox(app: &mut App, content: UserContent) { }; app.input_history.push(history_text); app.history_index = None; - if let Some(msg) = app.session.enqueue_message(content) { - tracing::debug!("TUI: message forwarded to agent"); - app.session.route_message(msg).await; - } else { - tracing::debug!("TUI: agent busy, message queued + interrupt sent"); - app.session.interrupt(); - } + // Optimistic display update, then deliver directly to agent mailbox. + // Agent state (idle/busy) is derived from agent events, not set here. + app.session.append_user_display(&content); + app.session.route_message(content).await; } pub(crate) async fn handle_effect(app: &mut App, effect: CommandEffect) -> bool { diff --git a/crates/loopal-tui/src/render.rs b/crates/loopal-tui/src/render.rs index cef88cc0..6d431636 100644 --- a/crates/loopal-tui/src/render.rs +++ b/crates/loopal-tui/src/render.rs @@ -12,8 +12,7 @@ pub fn draw(f: &mut Frame, app: &mut App) { let size = f.area(); let state = app.session.lock(); - let inbox_count = state.inbox.len(); - let pw = input_view::prefix_width(inbox_count, app.pending_image_count()); + let pw = input_view::prefix_width(app.pending_image_count()); let input_h = input_view::input_height(&app.input, size.width, pw); let conv = state.active_conversation(); let banner_h = views::retry_banner::banner_height(&conv.retry_banner); @@ -125,7 +124,7 @@ pub fn draw(f: &mut Frame, app: &mut App) { let pending_question = conv.pending_question.clone(); let topology_data = if app.show_topology { use loopal_protocol::AgentStatus; - let root_status = if conv.agent_idle { + let root_status = if state.is_active_agent_idle() { AgentStatus::WaitingForInput } else { AgentStatus::Running @@ -146,7 +145,6 @@ pub fn draw(f: &mut Frame, app: &mut App) { f, &app.input, app.input_cursor, - inbox_count, image_count, app.input_scroll, layout.input, diff --git a/crates/loopal-tui/src/tui_loop.rs b/crates/loopal-tui/src/tui_loop.rs index 8c8fc23c..8fee6ac2 100644 --- a/crates/loopal-tui/src/tui_loop.rs +++ b/crates/loopal-tui/src/tui_loop.rs @@ -77,9 +77,7 @@ where { load_resumed_display(app, session_id); } - if let Some(content) = app.session.handle_event(agent_event) { - app.session.route_message(content).await; - } + app.session.handle_event(agent_event); } AppEvent::Paste(result) => { paste::apply_paste_result(app, result); diff --git a/crates/loopal-tui/src/views/input_view.rs b/crates/loopal-tui/src/views/input_view.rs index 7ae9dbaa..33ce1677 100644 --- a/crates/loopal-tui/src/views/input_view.rs +++ b/crates/loopal-tui/src/views/input_view.rs @@ -19,7 +19,6 @@ pub fn render_input( f: &mut Frame, input: &str, cursor: usize, - inbox_count: usize, image_count: usize, input_scroll: usize, area: Rect, @@ -28,7 +27,7 @@ pub fn render_input( return; } - let prefix = build_prefix(inbox_count, image_count); + let prefix = build_prefix(image_count); let prefix_width = display_width(&prefix); let content_width = (area.width as usize).saturating_sub(prefix_width); @@ -71,8 +70,8 @@ pub fn input_height(input: &str, area_width: u16, prefix_width: usize) -> u16 { } /// Calculate the prefix display width (exported for layout computation). -pub fn prefix_width(inbox_count: usize, image_count: usize) -> usize { - display_width(&build_prefix(inbox_count, image_count)) +pub fn prefix_width(image_count: usize) -> usize { + display_width(&build_prefix(image_count)) } // --- Internal helpers --- @@ -138,12 +137,11 @@ fn set_cursor( f.set_cursor_position((x, y)); } -fn build_prefix(inbox_count: usize, image_count: usize) -> String { - match (inbox_count > 0, image_count > 0) { - (true, true) => format!("> [img:{image_count}] ({inbox_count} queued) "), - (true, false) => format!("> ({inbox_count} queued) "), - (false, true) => format!("> [img:{image_count}] "), - (false, false) => "> ".to_string(), +fn build_prefix(image_count: usize) -> String { + if image_count > 0 { + format!("> [img:{image_count}] ") + } else { + "> ".to_string() } } @@ -184,14 +182,13 @@ mod tests { #[test] fn test_prefix_variants() { - assert_eq!(build_prefix(0, 0), "> "); - assert_eq!(build_prefix(2, 0), "> (2 queued) "); - assert_eq!(build_prefix(0, 1), "> [img:1] "); - assert_eq!(build_prefix(3, 2), "> [img:2] (3 queued) "); + assert_eq!(build_prefix(0), "> "); + assert_eq!(build_prefix(1), "> [img:1] "); + assert_eq!(build_prefix(2), "> [img:2] "); } #[test] fn test_prefix_width_fn() { - assert_eq!(prefix_width(0, 0), 2); + assert_eq!(prefix_width(0), 2); } } diff --git a/crates/loopal-tui/src/views/unified_status.rs b/crates/loopal-tui/src/views/unified_status.rs index 3ee4c1a4..e43dde18 100644 --- a/crates/loopal-tui/src/views/unified_status.rs +++ b/crates/loopal-tui/src/views/unified_status.rs @@ -100,7 +100,7 @@ fn status_icon_and_label( Style::default().fg(Color::Yellow), "Waiting", ) - } else if !conv.agent_idle { + } else if !state.is_active_agent_idle() { let frame = spinner_frame(elapsed); ( frame.to_string(), @@ -131,7 +131,7 @@ pub fn spinner_frame(elapsed: std::time::Duration) -> &'static str { fn is_agent_active(state: &SessionState) -> bool { let conv = state.active_conversation(); - !conv.agent_idle + !state.is_active_agent_idle() || !conv.streaming_text.is_empty() || conv.thinking_active || has_live_subagents(state) diff --git a/crates/loopal-tui/tests/suite/app_test.rs b/crates/loopal-tui/tests/suite/app_test.rs index 336e7a06..56864ea2 100644 --- a/crates/loopal-tui/tests/suite/app_test.rs +++ b/crates/loopal-tui/tests/suite/app_test.rs @@ -65,50 +65,20 @@ fn test_submit_input_returns_text_and_resets() { #[test] fn test_awaiting_input_sets_idle() { let (app, _, _) = make_app(); - assert!(!app.session.lock().active_conversation().agent_idle); + assert!(!app.session.lock().is_active_agent_idle()); app.session .handle_event(AgentEvent::root(AgentEventPayload::AwaitingInput)); - assert!(app.session.lock().active_conversation().agent_idle); + assert!(app.session.lock().is_active_agent_idle()); } #[test] -fn test_awaiting_input_forwards_inbox_message() { +fn test_awaiting_input_does_not_auto_forward() { let (app, _, _) = make_app(); - { - let mut state = app.session.lock(); - state.inbox.push("queued".into()); - } - // AwaitingInput sets idle=true then tries forward - let forwarded = app - .session + // AwaitingInput no longer auto-forwards — messages go directly to agent mailbox. + app.session .handle_event(AgentEvent::root(AgentEventPayload::AwaitingInput)); - assert_eq!(forwarded.map(|c| c.text), Some("queued".to_string())); let state = app.session.lock(); - let conv = state.active_conversation(); - assert!(!conv.agent_idle); // forwarding clears idle - assert!(state.inbox.is_empty()); - assert_eq!(conv.messages.last().unwrap().role, "user"); - assert_eq!(conv.messages.last().unwrap().content, "queued"); -} - -#[test] -fn test_pop_inbox_to_input() { - let (mut app, _, _) = make_app(); - { - let mut state = app.session.lock(); - state.inbox.push("first".into()); - state.inbox.push("second".into()); - } - assert!(app.pop_inbox_to_input()); - assert_eq!(app.input, "second"); - assert_eq!(app.input_cursor, 6); - assert_eq!(app.session.lock().inbox.len(), 1); -} - -#[test] -fn test_pop_inbox_empty_returns_false() { - let (mut app, _, _) = make_app(); - assert!(!app.pop_inbox_to_input()); + assert!(state.is_active_agent_idle()); } fn sample_image(label: &str) -> ImageAttachment { diff --git a/crates/loopal-tui/tests/suite/command_edge_test.rs b/crates/loopal-tui/tests/suite/command_edge_test.rs index 637a6e26..88fba1f4 100644 --- a/crates/loopal-tui/tests/suite/command_edge_test.rs +++ b/crates/loopal-tui/tests/suite/command_edge_test.rs @@ -1,6 +1,6 @@ /// Command edge cases: handler effects, skill expansion, sub-page open. use loopal_config::Skill; -use loopal_protocol::{AgentMode, ControlCommand, UserQuestionResponse}; +use loopal_protocol::{AgentMode, AgentStatus, ControlCommand, UserQuestionResponse}; use loopal_session::SessionController; use loopal_tui::app::App; @@ -97,7 +97,7 @@ async fn test_rewind_on_idle_opens_sub_page() { let mut app = make_app(); { let mut state = app.session.lock(); - state.active_conversation_mut().agent_idle = true; + state.agents.get_mut("main").unwrap().observable.status = AgentStatus::WaitingForInput; state .active_conversation_mut() .messages @@ -118,7 +118,13 @@ async fn test_rewind_on_idle_opens_sub_page() { async fn test_rewind_on_busy_agent_shows_error() { let mut app = make_app(); { - app.session.lock().active_conversation_mut().agent_idle = false; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::Running; } let handler = app.command_registry.find("/rewind").unwrap(); handler.execute(&mut app, None).await; diff --git a/crates/loopal-tui/tests/suite/input_scroll_edge_test.rs b/crates/loopal-tui/tests/suite/input_scroll_edge_test.rs index 5e4e571d..c7033f4f 100644 --- a/crates/loopal-tui/tests/suite/input_scroll_edge_test.rs +++ b/crates/loopal-tui/tests/suite/input_scroll_edge_test.rs @@ -2,8 +2,8 @@ /// Ctrl+P/N scroll reset, and multiline boundary fall-through to history. use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; -use loopal_protocol::ControlCommand; use loopal_protocol::UserQuestionResponse; +use loopal_protocol::{AgentStatus, ControlCommand}; use loopal_session::SessionController; use loopal_tui::app::App; use loopal_tui::input::handle_key; @@ -78,7 +78,13 @@ fn test_esc_preserves_scroll_offset() { #[test] fn test_up_navigates_history_when_agent_busy() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = false; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::Running; app.input_history.push("prev".into()); handle_key(&mut app, key(KeyCode::Up)); assert_eq!( diff --git a/crates/loopal-tui/tests/suite/input_scroll_test.rs b/crates/loopal-tui/tests/suite/input_scroll_test.rs index a7eef8ca..8a7382f7 100644 --- a/crates/loopal-tui/tests/suite/input_scroll_test.rs +++ b/crates/loopal-tui/tests/suite/input_scroll_test.rs @@ -10,8 +10,8 @@ /// Ctrl+P/N always navigate history regardless of scroll state. use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; -use loopal_protocol::ControlCommand; use loopal_protocol::UserQuestionResponse; +use loopal_protocol::{AgentStatus, ControlCommand}; use loopal_session::SessionController; use loopal_tui::app::App; use loopal_tui::input::{InputAction, handle_key}; @@ -83,7 +83,13 @@ fn test_down_resets_scroll_and_navigates_history() { #[test] fn test_up_navigates_history_when_idle() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; app.input_history.push("older".into()); app.input_history.push("recent".into()); handle_key(&mut app, key(KeyCode::Up)); @@ -94,7 +100,13 @@ fn test_up_navigates_history_when_idle() { #[test] fn test_down_navigates_history_forward() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; app.input_history.push("first".into()); app.input_history.push("second".into()); handle_key(&mut app, key(KeyCode::Up)); @@ -107,7 +119,13 @@ fn test_down_navigates_history_forward() { #[test] fn test_up_navigates_history_when_content_fits() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; app.input_history.push("previous command".into()); let action = handle_key(&mut app, key(KeyCode::Up)); assert!(matches!(action, InputAction::None)); @@ -120,7 +138,13 @@ fn test_up_navigates_history_when_content_fits() { #[test] fn test_ctrl_p_navigates_history() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; app.input_history.push("first".into()); app.input_history.push("second".into()); handle_key(&mut app, ctrl('p')); @@ -131,7 +155,13 @@ fn test_ctrl_p_navigates_history() { #[test] fn test_ctrl_n_navigates_history_forward() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; app.input_history.push("first".into()); app.input_history.push("second".into()); handle_key(&mut app, ctrl('p')); diff --git a/crates/loopal-tui/tests/suite/input_test.rs b/crates/loopal-tui/tests/suite/input_test.rs index bcb86b2a..0c041a9a 100644 --- a/crates/loopal-tui/tests/suite/input_test.rs +++ b/crates/loopal-tui/tests/suite/input_test.rs @@ -1,7 +1,7 @@ /// Input handling tests: key routing priority chain + basic interactions. use crossterm::event::{KeyCode, KeyEvent, KeyModifiers}; -use loopal_protocol::{ControlCommand, UserQuestionResponse}; +use loopal_protocol::{AgentStatus, ControlCommand, UserQuestionResponse}; use loopal_session::SessionController; use loopal_tui::app::App; @@ -50,7 +50,13 @@ fn test_ctrl_c_clears_input_when_non_empty() { #[test] fn test_ctrl_c_interrupts_when_agent_busy() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = false; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::Running; let action = handle_key(&mut app, ctrl('c')); assert!(matches!(action, InputAction::Interrupt)); } @@ -58,7 +64,13 @@ fn test_ctrl_c_interrupts_when_agent_busy() { #[test] fn test_ctrl_c_noop_when_idle_and_empty() { let mut app = make_app(); - app.session.lock().active_conversation_mut().agent_idle = true; + app.session + .lock() + .agents + .get_mut("main") + .unwrap() + .observable + .status = AgentStatus::WaitingForInput; let action = handle_key(&mut app, ctrl('c')); assert!(matches!(action, InputAction::None)); }