Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions crates/surge-cli/src/commands/install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1146,17 +1146,27 @@ mod tests {
);

assert!(probe.contains("active_exe=\"$install_root/app/$main_exe\""));
assert!(probe.contains("status_file=\"$install_root/.surge-update-status.json\""));
assert!(probe.contains("status_converged=0"));
assert!(probe.contains("contains_target_first_run()"));
assert!(probe.contains("watched_pid_is_running()"));
assert!(probe.contains("contains_target_version_arg()"));
assert!(probe.contains("process_exe_matches_active()"));
assert!(probe.contains("extract_watched_pid()"));
assert!(probe.contains("*\" --surge-first-run $version \"*|*\" $version --surge-first-run \"*"));
assert!(probe.contains("*'\"state\":\"converged\"'*"));
assert!(probe.contains("*'\"installed_version\":\"'\"$version\"'\"'*"));
assert!(probe.contains("[ \"$status_converged\" -eq 1 ] && process_exe_matches_active \"$pid\""));
assert!(probe.contains("target_app_pids"));
assert!(probe.contains("target_supervisor_seen"));
assert!(probe.contains("surge-supervisor"));
assert!(probe.contains("--id $supervisor_id"));
assert!(probe.contains("app process for $active_exe was not found"));
assert!(probe.contains("app process for $active_exe is running without --surge-first-run proof for $version"));
assert!(probe.contains("app process for $active_exe is running without target proof for $version"));
assert!(probe.contains("stale app process for $active_exe is still running without target proof for $version"));
assert!(probe.contains("supervisor process '$supervisor_id' is still waiting for the previous child"));
assert!(probe.contains("supervisor process '$supervisor_id' is running with stale first-run proof"));
assert!(probe.contains("supervisor process '$supervisor_id' was not found"));
assert!(probe.contains("supervisor process '$supervisor_id' is not watching target app process for $version"));
}

