Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/tower-cmd/src/apps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ const FOLLOW_BACKOFF_MAX: Duration = Duration::from_secs(5);
const LOG_DRAIN_DURATION: Duration = Duration::from_secs(5);
const RUN_START_POLL_INTERVAL: Duration = Duration::from_millis(500);
const RUN_START_MESSAGE_DELAY: Duration = Duration::from_secs(3);
const RUN_START_TIMEOUT: Duration = Duration::from_secs(30);

async fn follow_logs(config: Config, name: String, seq: i64) {
let enable_ctrl_c = !output::get_output_mode().is_mcp();
Expand Down Expand Up @@ -265,6 +266,12 @@ async fn follow_logs(config: Config, name: String, seq: i64) {
let mut notified = false;
loop {
sleep(RUN_START_POLL_INTERVAL).await;

if wait_started.elapsed() > RUN_START_TIMEOUT {
output::error("Timed out waiting for run to start. The runner may be unavailable.");
return;
}

// Avoid blank output on slow starts while keeping fast starts quiet.
if should_notify_run_wait(notified, wait_started.elapsed()) {
output::write("Waiting for run to start...\n");
Expand Down
3 changes: 3 additions & 0 deletions crates/tower-cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ pub enum Error {
#[snafu(display("Run was cancelled"))]
RunCancelled,

#[snafu(display("Timed out waiting for run to start. The runner may be unavailable."))]
RunStartTimeout,

#[snafu(display("App crashed during local execution"))]
AppCrashed,

Expand Down
21 changes: 12 additions & 9 deletions crates/tower-cmd/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,21 +724,24 @@ fn create_pyiceberg_catalog_property_name(catalog_name: &str, property_name: &st
format!("PYICEBERG_CATALOG__{}__{}", catalog_name, property_name)
}

const RUN_START_TIMEOUT: Duration = Duration::from_secs(30);

/// wait_for_run_start waits for the run to enter a "running" state. It polls the API every 500ms to see
/// if it's started yet.
async fn wait_for_run_start(config: &Config, run: &Run) -> Result<(), Error> {
loop {
let res = api::describe_run(config, &run.app_name, run.number).await?;
timeout(RUN_START_TIMEOUT, async {
loop {
let res = api::describe_run(config, &run.app_name, run.number).await?;

if is_run_started(&res.run)? {
return Ok(());
}

if is_run_started(&res.run)? {
break;
} else {
// Wait half a second to to try again.
sleep(Duration::from_millis(500)).await;
}
}

Ok(())
})
.await
.map_err(|_| Error::RunStartTimeout)?
}

/// wait_for_run_completion waits for the run to enter an terminal state. It polls the API every
Expand Down
Loading