From 01f934aae9eacbfad9c5fd37d2bcf85aa367f75e Mon Sep 17 00:00:00 2001 From: Vim <121349594+sammuti@users.noreply.github.com> Date: Thu, 5 Feb 2026 09:14:28 -0700 Subject: [PATCH 1/3] [TOW-1342] Timeout auto cleanup for subprocess (#195) * [TOW-1342] Timeout auto cleanup for subprocess * [TOW-1342] Timeout auto cleanup for subprocess * adjust polling interval --- crates/tower-runtime/src/auto_cleanup.rs | 325 +++++++++++++++++++++++ crates/tower-runtime/src/lib.rs | 8 + crates/tower-runtime/src/subprocess.rs | 64 +++-- 3 files changed, 371 insertions(+), 26 deletions(-) create mode 100644 crates/tower-runtime/src/auto_cleanup.rs diff --git a/crates/tower-runtime/src/auto_cleanup.rs b/crates/tower-runtime/src/auto_cleanup.rs new file mode 100644 index 00000000..992fb23f --- /dev/null +++ b/crates/tower-runtime/src/auto_cleanup.rs @@ -0,0 +1,325 @@ +//! Automatic cleanup timer for subprocess executions +//! +//! This module exists to handle the case where the control plane +//! disconnects and never sends a cleanup call to the runner. Under normal circumstances, +//! the control plane should always call cleanup after a run finishes. +//! ref: TOW-1342 + +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tmpdir::TmpDir; +use tokio::sync::Mutex; + +use crate::App; + +/// How often to poll the app status to check if it has reached terminal state +const STATUS_POLL_INTERVAL: Duration = Duration::from_secs(5); + +/// Spawns a background task that monitors an app and performs automatic cleanup +/// after a timeout if explicit cleanup hasn't been called. +/// +/// This task: +/// 1. Polls the app status every STATUS_POLL_INTERVAL +/// 2. When the app reaches a terminal state, waits for cleanup_timeout +/// 3. If cleanup_called flag is still false, performs cleanup and logs a warning +pub fn spawn_cleanup_monitor( + run_id: String, + app: Arc>, + package_tmp_dir: Arc>>, + uv_temp_dir: Arc>>, + cleanup_called: Arc, + cleanup_timeout: Duration, +) { + tokio::spawn(async move { + use tower_telemetry::{info, warn}; + + // Wait for terminal state + loop { + tokio::time::sleep(STATUS_POLL_INTERVAL).await; + let status = app.lock().await.status().await; + if matches!(status, Ok(s) if s.is_terminal()) { + info!( + "Run {} finished, starting {}s automatic cleanup timer", + run_id, + cleanup_timeout.as_secs() + ); + break; + } + } + + // Wait for cleanup timeout + tokio::time::sleep(cleanup_timeout).await; + + // Check if explicit cleanup was called + if cleanup_called.load(Ordering::Relaxed) { + return; + } + + // Perform automatic cleanup + warn!( + "Automatic cleanup triggered for run {} after {}s (control plane cleanup not received)", + run_id, + cleanup_timeout.as_secs() + ); + + if let Some(temp_dir) = uv_temp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(&temp_dir).await; + } + + if let Some(tmp_dir) = package_tmp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(tmp_dir.to_path_buf()).await; + } + + cleanup_called.store(true, Ordering::Relaxed); + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{errors::Error, StartOptions, Status}; + + /// Mock LocalApp for testing that allows controlled status transitions + struct MockLocalApp { + status: Arc>, + } + + impl MockLocalApp { + fn new(initial_status: Status, transition_to_terminal_after: Duration) -> Self { + let status = Arc::new(Mutex::new(initial_status)); + let app = Self { + status: status.clone(), + }; + + // Spawn background task to transition to terminal state after delay + tokio::spawn(async move { + tokio::time::sleep(transition_to_terminal_after).await; + *status.lock().await = Status::Exited; + }); + + app + } + } + + impl crate::App for MockLocalApp { + async fn start(_opts: StartOptions) -> Result { + unimplemented!("MockLocalApp doesn't support start") + } + + async fn terminate(&mut self) -> Result<(), Error> { + Ok(()) + } + + async fn status(&self) -> Result { + Ok(*self.status.lock().await) + } + } + + /// Helper to create temp directories for testing + async fn create_test_dirs() -> (TmpDir, PathBuf) { + let package_tmp = TmpDir::new("test-package") + .await + .expect("Failed to create package temp dir"); + + // Use timestamp for uniqueness + let unique_id = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let uv_temp_path = std::env::temp_dir().join(format!("test-uv-{}", unique_id)); + tokio::fs::create_dir_all(&uv_temp_path) + .await + .expect("Failed to create uv temp dir"); + + (package_tmp, uv_temp_path) + } + + #[tokio::test] + async fn test_automatic_cleanup_triggers_after_timeout() { + // Create a mock app that transitions to Exited after 100ms + let app = Arc::new(Mutex::new(MockLocalApp::new( + Status::Running, + Duration::from_millis(100), + ))); + + // Create temp directories + let (package_tmp, uv_temp_path) = create_test_dirs().await; + let package_tmp_dir = Arc::new(Mutex::new(Some(package_tmp))); + let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + // Verify directories exist before cleanup + assert!(tokio::fs::metadata(uv_temp_path.clone()).await.is_ok()); + + let cleanup_timeout = Duration::from_secs(1); + + // Spawn cleanup monitor + spawn_cleanup_monitor( + "test-run-1".to_string(), + app, + package_tmp_dir.clone(), + uv_temp_dir.clone(), + cleanup_called.clone(), + cleanup_timeout, + ); + + // Wait for: + // - App to transition (100ms) + // - Polling to detect terminal state (up to STATUS_POLL_INTERVAL) + // - Cleanup timeout + // - Buffer (1000ms) + let wait_time = Duration::from_millis(100) + + STATUS_POLL_INTERVAL + + cleanup_timeout + + Duration::from_secs(1); + tokio::time::sleep(wait_time).await; + + // Verify cleanup happened + assert!( + cleanup_called.load(Ordering::Relaxed), + "Cleanup flag should be set" + ); + + // Verify directories were removed + assert!( + uv_temp_dir.lock().await.is_none(), + "UV temp dir should be taken" + ); + assert!( + package_tmp_dir.lock().await.is_none(), + "Package temp dir should be taken" + ); + + // Verify actual filesystem cleanup + assert!( + tokio::fs::metadata(uv_temp_path).await.is_err(), + "UV temp directory should be deleted from filesystem" + ); + } + + #[tokio::test] + async fn test_explicit_cleanup_prevents_automatic_cleanup() { + // Create a mock app that transitions to Exited after 100ms + let app = Arc::new(Mutex::new(MockLocalApp::new( + Status::Running, + Duration::from_millis(100), + ))); + + // Create temp directories + let (package_tmp, uv_temp_path) = create_test_dirs().await; + let package_tmp_dir = Arc::new(Mutex::new(Some(package_tmp))); + let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + let cleanup_timeout = Duration::from_secs(1); + + // Spawn cleanup monitor + spawn_cleanup_monitor( + "test-run-2".to_string(), + app, + package_tmp_dir.clone(), + uv_temp_dir.clone(), + cleanup_called.clone(), + cleanup_timeout, + ); + + // Wait for app to transition + polling to detect it + let wait_before_cleanup = + Duration::from_millis(100) + STATUS_POLL_INTERVAL + Duration::from_millis(100); + tokio::time::sleep(wait_before_cleanup).await; + + // Simulate explicit cleanup call before timeout expires + cleanup_called.store(true, Ordering::Relaxed); + + // Manually clean up directories (simulating explicit cleanup) + if let Some(temp_dir) = uv_temp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(&temp_dir).await; + } + if let Some(tmp_dir) = package_tmp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(tmp_dir.to_path_buf()).await; + } + + // Wait past the cleanup timeout to ensure automatic cleanup would have triggered + tokio::time::sleep(cleanup_timeout + Duration::from_secs(1)).await; + + // Verify cleanup flag is still true + assert!( + cleanup_called.load(Ordering::Relaxed), + "Cleanup flag should remain set" + ); + + // Verify directories were already cleaned up + assert!( + uv_temp_dir.lock().await.is_none(), + "UV temp dir should already be taken" + ); + assert!( + package_tmp_dir.lock().await.is_none(), + "Package temp dir should already be taken" + ); + } + + #[tokio::test] + async fn test_cleanup_waits_for_terminal_state() { + // Create a mock app that takes longer to transition (6s) + let app = Arc::new(Mutex::new(MockLocalApp::new( + Status::Running, + Duration::from_secs(6), + ))); + + // Create temp directories + let (package_tmp, uv_temp_path) = create_test_dirs().await; + let package_tmp_dir = Arc::new(Mutex::new(Some(package_tmp))); + let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + let cleanup_timeout = Duration::from_millis(500); + + // Spawn cleanup monitor + spawn_cleanup_monitor( + "test-run-3".to_string(), + app.clone(), + package_tmp_dir.clone(), + uv_temp_dir.clone(), + cleanup_called.clone(), + cleanup_timeout, + ); + + // Check status well before transition + tokio::time::sleep(Duration::from_secs(2)).await; + + // Cleanup should NOT have happened yet because app is still Running + assert!( + !cleanup_called.load(Ordering::Relaxed), + "Cleanup should not trigger while app is still running" + ); + + // Verify directories still exist + assert!( + uv_temp_dir.lock().await.is_some(), + "UV temp dir should still exist" + ); + + // Wait for: + // - Rest of transition (4s more) + // - Polling to detect terminal state (up to STATUS_POLL_INTERVAL) + // - Cleanup timeout + // - Buffer (1s) + let remaining_wait = Duration::from_secs(4) + + STATUS_POLL_INTERVAL + + cleanup_timeout + + Duration::from_secs(1); + tokio::time::sleep(remaining_wait).await; + + // Now cleanup should have happened + assert!( + cleanup_called.load(Ordering::Relaxed), + "Cleanup should trigger after app reaches terminal state" + ); + + // Cleanup the temp directory manually if test failed to clean it up + let _ = tokio::fs::remove_dir_all(uv_temp_path).await; + } +} diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 74091edc..b4f56591 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -6,6 +6,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tower_package::Package; +pub mod auto_cleanup; pub mod errors; pub mod execution; pub mod local; @@ -44,6 +45,13 @@ pub enum Status { Crashed { code: i32 }, } +impl Status { + /// Returns true if this status represents a terminal state (run is finished) + pub fn is_terminal(&self) -> bool { + matches!(self, Status::Exited | Status::Crashed { .. }) + } +} + pub type OutputReceiver = UnboundedReceiver; pub type OutputSender = UnboundedSender; diff --git a/crates/tower-runtime/src/subprocess.rs b/crates/tower-runtime/src/subprocess.rs index 15603f5b..decdb1b0 100644 --- a/crates/tower-runtime/src/subprocess.rs +++ b/crates/tower-runtime/src/subprocess.rs @@ -1,5 +1,6 @@ //! Subprocess execution backend +use crate::auto_cleanup; use crate::errors::Error; use crate::execution::{ BackendCapabilities, CacheBackend, ExecutionBackend, ExecutionHandle, ExecutionSpec, @@ -10,6 +11,7 @@ use crate::{App, OutputReceiver, StartOptions, Status}; use async_trait::async_trait; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tmpdir::TmpDir; use tokio::fs::File; @@ -18,6 +20,9 @@ use tokio::sync::Mutex; use tokio::time::Duration; use tower_package::Package; +/// Cleanup timeout after a run finishes (5 minutes) +const CLEANUP_TIMEOUT: Duration = Duration::from_secs(5 * 60); + /// SubprocessBackend executes apps as a subprocess pub struct SubprocessBackend { /// Optional default cache directory to use @@ -146,14 +151,29 @@ impl ExecutionBackend for SubprocessBackend { }; // Start the LocalApp - let app = LocalApp::start(opts).await?; + let app = Arc::new(Mutex::new(LocalApp::start(opts).await?)); + + let package_tmp_dir = Arc::new(Mutex::new(package_tmp_dir)); + let uv_temp_dir = Arc::new(Mutex::new(uv_temp_dir)); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + // Spawn automatic cleanup monitor (temporary workaround for disconnected control plane) + auto_cleanup::spawn_cleanup_monitor( + spec.id.clone(), + Arc::clone(&app), + Arc::clone(&package_tmp_dir), + Arc::clone(&uv_temp_dir), + Arc::clone(&cleanup_called), + CLEANUP_TIMEOUT, + ); Ok(SubprocessHandle { id: spec.id, - app: Arc::new(Mutex::new(app)), + app, output_receiver: Arc::new(Mutex::new(output_receiver)), package_tmp_dir, uv_temp_dir, + cleanup_called, }) } @@ -181,22 +201,9 @@ pub struct SubprocessHandle { id: String, app: Arc>, output_receiver: Arc>, - package_tmp_dir: Option, // Track package temp directory for cleanup - uv_temp_dir: Option, // Track UV's temp directory for cleanup -} - -impl Drop for SubprocessHandle { - fn drop(&mut self) { - // Best-effort cleanup of UV temp directory when handle is dropped - if let Some(temp_dir) = self.uv_temp_dir.take() { - let _ = std::fs::remove_dir_all(&temp_dir); - } - - // Best-effort cleanup of package temp directory when handle is dropped - if let Some(tmp_dir) = self.package_tmp_dir.take() { - let _ = std::fs::remove_dir_all(tmp_dir.to_path_buf()); - } - } + package_tmp_dir: Arc>>, + uv_temp_dir: Arc>>, + cleanup_called: Arc, } #[async_trait] @@ -256,23 +263,28 @@ impl ExecutionHandle for SubprocessHandle { } async fn cleanup(&mut self) -> Result<(), Error> { + use tower_telemetry::{debug, info}; + + info!("Explicit cleanup called for run {}", self.id); + + // Mark cleanup as called (prevents timer from running) + self.cleanup_called.store(true, Ordering::Relaxed); + // Ensure the app is terminated self.terminate().await?; - // Clean up uv's temp directory if it was created - if let Some(ref temp_dir) = self.uv_temp_dir { - if let Err(e) = tokio::fs::remove_dir_all(temp_dir).await { - // Log but don't fail - cleanup is best-effort - tower_telemetry::debug!("Failed to clean up uv temp directory: {:?}", e); + // Clean up uv's temp directory + if let Some(temp_dir) = self.uv_temp_dir.lock().await.take() { + if let Err(e) = tokio::fs::remove_dir_all(&temp_dir).await { + debug!("Failed to clean up uv temp directory: {:?}", e); } } // Clean up package temp directory - if let Some(tmp_dir) = self.package_tmp_dir.take() { + if let Some(tmp_dir) = self.package_tmp_dir.lock().await.take() { let path = tmp_dir.to_path_buf(); if let Err(e) = tokio::fs::remove_dir_all(&path).await { - // Log but don't fail - cleanup is best-effort - tower_telemetry::debug!("Failed to clean up package temp directory: {:?}", e); + debug!("Failed to clean up package temp directory: {:?}", e); } } From 71809cb5efb08dcdec7b2e3783fb946a06f21c62 Mon Sep 17 00:00:00 2001 From: Vim Wickramasinghe Date: Fri, 6 Feb 2026 11:20:00 +0100 Subject: [PATCH 2/3] Version bump to 0.3.48 --- Cargo.lock | 22 +++++++++++----------- Cargo.toml | 2 +- pyproject.toml | 2 +- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a924570d..d0da63a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -491,7 +491,7 @@ dependencies = [ [[package]] name = "config" -version = "0.3.47" +version = "0.3.48" dependencies = [ "base64", "chrono", @@ -598,7 +598,7 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] name = "crypto" -version = "0.3.47" +version = "0.3.48" dependencies = [ "aes-gcm", "base64", @@ -3252,7 +3252,7 @@ dependencies = [ [[package]] name = "testutils" -version = "0.3.47" +version = "0.3.48" dependencies = [ "pem", "rsa", @@ -3522,7 +3522,7 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" [[package]] name = "tower" -version = "0.3.47" +version = "0.3.48" dependencies = [ "tokio", "tower-api", @@ -3547,7 +3547,7 @@ dependencies = [ [[package]] name = "tower-api" -version = "0.3.47" +version = "0.3.48" dependencies = [ "reqwest", "serde", @@ -3559,7 +3559,7 @@ dependencies = [ [[package]] name = "tower-cmd" -version = "0.3.47" +version = "0.3.48" dependencies = [ "axum", "bytes", @@ -3629,7 +3629,7 @@ checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-package" -version = "0.3.47" +version = "0.3.48" dependencies = [ "async-compression", "config", @@ -3647,7 +3647,7 @@ dependencies = [ [[package]] name = "tower-runtime" -version = "0.3.47" +version = "0.3.48" dependencies = [ "async-trait", "chrono", @@ -3670,7 +3670,7 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tower-telemetry" -version = "0.3.47" +version = "0.3.48" dependencies = [ "tracing", "tracing-appender", @@ -3679,7 +3679,7 @@ dependencies = [ [[package]] name = "tower-uv" -version = "0.3.47" +version = "0.3.48" dependencies = [ "async-compression", "async_zip", @@ -3697,7 +3697,7 @@ dependencies = [ [[package]] name = "tower-version" -version = "0.3.47" +version = "0.3.48" dependencies = [ "anyhow", "chrono", diff --git a/Cargo.toml b/Cargo.toml index c581771a..01167576 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ resolver = "2" [workspace.package] edition = "2021" -version = "0.3.47" +version = "0.3.48" description = "Tower is the best way to host Python data apps in production" rust-version = "1.81" authors = ["Brad Heller "] diff --git a/pyproject.toml b/pyproject.toml index c66354af..4fbb5c07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "maturin" [project] name = "tower" -version = "0.3.47" +version = "0.3.48" description = "Tower CLI and runtime environment for Tower." authors = [{ name = "Tower Computing Inc.", email = "brad@tower.dev" }] readme = "README.md" From f9db30df7cd59214afe6e2acbd48a2578dd207c5 Mon Sep 17 00:00:00 2001 From: Vim Wickramasinghe Date: Fri, 6 Feb 2026 11:20:52 +0100 Subject: [PATCH 3/3] uv lock --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index d5d4dd75..9c8797e1 100644 --- a/uv.lock +++ b/uv.lock @@ -2744,7 +2744,7 @@ wheels = [ [[package]] name = "tower" -version = "0.3.47" +version = "0.3.48" source = { editable = "." } dependencies = [ { name = "attrs" },