diff --git a/gitbooks/developing/architecture/desktop-companion.md b/gitbooks/developing/architecture/desktop-companion.md new file mode 100644 index 0000000000..1ead03aca3 --- /dev/null +++ b/gitbooks/developing/architecture/desktop-companion.md @@ -0,0 +1,129 @@ +--- +description: Desktop companion domain — Clicky-style interaction loop tying hotkey, voice, screen intelligence, LLM, TTS, and visual pointing into a single product experience. +icon: robot +--- + +# Desktop Companion (`src/openhuman/desktop_companion/`) + +The desktop companion orchestrates a Clicky-style interaction loop: hotkey activation, microphone capture, screen context, LLM reasoning, speech synthesis, and visual pointing. It reuses existing building blocks rather than reimplementing them. + +## Building blocks + +| Module | What it provides | Path | +|--------|-----------------|------| +| **screen_intelligence** | Permission-gated capture sessions, `capture_now()`, `VisionSummary`, `AppContextInfo` | `src/openhuman/screen_intelligence/` | +| **voice** | Hotkey listener (push/tap), audio capture, cloud STT (Whisper), TTS (`reply_speech`) | `src/openhuman/voice/` | +| **meet_agent** | LLM orchestration pattern (STT -> LLM -> TTS), WAV packing | `src/openhuman/meet_agent/` | +| **overlay** | Floating UI surface, attention events, typewriter bubbles | `src/openhuman/overlay/` | +| **provider_surfaces** | Connected-app event queue (`ingest_event`, `list_queue`) | `src/openhuman/provider_surfaces/` | +| **accessibility** | Foreground app context (`foreground_context()`) | `src/openhuman/accessibility/` | + +## Module layout + +```text +src/openhuman/desktop_companion/ + mod.rs — module exports (light) + types.rs — CompanionState enum, CompanionConfig, ConversationTurn, session param/result types + session.rs — singleton session lifecycle, state machine, TTL, conversation history + pipeline.rs — STT -> screen context -> LLM -> TTS -> pointing orchestration + pointing.rs — [POINT:x,y:label:screenN] tag parser, multi-monitor coordinate mapping + handoff.rs — provider-surface queue matching for connected-app actions + bus.rs — broadcast channel for CompanionStateChangedEvent + schemas.rs — RPC controllers (companion_start_session, companion_stop_session, etc.) +``` + +## State machine + +```text +Idle -> Listening -> Thinking -> Speaking -> Pointing -> Idle + | | + v v + Listening Listening (interrupt) + +Any state -> Error -> Idle (reset) +``` + +Valid transitions are enforced by `session::is_valid_transition()`. Key paths: + +- **Happy path**: Idle -> Listening -> Thinking -> Speaking -> Pointing -> Idle +- **No pointing**: Thinking -> Speaking -> Idle (no POINT tags in response) +- **Interrupt**: Speaking/Pointing -> Listening (user re-activates hotkey) +- **Cancel**: Thinking -> Idle (user cancels mid-think) +- **Error recovery**: Any -> Error -> Idle + +## Interaction pipeline + +`pipeline.rs` orchestrates a single turn: + +1. **Activation** — state transitions to Listening (will be driven by Tauri shell hotkey bridge in PR 2) +2. **STT** — audio samples transcribed via `voice::cloud_transcribe` (Whisper) +3. **Screen context** — `accessibility::foreground_context()` for app name + window title +4. **LLM** — chat-completions via `BackendOAuthClient` with system prompt, screen context, and rolling conversation history (last 20 turns as context) +5. **Parse response** — extract `[POINT:x,y:label:screenN]` tags via `pointing::parse_and_map()` +6. **Handoff check** — scan response for provider keywords, match against `provider_surfaces` queue +7. **TTS** — synthesize speech via `voice::reply_speech` (ElevenLabs) +8. **Pointing** — emit pointing targets for overlay animation +9. **Return to Idle** + +The pipeline supports cancellation via `CancellationToken` — the Tauri shell can cancel at any checkpoint (between STT, LLM, TTS stages). + +Text input is also supported via `run_text_turn()` which skips STT. + +## Session lifecycle + +- **One session at a time** — enforced by a process-global `Mutex>` +- **Consent required** — `start_session` rejects `consent=false` +- **TTL enforcement** — sessions auto-expire when `status()` detects elapsed TTL +- **Conversation history** — capped at 50 turns, oldest drained on overflow + +## RPC surface + +Namespace: `companion`. All methods go through the standard controller registry. + +| Method | Description | +|--------|-------------| +| `companion_start_session` | Start a session with explicit consent + optional TTL | +| `companion_stop_session` | End the active session | +| `companion_status` | Current state, session info, remaining TTL | +| `companion_config_get` | Read companion configuration | +| `companion_config_set` | Update companion configuration | + +## Event bus + +`CompanionStateChangedEvent` is broadcast via a `tokio::sync::broadcast` channel (same pattern as `overlay::bus`). Three `DomainEvent` variants route to the `"companion"` domain: + +- `CompanionSessionStarted { session_id }` +- `CompanionStateChanged { session_id, state, previous_state }` +- `CompanionSessionEnded { session_id, reason }` + +## Pointing system + +LLM responses can embed `[POINT:x,y:label:screenN]` tags. `pointing.rs`: + +- Parses tags via regex +- Maps screen-relative coordinates to absolute desktop coordinates using `ScreenGeometry` +- Clamps coordinates to screen bounds +- Falls back to screen 0 when the index is out of range +- Strips tags from display text + +## Provider-surface handoff + +`handoff.rs` scans the clean LLM response text for provider keywords (slack, discord, telegram, etc.) and matches them against items in the `provider_surfaces` queue. When matches are found, `HandoffEvent`s are included in `TurnResult` for the Tauri shell / overlay to surface. + +## Platform scope + +- **macOS**: Full support — hotkey, screen capture, pointing, TTS, overlay +- **Windows/Linux**: Partial — hotkey works (rdev), screen context stubbed, no pointing + +Platform-specific code is gated with `#[cfg(target_os = "macos")]`. + +## Testing + +| File | Coverage | +|------|----------| +| `session_tests.rs` | Session CRUD, state machine transitions, TTL, consent, conversation history | +| `pipeline_tests.rs` | Turn orchestration, cancellation, input validation, system prompt | +| `pointing_tests.rs` | Tag parsing, coordinate mapping, multi-monitor, edge cases | +| `handoff.rs` (inline) | Keyword matching, empty queue, provider coverage | +| `schemas.rs` (inline) | Controller count, schema field validation | +| `tests/json_rpc_e2e.rs` | Full RPC round-trip: start -> status -> config -> stop | diff --git a/src/core/all.rs b/src/core/all.rs index fd85224ac3..995e79c7d4 100644 --- a/src/core/all.rs +++ b/src/core/all.rs @@ -235,6 +235,10 @@ fn build_registered_controllers() -> Vec { controllers.extend(crate::openhuman::meet::all_meet_registered_controllers()); // Live meet-agent loop: STT/LLM/TTS over the open call's audio. controllers.extend(crate::openhuman::meet_agent::all_meet_agent_registered_controllers()); + // Desktop companion — Clicky-style interaction loop. + controllers.extend( + crate::openhuman::desktop_companion::all_desktop_companion_registered_controllers(), + ); // Structured WhatsApp Web data — agent-facing read-only controllers (list/search). // The write-path ingest controller is registered separately in build_internal_only_controllers. controllers.extend(crate::openhuman::whatsapp_data::all_whatsapp_data_registered_controllers()); @@ -330,6 +334,8 @@ fn build_declared_controller_schemas() -> Vec { schemas.extend(crate::openhuman::meet::all_meet_controller_schemas()); // Live meet-agent listening + speaking loop schemas.extend(crate::openhuman::meet_agent::all_meet_agent_controller_schemas()); + // Desktop companion — Clicky-style interaction loop. + schemas.extend(crate::openhuman::desktop_companion::all_desktop_companion_controller_schemas()); // Structured WhatsApp Web data — local SQLite store, agent-queryable schemas.extend(crate::openhuman::whatsapp_data::all_whatsapp_data_controller_schemas()); schemas @@ -438,6 +444,9 @@ pub fn namespace_description(namespace: &str) -> Option<&'static str> { "whatsapp_data" => Some( "Structured WhatsApp conversation and message store — list chats, read messages, and search across WhatsApp Web data.", ), + "companion" => Some( + "Desktop companion — Clicky-style hotkey-driven interaction loop with STT, LLM, TTS, and visual pointing.", + ), _ => None, } } diff --git a/src/core/event_bus/events.rs b/src/core/event_bus/events.rs index 76c009f951..18087217eb 100644 --- a/src/core/event_bus/events.rs +++ b/src/core/event_bus/events.rs @@ -415,6 +415,22 @@ pub enum DomainEvent { rebuilt_at: f64, }, + // ── Desktop Companion ────────────────────────────────────────────── + /// A desktop companion session was started. + CompanionSessionStarted { session_id: String, ttl_secs: u64 }, + /// The companion transitioned to a new state. + CompanionStateChanged { + session_id: String, + state: String, + previous_state: String, + }, + /// A desktop companion session ended. + CompanionSessionEnded { + session_id: String, + reason: String, + turn_count: usize, + }, + // ── System lifecycle ──────────────────────────────────────────────── /// A system component started up. SystemStartup { component: String }, @@ -511,6 +527,10 @@ impl DomainEvent { Self::NotificationIngested { .. } | Self::NotificationTriaged { .. } => "notification", + Self::CompanionSessionStarted { .. } + | Self::CompanionStateChanged { .. } + | Self::CompanionSessionEnded { .. } => "companion", + Self::SystemStartup { .. } | Self::SystemShutdown { .. } | Self::SystemRestartRequested { .. } diff --git a/src/openhuman/about_app/catalog.rs b/src/openhuman/about_app/catalog.rs index a04c8a52f5..b1feaa79a0 100644 --- a/src/openhuman/about_app/catalog.rs +++ b/src/openhuman/about_app/catalog.rs @@ -1103,6 +1103,30 @@ const CAPABILITIES: &[Capability] = &[ status: CapabilityStatus::Beta, privacy: GITHUB_RELEASES_METADATA, }, + // ── Desktop Companion ──────────────────────────────────────────── + Capability { + id: "companion.session", + name: "Desktop Companion Session", + domain: "desktop_companion", + category: CapabilityCategory::ScreenIntelligence, + description: "Start a Clicky-style companion session that ties hotkey activation, \ + microphone capture, screen context, LLM reasoning, speech synthesis, \ + and visual pointing into a single interaction loop.", + how_to: "Settings > Companion, or activate via the configured hotkey.", + status: CapabilityStatus::Beta, + privacy: DERIVED_TO_BACKEND, + }, + Capability { + id: "companion.pointing", + name: "Visual Pointing", + domain: "desktop_companion", + category: CapabilityCategory::ScreenIntelligence, + description: "The companion LLM can embed [POINT:x,y:label:screenN] tags to \ + visually point at UI elements on screen via the overlay.", + how_to: "Automatic during companion sessions when the LLM identifies a UI target.", + status: CapabilityStatus::Beta, + privacy: None, + }, ]; static VALIDATED: OnceLock<()> = OnceLock::new(); diff --git a/src/openhuman/desktop_companion/bus.rs b/src/openhuman/desktop_companion/bus.rs new file mode 100644 index 0000000000..a946f78c82 --- /dev/null +++ b/src/openhuman/desktop_companion/bus.rs @@ -0,0 +1,81 @@ +//! Broadcast bus for companion state change events. +//! +//! Follows the same pattern as `overlay::bus`: a process-global +//! `tokio::sync::broadcast` channel so any module can subscribe. +//! The Socket.IO bridge (PR 2) will forward these to the overlay +//! as `companion:state_changed` events. + +use once_cell::sync::Lazy; +use tokio::sync::broadcast; + +use super::types::CompanionStateChangedEvent; + +const LOG_PREFIX: &str = "[desktop_companion]"; + +static STATE_BUS: Lazy> = Lazy::new(|| { + let (tx, _rx) = broadcast::channel(64); + tx +}); + +/// Subscribe to companion state change events. +pub fn subscribe_state_changed() -> broadcast::Receiver { + STATE_BUS.subscribe() +} + +/// Publish a state change event. +/// +/// Fire-and-forget: if nobody is subscribed the event is dropped. +pub fn publish_state_changed(event: CompanionStateChangedEvent) -> usize { + log::debug!( + "{LOG_PREFIX} state_changed session={} {} -> {}", + event.session_id, + event.previous_state, + event.state, + ); + match STATE_BUS.send(event) { + Ok(n) => n, + Err(_) => { + log::debug!("{LOG_PREFIX} no subscribers — state change dropped"); + 0 + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::openhuman::desktop_companion::types::CompanionState; + + #[tokio::test] + async fn publish_is_received_by_subscriber() { + // STATE_BUS is process-global — other tests may publish events. + // We filter by session_id to avoid flakiness. + let mut rx = subscribe_state_changed(); + let delivered = publish_state_changed(CompanionStateChangedEvent { + session_id: "bus-test-unique".into(), + state: CompanionState::Listening, + previous_state: CompanionState::Idle, + message: None, + }); + assert!(delivered >= 1); + // Drain until we find our specific event (others may have been published concurrently). + loop { + let event = rx.recv().await.expect("event delivered"); + if event.session_id == "bus-test-unique" { + assert_eq!(event.state, CompanionState::Listening); + assert_eq!(event.previous_state, CompanionState::Idle); + break; + } + } + } + + #[test] + fn publish_with_no_subscribers_is_safe() { + let _ = publish_state_changed(CompanionStateChangedEvent { + session_id: "test".into(), + state: CompanionState::Idle, + previous_state: CompanionState::Error, + message: Some("recovered".into()), + }); + } +} diff --git a/src/openhuman/desktop_companion/handoff.rs b/src/openhuman/desktop_companion/handoff.rs new file mode 100644 index 0000000000..9ac482cb9c --- /dev/null +++ b/src/openhuman/desktop_companion/handoff.rs @@ -0,0 +1,219 @@ +//! Provider-surface handoff — routes companion-identified connected-app +//! actions to the provider_surfaces queue. +//! +//! When the companion LLM response mentions a specific connected app +//! (e.g. "reply to the Slack message"), this module checks whether a +//! matching [`RespondQueueItem`] exists in the provider_surfaces queue +//! and emits a [`HandoffEvent`] through the companion event bus. +//! +//! This is intentionally light-touch: the provider_surfaces domain is +//! scaffolding-complete but behaviorally incomplete — we wire the +//! plumbing so it works when the surface is ready. + +use log::debug; +use serde::{Deserialize, Serialize}; + +use crate::openhuman::provider_surfaces::store; +use crate::openhuman::provider_surfaces::types::RespondQueueItem; + +const LOG_PREFIX: &str = "[companion_handoff]"; + +/// Known provider keywords the LLM might reference. +const PROVIDER_KEYWORDS: &[(&str, &str)] = &[ + ("slack", "slack"), + ("discord", "discord"), + ("telegram", "telegram"), + ("whatsapp", "whatsapp"), + ("imessage", "imessage"), + ("email", "gmail"), + ("gmail", "gmail"), + ("google meet", "google-meet"), +]; + +/// A handoff event emitted when the companion identifies a provider action. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HandoffEvent { + /// The provider name that was matched (e.g. "slack"). + pub provider: String, + /// Queue items from the provider_surfaces queue that match. + pub matching_items: Vec, + /// The original LLM response text that triggered the handoff. + pub response_text: String, +} + +/// Check the LLM response for provider references and match against the +/// provider_surfaces queue. Returns a list of handoff events (usually 0 or 1). +pub fn check_handoff(response_text: &str) -> Vec { + if response_text.is_empty() { + return Vec::new(); + } + + let queue_items = store::list_queue_items(); + + if queue_items.is_empty() { + debug!("{LOG_PREFIX} no items in provider queue, skipping handoff check"); + return Vec::new(); + } + + check_handoff_with_items(response_text, &queue_items) +} + +/// Pure matching logic: match provider keywords against response text and the +/// given queue items. Extracted so tests can exercise the positive path without +/// depending on global store state. +pub(crate) fn check_handoff_with_items( + response_text: &str, + queue_items: &[RespondQueueItem], +) -> Vec { + let lower = response_text.to_lowercase(); + let mut events = Vec::new(); + + // Split response into tokens once for word-boundary matching. + let tokens: Vec<&str> = lower + .split(|c: char| !c.is_alphanumeric() && c != '-') + .filter(|s| !s.is_empty()) + .collect(); + + for &(keyword, provider_id) in PROVIDER_KEYWORDS { + // Token-aware match: single-word keywords use exact token match to avoid + // substring false positives (e.g. "slacking" won't match "slack"). + // Multi-word keywords (like "google meet") fall back to substring match. + let matched = if keyword.contains(' ') { + lower.contains(keyword) + } else { + tokens.iter().any(|t| *t == keyword) + }; + if !matched { + continue; + } + + // Deduplicate: skip if we already emitted an event for this provider + // (e.g. "email" and "gmail" both map to provider_id "gmail"). + if events + .iter() + .any(|e: &HandoffEvent| e.provider == provider_id) + { + debug!("{LOG_PREFIX} skipping duplicate provider={provider_id} (already matched)"); + continue; + } + + let matching: Vec = queue_items + .iter() + .filter(|item| item.provider.to_lowercase() == provider_id) + .cloned() + .collect(); + + if matching.is_empty() { + debug!("{LOG_PREFIX} keyword '{keyword}' found but no queue items for provider={provider_id}"); + continue; + } + + debug!( + "{LOG_PREFIX} handoff: keyword='{keyword}' provider={provider_id} matches={}", + matching.len() + ); + + events.push(HandoffEvent { + provider: provider_id.to_string(), + matching_items: matching, + response_text: response_text.to_string(), + }); + } + + events +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_queue_item(provider: &str) -> RespondQueueItem { + RespondQueueItem { + id: "test-id".into(), + provider: provider.into(), + account_id: "acct".into(), + event_kind: "message".into(), + entity_id: "ent".into(), + thread_id: None, + title: None, + snippet: None, + sender_name: None, + sender_handle: None, + timestamp: "2026-01-01T00:00:00Z".into(), + deep_link: None, + requires_attention: true, + status: String::new(), + } + } + + #[test] + fn check_handoff_empty_response() { + assert!(check_handoff("").is_empty()); + } + + #[test] + fn check_handoff_no_keywords() { + let events = check_handoff("Please click the save button."); + assert!(events.is_empty()); + } + + #[test] + fn check_handoff_keyword_but_empty_queue() { + // Queue is empty by default in tests. + let events = check_handoff("Reply to the Slack message from Alice."); + assert!(events.is_empty()); + } + + #[test] + fn check_handoff_with_items_emits_event() { + let items = vec![make_queue_item("slack")]; + let events = check_handoff_with_items("Reply to the Slack message", &items); + assert_eq!(events.len(), 1); + assert_eq!(events[0].provider, "slack"); + assert_eq!(events[0].matching_items.len(), 1); + } + + #[test] + fn check_handoff_with_items_deduplicates_gmail() { + // "email" and "gmail" both map to provider_id "gmail" — should emit once. + let items = vec![make_queue_item("gmail")]; + let events = check_handoff_with_items("Forward the email from Gmail to the team", &items); + assert_eq!(events.len(), 1); + assert_eq!(events[0].provider, "gmail"); + } + + #[test] + fn check_handoff_with_items_no_substring_false_positive() { + // "slacking" should NOT match "slack". + let items = vec![make_queue_item("slack")]; + let events = check_handoff_with_items("Stop slacking off", &items); + assert!(events.is_empty()); + } + + #[test] + fn check_handoff_with_items_multi_word_keyword() { + let items = vec![make_queue_item("google-meet")]; + let events = check_handoff_with_items("Join the Google Meet call", &items); + assert_eq!(events.len(), 1); + assert_eq!(events[0].provider, "google-meet"); + } + + #[test] + fn provider_keywords_cover_known_providers() { + let providers: Vec<&str> = PROVIDER_KEYWORDS.iter().map(|(_, p)| *p).collect(); + assert!(providers.contains(&"slack")); + assert!(providers.contains(&"discord")); + assert!(providers.contains(&"telegram")); + assert!(providers.contains(&"whatsapp")); + assert!(providers.contains(&"gmail")); + } + + #[test] + fn provider_keywords_case_insensitive_match() { + // Route through production matcher to verify case handling. + let items = vec![make_queue_item("slack")]; + let events = check_handoff_with_items("Check your SLACK messages", &items); + assert_eq!(events.len(), 1); + assert_eq!(events[0].provider, "slack"); + } +} diff --git a/src/openhuman/desktop_companion/mod.rs b/src/openhuman/desktop_companion/mod.rs new file mode 100644 index 0000000000..821c96070b --- /dev/null +++ b/src/openhuman/desktop_companion/mod.rs @@ -0,0 +1,26 @@ +//! Desktop companion domain — Clicky-style interaction loop. +//! +//! Ties hotkey activation, microphone capture, screen context, LLM +//! reasoning, speech synthesis, and visual pointing into a single +//! product experience. Orchestrates existing building blocks: +//! +//! - `screen_intelligence` — permission-gated capture sessions +//! - `voice` — hotkey, STT, TTS pipelines +//! - `meet_agent` — LLM orchestration patterns +//! - `overlay` — floating UI surface +//! - `provider_surfaces` — connected-app event queues +//! +//! This module is export-focused. Operational code lives in `session.rs`, +//! `pipeline.rs`, and `pointing.rs`. + +pub mod bus; +pub mod handoff; +pub mod pipeline; +pub mod pointing; +pub mod schemas; +pub mod session; +pub mod types; + +pub use schemas::{ + all_desktop_companion_controller_schemas, all_desktop_companion_registered_controllers, +}; diff --git a/src/openhuman/desktop_companion/pipeline.rs b/src/openhuman/desktop_companion/pipeline.rs new file mode 100644 index 0000000000..eac88202cb --- /dev/null +++ b/src/openhuman/desktop_companion/pipeline.rs @@ -0,0 +1,415 @@ +//! Companion interaction pipeline: STT → screen context → LLM → TTS → pointing. +//! +//! Orchestrates a single interaction turn for the desktop companion. Reuses +//! the same cloud STT, LLM, and TTS backends that `meet_agent::brain` uses, +//! but adds screenshot + foreground-app context and POINT-tag parsing for +//! visual pointing. +//! +//! The pipeline is cancellable via [`tokio_util::sync::CancellationToken`] so +//! the Tauri shell can interrupt mid-turn (e.g. user presses hotkey again +//! during Speaking). + +use base64::{engine::general_purpose::STANDARD as B64, Engine as _}; +use log::{debug, info, warn}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use tokio_util::sync::CancellationToken; + +use super::handoff::{self, HandoffEvent}; +use super::pointing::{self, PointTarget, PointingParseResult, ScreenGeometry}; +use super::session; +use super::types::*; + +const LOG_PREFIX: &str = "[companion_pipeline]"; + +/// Maximum tokens for the companion LLM reply. Longer than meet_agent (220) +/// because the companion can give richer answers when not constrained to +/// live-meeting brevity. +const REPLY_MAX_TOKENS: u32 = 512; + +/// Rolling conversation context window (number of turns). +const CONTEXT_WINDOW: usize = 20; + +/// ElevenLabs TTS model for companion speech. +const TTS_MODEL_ID: &str = "eleven_turbo_v2_5"; + +/// Result of a single companion interaction turn. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TurnResult { + /// The user's transcribed speech (or typed input). + pub transcript: String, + /// The LLM's response text (POINT tags stripped). + pub response_text: String, + /// Parsed pointing targets from the LLM response. + pub targets: Vec, + /// Whether TTS audio was synthesized and enqueued. + pub tts_synthesized: bool, + /// Provider-surface handoff events detected in the response. + pub handoff_events: Vec, + /// Whether the turn was cancelled before completion. + pub cancelled: bool, +} + +/// Run a text-input companion turn (no STT needed — the user typed their query). +/// +/// **Precondition**: the session must already be in `Listening` state. The +/// caller (e.g. Tauri hotkey bridge or `companion_activate` RPC) is +/// responsible for the `Idle → Listening` transition before invoking this. +/// +/// Transitions: Listening → Thinking → Speaking/Pointing → Idle. +pub async fn run_text_turn( + text: &str, + screens: &[ScreenGeometry], + cancel: CancellationToken, +) -> Result { + let trimmed = text.trim(); + if trimmed.is_empty() { + return Err("empty input".into()); + } + + info!("{LOG_PREFIX} text turn start chars={}", trimmed.len()); + + // Transition to Thinking. + session::transition_state(CompanionState::Thinking, None)?; + + if cancel.is_cancelled() { + return Ok(cancelled_result(trimmed)); + } + + // Gather conversation history. + let history = session::conversation_history(); + let history_window = tail_history(&history, CONTEXT_WINDOW); + + // Screen context (best-effort — skip if unavailable). + let screen_context = gather_screen_context().await; + + if cancel.is_cancelled() { + return Ok(cancelled_result(trimmed)); + } + + // LLM call. + let raw_reply = match llm_companion(trimmed, &history_window, screen_context.as_deref()).await { + Ok(reply) => reply, + Err(err) => { + warn!("{LOG_PREFIX} LLM failed err={err}"); + session::transition_state(CompanionState::Error, Some(format!("LLM failure: {err}")))?; + return Err(format!("LLM failure: {err}")); + } + }; + + if cancel.is_cancelled() { + return Ok(cancelled_result(trimmed)); + } + + // Parse POINT tags. + let PointingParseResult { + targets, + clean_text, + } = pointing::parse_and_map(&raw_reply, screens); + + debug!( + "{LOG_PREFIX} LLM reply chars={} targets={}", + clean_text.len(), + targets.len() + ); + + // Check for provider-surface handoff opportunities. + let handoff_events = handoff::check_handoff(&clean_text); + if !handoff_events.is_empty() { + debug!("{LOG_PREFIX} handoff events={}", handoff_events.len()); + } + + // Record conversation turns. + let now_ms = chrono::Utc::now().timestamp_millis(); + let _ = session::push_conversation_turn(ConversationTurn { + role: "user".into(), + content: trimmed.to_string(), + timestamp_ms: now_ms, + }); + if !clean_text.is_empty() { + let _ = session::push_conversation_turn(ConversationTurn { + role: "assistant".into(), + content: clean_text.clone(), + timestamp_ms: now_ms, + }); + } + + // TTS (skip if response is empty). + let tts_ok = if !clean_text.is_empty() && !cancel.is_cancelled() { + session::transition_state(CompanionState::Speaking, None)?; + match tts(&clean_text).await { + Ok(_samples) => { + debug!("{LOG_PREFIX} TTS synthesized samples={}", _samples.len()); + true + } + Err(err) => { + warn!("{LOG_PREFIX} TTS failed err={err} (continuing without audio)"); + false + } + } + } else { + false + }; + + if cancel.is_cancelled() { + return Ok(cancelled_result(trimmed)); + } + + // Pointing phase. + if !targets.is_empty() { + let _ = session::transition_state(CompanionState::Pointing, None); + } + + // Back to idle. + let _ = session::transition_state(CompanionState::Idle, None); + + let result = TurnResult { + transcript: trimmed.to_string(), + response_text: clean_text, + targets, + tts_synthesized: tts_ok, + handoff_events, + cancelled: false, + }; + + info!("{LOG_PREFIX} text turn done"); + Ok(result) +} + +/// Run a full audio-input companion turn: STT → screen context → LLM → TTS → pointing. +/// +/// **Precondition**: the session must already be in `Listening` state. The +/// caller (e.g. Tauri hotkey bridge or `companion_activate` RPC) is +/// responsible for the `Idle → Listening` transition before invoking this. +/// +/// Transitions: Listening → Thinking → Speaking/Pointing → Idle. +pub async fn run_audio_turn( + audio_samples: &[i16], + sample_rate: u32, + screens: &[ScreenGeometry], + cancel: CancellationToken, +) -> Result { + if audio_samples.is_empty() { + return Err("no audio samples".into()); + } + + info!( + "{LOG_PREFIX} audio turn start samples={} rate={sample_rate}", + audio_samples.len() + ); + + // Check cancellation before expensive STT call. + if cancel.is_cancelled() { + return Ok(cancelled_result("")); + } + + // STT. + let transcript = match stt(audio_samples, sample_rate).await { + Ok(text) if text.trim().is_empty() => { + info!("{LOG_PREFIX} STT returned empty transcript, skipping turn"); + let _ = session::transition_state(CompanionState::Idle, None); + return Ok(TurnResult { + transcript: String::new(), + response_text: String::new(), + targets: Vec::new(), + tts_synthesized: false, + handoff_events: Vec::new(), + cancelled: false, + }); + } + Ok(text) => text, + Err(err) => { + warn!("{LOG_PREFIX} STT failed err={err}"); + session::transition_state(CompanionState::Error, Some(format!("STT failure: {err}")))?; + return Err(format!("STT failure: {err}")); + } + }; + + debug!("{LOG_PREFIX} STT transcript chars={}", transcript.len()); + + // Hand off to the text pipeline for the rest. + run_text_turn(&transcript, screens, cancel).await +} + +// ─── Real adapters ────────────────────────────────────────────────── + +/// Transcribe audio samples to text via cloud STT. +async fn stt(samples: &[i16], sample_rate: u32) -> Result { + use crate::openhuman::voice::cloud_transcribe::{transcribe_cloud, CloudTranscribeOptions}; + + let config = crate::openhuman::config::ops::load_config_with_timeout().await?; + let wav_bytes = crate::openhuman::meet_agent::wav::pack_pcm16le_mono_wav(samples, sample_rate); + let audio_b64 = B64.encode(&wav_bytes); + let opts = CloudTranscribeOptions { + mime_type: Some("audio/wav".to_string()), + file_name: Some("companion.wav".to_string()), + ..Default::default() + }; + let outcome = transcribe_cloud(&config, &audio_b64, &opts).await?; + Ok(outcome.value.text.clone()) +} + +/// System prompt for the desktop companion LLM. +const COMPANION_SYSTEM_PROMPT: &str = "\ +You are OpenHuman, a helpful desktop AI companion. The user is talking to you \ +via voice or text while using their computer. You can see their screen \ +(a screenshot and foreground app info may be provided).\n\ +\n\ +Guidelines:\n\ +- Be concise and conversational. 1-3 sentences unless the question demands more.\n\ +- When you want to point the user to a UI element on screen, embed a \ +`[POINT:x,y:label:screenN]` tag in your response where x,y are pixel \ +coordinates relative to the screen, label describes the element, and N is \ +the zero-based screen index.\n\ +- Do not use markdown formatting (no asterisks, backticks, or bullet markers) — \ +your response will be spoken aloud via TTS.\n\ +- If screen context is provided, reference what you see when relevant.\n\ +- If you don't know or can't help, say so briefly.\n\ +"; + +/// Build a chat-completions request with screen context and conversation +/// history, then return the assistant's reply. +async fn llm_companion( + prompt: &str, + history: &[&ConversationTurn], + screen_context: Option<&str>, +) -> Result { + use crate::api::config::effective_backend_api_url; + use crate::api::jwt::get_session_token; + use crate::api::BackendOAuthClient; + use reqwest::Method; + + let config = crate::openhuman::config::ops::load_config_with_timeout().await?; + let token = get_session_token(&config) + .map_err(|e| e.to_string())? + .filter(|t| !t.trim().is_empty()) + .ok_or_else(|| "no backend session token".to_string())?; + + let api_url = effective_backend_api_url(&config.api_url); + let client = BackendOAuthClient::new(&api_url).map_err(|e| e.to_string())?; + + let mut messages: Vec = Vec::with_capacity(history.len() + 3); + + // System prompt with optional screen context. + let system = if let Some(ctx) = screen_context { + format!( + "{COMPANION_SYSTEM_PROMPT}\n\ + Current screen context:\n{ctx}" + ) + } else { + COMPANION_SYSTEM_PROMPT.to_string() + }; + messages.push(json!({ "role": "system", "content": system })); + + // Rolling conversation history. + for turn in history { + messages.push(json!({ "role": turn.role, "content": turn.content })); + } + + // Current user message. + messages.push(json!({ "role": "user", "content": prompt })); + + let body = json!({ + "model": "agentic-v1", + "temperature": 0.5, + "max_tokens": REPLY_MAX_TOKENS, + "messages": messages, + }); + + let raw = client + .authed_json( + &token, + Method::POST, + "/openai/v1/chat/completions", + Some(body), + ) + .await + .map_err(|e| e.to_string())?; + + extract_chat_completion_text(&raw) + .ok_or_else(|| format!("unexpected chat completions response: {raw}")) +} + +/// Synthesize speech from text via cloud TTS. Returns raw PCM16LE samples. +async fn tts(text: &str) -> Result, String> { + use crate::openhuman::voice::reply_speech::{synthesize_reply, ReplySpeechOptions}; + + let config = crate::openhuman::config::ops::load_config_with_timeout().await?; + let voice_settings = json!({ + "stability": 0.4, + "similarity_boost": 0.75, + "style": 0.35, + "use_speaker_boost": true, + }); + let opts = ReplySpeechOptions { + output_format: Some("pcm_16000".to_string()), + model_id: Some(TTS_MODEL_ID.to_string()), + voice_settings: Some(voice_settings), + ..Default::default() + }; + let outcome = synthesize_reply(&config, text, &opts).await?; + let pcm_bytes = B64 + .decode(outcome.value.audio_base64.as_bytes()) + .map_err(|e| format!("decode tts base64: {e}"))?; + if pcm_bytes.len() % 2 != 0 { + return Err(format!("odd byte length from tts: {}", pcm_bytes.len())); + } + Ok(pcm_bytes + .chunks_exact(2) + .map(|c| i16::from_le_bytes([c[0], c[1]])) + .collect()) +} + +/// Gather screen context (foreground app info) as a text summary. +/// Returns `None` if screen intelligence is unavailable. +async fn gather_screen_context() -> Option { + #[cfg(target_os = "macos")] + { + let context = crate::openhuman::accessibility::foreground_context(); + context.map(|ctx| { + format!( + "App: {} | Window: {}", + ctx.app_name.as_deref().unwrap_or("unknown"), + ctx.window_title.as_deref().unwrap_or("unknown"), + ) + }) + } + #[cfg(not(target_os = "macos"))] + { + None + } +} + +fn extract_chat_completion_text(raw: &Value) -> Option { + raw.get("choices") + .and_then(|c| c.as_array()) + .and_then(|arr| arr.first()) + .and_then(|first| first.get("message")) + .and_then(|m| m.get("content")) + .and_then(|s| s.as_str()) + .map(|s| s.trim().to_string()) +} + +/// Take the last `n` turns from conversation history. +fn tail_history(history: &[ConversationTurn], n: usize) -> Vec<&ConversationTurn> { + let start = history.len().saturating_sub(n); + history[start..].iter().collect() +} + +fn cancelled_result(transcript: &str) -> TurnResult { + // Restore session to Idle so it doesn't stay stuck in Thinking/Speaking. + let _ = session::transition_state(CompanionState::Idle, None); + info!("{LOG_PREFIX} turn cancelled, restored session to Idle"); + TurnResult { + transcript: transcript.to_string(), + response_text: String::new(), + targets: Vec::new(), + tts_synthesized: false, + handoff_events: Vec::new(), + cancelled: true, + } +} + +#[cfg(test)] +#[path = "pipeline_tests.rs"] +mod tests; diff --git a/src/openhuman/desktop_companion/pipeline_tests.rs b/src/openhuman/desktop_companion/pipeline_tests.rs new file mode 100644 index 0000000000..3770011f9a --- /dev/null +++ b/src/openhuman/desktop_companion/pipeline_tests.rs @@ -0,0 +1,177 @@ +//! Tests for the companion interaction pipeline. +//! +//! These tests exercise the pipeline's orchestration logic — state +//! transitions, cancellation, conversation history, and POINT-tag +//! integration. Real STT/LLM/TTS calls are not made; the pipeline +//! falls back to stubs in a test environment (no backend token). + +use super::*; +use crate::openhuman::desktop_companion::pointing::ScreenGeometry; +use crate::openhuman::desktop_companion::session; +use crate::openhuman::desktop_companion::types::*; + +use std::sync::Mutex as StdMutex; + +/// Serialize tests that touch the process-global session state. +static TEST_MUTEX: StdMutex<()> = StdMutex::new(()); + +fn lock_and_reset() -> std::sync::MutexGuard<'static, ()> { + let guard = TEST_MUTEX.lock().unwrap_or_else(|p| p.into_inner()); + session::reset_for_test(); + session::start_session(&StartCompanionSessionParams { + consent: true, + ttl_secs: Some(3600), + }) + .expect("session should start"); + guard +} + +fn single_screen() -> Vec { + vec![ScreenGeometry { + index: 0, + x: 0.0, + y: 0.0, + width: 1920.0, + height: 1080.0, + }] +} + +// ── Helper tests ───────────────────────────────────────────────────── + +#[test] +fn tail_history_returns_last_n() { + let turns: Vec = (0..10) + .map(|i| ConversationTurn { + role: "user".into(), + content: format!("turn {i}"), + timestamp_ms: i, + }) + .collect(); + let tail = tail_history(&turns, 3); + assert_eq!(tail.len(), 3); + assert_eq!(tail[0].content, "turn 7"); + assert_eq!(tail[2].content, "turn 9"); +} + +#[test] +fn tail_history_handles_small_history() { + let turns = vec![ConversationTurn { + role: "user".into(), + content: "only".into(), + timestamp_ms: 0, + }]; + let tail = tail_history(&turns, 10); + assert_eq!(tail.len(), 1); +} + +#[test] +fn tail_history_empty() { + let turns: Vec = Vec::new(); + let tail = tail_history(&turns, 5); + assert!(tail.is_empty()); +} + +#[test] +fn cancelled_result_has_correct_fields() { + let r = cancelled_result("hello"); + assert_eq!(r.transcript, "hello"); + assert!(r.response_text.is_empty()); + assert!(r.targets.is_empty()); + assert!(!r.tts_synthesized); + assert!(r.handoff_events.is_empty()); + assert!(r.cancelled); +} + +#[test] +fn extract_chat_completion_text_valid() { + let raw = json!({ + "choices": [{ "message": { "content": " Hello! " } }] + }); + assert_eq!( + extract_chat_completion_text(&raw), + Some("Hello!".to_string()) + ); +} + +#[test] +fn extract_chat_completion_text_empty_choices() { + assert_eq!( + extract_chat_completion_text(&json!({ "choices": [] })), + None + ); +} + +#[test] +fn extract_chat_completion_text_malformed() { + assert_eq!(extract_chat_completion_text(&json!({})), None); + assert_eq!(extract_chat_completion_text(&json!(42)), None); +} + +// ── Text turn tests ────────────────────────────────────────────────── + +#[tokio::test] +async fn text_turn_rejects_empty_input() { + let _guard = lock_and_reset(); + let cancel = CancellationToken::new(); + let result = run_text_turn("", &single_screen(), cancel).await; + assert!(result.is_err()); + assert!(result.unwrap_err().contains("empty")); + session::reset_for_test(); +} + +#[tokio::test] +async fn text_turn_rejects_whitespace_only() { + let _guard = lock_and_reset(); + let cancel = CancellationToken::new(); + let result = run_text_turn(" \n ", &single_screen(), cancel).await; + assert!(result.is_err()); + session::reset_for_test(); +} + +#[tokio::test] +async fn text_turn_cancellation_returns_cancelled() { + let _guard = lock_and_reset(); + let cancel = CancellationToken::new(); + cancel.cancel(); + // Transition to Listening first so Thinking is a valid transition. + session::transition_state(CompanionState::Listening, None).unwrap(); + let result = run_text_turn("hello", &single_screen(), cancel).await; + let turn = result.unwrap(); + assert!(turn.cancelled); + assert!(turn.response_text.is_empty()); + session::reset_for_test(); +} + +// ── Audio turn tests ───────────────────────────────────────────────── + +#[tokio::test] +async fn audio_turn_rejects_empty_samples() { + let _guard = lock_and_reset(); + let cancel = CancellationToken::new(); + let result = run_audio_turn(&[], 16_000, &single_screen(), cancel).await; + assert!(result.is_err()); + assert!(result.unwrap_err().contains("no audio")); + session::reset_for_test(); +} + +// ── Screen context ─────────────────────────────────────────────────── + +#[tokio::test] +async fn gather_screen_context_returns_option() { + let ctx = gather_screen_context().await; + // Just verify it doesn't panic — value depends on platform. + let _ = ctx; +} + +// ── System prompt ──────────────────────────────────────────────────── + +#[test] +fn companion_system_prompt_mentions_point_tags() { + assert!(COMPANION_SYSTEM_PROMPT.contains("[POINT:")); + assert!(COMPANION_SYSTEM_PROMPT.contains("screenN")); +} + +#[test] +fn companion_system_prompt_discourages_markdown() { + assert!(COMPANION_SYSTEM_PROMPT.contains("markdown")); +} diff --git a/src/openhuman/desktop_companion/pointing.rs b/src/openhuman/desktop_companion/pointing.rs new file mode 100644 index 0000000000..ab491cf81b --- /dev/null +++ b/src/openhuman/desktop_companion/pointing.rs @@ -0,0 +1,125 @@ +//! POINT tag parser and multi-monitor coordinate mapping. +//! +//! The companion LLM embeds `[POINT:x,y:label:screenN]` tags in its +//! response text (Clicky convention). This module extracts those tags, +//! maps screen-relative coordinates to absolute desktop coordinates +//! using monitor geometry, and strips the tags from the display text. + +use log::debug; +use serde::{Deserialize, Serialize}; + +const LOG_PREFIX: &str = "[companion_pointing]"; + +/// A parsed point target from the LLM response. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct PointTarget { + /// Screen-relative X coordinate (as emitted by the LLM). + pub x: f64, + /// Screen-relative Y coordinate. + pub y: f64, + /// Human-readable label for the target element. + pub label: String, + /// Zero-based screen index. + pub screen_index: usize, + /// Absolute desktop X after multi-monitor mapping. + pub absolute_x: f64, + /// Absolute desktop Y after multi-monitor mapping. + pub absolute_y: f64, +} + +/// Monitor geometry used for coordinate mapping. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScreenGeometry { + /// Zero-based index. + pub index: usize, + /// Left edge in absolute desktop coordinates. + pub x: f64, + /// Top edge in absolute desktop coordinates. + pub y: f64, + /// Width in points. + pub width: f64, + /// Height in points. + pub height: f64, +} + +/// Result of parsing POINT tags from an LLM response. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PointingParseResult { + /// Extracted point targets with mapped coordinates. + pub targets: Vec, + /// The response text with POINT tags stripped out. + pub clean_text: String, +} + +/// Lazily compiled POINT-tag regex. +fn point_tag_regex() -> &'static regex::Regex { + static RE: std::sync::OnceLock = std::sync::OnceLock::new(); + RE.get_or_init(|| { + regex::Regex::new(r"\[POINT:(-?\d+(?:\.\d+)?),(-?\d+(?:\.\d+)?):([^:\]]+):screen(\d+)\]") + .expect("companion POINT tag regex is static and valid") + }) +} + +/// Parse `[POINT:x,y:label:screenN]` tags from LLM response text and map +/// coordinates to absolute desktop positions using the given screen geometry. +pub fn parse_and_map(text: &str, screens: &[ScreenGeometry]) -> PointingParseResult { + let re = point_tag_regex(); + + let mut targets = Vec::new(); + let clean_text = re + .replace_all(text, |caps: ®ex::Captures| { + let x: f64 = caps[1].parse().unwrap_or(0.0); + let y: f64 = caps[2].parse().unwrap_or(0.0); + let label = caps[3].trim().to_string(); + let screen_index: usize = caps[4].parse().unwrap_or(0); + + let (abs_x, abs_y) = map_to_absolute(x, y, screen_index, screens); + + debug!( + "{LOG_PREFIX} parsed target: ({x},{y}) label=\"{label}\" screen{screen_index} -> abs({abs_x},{abs_y})" + ); + + targets.push(PointTarget { + x, + y, + label, + screen_index, + absolute_x: abs_x, + absolute_y: abs_y, + }); + + // Replace the tag with empty string in the display text. + String::new() + }) + .to_string(); + + PointingParseResult { + targets, + clean_text: clean_text.trim().to_string(), + } +} + +/// Map screen-relative coordinates to absolute desktop coordinates. +/// +/// If the screen index is out of range, falls back to screen 0 (primary). +/// Coordinates are clamped to screen bounds. +fn map_to_absolute(x: f64, y: f64, screen_index: usize, screens: &[ScreenGeometry]) -> (f64, f64) { + if screens.is_empty() { + return (x, y); + } + + let screen = screens + .iter() + .find(|s| s.index == screen_index) + .or_else(|| screens.first()) + .unwrap(); + + let clamped_x = x.clamp(0.0, screen.width); + let clamped_y = y.clamp(0.0, screen.height); + + (screen.x + clamped_x, screen.y + clamped_y) +} + +#[cfg(test)] +#[path = "pointing_tests.rs"] +mod tests; diff --git a/src/openhuman/desktop_companion/pointing_tests.rs b/src/openhuman/desktop_companion/pointing_tests.rs new file mode 100644 index 0000000000..3f5c93f972 --- /dev/null +++ b/src/openhuman/desktop_companion/pointing_tests.rs @@ -0,0 +1,170 @@ +//! Tests for POINT tag parsing and coordinate mapping. + +use super::*; + +fn single_screen() -> Vec { + vec![ScreenGeometry { + index: 0, + x: 0.0, + y: 0.0, + width: 1920.0, + height: 1080.0, + }] +} + +fn dual_screens() -> Vec { + vec![ + ScreenGeometry { + index: 0, + x: 0.0, + y: 0.0, + width: 1920.0, + height: 1080.0, + }, + ScreenGeometry { + index: 1, + x: 1920.0, + y: 0.0, + width: 2560.0, + height: 1440.0, + }, + ] +} + +// ── Basic parsing ───────────────────────────────────────────────────── + +#[test] +fn parse_single_point_tag() { + let text = "Click the button [POINT:100,200:Save Button:screen0] to save."; + let result = parse_and_map(text, &single_screen()); + + assert_eq!(result.targets.len(), 1); + assert_eq!(result.targets[0].x, 100.0); + assert_eq!(result.targets[0].y, 200.0); + assert_eq!(result.targets[0].label, "Save Button"); + assert_eq!(result.targets[0].screen_index, 0); + assert_eq!(result.clean_text, "Click the button to save."); +} + +#[test] +fn parse_multiple_point_tags() { + let text = "First [POINT:10,20:A:screen0] then [POINT:30,40:B:screen1] done."; + let result = parse_and_map(text, &dual_screens()); + + assert_eq!(result.targets.len(), 2); + assert_eq!(result.targets[0].label, "A"); + assert_eq!(result.targets[1].label, "B"); +} + +#[test] +fn parse_no_point_tags() { + let text = "No pointing needed here."; + let result = parse_and_map(text, &single_screen()); + + assert!(result.targets.is_empty()); + assert_eq!(result.clean_text, "No pointing needed here."); +} + +#[test] +fn parse_decimal_coordinates() { + let text = "[POINT:100.5,200.75:Pin:screen0]"; + let result = parse_and_map(text, &single_screen()); + + assert_eq!(result.targets[0].x, 100.5); + assert_eq!(result.targets[0].y, 200.75); +} + +#[test] +fn parse_negative_coordinates_clamped() { + let text = "[POINT:-50,-100:Off-screen:screen0]"; + let result = parse_and_map(text, &single_screen()); + + assert_eq!(result.targets[0].absolute_x, 0.0); + assert_eq!(result.targets[0].absolute_y, 0.0); +} + +// ── Multi-monitor mapping ───────────────────────────────────────────── + +#[test] +fn map_to_primary_screen() { + let text = "[POINT:500,300:Target:screen0]"; + let result = parse_and_map(text, &dual_screens()); + + assert_eq!(result.targets[0].absolute_x, 500.0); + assert_eq!(result.targets[0].absolute_y, 300.0); +} + +#[test] +fn map_to_secondary_screen() { + let text = "[POINT:500,300:Target:screen1]"; + let result = parse_and_map(text, &dual_screens()); + + // screen1 starts at x=1920 + assert_eq!(result.targets[0].absolute_x, 2420.0); + assert_eq!(result.targets[0].absolute_y, 300.0); +} + +#[test] +fn screen_index_out_of_range_falls_back_to_primary() { + let text = "[POINT:100,200:Target:screen5]"; + let result = parse_and_map(text, &single_screen()); + + // Falls back to screen 0 + assert_eq!(result.targets[0].absolute_x, 100.0); + assert_eq!(result.targets[0].absolute_y, 200.0); +} + +#[test] +fn coordinates_clamped_to_screen_bounds() { + let text = "[POINT:5000,3000:Far:screen0]"; + let result = parse_and_map(text, &single_screen()); + + assert_eq!(result.targets[0].absolute_x, 1920.0); + assert_eq!(result.targets[0].absolute_y, 1080.0); +} + +#[test] +fn empty_screens_returns_raw_coordinates() { + let text = "[POINT:100,200:Target:screen0]"; + let result = parse_and_map(text, &[]); + + assert_eq!(result.targets[0].absolute_x, 100.0); + assert_eq!(result.targets[0].absolute_y, 200.0); +} + +// ── Malformed tags ──────────────────────────────────────────────────── + +#[test] +fn malformed_tag_not_parsed() { + let text = "[POINT:abc,def:Bad:screen0] and [POINT:100:Missing:screen0]"; + let result = parse_and_map(text, &single_screen()); + + // Neither matches the regex + assert!(result.targets.is_empty()); +} + +#[test] +fn partial_tag_not_parsed() { + let text = "[POINT:100,200:No Screen] and POINT:100,200:bare:screen0]"; + let result = parse_and_map(text, &single_screen()); + + assert!(result.targets.is_empty()); +} + +// ── Clean text ──────────────────────────────────────────────────────── + +#[test] +fn clean_text_strips_all_tags() { + let text = "Start [POINT:0,0:A:screen0] middle [POINT:0,0:B:screen0] end"; + let result = parse_and_map(text, &single_screen()); + + assert_eq!(result.clean_text, "Start middle end"); +} + +#[test] +fn clean_text_trims_whitespace() { + let text = " [POINT:0,0:A:screen0] "; + let result = parse_and_map(text, &single_screen()); + + assert_eq!(result.clean_text, ""); +} diff --git a/src/openhuman/desktop_companion/schemas.rs b/src/openhuman/desktop_companion/schemas.rs new file mode 100644 index 0000000000..9ca6e3cc43 --- /dev/null +++ b/src/openhuman/desktop_companion/schemas.rs @@ -0,0 +1,267 @@ +//! Controller registry for `desktop_companion`. +//! +//! Exposes the companion session lifecycle over JSON-RPC so the Tauri +//! shell and frontend can drive the desktop companion loop. + +use log::{debug, warn}; +use serde::de::DeserializeOwned; +use serde_json::{Map, Value}; + +use crate::core::all::RegisteredController; +use crate::core::{ControllerSchema, FieldSchema, TypeSchema}; +use crate::openhuman::memory::EmptyRequest; + +use super::session; +use super::types::*; + +const LOG_PREFIX: &str = "[companion_rpc]"; + +pub fn all_desktop_companion_controller_schemas() -> Vec { + vec![ + schemas("start_session"), + schemas("stop_session"), + schemas("status"), + schemas("config_get"), + schemas("config_set"), + ] +} + +pub fn all_desktop_companion_registered_controllers() -> Vec { + vec![ + RegisteredController { + schema: schemas("start_session"), + handler: handle_start_session, + }, + RegisteredController { + schema: schemas("stop_session"), + handler: handle_stop_session, + }, + RegisteredController { + schema: schemas("status"), + handler: handle_status, + }, + RegisteredController { + schema: schemas("config_get"), + handler: handle_config_get, + }, + RegisteredController { + schema: schemas("config_set"), + handler: handle_config_set, + }, + ] +} + +pub fn schemas(function: &str) -> ControllerSchema { + match function { + "start_session" => ControllerSchema { + namespace: "companion", + function: "start_session", + description: "Start a desktop companion session with explicit consent.", + inputs: vec![ + field( + "consent", + TypeSchema::Bool, + "User consent for screen monitoring and audio capture.", + ), + optional( + "ttl_secs", + TypeSchema::U64, + "Session time-to-live in seconds. 0 = no expiry.", + ), + ], + outputs: vec![json_output( + "result", + "Session start result with session_id and state.", + )], + }, + "stop_session" => ControllerSchema { + namespace: "companion", + function: "stop_session", + description: "Stop the active desktop companion session.", + inputs: vec![optional( + "reason", + TypeSchema::String, + "Optional reason for stopping.", + )], + outputs: vec![json_output("result", "Session stop result.")], + }, + "status" => ControllerSchema { + namespace: "companion", + function: "status", + description: "Get the current desktop companion session status.", + inputs: vec![], + outputs: vec![json_output( + "result", + "Current session status including state and TTL.", + )], + }, + "config_get" => ControllerSchema { + namespace: "companion", + function: "config_get", + description: "Get the current desktop companion configuration.", + inputs: vec![], + outputs: vec![json_output("result", "Current companion configuration.")], + }, + "config_set" => ControllerSchema { + namespace: "companion", + function: "config_set", + description: "Update desktop companion configuration.", + inputs: vec![ + optional( + "hotkey", + TypeSchema::String, + "Hotkey string for activation.", + ), + optional( + "activation_mode", + TypeSchema::String, + "Activation mode: push or tap.", + ), + optional( + "ttl_secs", + TypeSchema::U64, + "Default session TTL in seconds.", + ), + optional( + "capture_screen", + TypeSchema::Bool, + "Whether to capture screenshots.", + ), + optional( + "include_app_context", + TypeSchema::Bool, + "Whether to include foreground app info.", + ), + ], + outputs: vec![json_output("result", "Updated companion configuration.")], + }, + _ => ControllerSchema { + namespace: "companion", + function: "unknown", + description: "Unknown companion controller.", + inputs: vec![], + outputs: vec![field("error", TypeSchema::String, "Lookup error details.")], + }, + } +} + +// ── Handlers ────────────────────────────────────────────────────────── + +fn handle_start_session(params: Map) -> crate::core::all::ControllerFuture { + Box::pin(async move { + debug!("{LOG_PREFIX} start_session entry"); + let req: StartCompanionSessionParams = parse_params(params)?; + let result = session::start_session(&req).map_err(|e| { + warn!("{LOG_PREFIX} start_session failed: {e}"); + e + })?; + debug!( + "{LOG_PREFIX} start_session done session_id={}", + result.session_id + ); + serde_json::to_value(result).map_err(|e| format!("serialize error: {e}")) + }) +} + +fn handle_stop_session(params: Map) -> crate::core::all::ControllerFuture { + Box::pin(async move { + debug!("{LOG_PREFIX} stop_session entry"); + let req: StopCompanionSessionParams = parse_params(params)?; + let result = session::stop_session(&req).map_err(|e| { + warn!("{LOG_PREFIX} stop_session failed: {e}"); + e + })?; + debug!("{LOG_PREFIX} stop_session done stopped={}", result.stopped); + serde_json::to_value(result).map_err(|e| format!("serialize error: {e}")) + }) +} + +fn handle_status(params: Map) -> crate::core::all::ControllerFuture { + Box::pin(async move { + debug!("{LOG_PREFIX} status entry"); + let _: EmptyRequest = parse_params(params)?; + let result = session::session_status(); + debug!("{LOG_PREFIX} status done active={}", result.active); + serde_json::to_value(result).map_err(|e| format!("serialize error: {e}")) + }) +} + +fn handle_config_get(params: Map) -> crate::core::all::ControllerFuture { + Box::pin(async move { + debug!("{LOG_PREFIX} config_get entry"); + let _: EmptyRequest = parse_params(params)?; + let config = CompanionConfig::default(); + debug!("{LOG_PREFIX} config_get done"); + serde_json::to_value(config).map_err(|e| format!("serialize error: {e}")) + }) +} + +fn handle_config_set(_params: Map) -> crate::core::all::ControllerFuture { + Box::pin(async move { + warn!("{LOG_PREFIX} config_set called but persistence is not yet implemented"); + Err("companion config_set is not yet persisted — changes are not saved".to_string()) + }) +} + +// ── Helpers ─────────────────────────────────────────────────────────── + +fn parse_params(params: Map) -> Result { + serde_json::from_value(Value::Object(params)).map_err(|e| format!("invalid params: {e}")) +} + +fn field(name: &'static str, ty: TypeSchema, comment: &'static str) -> FieldSchema { + FieldSchema { + name, + ty, + comment, + required: true, + } +} + +fn optional(name: &'static str, ty: TypeSchema, comment: &'static str) -> FieldSchema { + FieldSchema { + name, + ty: TypeSchema::Option(Box::new(ty)), + comment, + required: false, + } +} + +fn json_output(name: &'static str, comment: &'static str) -> FieldSchema { + FieldSchema { + name, + ty: TypeSchema::Json, + comment, + required: true, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn all_schemas_count() { + assert_eq!(all_desktop_companion_controller_schemas().len(), 5); + } + + #[test] + fn all_controllers_count() { + assert_eq!(all_desktop_companion_registered_controllers().len(), 5); + } + + #[test] + fn status_schema_has_no_inputs() { + let schema = schemas("status"); + assert!(schema.inputs.is_empty()); + assert_eq!(schema.namespace, "companion"); + } + + #[test] + fn start_session_schema_requires_consent() { + let schema = schemas("start_session"); + let consent_field = schema.inputs.iter().find(|f| f.name == "consent"); + assert!(consent_field.is_some()); + assert!(consent_field.unwrap().required); + } +} diff --git a/src/openhuman/desktop_companion/session.rs b/src/openhuman/desktop_companion/session.rs new file mode 100644 index 0000000000..ebd720022c --- /dev/null +++ b/src/openhuman/desktop_companion/session.rs @@ -0,0 +1,343 @@ +//! Companion session lifecycle and state machine. +//! +//! A companion session represents a single period of desktop companion +//! activity. It owns the state machine (idle → listening → thinking → +//! speaking → pointing → idle), TTL enforcement, and conversation history. +//! +//! Only one session may be active at a time. Sessions are created with +//! explicit user consent and can be stopped manually or via TTL expiry. + +use log::{debug, info, warn}; +use parking_lot::Mutex; + +use super::bus; +use super::types::*; + +const LOG_PREFIX: &str = "[desktop_companion]"; + +/// Maximum number of conversation turns retained per session. +const MAX_CONVERSATION_TURNS: usize = 50; + +/// Process-global singleton for the active companion session. +/// The `Mutex` serializes all session operations — no separate lock needed. +static ACTIVE_SESSION: Mutex> = Mutex::new(None); + +/// Internal mutable session state (not serialized directly). +struct CompanionSessionInner { + id: String, + state: CompanionState, + started_at_ms: i64, + expires_at_ms: Option, + ttl_secs: u64, + conversation: Vec, + last_error: Option, +} + +/// Start a new companion session. +/// +/// Returns an error if consent is not granted or a session is already active. +pub fn start_session( + params: &StartCompanionSessionParams, +) -> Result { + if !params.consent { + warn!("{LOG_PREFIX} start_session denied — consent=false"); + return Err("user consent is required to start a companion session".into()); + } + + let mut guard = ACTIVE_SESSION.lock(); + if let Some(ref inner) = *guard { + // Auto-expire stale sessions so callers don't have to poll status() first. + let now_ms = chrono::Utc::now().timestamp_millis(); + let expired = inner + .expires_at_ms + .map(|exp| now_ms >= exp) + .unwrap_or(false); + if expired { + let stale_id = inner.id.clone(); + info!("{LOG_PREFIX} auto-expiring stale session id={stale_id} during start_session"); + let _ = guard.take(); + } else { + return Err("a companion session is already active — stop it first".into()); + } + } + + let now_ms = chrono::Utc::now().timestamp_millis(); + let ttl_secs = params + .ttl_secs + .unwrap_or(CompanionConfig::default().ttl_secs); + // Guard against overflow: cap so that now_ms + ttl_ms never exceeds i64::MAX. + let max_ttl_ms = (i64::MAX - now_ms) as u64; + let ttl_ms = ttl_secs.saturating_mul(1000).min(max_ttl_ms); + let expires_at_ms = if ttl_secs > 0 { + Some(now_ms + ttl_ms as i64) + } else { + None + }; + let session_id = uuid::Uuid::new_v4().to_string(); + + info!( + "{LOG_PREFIX} starting session id={} ttl_secs={} expires_at_ms={:?}", + session_id, ttl_secs, expires_at_ms + ); + + let inner = CompanionSessionInner { + id: session_id.clone(), + state: CompanionState::Idle, + started_at_ms: now_ms, + expires_at_ms, + ttl_secs, + conversation: Vec::new(), + last_error: None, + }; + *guard = Some(inner); + drop(guard); + + // Publish session-started event for Socket.IO bridge / subscribers. + let _ = crate::core::event_bus::publish_global( + crate::core::event_bus::DomainEvent::CompanionSessionStarted { + session_id: session_id.clone(), + ttl_secs, + }, + ); + + Ok(StartCompanionSessionResult { + session_id, + state: CompanionState::Idle, + expires_at_ms, + }) +} + +/// Stop the active companion session. +pub fn stop_session( + params: &StopCompanionSessionParams, +) -> Result { + let mut guard = ACTIVE_SESSION.lock(); + match guard.take() { + Some(inner) => { + let reason = params + .reason + .clone() + .unwrap_or_else(|| "user_requested".into()); + let session_id = inner.id.clone(); + let turn_count = inner.conversation.len(); + info!( + "{LOG_PREFIX} stopping session id={session_id} reason={reason} turns={turn_count}", + ); + drop(guard); + + let _ = crate::core::event_bus::publish_global( + crate::core::event_bus::DomainEvent::CompanionSessionEnded { + session_id, + reason: reason.clone(), + turn_count, + }, + ); + + Ok(StopCompanionSessionResult { + stopped: true, + reason: Some(reason), + }) + } + None => { + debug!("{LOG_PREFIX} stop_session called with no active session"); + Ok(StopCompanionSessionResult { + stopped: false, + reason: Some("no active session".into()), + }) + } + } +} + +/// Get the current session status. +pub fn session_status() -> CompanionSessionStatus { + let mut guard = ACTIVE_SESSION.lock(); + match guard.as_ref() { + Some(inner) => { + let now_ms = chrono::Utc::now().timestamp_millis(); + let remaining_ms = inner.expires_at_ms.map(|exp| (exp - now_ms).max(0)); + + // Auto-expire if TTL exceeded. + // Clear inline (guard.take) instead of calling stop_session() to + // avoid a TOCTOU race where another thread starts a new session + // between drop(guard) and the stop_session() call. + if let Some(remaining) = remaining_ms { + if remaining == 0 { + let stale = guard.take().expect("checked is_some"); + let stale_id = stale.id.clone(); + let turn_count = stale.conversation.len(); + drop(stale); + drop(guard); + info!( + "{LOG_PREFIX} auto-expiring stale session id={stale_id} turns={turn_count}" + ); + let _ = crate::core::event_bus::publish_global( + crate::core::event_bus::DomainEvent::CompanionSessionEnded { + session_id: stale_id, + reason: "ttl_expired".into(), + turn_count, + }, + ); + return CompanionSessionStatus { + active: false, + state: CompanionState::Idle, + session_id: None, + started_at_ms: None, + expires_at_ms: None, + remaining_ms: None, + turn_count: 0, + last_error: Some("session expired".into()), + }; + } + } + + CompanionSessionStatus { + active: true, + state: inner.state, + session_id: Some(inner.id.clone()), + started_at_ms: Some(inner.started_at_ms), + expires_at_ms: inner.expires_at_ms, + remaining_ms, + turn_count: inner.conversation.len(), + last_error: inner.last_error.clone(), + } + } + None => CompanionSessionStatus { + active: false, + state: CompanionState::Idle, + session_id: None, + started_at_ms: None, + expires_at_ms: None, + remaining_ms: None, + turn_count: 0, + last_error: None, + }, + } +} + +/// Transition the companion to a new state. +/// +/// Returns the previous state, or an error if no session is active or the +/// transition is invalid. +pub fn transition_state( + new_state: CompanionState, + message: Option, +) -> Result { + let mut guard = ACTIVE_SESSION.lock(); + let inner = guard.as_mut().ok_or_else(|| { + warn!("{LOG_PREFIX} transition_state called with no active session target={new_state}"); + "no active companion session".to_string() + })?; + + let previous = inner.state; + + // Validate transitions. + if !is_valid_transition(previous, new_state) { + warn!( + "{LOG_PREFIX} rejected state transition: {} -> {} session={}", + previous, new_state, inner.id + ); + return Err(format!( + "invalid companion state transition: {} -> {}", + previous, new_state + )); + } + + debug!( + "{LOG_PREFIX} state transition: {} -> {} session={}", + previous, new_state, inner.id + ); + + inner.state = new_state; + + if new_state == CompanionState::Error { + inner.last_error = message.clone(); + } + + // Publish the state change event. + let session_id = inner.id.clone(); + drop(guard); + + bus::publish_state_changed(CompanionStateChangedEvent { + session_id, + state: new_state, + previous_state: previous, + message, + }); + + Ok(previous) +} + +/// Add a conversation turn to the session history. +pub fn push_conversation_turn(turn: ConversationTurn) -> Result<(), String> { + let mut guard = ACTIVE_SESSION.lock(); + let inner = guard.as_mut().ok_or("no active companion session")?; + + inner.conversation.push(turn); + + // Cap the history to prevent unbounded growth. + if inner.conversation.len() > MAX_CONVERSATION_TURNS { + let drain_count = inner.conversation.len() - MAX_CONVERSATION_TURNS; + inner.conversation.drain(..drain_count); + } + + Ok(()) +} + +/// Get a snapshot of the conversation history for LLM context. +pub fn conversation_history() -> Vec { + let guard = ACTIVE_SESSION.lock(); + match guard.as_ref() { + Some(inner) => inner.conversation.clone(), + None => Vec::new(), + } +} + +/// Check whether a state transition is valid. +/// +/// Valid transitions: +/// - Idle → Listening (activation) +/// - Listening → Thinking (transcript received) +/// - Listening → Idle (cancelled / released) +/// - Thinking → Speaking (response ready) +/// - Thinking → Pointing (response has point targets, no TTS) +/// - Thinking → Idle (cancelled) +/// - Speaking → Pointing (TTS done, point targets present) +/// - Speaking → Idle (TTS done, no pointing) +/// - Speaking → Listening (interrupted — new turn) +/// - Pointing → Idle (animation done) +/// - Pointing → Listening (interrupted — new turn) +/// - Error → Idle (reset) +/// - Any → Error (error from any state) +fn is_valid_transition(from: CompanionState, to: CompanionState) -> bool { + // Any state can transition to Error. + if to == CompanionState::Error { + return true; + } + + matches!( + (from, to), + (CompanionState::Idle, CompanionState::Listening) + | (CompanionState::Listening, CompanionState::Thinking) + | (CompanionState::Listening, CompanionState::Idle) + | (CompanionState::Thinking, CompanionState::Speaking) + | (CompanionState::Thinking, CompanionState::Pointing) + | (CompanionState::Thinking, CompanionState::Idle) + | (CompanionState::Speaking, CompanionState::Pointing) + | (CompanionState::Speaking, CompanionState::Idle) + | (CompanionState::Speaking, CompanionState::Listening) + | (CompanionState::Pointing, CompanionState::Idle) + | (CompanionState::Pointing, CompanionState::Listening) + | (CompanionState::Error, CompanionState::Idle) + ) +} + +/// Reset the global session state. Used only in tests. +#[cfg(test)] +pub(crate) fn reset_for_test() { + let mut guard = ACTIVE_SESSION.lock(); + *guard = None; +} + +#[cfg(test)] +#[path = "session_tests.rs"] +mod tests; diff --git a/src/openhuman/desktop_companion/session_tests.rs b/src/openhuman/desktop_companion/session_tests.rs new file mode 100644 index 0000000000..51d6e8dbda --- /dev/null +++ b/src/openhuman/desktop_companion/session_tests.rs @@ -0,0 +1,381 @@ +//! Tests for companion session lifecycle and state machine. + +use super::*; +use crate::openhuman::desktop_companion::types::*; + +use std::sync::Mutex as StdMutex; + +/// Serialize tests that mutate the process-global session state. +static TEST_MUTEX: StdMutex<()> = StdMutex::new(()); + +fn with_clean_session(f: F) { + let _lock = TEST_MUTEX.lock().unwrap_or_else(|p| p.into_inner()); + reset_for_test(); + f(); + reset_for_test(); +} + +fn start_default_session() -> StartCompanionSessionResult { + start_session(&StartCompanionSessionParams { + consent: true, + ttl_secs: Some(3600), + }) + .expect("session should start") +} + +// ── Session creation ────────────────────────────────────────────────── + +#[test] +fn start_session_requires_consent() { + with_clean_session(|| { + let result = start_session(&StartCompanionSessionParams { + consent: false, + ttl_secs: None, + }); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("consent")); + }); +} + +#[test] +fn start_session_succeeds_with_consent() { + with_clean_session(|| { + let result = start_default_session(); + assert!(!result.session_id.is_empty()); + assert_eq!(result.state, CompanionState::Idle); + assert!(result.expires_at_ms.is_some()); + }); +} + +#[test] +fn start_session_rejects_duplicate() { + with_clean_session(|| { + let _first = start_default_session(); + let second = start_session(&StartCompanionSessionParams { + consent: true, + ttl_secs: None, + }); + assert!(second.is_err()); + assert!(second.unwrap_err().contains("already active")); + }); +} + +#[test] +fn start_session_zero_ttl_means_no_expiry() { + with_clean_session(|| { + let result = start_session(&StartCompanionSessionParams { + consent: true, + ttl_secs: Some(0), + }) + .unwrap(); + assert!(result.expires_at_ms.is_none()); + }); +} + +// ── Session stop ────────────────────────────────────────────────────── + +#[test] +fn stop_session_succeeds() { + with_clean_session(|| { + let _s = start_default_session(); + let result = stop_session(&StopCompanionSessionParams { + reason: Some("test".into()), + }) + .unwrap(); + assert!(result.stopped); + assert_eq!(result.reason.as_deref(), Some("test")); + }); +} + +#[test] +fn stop_session_with_no_active_session() { + with_clean_session(|| { + let result = stop_session(&StopCompanionSessionParams { reason: None }).unwrap(); + assert!(!result.stopped); + assert!(result.reason.unwrap().contains("no active")); + }); +} + +#[test] +fn stop_allows_new_session() { + with_clean_session(|| { + let _s = start_default_session(); + let _ = stop_session(&StopCompanionSessionParams { reason: None }); + let second = start_default_session(); + assert!(!second.session_id.is_empty()); + }); +} + +// ── Session status ──────────────────────────────────────────────────── + +#[test] +fn status_inactive_by_default() { + with_clean_session(|| { + let status = session_status(); + assert!(!status.active); + assert_eq!(status.state, CompanionState::Idle); + assert!(status.session_id.is_none()); + }); +} + +#[test] +fn status_reflects_active_session() { + with_clean_session(|| { + let s = start_default_session(); + let status = session_status(); + assert!(status.active); + assert_eq!(status.session_id.as_deref(), Some(s.session_id.as_str())); + assert!(status.remaining_ms.is_some()); + }); +} + +// ── State transitions ───────────────────────────────────────────────── + +#[test] +fn transition_idle_to_listening() { + with_clean_session(|| { + let _s = start_default_session(); + let prev = transition_state(CompanionState::Listening, None).unwrap(); + assert_eq!(prev, CompanionState::Idle); + assert_eq!(session_status().state, CompanionState::Listening); + }); +} + +#[test] +fn transition_listening_to_thinking() { + with_clean_session(|| { + let _s = start_default_session(); + transition_state(CompanionState::Listening, None).unwrap(); + let prev = transition_state(CompanionState::Thinking, None).unwrap(); + assert_eq!(prev, CompanionState::Listening); + }); +} + +#[test] +fn transition_thinking_to_speaking() { + with_clean_session(|| { + let _s = start_default_session(); + transition_state(CompanionState::Listening, None).unwrap(); + transition_state(CompanionState::Thinking, None).unwrap(); + let prev = transition_state(CompanionState::Speaking, None).unwrap(); + assert_eq!(prev, CompanionState::Thinking); + }); +} + +#[test] +fn transition_speaking_to_pointing() { + with_clean_session(|| { + let _s = start_default_session(); + transition_state(CompanionState::Listening, None).unwrap(); + transition_state(CompanionState::Thinking, None).unwrap(); + transition_state(CompanionState::Speaking, None).unwrap(); + let prev = transition_state(CompanionState::Pointing, None).unwrap(); + assert_eq!(prev, CompanionState::Speaking); + }); +} + +#[test] +fn transition_pointing_to_idle() { + with_clean_session(|| { + let _s = start_default_session(); + transition_state(CompanionState::Listening, None).unwrap(); + transition_state(CompanionState::Thinking, None).unwrap(); + transition_state(CompanionState::Pointing, None).unwrap(); + let prev = transition_state(CompanionState::Idle, None).unwrap(); + assert_eq!(prev, CompanionState::Pointing); + }); +} + +#[test] +fn transition_full_happy_path() { + with_clean_session(|| { + let _s = start_default_session(); + transition_state(CompanionState::Listening, None).unwrap(); + transition_state(CompanionState::Thinking, None).unwrap(); + transition_state(CompanionState::Speaking, None).unwrap(); + transition_state(CompanionState::Pointing, None).unwrap(); + transition_state(CompanionState::Idle, None).unwrap(); + assert_eq!(session_status().state, CompanionState::Idle); + }); +} + +#[test] +fn transition_any_to_error() { + with_clean_session(|| { + let _s = start_default_session(); + transition_state(CompanionState::Listening, None).unwrap(); + let prev = transition_state(CompanionState::Error, Some("mic failure".into())).unwrap(); + assert_eq!(prev, CompanionState::Listening); + + let status = session_status(); + assert_eq!(status.state, CompanionState::Error); + assert_eq!(status.last_error.as_deref(), Some("mic failure")); + }); +} + +#[test] +fn transition_error_to_idle() { + with_clean_session(|| { + let _s = start_default_session(); + transition_state(CompanionState::Error, Some("oops".into())).unwrap(); + let prev = transition_state(CompanionState::Idle, None).unwrap(); + assert_eq!(prev, CompanionState::Error); + }); +} + +#[test] +fn transition_speaking_to_listening_interrupt() { + with_clean_session(|| { + let _s = start_default_session(); + transition_state(CompanionState::Listening, None).unwrap(); + transition_state(CompanionState::Thinking, None).unwrap(); + transition_state(CompanionState::Speaking, None).unwrap(); + let prev = transition_state(CompanionState::Listening, None).unwrap(); + assert_eq!(prev, CompanionState::Speaking); + }); +} + +#[test] +fn transition_invalid_idle_to_speaking() { + with_clean_session(|| { + let _s = start_default_session(); + let result = transition_state(CompanionState::Speaking, None); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("invalid")); + }); +} + +#[test] +fn transition_invalid_idle_to_thinking() { + with_clean_session(|| { + let _s = start_default_session(); + let result = transition_state(CompanionState::Thinking, None); + assert!(result.is_err()); + }); +} + +#[test] +fn transition_requires_active_session() { + with_clean_session(|| { + let result = transition_state(CompanionState::Listening, None); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("no active")); + }); +} + +// ── Conversation history ────────────────────────────────────────────── + +#[test] +fn push_conversation_turn_succeeds() { + with_clean_session(|| { + let _s = start_default_session(); + push_conversation_turn(ConversationTurn { + role: "user".into(), + content: "hello".into(), + timestamp_ms: 1000, + }) + .unwrap(); + let history = conversation_history(); + assert_eq!(history.len(), 1); + assert_eq!(history[0].content, "hello"); + }); +} + +#[test] +fn conversation_history_capped() { + with_clean_session(|| { + let _s = start_default_session(); + for i in 0..60 { + push_conversation_turn(ConversationTurn { + role: "user".into(), + content: format!("turn {i}"), + timestamp_ms: i as i64, + }) + .unwrap(); + } + let history = conversation_history(); + assert_eq!(history.len(), MAX_CONVERSATION_TURNS); + // Oldest turns should have been drained. + assert_eq!(history[0].content, "turn 10"); + }); +} + +#[test] +fn conversation_history_empty_without_session() { + with_clean_session(|| { + assert!(conversation_history().is_empty()); + }); +} + +#[test] +fn push_turn_fails_without_session() { + with_clean_session(|| { + let result = push_conversation_turn(ConversationTurn { + role: "user".into(), + content: "hello".into(), + timestamp_ms: 1000, + }); + assert!(result.is_err()); + }); +} + +// ── Auto-expire and TTL edge cases ─────────────────────────────────── + +#[test] +fn start_session_auto_expires_stale_session() { + with_clean_session(|| { + // Start a session with a 1-second TTL. + let first = start_session(&StartCompanionSessionParams { + consent: true, + ttl_secs: Some(1), + }) + .unwrap(); + assert!(first.expires_at_ms.is_some()); + + // Sleep past the TTL so the session becomes stale. + std::thread::sleep(std::time::Duration::from_millis(1100)); + + // Starting a new session should succeed — the stale one is auto-expired. + let second = start_session(&StartCompanionSessionParams { + consent: true, + ttl_secs: Some(3600), + }) + .unwrap(); + assert_ne!(first.session_id, second.session_id); + assert_eq!(second.state, CompanionState::Idle); + }); +} + +#[test] +fn start_session_ttl_overflow_guard() { + with_clean_session(|| { + // u64::MAX would overflow i64 when multiplied by 1000 — the guard caps it. + let result = start_session(&StartCompanionSessionParams { + consent: true, + ttl_secs: Some(u64::MAX), + }) + .unwrap(); + // Session created without panic. + assert!(!result.session_id.is_empty()); + // expires_at_ms should be set (non-zero TTL) and positive (not overflowed). + let expires = result + .expires_at_ms + .expect("should have expiry with non-zero TTL"); + assert!( + expires > 0, + "expires_at_ms should be positive, got {expires}" + ); + }); +} + +// ── is_active helper ────────────────────────────────────────────────── + +#[test] +fn companion_state_is_active() { + assert!(!CompanionState::Idle.is_active()); + assert!(CompanionState::Listening.is_active()); + assert!(CompanionState::Thinking.is_active()); + assert!(CompanionState::Speaking.is_active()); + assert!(CompanionState::Pointing.is_active()); + assert!(!CompanionState::Error.is_active()); +} diff --git a/src/openhuman/desktop_companion/types.rs b/src/openhuman/desktop_companion/types.rs new file mode 100644 index 0000000000..8e8fb76bc0 --- /dev/null +++ b/src/openhuman/desktop_companion/types.rs @@ -0,0 +1,155 @@ +//! Shared types for the desktop companion session. + +use serde::{Deserialize, Serialize}; + +/// Visual state of the companion surface, broadcast to the overlay window. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum CompanionState { + /// No interaction in progress; mascot idles. + #[default] + Idle, + /// Microphone is live — capturing user speech. + Listening, + /// Transcript + screen context sent to LLM; awaiting response. + Thinking, + /// TTS is playing the response audio. + Speaking, + /// Visual pointer is animating toward a UI target. + Pointing, + /// An unrecoverable error occurred in the current turn. + Error, +} + +impl CompanionState { + /// Returns `true` for states that represent an active interaction turn. + pub fn is_active(self) -> bool { + !matches!(self, Self::Idle | Self::Error) + } +} + +impl std::fmt::Display for CompanionState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Idle => write!(f, "idle"), + Self::Listening => write!(f, "listening"), + Self::Thinking => write!(f, "thinking"), + Self::Speaking => write!(f, "speaking"), + Self::Pointing => write!(f, "pointing"), + Self::Error => write!(f, "error"), + } + } +} + +/// A single conversation turn in the companion session history. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConversationTurn { + /// Who spoke — `"user"` or `"assistant"`. + pub role: String, + /// The text content of this turn. + pub content: String, + /// Epoch milliseconds when this turn was recorded. + pub timestamp_ms: i64, +} + +/// Persistent configuration for the desktop companion. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompanionConfig { + /// Hotkey string for activation (e.g. `"ctrl+space"`). + #[serde(default = "default_hotkey")] + pub hotkey: String, + /// Activation mode: `"push"` (hold-to-talk) or `"tap"` (toggle). + #[serde(default = "default_activation_mode")] + pub activation_mode: String, + /// Session TTL in seconds. `0` means no automatic expiry. + #[serde(default = "default_ttl_secs")] + pub ttl_secs: u64, + /// Whether to capture a screenshot on each activation. + #[serde(default = "default_true")] + pub capture_screen: bool, + /// Whether to include the foreground app context. + #[serde(default = "default_true")] + pub include_app_context: bool, +} + +impl Default for CompanionConfig { + fn default() -> Self { + Self { + hotkey: default_hotkey(), + activation_mode: default_activation_mode(), + ttl_secs: default_ttl_secs(), + capture_screen: true, + include_app_context: true, + } + } +} + +fn default_hotkey() -> String { + "ctrl+space".into() +} +fn default_activation_mode() -> String { + "push".into() +} +fn default_ttl_secs() -> u64 { + 3600 +} +fn default_true() -> bool { + true +} + +/// Parameters for starting a companion session. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StartCompanionSessionParams { + /// Explicit user consent to screen monitoring and audio capture. + pub consent: bool, + /// Optional TTL override in seconds. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub ttl_secs: Option, +} + +/// Parameters for stopping a companion session. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StopCompanionSessionParams { + /// Optional reason for stopping (shown in logs). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + +/// Snapshot of the current companion session status. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompanionSessionStatus { + pub active: bool, + pub state: CompanionState, + pub session_id: Option, + pub started_at_ms: Option, + pub expires_at_ms: Option, + pub remaining_ms: Option, + pub turn_count: usize, + pub last_error: Option, +} + +/// Result of starting a companion session. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StartCompanionSessionResult { + pub session_id: String, + pub state: CompanionState, + pub expires_at_ms: Option, +} + +/// Result of stopping a companion session. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StopCompanionSessionResult { + pub stopped: bool, + pub reason: Option, +} + +/// Event emitted when companion state changes (for Socket.IO bridge). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CompanionStateChangedEvent { + pub session_id: String, + pub state: CompanionState, + pub previous_state: CompanionState, + /// Optional human-readable message (e.g. error details, response text). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub message: Option, +} diff --git a/src/openhuman/mod.rs b/src/openhuman/mod.rs index 99dca02a79..71fe52b9c1 100644 --- a/src/openhuman/mod.rs +++ b/src/openhuman/mod.rs @@ -30,6 +30,7 @@ pub mod context; pub mod cost; pub mod credentials; pub mod cron; +pub mod desktop_companion; pub mod dev_paths; pub mod doctor; pub mod embeddings; diff --git a/tests/json_rpc_e2e.rs b/tests/json_rpc_e2e.rs index d1a82d1058..8ca24a0783 100644 --- a/tests/json_rpc_e2e.rs +++ b/tests/json_rpc_e2e.rs @@ -5889,3 +5889,140 @@ async fn whatsapp_data_agent_tools_e2e_1341() { .description() .contains("WhatsApp")); } + +// --------------------------------------------------------------------------- +// Desktop companion session lifecycle (RPC round-trip) +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn companion_session_lifecycle_over_rpc() { + let _env_lock = json_rpc_e2e_env_lock(); + let tmp = tempdir().expect("tempdir"); + let home = tmp.path(); + let openhuman_home = home.join(".openhuman"); + + let _home_guard = EnvVarGuard::set_to_path("HOME", home); + let _workspace_guard = EnvVarGuard::unset("OPENHUMAN_WORKSPACE"); + let _backend_url_guard = EnvVarGuard::unset("BACKEND_URL"); + let _vite_backend_guard = EnvVarGuard::unset("VITE_BACKEND_URL"); + + let (mock_addr, mock_join) = serve_on_ephemeral(mock_upstream_router()).await; + let mock_origin = format!("http://{}", mock_addr); + write_min_config(&openhuman_home, &mock_origin); + + let (rpc_addr, rpc_join) = serve_on_ephemeral(build_core_http_router(false)).await; + let rpc_base = format!("http://{}", rpc_addr); + tokio::time::sleep(Duration::from_millis(100)).await; + + // Reset any lingering session from other tests. + let _ = post_json_rpc( + &rpc_base, + 100, + "openhuman.companion_stop_session", + json!({ "reason": "test_reset" }), + ) + .await; + + // ── 1. Status before any session ── + let status = post_json_rpc(&rpc_base, 101, "openhuman.companion_status", json!({})).await; + let status_r = assert_no_jsonrpc_error(&status, "companion_status (initial)"); + let result_body = status_r.get("result").unwrap_or(status_r); + assert_eq!( + result_body.get("active"), + Some(&json!(false)), + "no session should be active initially: {result_body}" + ); + + // ── 2. Start without consent → error ── + let no_consent = post_json_rpc( + &rpc_base, + 102, + "openhuman.companion_start_session", + json!({ "consent": false }), + ) + .await; + assert_jsonrpc_error(&no_consent, "companion_start_session (no consent)"); + + // ── 3. Start with consent → success ── + let start = post_json_rpc( + &rpc_base, + 103, + "openhuman.companion_start_session", + json!({ "consent": true, "ttl_secs": 3600 }), + ) + .await; + let start_r = assert_no_jsonrpc_error(&start, "companion_start_session"); + let start_body = start_r.get("result").unwrap_or(start_r); + assert!( + start_body.get("session_id").is_some(), + "start should return session_id: {start_body}" + ); + + // ── 4. Status reflects active session ── + let status2 = post_json_rpc(&rpc_base, 104, "openhuman.companion_status", json!({})).await; + let status2_r = assert_no_jsonrpc_error(&status2, "companion_status (active)"); + let result2_body = status2_r.get("result").unwrap_or(status2_r); + assert_eq!( + result2_body.get("active"), + Some(&json!(true)), + "session should be active: {result2_body}" + ); + + // ── 5. Duplicate start → error ── + let dup = post_json_rpc( + &rpc_base, + 105, + "openhuman.companion_start_session", + json!({ "consent": true }), + ) + .await; + assert_jsonrpc_error(&dup, "companion_start_session (duplicate)"); + + // ── 6. Config get ── + let config = post_json_rpc(&rpc_base, 106, "openhuman.companion_config_get", json!({})).await; + let config_r = assert_no_jsonrpc_error(&config, "companion_config_get"); + let config_body = config_r.get("result").unwrap_or(config_r); + assert!( + config_body.get("hotkey").is_some(), + "config should have hotkey: {config_body}" + ); + + // ── 6b. Config set → error (not yet persisted) ── + let config_set = post_json_rpc( + &rpc_base, + 116, + "openhuman.companion_config_set", + json!({ "hotkey": "CmdOrCtrl+Shift+H" }), + ) + .await; + assert_jsonrpc_error(&config_set, "companion_config_set (not persisted)"); + + // ── 7. Stop session ── + let stop = post_json_rpc( + &rpc_base, + 107, + "openhuman.companion_stop_session", + json!({ "reason": "test_done" }), + ) + .await; + let stop_r = assert_no_jsonrpc_error(&stop, "companion_stop_session"); + let stop_body = stop_r.get("result").unwrap_or(stop_r); + assert_eq!( + stop_body.get("stopped"), + Some(&json!(true)), + "session should be stopped: {stop_body}" + ); + + // ── 8. Status after stop ── + let status3 = post_json_rpc(&rpc_base, 108, "openhuman.companion_status", json!({})).await; + let status3_r = assert_no_jsonrpc_error(&status3, "companion_status (after stop)"); + let result3_body = status3_r.get("result").unwrap_or(status3_r); + assert_eq!( + result3_body.get("active"), + Some(&json!(false)), + "session should be inactive after stop: {result3_body}" + ); + + mock_join.abort(); + rpc_join.abort(); +}