Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/staged/src-tauri/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,7 @@ pub fn run() {
// Check compatibility *before* creating the store.
let compat = store::check_db_compatibility(&db_path)
.map_err(|e| format!("Cannot check database: {e}"))?;
let session_registry = Arc::new(session_runner::SessionRegistry::new());

let (store_slot, reset_info) = match compat {
store::DbCompatibility::Ok => {
Expand All @@ -1611,6 +1612,7 @@ pub fn run() {
// owned by other live Staged instances untouched.
session_runner::recover_dead_sessions(
Arc::clone(&store_arc),
Arc::clone(&session_registry),
app.handle().clone(),
);
// Clean up images left in "pending" state from compose
Expand Down Expand Up @@ -1651,7 +1653,7 @@ pub fn run() {
};

app.manage(store_slot);
app.manage(Arc::new(session_runner::SessionRegistry::new()));
app.manage(session_registry);
app.manage(Arc::new(actions::ActionExecutor::new()));
app.manage(Arc::new(actions::ActionRegistry::new()));
app.manage(ShutdownState::default());
Expand Down
20 changes: 20 additions & 0 deletions apps/staged/src-tauri/src/session_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,27 @@ pub async fn drain_queued_sessions(
provider: Option<String>,
) -> Result<bool, String> {
let store = get_store(&store)?;
drain_queued_sessions_for_branch(
store,
Arc::clone(&registry),
app_handle,
branch_id,
provider,
)
.await
}

/// Start the oldest queued branch session if one exists and the branch is idle.
///
/// This is shared by the Tauri command and backend lifecycle hooks so queue
/// progression remains owned by the backend.
pub async fn drain_queued_sessions_for_branch(
store: Arc<Store>,
registry: Arc<session_runner::SessionRegistry>,
app_handle: tauri::AppHandle,
branch_id: String,
provider: Option<String>,
) -> Result<bool, String> {
// Bail out if the branch already has a running session to prevent
// concurrent sessions on the same branch.
if store
Expand Down
102 changes: 77 additions & 25 deletions apps/staged/src-tauri/src/session_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,41 +402,59 @@ pub fn start_session(
error_msg,
Some(&completion_reason),
);
}

// Trigger auto review when a commit session completes successfully,
// but only if there are no queued sessions waiting for this branch.
// Queued sessions take priority — the next one will be drained instead.
if let Some(branch_id) = committed_branch_id {
let has_queued = store_for_status
.get_queued_sessions_for_branch(&branch_id)
.map(|q| !q.is_empty())
.unwrap_or(false);
let branch_id = store_for_status
.get_branch_id_for_session(&session_id_for_status)
.ok()
.flatten();
let auto_review_branch_id = committed_branch_id.clone();

if !has_queued {
let store_for_auto = Arc::clone(&store_for_status);
let registry_for_auto = Arc::clone(&registry);
let app_handle_for_auto = app_handle.clone();
if let Some(branch_id) = branch_id {
let store_for_follow_up = Arc::clone(&store_for_status);
let registry_for_follow_up = Arc::clone(&registry);
let app_handle_for_follow_up = app_handle.clone();
tauri::async_runtime::spawn(async move {
match crate::session_commands::trigger_auto_review(
store_for_auto,
registry_for_auto,
app_handle_for_auto,
match crate::session_commands::drain_queued_sessions_for_branch(
Arc::clone(&store_for_follow_up),
Arc::clone(&registry_for_follow_up),
app_handle_for_follow_up.clone(),
branch_id.clone(),
None,
)
.await
{
Ok(resp) => {
log::info!(
"Auto review triggered for branch {branch_id}: session={}, review={}",
resp.session_id,
resp.artifact_id,
);
Ok(true) => {
log::info!("Drained next queued session for branch {branch_id}");
}
Ok(false) => {
if let Some(auto_review_branch_id) = auto_review_branch_id {
match crate::session_commands::trigger_auto_review(
store_for_follow_up,
registry_for_follow_up,
app_handle_for_follow_up,
auto_review_branch_id.clone(),
None,
)
.await
{
Ok(resp) => {
log::info!(
"Auto review triggered for branch {auto_review_branch_id}: session={}, review={}",
resp.session_id,
resp.artifact_id,
);
}
Err(e) => {
log::error!(
"Failed to trigger auto review for branch {auto_review_branch_id}: {e}"
);
}
}
}
}
Err(e) => {
log::error!(
"Failed to trigger auto review for branch {branch_id}: {e}"
"Failed to drain queued sessions for branch {branch_id}: {e}"
);
}
}
Expand All @@ -462,7 +480,11 @@ pub fn start_session(
/// - `owner_pid` is dead (or NULL for pre-migration rows) → transition to
/// error with `AppQuit` reason and emit `session-status-changed` so the
/// frontend learns the outcome.
pub fn recover_dead_sessions(store: Arc<Store>, app_handle: AppHandle) {
pub fn recover_dead_sessions(
store: Arc<Store>,
registry: Arc<SessionRegistry>,
app_handle: AppHandle,
) {
let sessions = match store.get_running_sessions() {
Ok(s) => s,
Err(e) => {
Expand Down Expand Up @@ -496,6 +518,36 @@ pub fn recover_dead_sessions(store: Arc<Store>, app_handle: AppHandle) {
None,
Some(&CompletionReason::AppQuit),
);

let branch_id = store.get_branch_id_for_session(&session.id).ok().flatten();
if let Some(branch_id) = branch_id {
let store_for_follow_up = Arc::clone(&store);
let registry_for_follow_up = Arc::clone(&registry);
let app_handle_for_follow_up = app_handle.clone();
tauri::async_runtime::spawn(async move {
match crate::session_commands::drain_queued_sessions_for_branch(
store_for_follow_up,
registry_for_follow_up,
app_handle_for_follow_up,
branch_id.clone(),
None,
)
.await
{
Ok(true) => {
log::info!(
"Drained next queued session after orphan recovery for branch {branch_id}"
);
}
Ok(false) => {}
Err(e) => {
log::error!(
"Failed to drain queued sessions after orphan recovery for branch {branch_id}: {e}"
);
}
}
});
}
}
}
}
Expand Down
26 changes: 26 additions & 0 deletions apps/staged/src-tauri/src/store/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,32 @@ impl Store {
Ok(count > 0)
}

/// Resolve the branch that owns a session through its linked artifact.
///
/// Project-note sessions do not belong to a branch and therefore return `None`.
/// This assumes all branch-linked artifacts for a session point at the same
/// branch; if a session somehow links artifacts across multiple branches,
/// the first row returned by SQLite wins.
pub fn get_branch_id_for_session(
&self,
session_id: &str,
) -> Result<Option<String>, StoreError> {
let conn = self.conn.lock().unwrap();
conn.query_row(
"SELECT branch_id FROM (
SELECT branch_id FROM commits WHERE session_id = ?1
UNION ALL
SELECT branch_id FROM notes WHERE session_id = ?1
UNION ALL
SELECT branch_id FROM reviews WHERE session_id = ?1
) LIMIT 1",
params![session_id],
|row| row.get(0),
)
.optional()
.map_err(Into::into)
}

pub fn delete_session(&self, id: &str) -> Result<(), StoreError> {
let conn = self.conn.lock().unwrap();
conn.execute("DELETE FROM sessions WHERE id = ?1", params![id])?;
Expand Down
7 changes: 0 additions & 7 deletions apps/staged/src/lib/listeners/sessionStatusListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,6 @@ async function handleSessionEnd(sessionId: string, status: string) {

// Clean up the session from the unified registry (single point of cleanup).
sessionRegistry.unregister(sessionId);

// Drain queued sessions for this branch so the next one starts automatically.
if (branchId) {
commands.drainQueuedSessions(branchId).catch((e) => {
console.error('[sessionStatusListener] Failed to drain queued sessions:', e);
});
}
}

async function handlePrCompletion(sessionId: string, branchId: string, status: string) {
Expand Down