diff --git a/crates/surge-cli/src/cli.rs b/crates/surge-cli/src/cli.rs index 625624a..9a71b73 100644 --- a/crates/surge-cli/src/cli.rs +++ b/crates/surge-cli/src/cli.rs @@ -280,6 +280,13 @@ pub(crate) enum Commands { json: bool, }, + /// Inspect read-only status for nodes in a fleet + #[command(name = "fleet-status")] + FleetStatus { + #[command(subcommand)] + action: FleetStatusAction, + }, + /// Install packages using a selected transport method Install { /// Install method (defaults to backend) @@ -303,6 +310,55 @@ pub(crate) enum Commands { }, } +#[derive(Subcommand)] +pub(crate) enum FleetStatusAction { + /// Probe Surge runtime status over tailscale SSH + Tailscale(FleetStatusTailscaleOptions), +} + +#[derive(Args, Clone)] +pub(crate) struct FleetStatusTailscaleOptions { + /// Expected application ID + #[arg(long)] + pub(crate) app_id: String, + + /// Expected runtime identifier + #[arg(long)] + pub(crate) rid: String, + + /// Expected release channel + #[arg(long)] + pub(crate) channel: String, + + /// Expected release version + #[arg(long)] + pub(crate) version: String, + + /// Target node, repeatable (for example: operator@example-node) + #[arg(long)] + pub(crate) node: Vec, + + /// File containing one node per line + #[arg(long)] + pub(crate) nodes_file: Option, + + /// Node user account used when --node values omit a user + #[arg(long = "node-user", alias = "ssh-user")] + pub(crate) node_user: Option, + + /// Maximum number of concurrent node probes + #[arg(long, default_value_t = 16)] + pub(crate) concurrency: usize, + + /// Per-node probe timeout in seconds + #[arg(long, default_value_t = 20)] + pub(crate) timeout_seconds: u64, + + /// Emit JSON instead of a human-readable summary + #[arg(long)] + pub(crate) json: bool, +} + #[derive(Subcommand)] pub(crate) enum LockAction { /// Acquire a distributed lock @@ -458,7 +514,7 @@ pub(crate) struct InstallCompatibilityOptions { mod tests { use clap::Parser; - use super::{Cli, Commands}; + use super::{Cli, Commands, FleetStatusAction}; #[test] fn restore_package_file_requires_installers_flag() { @@ -564,4 +620,43 @@ mod tests { assert!(err.to_string().contains("--stage")); } + + #[test] + fn fleet_status_tailscale_parses_repeated_nodes() { + let cli = Cli::try_parse_from([ + "surge", + "fleet-status", + "tailscale", + "--app-id", + "sample-app", + "--rid", + "linux-x64", + "--channel", + "sample-channel", + "--version", + "1.2.3", + "--node", + "node-a", + "--node", + "operator@example-node", + "--concurrency", + "4", + "--timeout-seconds", + "8", + "--json", + ]) + .expect("fleet status command should parse"); + + let Commands::FleetStatus { + action: FleetStatusAction::Tailscale(options), + } = cli.command + else { + panic!("expected fleet status tailscale command"); + }; + + assert_eq!(options.node, ["node-a", "operator@example-node"]); + assert_eq!(options.concurrency, 4); + assert_eq!(options.timeout_seconds, 8); + assert!(options.json); + } } diff --git a/crates/surge-cli/src/commands/fleet_status.rs b/crates/surge-cli/src/commands/fleet_status.rs new file mode 100644 index 0000000..b8673c8 --- /dev/null +++ b/crates/surge-cli/src/commands/fleet_status.rs @@ -0,0 +1,4 @@ +mod probe; +mod tailscale; + +pub(crate) use tailscale::execute_tailscale; diff --git a/crates/surge-cli/src/commands/fleet_status/probe.rs b/crates/surge-cli/src/commands/fleet_status/probe.rs new file mode 100644 index 0000000..43de4fb --- /dev/null +++ b/crates/surge-cli/src/commands/fleet_status/probe.rs @@ -0,0 +1,338 @@ +use std::collections::BTreeMap; +use std::process::Stdio; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tokio::process::Command; + +use crate::commands::install::shell_single_quote; + +const PROBE_BEGIN: &str = "__SURGE_FLEET_STATUS_PROBE_BEGIN__"; +const PROBE_END: &str = "__SURGE_FLEET_STATUS_PROBE_END__"; +const RUNTIME_BEGIN: &str = "__SURGE_FLEET_STATUS_RUNTIME_BEGIN__"; +const RUNTIME_END: &str = "__SURGE_FLEET_STATUS_RUNTIME_END__"; +const UPDATE_BEGIN: &str = "__SURGE_FLEET_STATUS_UPDATE_BEGIN__"; +const UPDATE_END: &str = "__SURGE_FLEET_STATUS_UPDATE_END__"; + +#[derive(Debug, Clone)] +pub(super) struct RemoteProbe { + pub(super) missing: bool, + pub(super) runtime_yaml: Option, + pub(super) update_status: Option, + pub(super) process: ProcessSummary, +} + +#[derive(Debug, Clone, Deserialize)] +pub(super) struct RemoteUpdateStatus { + #[serde(default)] + pub(super) state: String, + #[serde(default)] + pub(super) installed_version: String, + #[serde(default)] + pub(super) target_version: String, + #[serde(default)] + pub(super) channel: String, + #[serde(default)] + pub(super) app_id: String, + pub(super) current_phase: Option, + pub(super) last_progress_at_utc: Option, + pub(super) reason: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct ProcessSummary { + pub(super) proc_available: bool, + #[serde(flatten)] + pub(super) app: AppProcessSummary, + #[serde(flatten)] + pub(super) supervisor: SupervisorProcessSummary, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct AppProcessSummary { + pub(super) app_process_running: bool, + pub(super) target_app_process_running: bool, + pub(super) stale_app_process_running: bool, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct SupervisorProcessSummary { + pub(super) supervisor_configured: bool, + pub(super) supervisor_process_running: bool, + #[serde(flatten)] + pub(super) handoff: SupervisorHandoffSummary, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct SupervisorHandoffSummary { + pub(super) supervisor_waiting_for_previous_child: bool, + pub(super) stale_supervisor_process_running: bool, +} + +impl ProcessSummary { + pub(super) fn app_process_running(&self) -> bool { + self.app.app_process_running + } + + pub(super) fn target_app_process_running(&self) -> bool { + self.app.target_app_process_running + } + + pub(super) fn supervisor_configured(&self) -> bool { + self.supervisor.supervisor_configured + } + + pub(super) fn supervisor_process_running(&self) -> bool { + self.supervisor.supervisor_process_running + } + + pub(super) fn supervisor_waiting_for_previous_child(&self) -> bool { + self.supervisor.handoff.supervisor_waiting_for_previous_child + } + + pub(super) fn stale_supervisor_process_running(&self) -> bool { + self.supervisor.handoff.stale_supervisor_process_running + } +} + +pub(super) async fn run_tailscale_capture_timeout( + ssh_target: &str, + remote_command: &str, + timeout: Duration, +) -> std::result::Result { + let mut command = Command::new("tailscale"); + command + .args(["ssh", ssh_target, remote_command]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true); + + let output = match tokio::time::timeout(timeout, command.output()).await { + Ok(Ok(output)) => output, + Ok(Err(error)) => return Err(format!("failed to run tailscale command: {error}")), + Err(_) => { + return Err(format!( + "timed out after {}s running tailscale ssh {}", + timeout.as_secs(), + ssh_target + )); + } + }; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); + if stderr.is_empty() { + return Err(format!("tailscale ssh {ssh_target} exited with {}", output.status)); + } + return Err(stderr); + } + + Ok(String::from_utf8_lossy(&output.stdout).to_string()) +} + +pub(super) fn parse_remote_probe(output: &str) -> std::result::Result { + let probe = + section_between(output, PROBE_BEGIN, PROBE_END).ok_or_else(|| "missing probe section markers".to_string())?; + let values = parse_key_values(&probe); + let missing = parse_bool(values.get("missing").map(String::as_str)); + let runtime_yaml = section_between(output, RUNTIME_BEGIN, RUNTIME_END); + let update_status = parse_update_status(section_between(output, UPDATE_BEGIN, UPDATE_END).as_deref())?; + Ok(RemoteProbe { + missing, + runtime_yaml, + update_status, + process: ProcessSummary { + proc_available: parse_bool(values.get("proc_available").map(String::as_str)), + app: AppProcessSummary { + app_process_running: parse_bool(values.get("app_process_running").map(String::as_str)), + target_app_process_running: parse_bool(values.get("target_app_process_running").map(String::as_str)), + stale_app_process_running: parse_bool(values.get("stale_app_process_running").map(String::as_str)), + }, + supervisor: SupervisorProcessSummary { + supervisor_configured: parse_bool(values.get("supervisor_configured").map(String::as_str)), + supervisor_process_running: parse_bool(values.get("supervisor_process_running").map(String::as_str)), + handoff: SupervisorHandoffSummary { + supervisor_waiting_for_previous_child: parse_bool( + values.get("supervisor_waiting_for_previous_child").map(String::as_str), + ), + stale_supervisor_process_running: parse_bool( + values.get("stale_supervisor_process_running").map(String::as_str), + ), + }, + }, + }, + }) +} + +fn parse_update_status(raw: Option<&str>) -> std::result::Result, String> { + let Some(raw) = raw.map(str::trim).filter(|raw| !raw.is_empty()) else { + return Ok(None); + }; + serde_json::from_str(raw).map(Some).map_err(|error| error.to_string()) +} + +fn parse_key_values(raw: &str) -> BTreeMap { + raw.lines() + .filter_map(|line| { + let (key, value) = line.split_once('=')?; + Some((key.trim().to_string(), value.trim().to_string())) + }) + .collect() +} + +fn parse_bool(value: Option<&str>) -> bool { + matches!(value.map(str::trim), Some("true" | "1" | "yes")) +} + +fn section_between(output: &str, begin: &str, end: &str) -> Option { + let (_, after_begin) = output.split_once(begin)?; + let (section, _) = after_begin.split_once(end)?; + Some(section.trim_matches('\n').to_string()) +} + +pub(super) fn build_remote_probe_script(app_id: &str, target_version: &str) -> String { + let app_id = shell_single_quote(app_id); + let target_version = shell_single_quote(target_version); + format!( + r#"set -eu +app_id={app_id} +target_version={target_version} +base="$HOME/.local/share" +manifest="" +direct="$base/$app_id/app/.surge/runtime.yml" +if [ -f "$direct" ]; then + manifest="$direct" +else + for candidate in "$base"/*/app/.surge/runtime.yml; do + [ -f "$candidate" ] || continue + candidate_id="$(sed -n 's/^id:[[:space:]]*//p' "$candidate" | head -n1)" + if [ "$candidate_id" = "$app_id" ]; then + manifest="$candidate" + break + fi + done +fi +printf '{PROBE_BEGIN}\n' +if [ -z "$manifest" ]; then + printf 'missing=true\n' + printf 'proc_available=false\n' + printf 'app_process_running=false\n' + printf 'target_app_process_running=false\n' + printf 'stale_app_process_running=false\n' + printf 'supervisor_configured=false\n' + printf 'supervisor_process_running=false\n' + printf 'supervisor_waiting_for_previous_child=false\n' + printf 'stale_supervisor_process_running=false\n' + printf '{PROBE_END}\n' + exit 0 +fi +app_dir="${{manifest%/.surge/runtime.yml}}" +install_root="${{app_dir%/app}}" +status_file="$install_root/.surge-update-status.json" +supervisor_id="$(sed -n 's/^supervisorId:[[:space:]]*//p' "$manifest" | head -n1)" +proc_available=false +app_process_running=false +target_app_process_running=false +stale_app_process_running=false +supervisor_process_running=false +supervisor_waiting_for_previous_child=false +stale_supervisor_process_running=false +contains_target_first_run() {{ + cmd_tokens=" $1 " + case "$cmd_tokens" in + *" --surge-first-run $target_version "*|*" $target_version --surge-first-run "*) return 0 ;; + esac + return 1 +}} +watched_pid_is_running() {{ + case "$1" in + *" watch "*" --pid "*) + rest="${{1#* --pid }}" + watched_pid="${{rest%% *}}" + case "$watched_pid" in ""|*[!0-9]*) return 1 ;; esac + kill -0 "$watched_pid" 2>/dev/null + return $? + ;; + esac + return 1 +}} +if [ -d /proc ]; then + proc_available=true + for cmdline in /proc/[0-9]*/cmdline; do + [ -r "$cmdline" ] || continue + pid="${{cmdline%/cmdline}}" + pid="${{pid##*/}}" + case "$pid" in "$$"|"$PPID") continue ;; esac + cmd="$(tr '\0' ' ' < "$cmdline" 2>/dev/null || true)" + [ -n "$cmd" ] || continue + case "$cmd" in + *"$install_root/app/"*) + case "$cmd" in + *"surge-supervisor"*) ;; + *) + app_process_running=true + if contains_target_first_run "$cmd"; then + target_app_process_running=true + else + stale_app_process_running=true + fi + ;; + esac + ;; + esac + if [ -n "$supervisor_id" ]; then + case "$cmd" in + *"surge-supervisor"*"--id $supervisor_id"*) + supervisor_process_running=true + if watched_pid_is_running "$cmd"; then + supervisor_waiting_for_previous_child=true + fi + case " $cmd " in + *" --surge-first-run "*) if ! contains_target_first_run "$cmd"; then stale_supervisor_process_running=true; fi ;; + esac + ;; + esac + fi + done +fi +printf 'missing=false\n' +printf 'manifest=%s\n' "$manifest" +printf 'install_root=%s\n' "$install_root" +printf 'proc_available=%s\n' "$proc_available" +printf 'app_process_running=%s\n' "$app_process_running" +printf 'target_app_process_running=%s\n' "$target_app_process_running" +printf 'stale_app_process_running=%s\n' "$stale_app_process_running" +if [ -n "$supervisor_id" ]; then printf 'supervisor_configured=true\n'; else printf 'supervisor_configured=false\n'; fi +printf 'supervisor_process_running=%s\n' "$supervisor_process_running" +printf 'supervisor_waiting_for_previous_child=%s\n' "$supervisor_waiting_for_previous_child" +printf 'stale_supervisor_process_running=%s\n' "$stale_supervisor_process_running" +printf '{PROBE_END}\n' +printf '{RUNTIME_BEGIN}\n' +cat "$manifest" +printf '\n{RUNTIME_END}\n' +printf '{UPDATE_BEGIN}\n' +if [ -f "$status_file" ]; then + cat "$status_file" +fi +printf '\n{UPDATE_END}\n' +"# + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_remote_probe_reads_sections() { + let output = format!( + "{PROBE_BEGIN}\nmissing=false\nproc_available=true\napp_process_running=true\ntarget_app_process_running=true\nstale_app_process_running=false\nsupervisor_configured=false\nsupervisor_process_running=false\nsupervisor_waiting_for_previous_child=false\nstale_supervisor_process_running=false\n{PROBE_END}\n{RUNTIME_BEGIN}\nid: sample-app\nversion: 1.2.3\nchannel: sample-channel\ninstallDirectory: sample-app\n{RUNTIME_END}\n{UPDATE_BEGIN}\n{UPDATE_END}\n", + ); + + let probe = parse_remote_probe(&output).expect("probe should parse"); + + assert!(!probe.missing); + assert!(probe.process.app_process_running()); + assert!(probe.runtime_yaml.expect("runtime yaml").contains("sample-app")); + } +} diff --git a/crates/surge-cli/src/commands/fleet_status/tailscale.rs b/crates/surge-cli/src/commands/fleet_status/tailscale.rs new file mode 100644 index 0000000..4c94180 --- /dev/null +++ b/crates/surge-cli/src/commands/fleet_status/tailscale.rs @@ -0,0 +1,630 @@ +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use surge_core::error::{Result, SurgeError}; +use tokio::sync::Semaphore; +use tokio::task::JoinSet; + +#[cfg(test)] +use super::probe::{AppProcessSummary, SupervisorHandoffSummary, SupervisorProcessSummary}; +use super::probe::{ + ProcessSummary, RemoteUpdateStatus, build_remote_probe_script, parse_remote_probe, run_tailscale_capture_timeout, +}; +use crate::cli::FleetStatusTailscaleOptions; +use crate::commands::install::{resolve_tailscale_targets, shell_single_quote}; +use crate::logline; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct FleetStatusOutcome { + pub(crate) all_healthy: bool, +} + +#[derive(Debug, Clone)] +struct ProbeConfig { + app_id: String, + rid: String, + channel: String, + version: String, + timeout: Duration, +} + +#[derive(Debug, Clone)] +struct TargetNode { + index: usize, + ssh_target: String, + display_target: String, +} + +#[derive(Debug, Clone, Serialize)] +struct FleetStatusReport { + app_id: String, + rid: String, + channel: String, + version: String, + total: usize, + healthy: usize, + unhealthy: usize, + nodes: Vec, +} + +#[derive(Debug, Clone, Serialize)] +struct NodeStatus { + #[serde(skip)] + index: usize, + node: String, + status: NodeHealthStatus, + #[serde(skip_serializing_if = "Option::is_none")] + reason: Option, + #[serde(skip_serializing_if = "Option::is_none")] + installed: Option, + #[serde(skip_serializing_if = "Option::is_none")] + update_status: Option, + #[serde(skip_serializing_if = "Option::is_none")] + process: Option, + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +enum NodeHealthStatus { + Healthy, + Unreachable, + Missing, + WrongApp, + WrongChannel, + Stale, + Degraded, +} + +impl NodeHealthStatus { + fn as_str(self) -> &'static str { + match self { + Self::Healthy => "healthy", + Self::Unreachable => "unreachable", + Self::Missing => "missing", + Self::WrongApp => "wrong_app", + Self::WrongChannel => "wrong_channel", + Self::Stale => "stale", + Self::Degraded => "degraded", + } + } +} + +#[derive(Debug, Clone, Serialize)] +struct InstalledSummary { + app_id: String, + version: String, + channel: String, + install_directory: String, + #[serde(skip_serializing_if = "Option::is_none")] + supervisor_id: Option, +} + +#[derive(Debug, Clone, Serialize)] +struct UpdateStatusSummary { + state: String, + installed_version: String, + target_version: String, + channel: String, + app_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + current_phase: Option, + #[serde(skip_serializing_if = "Option::is_none")] + last_progress_at_utc: Option, + #[serde(skip_serializing_if = "Option::is_none")] + reason: Option, +} + +#[derive(Debug, Clone, Deserialize)] +struct RuntimeManifest { + id: String, + version: String, + channel: String, + #[serde(rename = "installDirectory", default)] + install_directory: String, + #[serde(rename = "supervisorId", default)] + supervisor_id: String, +} + +pub(crate) async fn execute_tailscale(options: FleetStatusTailscaleOptions) -> Result { + let config = ProbeConfig { + app_id: required_value(&options.app_id, "--app-id")?, + rid: required_value(&options.rid, "--rid")?, + channel: required_value(&options.channel, "--channel")?, + version: required_value(&options.version, "--version")?, + timeout: Duration::from_secs(validate_positive_u64(options.timeout_seconds, "--timeout-seconds")?), + }; + let concurrency = validate_positive_usize(options.concurrency, "--concurrency")?; + let nodes = collect_nodes(&options.node, options.nodes_file.as_deref())?; + if nodes.is_empty() { + return Err(SurgeError::Config( + "Provide at least one --node or a non-empty --nodes-file.".to_string(), + )); + } + ensure_tailscale_command_available()?; + + let targets = resolve_targets(&nodes, options.node_user.as_deref())?; + let report = probe_nodes(config, targets, concurrency).await?; + emit_report(&report, options.json)?; + + Ok(FleetStatusOutcome { + all_healthy: report.unhealthy == 0, + }) +} + +fn required_value(value: &str, option: &str) -> Result { + let value = value.trim(); + if value.is_empty() { + return Err(SurgeError::Config(format!("{option} cannot be empty."))); + } + Ok(value.to_string()) +} + +fn validate_positive_usize(value: usize, option: &str) -> Result { + if value == 0 { + return Err(SurgeError::Config(format!("{option} must be greater than zero."))); + } + Ok(value) +} + +fn validate_positive_u64(value: u64, option: &str) -> Result { + if value == 0 { + return Err(SurgeError::Config(format!("{option} must be greater than zero."))); + } + Ok(value) +} + +fn collect_nodes(inline_nodes: &[String], nodes_file: Option<&Path>) -> Result> { + let mut nodes = Vec::new(); + for node in inline_nodes { + push_node_value(&mut nodes, node); + } + + if let Some(path) = nodes_file { + let raw = std::fs::read_to_string(path) + .map_err(|e| SurgeError::Config(format!("Failed to read nodes file '{}': {e}", path.display())))?; + for line in raw.lines() { + push_node_value(&mut nodes, line); + } + } + + Ok(nodes) +} + +fn push_node_value(nodes: &mut Vec, value: &str) { + let trimmed = value.trim(); + if trimmed.is_empty() || trimmed.starts_with('#') { + return; + } + nodes.push(trimmed.to_string()); +} + +fn ensure_tailscale_command_available() -> Result<()> { + which::which("tailscale") + .map(|_| ()) + .map_err(|e| SurgeError::Config(format!("tailscale command is not available on PATH: {e}"))) +} + +fn resolve_targets(nodes: &[String], node_user: Option<&str>) -> Result> { + nodes + .iter() + .enumerate() + .map(|(index, node)| { + let (ssh_target, display_target) = resolve_tailscale_targets(node, node_user)?; + Ok(TargetNode { + index, + ssh_target, + display_target, + }) + }) + .collect() +} + +async fn probe_nodes(config: ProbeConfig, targets: Vec, concurrency: usize) -> Result { + let semaphore = Arc::new(Semaphore::new(concurrency)); + let config = Arc::new(config); + let mut tasks = JoinSet::new(); + + for target in targets { + let permit = semaphore + .clone() + .acquire_owned() + .await + .map_err(|e| SurgeError::Platform(format!("Failed to acquire probe concurrency permit: {e}")))?; + let config = Arc::clone(&config); + tasks.spawn(async move { + let _permit = permit; + probe_node(config, target).await + }); + } + + let mut nodes = Vec::new(); + while let Some(joined) = tasks.join_next().await { + let status = joined.map_err(|e| SurgeError::Platform(format!("Fleet status probe task failed: {e}")))?; + nodes.push(status); + } + nodes.sort_by_key(|node| node.index); + + let healthy = nodes + .iter() + .filter(|node| node.status == NodeHealthStatus::Healthy) + .count(); + let total = nodes.len(); + Ok(FleetStatusReport { + app_id: config.app_id.clone(), + rid: config.rid.clone(), + channel: config.channel.clone(), + version: config.version.clone(), + total, + healthy, + unhealthy: total.saturating_sub(healthy), + nodes, + }) +} + +async fn probe_node(config: Arc, target: TargetNode) -> NodeStatus { + let script = build_remote_probe_script(&config.app_id, &config.version); + let remote_command = format!("sh -lc {}", shell_single_quote(&script)); + match run_tailscale_capture_timeout(&target.ssh_target, &remote_command, config.timeout).await { + Ok(output) => classify_probe_output(&config, &target, &output), + Err(error) => NodeStatus { + index: target.index, + node: target.display_target, + status: NodeHealthStatus::Unreachable, + reason: Some("tailscale ssh probe failed".to_string()), + installed: None, + update_status: None, + process: None, + error: Some(error), + }, + } +} + +fn classify_probe_output(config: &ProbeConfig, target: &TargetNode, output: &str) -> NodeStatus { + let parsed = match parse_remote_probe(output) { + Ok(parsed) => parsed, + Err(error) => { + return NodeStatus { + index: target.index, + node: target.display_target.clone(), + status: NodeHealthStatus::Degraded, + reason: Some("remote probe returned an unreadable result".to_string()), + installed: None, + update_status: None, + process: None, + error: Some(error), + }; + } + }; + + if parsed.missing { + return NodeStatus { + index: target.index, + node: target.display_target.clone(), + status: NodeHealthStatus::Missing, + reason: Some(format!("no runtime manifest found for '{}'", config.app_id)), + installed: None, + update_status: None, + process: Some(parsed.process), + error: None, + }; + } + + let Some(runtime_yaml) = parsed.runtime_yaml.as_deref() else { + return NodeStatus { + index: target.index, + node: target.display_target.clone(), + status: NodeHealthStatus::Missing, + reason: Some("runtime manifest was not returned by the probe".to_string()), + installed: None, + update_status: None, + process: Some(parsed.process), + error: None, + }; + }; + + let manifest = match serde_yaml::from_str::(runtime_yaml) { + Ok(manifest) => manifest, + Err(error) => { + return NodeStatus { + index: target.index, + node: target.display_target.clone(), + status: NodeHealthStatus::Degraded, + reason: Some("runtime manifest could not be parsed".to_string()), + installed: None, + update_status: None, + process: Some(parsed.process), + error: Some(error.to_string()), + }; + } + }; + let update_status = parsed.update_status; + let (status, reason) = classify_node(config, &manifest, update_status.as_ref(), &parsed.process); + NodeStatus { + index: target.index, + node: target.display_target.clone(), + status, + reason, + installed: Some(installed_summary(&manifest)), + update_status: update_status.map(update_status_summary), + process: Some(parsed.process), + error: None, + } +} + +fn classify_node( + config: &ProbeConfig, + manifest: &RuntimeManifest, + update_status: Option<&RemoteUpdateStatus>, + process: &ProcessSummary, +) -> (NodeHealthStatus, Option) { + if manifest.id.trim() != config.app_id { + return ( + NodeHealthStatus::WrongApp, + Some(format!( + "expected app id '{}' but found '{}'", + config.app_id, + manifest.id.trim() + )), + ); + } + if manifest.channel.trim() != config.channel { + return ( + NodeHealthStatus::WrongChannel, + Some(format!( + "expected channel '{}' but found '{}'", + config.channel, + manifest.channel.trim() + )), + ); + } + if manifest.version.trim() != config.version { + return ( + NodeHealthStatus::Stale, + Some(format!( + "expected version '{}' but found '{}'", + config.version, + manifest.version.trim() + )), + ); + } + if process.app_process_running() && !process.target_app_process_running() { + return ( + NodeHealthStatus::Stale, + Some("app process is running without target-version first-run proof".to_string()), + ); + } + if process.stale_supervisor_process_running() { + return ( + NodeHealthStatus::Stale, + Some("supervisor process is running with stale first-run proof".to_string()), + ); + } + + let mut degraded_reasons = Vec::new(); + if !process.proc_available { + degraded_reasons.push("process table was not readable".to_string()); + } + if !process.app_process_running() { + degraded_reasons.push("app process was not found".to_string()); + } + if process.supervisor_configured() && !process.supervisor_process_running() { + degraded_reasons.push("supervisor process was not found".to_string()); + } + if process.supervisor_waiting_for_previous_child() { + degraded_reasons.push("supervisor is still waiting for a previous child process".to_string()); + } + if let Some(status) = update_status { + collect_update_status_degradation(config, status, &mut degraded_reasons); + } + + if degraded_reasons.is_empty() { + (NodeHealthStatus::Healthy, None) + } else { + (NodeHealthStatus::Degraded, Some(degraded_reasons.join("; "))) + } +} + +fn collect_update_status_degradation( + config: &ProbeConfig, + status: &RemoteUpdateStatus, + degraded_reasons: &mut Vec, +) { + let state = status.state.trim(); + if matches!(state, "failed" | "in_progress" | "pending_restart") { + degraded_reasons.push(format!("update status is {state}")); + } + if !status.app_id.trim().is_empty() && status.app_id.trim() != config.app_id { + degraded_reasons.push(format!( + "update status app id is '{}' instead of '{}'", + status.app_id.trim(), + config.app_id + )); + } + if !status.channel.trim().is_empty() && status.channel.trim() != config.channel { + degraded_reasons.push(format!( + "update status channel is '{}' instead of '{}'", + status.channel.trim(), + config.channel + )); + } + if !status.target_version.trim().is_empty() && status.target_version.trim() != config.version { + degraded_reasons.push(format!( + "update status target version is '{}' instead of '{}'", + status.target_version.trim(), + config.version + )); + } +} + +fn installed_summary(manifest: &RuntimeManifest) -> InstalledSummary { + InstalledSummary { + app_id: manifest.id.trim().to_string(), + version: manifest.version.trim().to_string(), + channel: manifest.channel.trim().to_string(), + install_directory: manifest.install_directory.trim().to_string(), + supervisor_id: non_empty_string(&manifest.supervisor_id), + } +} + +fn update_status_summary(status: RemoteUpdateStatus) -> UpdateStatusSummary { + UpdateStatusSummary { + state: status.state, + installed_version: status.installed_version, + target_version: status.target_version, + channel: status.channel, + app_id: status.app_id, + current_phase: status.current_phase, + last_progress_at_utc: status.last_progress_at_utc, + reason: status.reason, + } +} + +fn non_empty_string(value: &str) -> Option { + let value = value.trim(); + (!value.is_empty()).then(|| value.to_string()) +} + +fn emit_report(report: &FleetStatusReport, as_json: bool) -> Result<()> { + if as_json { + let json = serde_json::to_string_pretty(report) + .map_err(|e| SurgeError::Config(format!("Failed to encode fleet status as JSON: {e}")))?; + logline::emit_raw(&json); + return Ok(()); + } + + logline::info(&format!( + "Fleet status for '{}' v{} ({}, {}) across {} node(s)", + report.app_id, report.version, report.channel, report.rid, report.total + )); + for node in &report.nodes { + let message = if let Some(reason) = node.reason.as_deref() { + format!("{}: {} - {reason}", node.node, node.status.as_str()) + } else { + format!("{}: {}", node.node, node.status.as_str()) + }; + if node.status == NodeHealthStatus::Healthy { + logline::success(&message); + } else { + logline::warn(&message); + } + } + if report.unhealthy == 0 { + logline::success(&format!("All {} node(s) are healthy.", report.total)); + } else { + logline::warn(&format!( + "{} of {} node(s) are unhealthy or unreachable.", + report.unhealthy, report.total + )); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_config() -> ProbeConfig { + ProbeConfig { + app_id: "sample-app".to_string(), + rid: "linux-x64".to_string(), + channel: "sample-channel".to_string(), + version: "1.2.3".to_string(), + timeout: Duration::from_secs(20), + } + } + + fn sample_manifest() -> RuntimeManifest { + RuntimeManifest { + id: "sample-app".to_string(), + version: "1.2.3".to_string(), + channel: "sample-channel".to_string(), + install_directory: "sample-app".to_string(), + supervisor_id: "sample-supervisor".to_string(), + } + } + + fn healthy_process() -> ProcessSummary { + ProcessSummary { + proc_available: true, + app: AppProcessSummary { + app_process_running: true, + target_app_process_running: true, + stale_app_process_running: false, + }, + supervisor: SupervisorProcessSummary { + supervisor_configured: true, + supervisor_process_running: true, + handoff: SupervisorHandoffSummary { + supervisor_waiting_for_previous_child: false, + stale_supervisor_process_running: false, + }, + }, + } + } + + #[test] + fn collect_nodes_combines_inline_and_file_entries() { + let tmp = tempfile::NamedTempFile::new().expect("nodes file"); + std::fs::write(tmp.path(), "\n# ignored\nnode-b\noperator@example-node\n").expect("write nodes file"); + let nodes = collect_nodes(&["node-a".to_string()], Some(tmp.path())).expect("nodes should parse"); + + assert_eq!(nodes, ["node-a", "node-b", "operator@example-node"]); + } + + #[test] + fn classify_node_marks_matching_runtime_healthy() { + let (status, reason) = classify_node(&sample_config(), &sample_manifest(), None, &healthy_process()); + + assert_eq!(status, NodeHealthStatus::Healthy); + assert!(reason.is_none()); + } + + #[test] + fn classify_node_detects_wrong_app() { + let mut manifest = sample_manifest(); + manifest.id = "other-app".to_string(); + + let (status, reason) = classify_node(&sample_config(), &manifest, None, &healthy_process()); + + assert_eq!(status, NodeHealthStatus::WrongApp); + assert!(reason.expect("reason").contains("other-app")); + } + + #[test] + fn classify_node_detects_wrong_channel() { + let mut manifest = sample_manifest(); + manifest.channel = "other-channel".to_string(); + + let (status, reason) = classify_node(&sample_config(), &manifest, None, &healthy_process()); + + assert_eq!(status, NodeHealthStatus::WrongChannel); + assert!(reason.expect("reason").contains("other-channel")); + } + + #[test] + fn classify_node_detects_stale_version() { + let mut manifest = sample_manifest(); + manifest.version = "1.2.2".to_string(); + + let (status, reason) = classify_node(&sample_config(), &manifest, None, &healthy_process()); + + assert_eq!(status, NodeHealthStatus::Stale); + assert!(reason.expect("reason").contains("1.2.2")); + } + + #[test] + fn classify_node_detects_degraded_process_state() { + let mut process = healthy_process(); + process.app.app_process_running = false; + process.app.target_app_process_running = false; + + let (status, reason) = classify_node(&sample_config(), &sample_manifest(), None, &process); + + assert_eq!(status, NodeHealthStatus::Degraded); + assert!(reason.expect("reason").contains("app process was not found")); + } +} diff --git a/crates/surge-cli/src/commands/install/mod.rs b/crates/surge-cli/src/commands/install/mod.rs index 3e77e96..8c98981 100644 --- a/crates/surge-cli/src/commands/install/mod.rs +++ b/crates/surge-cli/src/commands/install/mod.rs @@ -20,6 +20,7 @@ use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt, BufRead use tokio::process::Command; pub(crate) use self::progress::{make_progress_bar, make_spinner, shell_single_quote}; +pub(crate) use self::remote::resolve_tailscale_targets; pub(crate) use self::resolution::selected_install_manifest_path; pub(crate) use self::runtime::{ auto_start_after_install, host_can_build_installer_locally, install_package_locally, release_install_profile, @@ -32,10 +33,7 @@ use self::profile::{ build_rid_candidates, derive_base_rid, detect_local_profile, warn_if_local_rid_looks_incompatible, }; use self::releases::{ArchiveAcquisition, download_release_archive, fetch_release_index, select_release}; -use self::remote::{ - ensure_supported_tailscale_rid, install_release_via_tailscale, resolve_tailscale_targets, - verify_remote_stage_readiness, -}; +use self::remote::{ensure_supported_tailscale_rid, install_release_via_tailscale, verify_remote_stage_readiness}; use self::resolution::{ build_storage_config_with_overrides, build_storage_config_without_manifest, load_install_manifest_if_available, resolve_install_app_id_without_manifest, resolve_tailscale_rid_without_manifest, diff --git a/crates/surge-cli/src/commands/mod.rs b/crates/surge-cli/src/commands/mod.rs index 28ade37..a6605b3 100644 --- a/crates/surge-cli/src/commands/mod.rs +++ b/crates/surge-cli/src/commands/mod.rs @@ -1,5 +1,6 @@ pub mod compact; pub mod demote; +pub mod fleet_status; pub mod init; pub mod install; pub mod list; diff --git a/crates/surge-cli/src/main.rs b/crates/surge-cli/src/main.rs index 36056d3..a7b30c4 100644 --- a/crates/surge-cli/src/main.rs +++ b/crates/surge-cli/src/main.rs @@ -15,7 +15,7 @@ mod logline; mod prompts; mod ui; -use cli::{Cli, Commands, InstallMethod, LockAction, TuneAction}; +use cli::{Cli, Commands, FleetStatusAction, InstallMethod, LockAction, TuneAction}; fn main() -> ExitCode { let started = Instant::now(); @@ -70,7 +70,7 @@ fn main() -> ExitCode { let result = rt.block_on(run(cli)); match result { - Ok(()) => ExitCode::SUCCESS, + Ok(exit_code) => exit_code, Err(e) => { logline::error_chain(&e); ExitCode::FAILURE @@ -78,10 +78,10 @@ fn main() -> ExitCode { } } -async fn run(cli: Cli) -> surge_core::error::Result<()> { +async fn run(cli: Cli) -> surge_core::error::Result { let manifest_path = cli.manifest_path; - match cli.command { + let result = match cli.command { Commands::Init { app_id, name, @@ -270,6 +270,20 @@ async fn run(cli: Cli) -> surge_core::error::Result<()> { Commands::Status { install_dir, json } => commands::status::execute(&install_dir, json), + Commands::FleetStatus { action } => { + return match action { + FleetStatusAction::Tailscale(options) => { + commands::fleet_status::execute_tailscale(options).await.map(|outcome| { + if outcome.all_healthy { + ExitCode::SUCCESS + } else { + ExitCode::from(2) + } + }) + } + }; + } + Commands::Install { method, target, @@ -341,5 +355,7 @@ async fn run(cli: Cli) -> surge_core::error::Result<()> { ) .await } - } + }; + + result.map(|()| ExitCode::SUCCESS) }