#[cfg(unix)]
Expand Down
27 changes: 21 additions & 6 deletions crates/surge-cli/src/commands/install/remote/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,32 +245,47 @@ pub(crate) fn build_remote_process_verification_probe(
) -> String {
format!(
r#"install_root={}; main_exe={}; supervisor_id={}; version={};
active_exe="$install_root/app/$main_exe"; app_seen=0; target_app_seen=0; stale_app_seen=0; supervisor_seen=0; stale_supervisor_seen=0; waiting_supervisor_seen=0;
active_exe="$install_root/app/$main_exe"; status_file="$install_root/.surge-update-status.json"; app_seen=0; target_app_seen=0; stale_app_seen=0; supervisor_seen=0; stale_supervisor_seen=0; waiting_supervisor_seen=0; target_supervisor_seen=0; target_app_pids=""; watched_pids=""; status_converged=0;
if [ -r "$status_file" ]; then
status_compact="$(tr -d '[:space:]' < "$status_file" 2>/dev/null || true)";
status_has_state=0; status_has_installed=0; status_has_target=0;
case "$status_compact" in *'"state":"converged"'*) status_has_state=1 ;; esac;
case "$status_compact" in *'"installed_version":"'"$version"'"'*) status_has_installed=1 ;; esac;
case "$status_compact" in *'"target_version":"'"$version"'"'*) status_has_target=1 ;; esac;
if [ "$status_has_state" -eq 1 ] && [ "$status_has_installed" -eq 1 ] && [ "$status_has_target" -eq 1 ]; then status_converged=1; fi;
fi;
contains_target_first_run() {{ cmd_tokens=" $1 "; case "$cmd_tokens" in *" --surge-first-run $version "*|*" $version --surge-first-run "*) return 0 ;; esac; return 1; }}
watched_pid_is_running() {{ case "$1" in *" watch "*" --pid "*) rest="${{1#* --pid }}"; watched_pid="${{rest%% *}}"; case "$watched_pid" in ""|*[!0-9]*) return 1 ;; esac; kill -0 "$watched_pid" 2>/dev/null ;; esac; return 1; }}
contains_target_version_arg() {{ cmd_tokens=" $1 "; case "$cmd_tokens" in *" $version "*) return 0 ;; esac; return 1; }}
contains_target_proof() {{ contains_target_first_run "$1" || contains_target_version_arg "$1"; }}
process_exe_matches_active() {{ actual="$(readlink "/proc/$1/exe" 2>/dev/null || true)"; [ "$actual" = "$active_exe" ]; }}
extract_watched_pid() {{ case "$1" in *" watch "*" --pid "*) rest="${{1#* --pid }}"; watched_pid="${{rest%% *}}"; case "$watched_pid" in ""|*[!0-9]*) return 1 ;; esac; printf '%s\n' "$watched_pid"; return 0 ;; esac; return 1; }}
for cmdline in /proc/[0-9]*/cmdline; do
[ -r "$cmdline" ] || continue;
pid="${{cmdline%/cmdline}}"; pid="${{pid##*/}}";
case "$pid" in "$$"|"$PPID") continue ;; esac;
cmd="$(tr '\0' ' ' < "$cmdline" 2>/dev/null || true)";
[ -n "$cmd" ] || continue;
case "$cmd" in *"surge-supervisor"*) ;; *"$active_exe"*) app_seen=1; if contains_target_first_run "$cmd"; then target_app_seen=1; else stale_app_seen=1; fi ;; esac;
case "$cmd" in *"surge-supervisor"*) ;; *"$active_exe"*) app_seen=1; if contains_target_proof "$cmd" || {{ [ "$status_converged" -eq 1 ] && process_exe_matches_active "$pid"; }}; then target_app_seen=1; target_app_pids="${{target_app_pids}}${{pid}} "; else stale_app_seen=1; fi ;; esac;
if [ -n "$supervisor_id" ]; then
case "$cmd" in *"surge-supervisor"*"--id $supervisor_id"*)
supervisor_seen=1;
if watched_pid_is_running "$cmd"; then waiting_supervisor_seen=1; fi;
case " $cmd " in *" --surge-first-run "*) if ! contains_target_first_run "$cmd"; then stale_supervisor_seen=1; fi ;; esac
if watched_pid="$(extract_watched_pid "$cmd")"; then watched_pids="${{watched_pids}}${{watched_pid}} "; fi;
case " $cmd " in *" --surge-first-run "*) if contains_target_first_run "$cmd"; then target_supervisor_seen=1; else stale_supervisor_seen=1; fi ;; esac
;; esac;
fi;
done;
for watched_pid in $watched_pids; do
case " $target_app_pids " in *" $watched_pid "*) target_supervisor_seen=1 ;; *) if kill -0 "$watched_pid" 2>/dev/null; then waiting_supervisor_seen=1; fi ;; esac;
done;
if [ "$target_app_seen" -ne 1 ]; then
if [ "$app_seen" -eq 1 ]; then echo "app process for $active_exe is running without --surge-first-run proof for $version"; else echo "app process for $active_exe was not found"; fi;
if [ "$app_seen" -eq 1 ]; then echo "app process for $active_exe is running without target proof for $version"; else echo "app process for $active_exe was not found"; fi;
exit 0;
fi;
if [ "$stale_app_seen" -eq 1 ]; then echo "stale app process for $active_exe is still running without target proof for $version"; exit 0; fi;
if [ -n "$supervisor_id" ] && [ "$waiting_supervisor_seen" -eq 1 ]; then echo "supervisor process '$supervisor_id' is still waiting for the previous child"; exit 0; fi;
if [ -n "$supervisor_id" ] && [ "$stale_supervisor_seen" -eq 1 ]; then echo "supervisor process '$supervisor_id' is running with stale first-run proof"; exit 0; fi;
if [ -n "$supervisor_id" ] && [ "$supervisor_seen" -ne 1 ]; then echo "supervisor process '$supervisor_id' was not found"; exit 0; fi;
if [ -n "$supervisor_id" ] && [ "$target_supervisor_seen" -ne 1 ]; then echo "supervisor process '$supervisor_id' is not watching target app process for $version"; exit 0; fi;
echo ready"#,
shell_single_quote(&install_root.to_string_lossy()),
shell_single_quote(main_exe),
Expand Down
19 changes: 17 additions & 2 deletions crates/surge-core/src/install/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use self::launch::{auto_start_after_install, auto_start_after_install_sequen
pub use self::persistent_assets::{copy_persistent_assets, validate_relative_persistent_asset_path};
pub use self::runtime_manifest::{
LEGACY_RUNTIME_MANIFEST_RELATIVE_PATH, RUNTIME_MANIFEST_RELATIVE_PATH, RuntimeManifestMetadata,
storage_provider_manifest_name, write_runtime_manifest,
read_runtime_manifest_version, storage_provider_manifest_name, write_runtime_manifest,
};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -111,7 +111,8 @@ mod tests {

use super::{
InstallProfile, LEGACY_RUNTIME_MANIFEST_RELATIVE_PATH, RUNTIME_MANIFEST_RELATIVE_PATH, RuntimeManifestMetadata,
install_package_locally_at_root, storage_provider_manifest_name, write_runtime_manifest,
install_package_locally_at_root, read_runtime_manifest_version, storage_provider_manifest_name,
write_runtime_manifest,
};

#[test]
Expand Down Expand Up @@ -169,6 +170,20 @@ mod tests {
assert_eq!(raw, legacy_raw);
}

#[test]
fn read_runtime_manifest_version_prefers_current_manifest() {
let tmp = tempfile::tempdir().expect("temp dir should exist");
let current_path = tmp.path().join(RUNTIME_MANIFEST_RELATIVE_PATH);
let legacy_path = tmp.path().join(LEGACY_RUNTIME_MANIFEST_RELATIVE_PATH);
std::fs::create_dir_all(current_path.parent().unwrap()).unwrap();
std::fs::write(&current_path, "id: demo\nversion: 2.0.0\nchannel: test\n").unwrap();
std::fs::write(&legacy_path, "id: demo\nversion: 1.0.0\nchannel: test\n").unwrap();

let version = read_runtime_manifest_version(tmp.path()).unwrap();

assert_eq!(version.as_deref(), Some("2.0.0"));
}

#[test]
fn install_package_locally_preserves_declared_persistent_assets_and_prunes_snapshots() {
let tmp = tempfile::tempdir().expect("temp dir should exist");
Expand Down
26 changes: 25 additions & 1 deletion crates/surge-core/src/install/runtime_manifest.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::{Path, PathBuf};

use serde::Serialize;
use serde::{Deserialize, Serialize};

use crate::context::StorageProvider;
use crate::error::{Result, SurgeError};
Expand Down Expand Up @@ -58,6 +58,11 @@ struct RuntimeManifestFile<'a> {
endpoint: &'a str,
}

#[derive(Debug, Clone, Deserialize)]
struct RuntimeManifestVersion {
version: String,
}

#[derive(Debug, Clone, Default)]
pub(super) struct RuntimeManifestSnapshot {
runtime_manifest: Option<Vec<u8>>,
Expand Down Expand Up @@ -176,3 +181,22 @@ pub fn write_runtime_manifest(
std::fs::write(&legacy_manifest_path, yaml)?;
Ok(runtime_manifest_path)
}

pub fn read_runtime_manifest_version(active_app_dir: &Path) -> Result<Option<String>> {
for relative_path in [RUNTIME_MANIFEST_RELATIVE_PATH, LEGACY_RUNTIME_MANIFEST_RELATIVE_PATH] {
let path = active_app_dir.join(relative_path);
if !path.is_file() {
continue;
}

let raw = std::fs::read(&path)?;
let manifest: RuntimeManifestVersion = serde_yaml::from_slice(&raw)
.map_err(|e| SurgeError::Config(format!("Failed to parse runtime manifest '{}': {e}", path.display())))?;
let version = manifest.version.trim();
if !version.is_empty() {
return Ok(Some(version.to_string()));
}
}

Ok(None)
}
131 changes: 130 additions & 1 deletion crates/surge-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::ptr;

use surge_core::lock::mutex::DistributedMutex;
use surge_core::supervisor::state::{supervisor_pid_file, supervisor_stop_file, write_restart_args};
use surge_core::update::status::{UpdateConvergenceState, mark_restart_handoff_converged, read_update_status};

pub use crate::context::{
surge_config_set_lock_server, surge_config_set_resource_budget, surge_config_set_storage, surge_context_create,
Expand Down Expand Up @@ -272,7 +273,10 @@ pub unsafe extern "C" fn surge_supervisor_start(
Some(install_dir),
&BTreeMap::new(),
) {
Ok(_) => SURGE_OK,
Ok(_) => {
mark_self_supervised_runtime_converged(install_dir, exe.parent().unwrap_or(install_dir));
SURGE_OK
}
Err(e) => {
tracing::error!("supervisor_start failed: {e}");
SURGE_ERROR
Expand All @@ -281,6 +285,63 @@ pub unsafe extern "C" fn surge_supervisor_start(
}))
}

fn mark_self_supervised_runtime_converged(install_dir: &Path, active_app_dir: &Path) {
let record = match read_update_status(install_dir) {
Ok(Some(record)) => record,
Ok(None) => return,
Err(e) => {
tracing::warn!(
install_root = %install_dir.display(),
reason = %e,
"Failed reading update status before self-supervisor convergence proof"
);
return;
}
};
if !matches!(
record.state,
UpdateConvergenceState::InProgress | UpdateConvergenceState::PendingRestart
) || record.installed_version.trim() != record.target_version.trim()
{
return;
}

let runtime_version = match surge_core::install::read_runtime_manifest_version(active_app_dir) {
Ok(Some(version)) => version,
Ok(None) => return,
Err(e) => {
tracing::warn!(
install_root = %install_dir.display(),
app_dir = %active_app_dir.display(),
reason = %e,
"Failed reading runtime manifest before self-supervisor convergence proof"
);
return;
}
};
if runtime_version.trim() != record.target_version.trim() {
return;
}

match mark_restart_handoff_converged(install_dir, &record.target_version) {
Ok(Some(_)) => {
tracing::info!(
version = record.target_version,
"Restart handoff converged after self-supervisor accepted the target runtime"
);
}
Ok(None) => {}
Err(e) => {
tracing::warn!(
install_root = %install_dir.display(),
version = record.target_version,
reason = %e,
"Failed to record self-supervisor convergence proof"
);
}
}
}

/// Stop a supervised process watcher.
#[unsafe(no_mangle)]
pub unsafe extern "C" fn surge_supervisor_stop(install_dir: *const c_char, supervisor_id: *const c_char) -> i32 {
Expand Down Expand Up @@ -437,3 +498,71 @@ pub unsafe extern "C" fn surge_free_cstring(ptr: *mut c_char) {
// documented as `free()`-owned, or is null.
unsafe { crate::shared::libc_free(ptr.cast::<c_void>()) };
}

#[cfg(test)]
mod tests {
use std::path::{Path, PathBuf};

struct TestInstallDir {
path: PathBuf,
}

impl TestInstallDir {
fn new(name: &str) -> Self {
let unique = format!(
"{name}-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
);
let path = std::env::temp_dir().join(unique);
std::fs::create_dir(&path).unwrap();
Self { path }
}

fn path(&self) -> &Path {
&self.path
}
}

impl Drop for TestInstallDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}

#[test]
fn self_supervisor_converges_pending_status_when_runtime_manifest_matches() {
let tmp = TestInstallDir::new("surge-ffi-self-supervisor-converges");
let app_dir = tmp.path().join("app");
let runtime_manifest = app_dir.join(surge_core::install::RUNTIME_MANIFEST_RELATIVE_PATH);
std::fs::create_dir_all(runtime_manifest.parent().unwrap()).unwrap();
std::fs::write(&runtime_manifest, "id: demo-app\nversion: 2.0.0\nchannel: test\n").unwrap();
let pending = surge_core::update::status::UpdateStatusRecord::pending_restart_with_failure_phase(
"demo-app",
"2.0.0",
"2.0.0",
"test",
"2026-05-21T10:00:00Z".to_string(),
"2026-05-21T10:00:01Z".to_string(),
"waiting for old child",
surge_core::update::status::RESTART_HANDOFF_WAITING_FOR_OLD_CHILD_PHASE,
);
surge_core::update::status::write_update_status(tmp.path(), &pending).unwrap();

super::mark_self_supervised_runtime_converged(tmp.path(), &app_dir);

let status = surge_core::update::status::read_update_status(tmp.path())
.unwrap()
.expect("status should remain present");
assert_eq!(
status.state,
surge_core::update::status::UpdateConvergenceState::Converged
);
assert!(status.supervisor_restart_confirmed);
assert_eq!(status.failure_phase, None);
assert_eq!(status.reason, None);
}
}
Loading
Loading