diff --git a/docs/plans/completed/codex-sandbox-state-meta-migration.md b/docs/plans/completed/codex-sandbox-state-meta-migration.md index 83c8ebb..bc0bff6 100644 --- a/docs/plans/completed/codex-sandbox-state-meta-migration.md +++ b/docs/plans/completed/codex-sandbox-state-meta-migration.md @@ -10,7 +10,7 @@ ## Status - State: completed -- Last updated: 2026-04-18 +- Last updated: 2026-04-19 - Current phase: completed ## Design Intent @@ -69,8 +69,13 @@ - Ordered sandbox plans still validate earlier operations before later mode resets; later-wins resolution does not silently discard earlier invalid CLI/config ops. - `--debug-repl --sandbox inherit` remains locally usable by bootstrapping one inherited snapshot from the current default sandbox state before the first worker spawn. - `repl_reset` derives inherited sandbox state from the current tool call's `_meta["codex/sandbox-state-meta"]`. -- Non-empty `repl` calls derive inherited sandbox state from the current tool call's `_meta["codex/sandbox-state-meta"]` before executing fresh code. -- Empty-input `repl` polls ignore per-call sandbox metadata when they can be answered from existing state, but they still apply the current tool call's metadata before spawning a worker to answer an idle call on a fresh session. +- Non-empty `repl` calls resolve stale timeout markers before deciding whether they still belong to a prior timed-out request. +- Bare `Ctrl-C` is the one non-empty follow-up that remains a local recovery control and does not force a sandbox-driven restart. +- Every other non-empty `repl` call requires valid current `_meta["codex/sandbox-state-meta"]`. +- If current metadata changes the effective inherited sandbox, `mcp-repl` restarts the worker before handling that non-empty call and includes a reply notice naming the new sandbox policy. +- Control-prefixed tails such as `Ctrl-C` and `Ctrl-D` run in the restarted session when the sandbox changed; the control prefix itself is not replayed into the fresh worker. +- While the pager is active, pure pager navigation remains local UI state and ignores sandbox metadata until a later tool call actually interacts with the worker again. +- Empty-input `repl` polls ignore per-call sandbox metadata when they can be answered from existing state, but they still apply the current tool call's metadata before any spawn or respawn needed to answer the call, including after draining a session-ended request. - When a prior timed-out request has already settled, `mcp-repl` resolves the stale timeout marker before deciding whether a new non-empty `repl` call is still just a busy follow-up. - Missing or malformed metadata fails closed with the existing inherit error path. - Explicit non-`inherit` sandbox modes ignore Codex metadata. @@ -88,7 +93,7 @@ - Current Codex source and live traces both showed the old async update protocol was obsolete for the current release line. - The migration stayed intentionally single-path: no compatibility layer for older Codex builds. -- Follow-up review fixes tightened the runtime sequencing so sandbox metadata is applied only for fresh execution or worker spawn, not for empty-input polls that are only draining prior output or using an already-running idle session. +- Follow-up review fixes and final contract clarification tightened the runtime sequencing so sandbox changes now define worker-session boundaries for non-empty calls: empty polls keep draining, bare interrupts stay local, and other non-empty interactions restart into the current inherited sandbox when it changed. ## Decision Log @@ -97,3 +102,4 @@ - 2026-04-17: Chose per-tool-call `_meta["codex/sandbox-state-meta"]` as the source of truth after inspecting current Codex source and live traces. - 2026-04-17: Completed the repo migration and verification against the real current Codex integration tests. - 2026-04-18: Clarified the shipped contract for `repl`: empty-input polls ignore per-call sandbox metadata only when they can be answered from existing state, while fresh non-empty calls resolve stale timeout markers and then apply the current call's sandbox metadata before executing new code. +- 2026-04-19: Replaced the earlier local-follow-up exception set with a simpler restart-on-change contract: empty polls keep draining, bare interrupts remain local, and other non-empty interactions use current metadata and restart the worker when the inherited sandbox changed. diff --git a/docs/plans/completed/inherit-sandbox-restart-contract.md b/docs/plans/completed/inherit-sandbox-restart-contract.md new file mode 100644 index 0000000..073ff20 --- /dev/null +++ b/docs/plans/completed/inherit-sandbox-restart-contract.md @@ -0,0 +1,51 @@ +# Inherit Sandbox Restart Contract + +## Summary + +- Change `--sandbox inherit` so new per-tool-call sandbox metadata takes effect by restarting the worker at the next non-poll, non-bare-interrupt interaction. +- Keep empty-input polls and bare `Ctrl-C` as the two cases that do not force a restart on sandbox change. +- Keep explicit non-`inherit` sandbox modes authoritative and unchanged. + +## Status + +- State: completed +- Last updated: 2026-04-19 +- Current phase: completed + +## Current Direction + +- Treat sandbox changes as worker-session boundaries. +- If current tool-call metadata changes the effective inherited sandbox, restart the worker before handling any tool call that would otherwise send input, restart the worker, or otherwise interact with the worker statefully. +- Empty-input polls keep draining existing state without forcing a restart. +- A bare `Ctrl-C` remains a local recovery control and does not force a restart just because sandbox metadata changed. + +## Long-Term Direction + +- The inherit contract should stay simple enough to explain in one paragraph: + fresh worker interaction uses the current metadata, and sandbox changes reset the session before that interaction happens. +- Review-driven exceptions should be minimized; only the explicit poll and bare interrupt escape hatches should remain. + +## Phase Status + +- Phase 0: completed + - Identified that the current fail-closed follow-up split is not the desired product behavior. +- Phase 1: completed + - Reworked runtime sequencing around restart-on-change semantics. +- Phase 2: completed + - Refreshed tests, docs, and final verification. + +## Locked Decisions + +- Do not revert to the obsolete async sandbox update protocol. +- Do not let explicit non-`inherit` CLI sandbox modes depend on Codex metadata. +- If the inherited sandbox changes, the next non-poll, non-bare-interrupt interaction should reset the worker instead of trying to preserve the old request/session. + +## Open Questions + +- What exact restart notice text best communicates both the restart cause and the new effective sandbox policy without creating brittle snapshots? +- Should a bare `Ctrl-C` with no live worker remain a no-op control reply or continue to surface the existing idle/session behavior? + +## Decision Log + +- 2026-04-19: Replaced the earlier fail-closed control-tail contract with a restart-on-change contract for non-poll, non-bare-interrupt interactions. +- 2026-04-19: Landed the runtime, docs, and regression updates; restart notices are informational and initial inherit-mode spawns do not pretend they were restarts. diff --git a/docs/sandbox.md b/docs/sandbox.md index 86af22e..e34de93 100644 --- a/docs/sandbox.md +++ b/docs/sandbox.md @@ -21,14 +21,45 @@ metadata channel in that mode, `mcp-repl --debug-repl --sandbox inherit` bootstraps one local inherited snapshot from the current default sandbox state before the first worker spawn. -For `repl`, empty-input polls ignore per-call sandbox metadata when they can be -answered from existing state, such as draining a timed-out request or returning an -idle prompt from an already-running worker. If an empty-input call must spawn a -worker to answer the call, `mcp-repl` applies the current tool call's sandbox -metadata before that spawn. Non-empty `repl` calls resolve any stale timeout -marker first, then apply the current call's sandbox metadata before executing -fresh code. If a timed-out request is still genuinely in flight, follow-up calls -continue servicing that request instead of switching sandboxes mid-flight. +For `repl`, inherited sandbox metadata controls the worker session that handles +the call. When a non-empty tool call would use the worker and the effective +inherited sandbox changed, `mcp-repl` restarts the worker before serving that +call and includes a restart notice that names the new sandbox policy. + +More specifically: + +- Empty-input polls ignore per-call sandbox metadata while they are only + draining existing pending or settled output, or returning an idle prompt from + an already-running worker. +- If an empty-input poll needs to spawn or respawn a worker to finish answering + the call, `mcp-repl` applies the current tool call's metadata before that + spawn. If a poll can first answer by draining a session-ended request, it + returns that local drain without respawning; the next spawn-needed call must + provide valid current metadata. +- While the pager is active, pure pager navigation is local UI state, not a + worker interaction. Pager-local commands such as `:q` or empty-string page + advance ignore sandbox metadata until a later tool call actually interacts + with the worker again. Bare `Ctrl-D` is not pager navigation; it remains an + explicit restart even when the pager is active. +- Bare `Ctrl-C` is the one non-empty `repl` follow-up that stays local and does + not force a sandbox-driven restart. +- Every other non-empty `repl` call must have valid current + `_meta["codex/sandbox-state-meta"]`. +- A non-empty retry after the memory guardrail aborts a worker is an ordinary + non-empty call. It must have valid current metadata before `mcp-repl` resets + or retries under `--sandbox inherit`. +- Non-empty `repl` calls resolve stale timeout markers before deciding whether + they are still looking at a live worker request. +- If current metadata changes the effective inherited sandbox, `mcp-repl` + restarts the worker at that call before handling the input. +- Control-prefixed tails such as `Ctrl-C` and `Ctrl-D` run in the + restarted session when the sandbox changed; the control prefix itself is not + replayed into the fresh worker. +- Explicit restarts discard preserved detached output from aborted prior + requests instead of carrying it into later unrelated replies. +- Sandbox metadata is enforced again at the next tool call that actually + interacts with the worker after pager navigation ends. +- Missing or malformed metadata still fails closed on calls that need it. The worker also gets a per-session temp directory, exported as: diff --git a/src/debug_repl.rs b/src/debug_repl.rs index 1c3339e..abf55fc 100644 --- a/src/debug_repl.rs +++ b/src/debug_repl.rs @@ -70,7 +70,7 @@ pub(crate) fn run( while let Some(line) = read_line(&mut stdin)? { if is_exact_command(&line, "INTERRUPT") { - let reply = worker.interrupt(DEFAULT_WRITE_STDIN_TIMEOUT); + let reply = worker.interrupt(DEFAULT_WRITE_STDIN_TIMEOUT, None, false); render_visible_reply( response.as_mut(), reply, diff --git a/src/sandbox_cli.rs b/src/sandbox_cli.rs index f974c3d..b92a088 100644 --- a/src/sandbox_cli.rs +++ b/src/sandbox_cli.rs @@ -197,6 +197,13 @@ pub fn resolve_effective_sandbox_state( resolve_effective_sandbox_state_with_defaults(plan, inherited, &defaults) } +pub fn validate_sandbox_plan_with_defaults( + plan: &SandboxCliPlan, + defaults: &SandboxState, +) -> Result<(), String> { + validate_sandbox_plan_operations(plan, None, defaults) +} + pub fn resolve_effective_sandbox_state_with_defaults( plan: &SandboxCliPlan, inherited: Option<&SandboxState>, diff --git a/src/server.rs b/src/server.rs index 4acf307..90e2575 100644 --- a/src/server.rs +++ b/src/server.rs @@ -35,7 +35,6 @@ use crate::worker_process::{ WorkerError, WorkerManager, WriteStdinControlAction, WriteStdinOptions, is_prechecked_follow_up_requires_meta, split_write_stdin_control_prefix, }; -use crate::worker_protocol::{WorkerContent, WorkerReply}; const BUSY_FOLLOW_UP_RECHECK_WAIT: Duration = Duration::from_millis(25); @@ -160,15 +159,14 @@ impl SharedServer { fn apply_tool_call_sandbox_state( state: &mut ServerState, update: Option, - ) -> Result<(), WorkerError> { + ) -> Result { let Some(update) = update else { - return Ok(()); + return Ok(false); }; state .worker - .update_sandbox_state(update, SANDBOX_UPDATE_TIMEOUT)?; - Ok(()) + .update_sandbox_state(update, SANDBOX_UPDATE_TIMEOUT) } fn stage_tool_call_sandbox_state_for_reset( @@ -194,12 +192,10 @@ impl SharedServer { let server_timeout = apply_safety_margin(timeout); let accepts_sandbox_state_meta = self.accepts_sandbox_state_meta(); self.run_state(move |state| { - let timeout_bundle_reuse = timeout_bundle_reuse_for_input(&input); - let raw_input = input; + let mut raw_input = input; let use_inline_pager_materialization = matches!(state.oversized_output, OversizedOutputMode::Pager); state.worker.refresh_timeout_marker(); - let mut control_input_on_meta_error = None; let parse_tool_call_sandbox_state = || { SharedServer::sandbox_state_update_for_tool_call_meta( accepts_sandbox_state_meta, @@ -212,142 +208,166 @@ impl SharedServer { &meta, ) }; - let sandbox_state_result = if raw_input.is_empty() { - // Empty-input polls may drain an existing request. Pass current - // metadata through so a session-end reset can stage it before - // respawning, but only apply it immediately when answering the - // call itself requires a spawn. - match state.worker.empty_input_requires_spawn() { - Ok(true) => { - if let Err(err) = parse_tool_call_sandbox_state().and_then(|update| { - SharedServer::apply_tool_call_sandbox_state(state, update) - }) { - Err(err) - } else { - Ok(None) - } + let mut suppress_session_end_reset = false; + let (sandbox_state_result, local_error_is_mcp_error) = if raw_input.is_empty() { + // Empty-input polls only skip metadata when they are truly + // draining existing output. In pager mode, empty input can + // also be a pure local navigation command and should ignore + // inherit metadata until a later worker interaction. + let needs_post_poll_reset = state.worker.empty_input_may_auto_reset_after_poll(); + if state.worker.empty_input_uses_local_pager_state() { + ( + match parse_tool_call_sandbox_state() { + Ok(update) => Ok(update), + Err(_) => { + suppress_session_end_reset = true; + Ok(None) + } + }, + false, + ) + } else { + match state.worker.empty_input_requires_spawn() { + Ok(true) => ( + parse_tool_call_sandbox_state().and_then(|update| { + let respawned = + SharedServer::apply_tool_call_sandbox_state(state, update)?; + if respawned { + state.response.retire_disclosed_timeout_bundle(); + } + Ok(None) + }), + true, + ), + Ok(false) if needs_post_poll_reset => ( + match parse_tool_call_sandbox_state() { + Ok(update) => Ok(update), + Err(_) => { + suppress_session_end_reset = true; + Ok(None) + } + }, + false, + ), + Ok(false) => (parse_optional_tool_call_sandbox_state(), false), + Err(err) => (Err(err), true), } - Ok(false) => parse_optional_tool_call_sandbox_state(), - Err(err) => Err(err), } } else { // A timed-out request still owns busy follow-ups, but a fresh // non-empty call after that request has already settled must // run under the current tool call's sandbox metadata. - let mut deferred_sandbox_state_update = None; if state.worker.pending_request() { state .worker .refresh_timeout_marker_with_wait(BUSY_FOLLOW_UP_RECHECK_WAIT); } - if state.worker.pending_request() { - if let Some((control, _remaining)) = - split_write_stdin_control_prefix(&raw_input) + let restart_control = matches!( + split_write_stdin_control_prefix(&raw_input), + Some((WriteStdinControlAction::Restart, _)) + ); + let bare_interrupt = matches!( + split_write_stdin_control_prefix(&raw_input), + Some((WriteStdinControlAction::Interrupt, remaining)) if remaining.is_empty() + ); + let needs_initial_state = + restart_control && state.worker.missing_inherited_state_without_worker(); + if needs_initial_state { + (parse_tool_call_sandbox_state(), true) + } else if state.worker.pending_request() && bare_interrupt { + ( + match parse_tool_call_sandbox_state() { + Ok(update) => Ok(update), + Err(_) => { + suppress_session_end_reset = true; + Ok(None) + } + }, + false, + ) + } else if state.worker.pending_request() { + let local_pager_follow_up = input_uses_local_pager_state(state, &raw_input); + if local_pager_follow_up + && split_write_stdin_control_prefix(&raw_input).is_none() { - control_input_on_meta_error = Some(( - match control { - WriteStdinControlAction::Interrupt => "\u{3}".to_string(), - WriteStdinControlAction::Restart => "\u{4}".to_string(), - }, - timeout_bundle_reuse_for_input(&raw_input), - matches!(control, WriteStdinControlAction::Interrupt), - )); - if let Err(err) = parse_tool_call_sandbox_state().map(|update| { - control_input_on_meta_error = None; - deferred_sandbox_state_update = update; - }) { - Err(err) - } else { - Ok(deferred_sandbox_state_update) - } + (Ok(None), false) } else { - parse_optional_tool_call_sandbox_state() + ( + match parse_tool_call_sandbox_state().and_then(|update| { + SharedServer::apply_tool_call_sandbox_state(state, update) + }) { + Ok(respawned) => { + if respawned { + state.response.retire_disclosed_timeout_bundle(); + raw_input = normalize_input_after_sandbox_respawn( + &raw_input, + local_pager_follow_up, + ); + } + Ok(None) + } + Err(err) => Err(err), + }, + true, + ) } } else { match state .worker .nonexecuting_follow_up_uses_existing_state(&raw_input) { - Ok(true) => { - // Local follow-ups like bare Ctrl-C or active pager - // commands can keep using existing state. Exact - // Ctrl-C is only in this path when it will not - // respawn a worker. - Ok(None) - } - Ok(false) => { - if matches!( - split_write_stdin_control_prefix(&raw_input), - Some((WriteStdinControlAction::Restart, _)) - ) { - if let Err(err) = parse_tool_call_sandbox_state().map(|update| { - deferred_sandbox_state_update = update; - }) { - Err(err) - } else { - Ok(deferred_sandbox_state_update) + Ok(true) => ( + match parse_tool_call_sandbox_state() { + Ok(update) => Ok(update), + Err(_) => { + suppress_session_end_reset = true; + Ok(None) } - } else { + }, + false, + ), + Ok(false) => { + let local_pager_follow_up = + input_uses_local_pager_state(state, &raw_input); + ( match parse_tool_call_sandbox_state().and_then(|update| { SharedServer::apply_tool_call_sandbox_state(state, update) }) { - Ok(()) => Ok(None), + Ok(respawned) => { + if respawned { + state.response.retire_disclosed_timeout_bundle(); + raw_input = normalize_input_after_sandbox_respawn( + &raw_input, + local_pager_follow_up, + ); + } + Ok(None) + } Err(err) => Err(err), - } - } + }, + true, + ) } - Err(err) => Err(err), + Err(err) => (Err(err), true), } } }; let deferred_sandbox_state_update = match sandbox_state_result { Ok(update) => update, Err(err) => { - if let Some((control_input, timeout_bundle_reuse, detach_control_reply)) = - control_input_on_meta_error.take() - { - let control_result = state.worker.write_stdin( - control_input, - worker_timeout, - server_timeout, - WriteStdinOptions { - pending_state_prechecked: true, - ..WriteStdinOptions::default() - }, - ); - let pending_request_after = state.worker.pending_request(); - let detached_prefix_item_count = if detach_control_reply { - control_result - .as_ref() - .map_or(0, prefixed_worker_reply_item_count) - } else { - 0 - }; - let mut result = finalize_visible_reply( - state, - control_result, - pending_request_after, - timeout_bundle_reuse, - detached_prefix_item_count, - use_inline_pager_materialization - && !pending_request_after - && !state.response.has_timeout_bundle_state(), - ); - result.is_error = Some(true); - result - .content - .push(rmcp::model::Content::text(format!("worker error: {err}"))); - strip_text_stream_meta(&mut result); - return result; - } - let mut result = state.response.finalize_local_error(err); + let mut result = state + .response + .finalize_local_error(err, local_error_is_mcp_error); strip_text_stream_meta(&mut result); return result; } }; + let prior_disclosed_timeout_bundle_id = state.response.disclosed_timeout_bundle_id(); + let mut timeout_bundle_reuse = timeout_bundle_reuse_for_input(&raw_input); let mut write_options = WriteStdinOptions { pending_state_prechecked: true, deferred_sandbox_state_update, + suppress_session_end_reset, ..WriteStdinOptions::default() }; let mut retried_after_meta_refresh = false; @@ -363,13 +383,24 @@ impl SharedServer { if !retried_after_meta_refresh && is_prechecked_follow_up_requires_meta(&err) => { + let local_pager_follow_up = input_uses_local_pager_state(state, &raw_input); match parse_tool_call_sandbox_state().and_then(|update| { SharedServer::apply_tool_call_sandbox_state(state, update) }) { - Ok(()) => { + Ok(respawned) => { + if respawned { + state.response.retire_disclosed_timeout_bundle(); + raw_input = normalize_input_after_sandbox_respawn( + &raw_input, + local_pager_follow_up, + ); + timeout_bundle_reuse = + timeout_bundle_reuse_for_input(&raw_input); + } retried_after_meta_refresh = true; write_options.pending_state_prechecked = false; write_options.deferred_sandbox_state_update = None; + write_options.suppress_session_end_reset = false; continue; } Err(err) => break Err(err), @@ -380,6 +411,7 @@ impl SharedServer { }; let pending_request_after = state.worker.pending_request(); let detached_prefix_item_count = state.worker.detached_prefix_item_count(); + let respawned_during_write = state.worker.respawned_during_last_write(); let mut result = finalize_visible_reply( state, result, @@ -390,6 +422,11 @@ impl SharedServer { && !pending_request_after && !state.response.has_timeout_bundle_state(), ); + if respawned_during_write { + state + .response + .retire_timeout_bundle_if_matches(prior_disclosed_timeout_bundle_id); + } strip_text_stream_meta(&mut result); result }) @@ -397,6 +434,34 @@ impl SharedServer { } } +fn input_uses_local_pager_state(state: &ServerState, input: &str) -> bool { + if let Some((_control, remaining)) = split_write_stdin_control_prefix(input) { + state + .worker + .local_pager_follow_up_uses_existing_state(remaining) + } else { + state + .worker + .local_pager_follow_up_uses_existing_state(input) + } +} + +fn normalize_input_after_sandbox_respawn(input: &str, local_pager_follow_up: bool) -> String { + if let Some((control, remaining)) = split_write_stdin_control_prefix(input) { + if matches!(control, WriteStdinControlAction::Restart) && remaining.is_empty() { + input.to_string() + } else if local_pager_follow_up { + String::new() + } else { + remaining.to_string() + } + } else if local_pager_follow_up { + String::new() + } else { + input.to_string() + } +} + fn server_info(advertise_sandbox_capabilities: bool) -> ServerInfo { let capabilities = if advertise_sandbox_capabilities { ServerCapabilities::builder() @@ -549,7 +614,7 @@ macro_rules! define_backend_tool_server { Err(err) => Err(WorkerError::Sandbox(err.to_string())), }; if let Err(err) = sandbox_state_result { - let mut result = state.response.finalize_local_error(err); + let mut result = state.response.finalize_local_error(err, true); strip_text_stream_meta(&mut result); return result; } @@ -607,32 +672,6 @@ fn finalize_visible_reply( } } -fn prefixed_worker_reply_item_count(reply: &WorkerReply) -> usize { - let WorkerReply::Output { - contents, prompt, .. - } = reply; - let Some(prompt_text) = prompt.as_deref() else { - return contents.len(); - }; - if prompt_text.is_empty() { - return contents.len(); - } - let Some(idx) = contents - .iter() - .rposition(|content| matches!(content, WorkerContent::ContentText { .. })) - else { - return contents.len(); - }; - let WorkerContent::ContentText { text, .. } = &contents[idx] else { - return contents.len(); - }; - if matches!(text.strip_suffix(prompt_text), Some("")) { - contents.len().saturating_sub(1) - } else { - contents.len() - } -} - define_backend_tool_server!(RFilesToolServer, "../docs/tool-descriptions/repl_tool_r.md"); define_backend_tool_server!( RPagerToolServer, diff --git a/src/server/response.rs b/src/server/response.rs index 728494b..77fa12a 100644 --- a/src/server/response.rs +++ b/src/server/response.rs @@ -234,6 +234,36 @@ impl ResponseState { self.active_timeout_bundle.is_some() || self.staged_timeout_output.is_some() } + pub(crate) fn disclosed_timeout_bundle_id(&self) -> Option { + self.active_timeout_bundle + .as_ref() + .filter(|active| active.was_disclosed()) + .map(|active| active.id) + } + + pub(crate) fn retire_disclosed_timeout_bundle(&mut self) { + if self + .active_timeout_bundle + .as_ref() + .is_some_and(ActiveOutputBundle::was_disclosed) + { + self.active_timeout_bundle = None; + } + } + + pub(crate) fn retire_timeout_bundle_if_matches(&mut self, bundle_id: Option) { + let Some(bundle_id) = bundle_id else { + return; + }; + if self + .active_timeout_bundle + .as_ref() + .is_some_and(|active| active.id == bundle_id) + { + self.active_timeout_bundle = None; + } + } + fn materialize_staged_timeout_output( &mut self, staged: &StagedTimeoutOutput, @@ -284,9 +314,19 @@ impl ResponseState { } /// Returns a local pre-execution error without disturbing any active timeout-bundle state. - pub(crate) fn finalize_local_error(&mut self, err: WorkerError) -> CallToolResult { + pub(crate) fn finalize_local_error( + &mut self, + err: WorkerError, + is_mcp_error: bool, + ) -> CallToolResult { eprintln!("worker write stdin error: {err}"); - finalize_error_batch(vec![Content::text(format!("worker error: {err}"))]) + let mut contents = vec![Content::text(format!("worker error: {err}"))]; + ensure_nonempty_contents(&mut contents); + if is_mcp_error { + CallToolResult::error(contents) + } else { + CallToolResult::success(contents) + } } /// Materializes a worker reply inline without applying files-mode bundle compaction. diff --git a/src/worker_process.rs b/src/worker_process.rs index 1832d27..0bd419e 100644 --- a/src/worker_process.rs +++ b/src/worker_process.rs @@ -44,6 +44,7 @@ use crate::sandbox::{ use crate::sandbox_cli::{ MISSING_INHERITED_SANDBOX_STATE_MESSAGE, SandboxCliPlan, resolve_effective_sandbox_state_with_defaults, sandbox_plan_requests_inherited_state, + validate_sandbox_plan_with_defaults, }; use crate::worker_protocol::{ ContentOrigin, TextStream, WORKER_MODE_ARG, WorkerContent, WorkerErrorCode, WorkerReply, @@ -93,6 +94,7 @@ fn raw_unix_kill(target: i32, signal: i32) -> i32 { struct GuardrailEvent { message: String, was_busy: bool, + is_error: bool, } #[derive(Clone)] @@ -468,7 +470,8 @@ impl From for WorkerError { } struct InputContext { - prefix_contents: Vec, + detached_prefix_contents: Vec, + reply_prefix_contents: Vec, prefix_is_error: bool, start_offset: u64, prefix_bytes: u64, @@ -476,6 +479,13 @@ struct InputContext { input_transcript: Option, } +#[derive(Default)] +struct PrefixCapture { + contents: Vec, + is_error: bool, + bytes: u64, +} + #[derive(Default)] struct InputFallback { transcript: Option, @@ -543,6 +553,7 @@ pub(crate) struct WriteStdinOptions { pub echo_input: bool, pub pending_state_prechecked: bool, pub deferred_sandbox_state_update: Option, + pub suppress_session_end_reset: bool, } impl WriteStdinOptions { @@ -552,6 +563,7 @@ impl WriteStdinOptions { echo_input: self.echo_input, pending_state_prechecked: false, deferred_sandbox_state_update, + suppress_session_end_reset: false, } } } @@ -620,12 +632,20 @@ pub struct WorkerManager { pending_request_input: Option, session_end_seen: bool, settled_pending_completion: Option, + preserved_detached_prefix: PrefixCapture, + reply_owned_prefix: PrefixCapture, + next_live_prefix_belongs_to_reply: bool, last_detached_prefix_item_count: usize, pager_prompt: Option, last_prompt: Option, last_spawn: Option, spawn_count: u64, guardrail: GuardrailShared, + pending_server_notice: Option, + write_in_progress: bool, + last_write_respawned: bool, + #[cfg(target_os = "linux")] + linux_bwrap_fallback_disabled: bool, } struct PreparedSandboxStateUpdate { @@ -644,6 +664,8 @@ impl WorkerManager { let sandbox_defaults = crate::sandbox::sandbox_state_defaults_with_environment(); let plan_requests_inherited_state = sandbox_plan_requests_inherited_state(&sandbox_plan); let sandbox_state = if plan_requests_inherited_state { + validate_sandbox_plan_with_defaults(&sandbox_plan, &sandbox_defaults) + .map_err(WorkerError::Sandbox)?; sandbox_defaults.clone() } else { resolve_effective_sandbox_state_with_defaults(&sandbox_plan, None, &sandbox_defaults) @@ -693,6 +715,9 @@ impl WorkerManager { pending_request_input: None, session_end_seen: false, settled_pending_completion: None, + preserved_detached_prefix: PrefixCapture::default(), + reply_owned_prefix: PrefixCapture::default(), + next_live_prefix_belongs_to_reply: false, last_detached_prefix_item_count: 0, pager_prompt: None, last_prompt: None, @@ -702,6 +727,11 @@ impl WorkerManager { event: Arc::new(Mutex::new(None)), busy: Arc::new(AtomicBool::new(false)), }, + pending_server_notice: None, + write_in_progress: false, + last_write_respawned: false, + #[cfg(target_os = "linux")] + linux_bwrap_fallback_disabled: false, }) } @@ -760,6 +790,38 @@ impl WorkerManager { Ok(needs_spawn) } + pub fn empty_input_polls_existing_output(&self) -> bool { + match self.oversized_output { + OversizedOutputMode::Files => { + self.pending_request + || self.pending_output_tape.has_pending() + || self.settled_pending_completion.is_some() + } + OversizedOutputMode::Pager => { + self.pending_request + || self.output.has_pending_output() + || self.settled_pending_completion.is_some() + } + } + } + + pub fn empty_input_uses_local_pager_state(&self) -> bool { + matches!(self.oversized_output, OversizedOutputMode::Pager) + && self.pager.is_active() + && !self.empty_input_polls_existing_output() + } + + pub fn empty_input_may_auto_reset_after_poll(&self) -> bool { + self.empty_input_polls_existing_output() + && (self.pending_request + || self.settled_pending_completion.is_some() + || self.session_end_seen) + } + + pub fn missing_inherited_state_without_worker(&self) -> bool { + self.missing_inherited_sandbox_state() && self.process.is_none() + } + pub fn nonexecuting_follow_up_uses_existing_state( &mut self, text: &str, @@ -768,9 +830,9 @@ impl WorkerManager { return match control { WriteStdinControlAction::Interrupt => { if remaining.is_empty() { - Ok(!self.control_only_interrupt_requires_spawn()?) + Ok(true) } else { - Ok(self.pager_follow_up_uses_existing_state(remaining) + Ok(self.local_pager_follow_up_uses_existing_state(remaining) && !self.control_only_interrupt_requires_spawn()?) } } @@ -778,7 +840,7 @@ impl WorkerManager { }; } - Ok(self.pager_follow_up_uses_existing_state(text) || self.guardrail_busy_event_pending()) + Ok(self.local_pager_follow_up_uses_existing_state(text)) } fn control_only_interrupt_requires_spawn(&mut self) -> Result { @@ -792,6 +854,16 @@ impl WorkerManager { self.last_detached_prefix_item_count } + pub fn respawned_during_last_write(&self) -> bool { + self.last_write_respawned + } + + fn note_respawn_during_write(&mut self) { + if self.write_in_progress { + self.last_write_respawned = true; + } + } + fn stage_deferred_sandbox_state_update( &mut self, update: Option, @@ -817,6 +889,24 @@ impl WorkerManager { self.stage_deferred_sandbox_state_update(update) } + fn maybe_reset_after_session_end_with_options( + &mut self, + deferred_sandbox_state_update: Option, + suppress_session_end_reset: bool, + pending_state_prechecked: bool, + ) -> Result<(), WorkerError> { + if self.session_end_seen && !suppress_session_end_reset { + self.stage_session_end_sandbox_state_update( + deferred_sandbox_state_update, + pending_state_prechecked, + )?; + } + if !suppress_session_end_reset { + self.maybe_reset_after_session_end(); + } + Ok(()) + } + fn apply_deferred_sandbox_state_update( &mut self, update: Option, @@ -846,7 +936,7 @@ impl WorkerManager { } } - fn pager_follow_up_uses_existing_state(&self, text: &str) -> bool { + pub(crate) fn local_pager_follow_up_uses_existing_state(&self, text: &str) -> bool { matches!(self.oversized_output, OversizedOutputMode::Pager) && self.pager.is_active() && { let trimmed = text.trim(); trimmed.is_empty() || trimmed.starts_with(':') @@ -877,14 +967,18 @@ impl WorkerManager { server_timeout: Duration, options: WriteStdinOptions, ) -> Result { - match self.oversized_output { + self.write_in_progress = true; + self.last_write_respawned = false; + let result = match self.oversized_output { OversizedOutputMode::Files => { self.write_stdin_files(text, worker_timeout, server_timeout, options) } OversizedOutputMode::Pager => { self.write_stdin_pager(text, worker_timeout, server_timeout, options) } - } + }; + self.write_in_progress = false; + result } /// Entry point for the public `repl` tool in default files mode. @@ -897,6 +991,7 @@ impl WorkerManager { ) -> Result { let pending_state_prechecked = options.pending_state_prechecked; let deferred_sandbox_state_update = options.deferred_sandbox_state_update.clone(); + let suppress_session_end_reset = options.suppress_session_end_reset; self.last_detached_prefix_item_count = 0; if let Some((control, remaining)) = split_write_stdin_control_prefix(&text) { self.clear_guardrail_busy_event(); @@ -905,6 +1000,7 @@ impl WorkerManager { if pending_state_prechecked && control_requires_spawn && deferred_sandbox_state_update.is_none() + && !suppress_session_end_reset { return Err(prechecked_follow_up_requires_meta_error()); } @@ -920,12 +1016,19 @@ impl WorkerManager { }; let control_reply = match control { WriteStdinControlAction::Interrupt if stage_interrupt_after_session_end => { - self.interrupt_files_control_tail(worker_timeout) + self.interrupt_files(worker_timeout, None, true) } - WriteStdinControlAction::Interrupt => self.interrupt_files(worker_timeout), + WriteStdinControlAction::Interrupt => self.interrupt_files( + worker_timeout, + tail_sandbox_state_update.clone(), + suppress_session_end_reset, + ), WriteStdinControlAction::Restart => self.restart_files(worker_timeout), }?; - if stage_interrupt_after_session_end && self.session_end_seen { + if stage_interrupt_after_session_end + && self.session_end_seen + && !suppress_session_end_reset + { self.stage_session_end_sandbox_state_update( tail_sandbox_state_update.take(), pending_state_prechecked, @@ -948,6 +1051,7 @@ impl WorkerManager { if self.guardrail_busy_event_pending() { // Don't execute new input; the previous request was aborted. + self.maybe_emit_guardrail_notice(); let event = self .guardrail .event @@ -961,7 +1065,7 @@ impl WorkerManager { let reply = self.build_reply_from_worker_error_files(&err, input_context); let _ = self.reset_preserving_detached_prefix_item_count(); let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, false, false)?; return Ok(reply); } @@ -977,13 +1081,11 @@ impl WorkerManager { { let reply = self.poll_pending_output_files(worker_timeout)?; let reply = self.finalize_reply(reply); - if self.session_end_seen { - self.stage_session_end_sandbox_state_update( - deferred_sandbox_state_update, - pending_state_prechecked, - )?; - } - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options( + deferred_sandbox_state_update, + suppress_session_end_reset, + pending_state_prechecked, + )?; return Ok(reply); } if pending_state_prechecked && self.control_only_interrupt_requires_spawn()? { @@ -993,12 +1095,12 @@ impl WorkerManager { let input_context = self.prepare_input_context_files(); let reply = self.build_reply_from_worker_error_files(&err, input_context); let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, false, false)?; return Ok(reply); } let reply = self.build_idle_poll_reply_files(); let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, false, false)?; return Ok(reply); } if !pending_state_prechecked && self.pending_request { @@ -1012,13 +1114,11 @@ impl WorkerManager { self.last_detached_prefix_item_count = detached_prefix_item_count; mark_busy_follow_up_reply(&mut reply.reply); let reply = self.finalize_reply(reply); - if self.session_end_seen { - self.stage_session_end_sandbox_state_update( - deferred_sandbox_state_update, - pending_state_prechecked, - )?; - } - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options( + deferred_sandbox_state_update, + suppress_session_end_reset, + pending_state_prechecked, + )?; return Ok(reply); } self.apply_deferred_sandbox_state_update(deferred_sandbox_state_update)?; @@ -1026,7 +1126,7 @@ impl WorkerManager { let input_context = self.prepare_input_context_files(); let reply = self.build_reply_from_worker_error_files(&err, input_context); let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, false, false)?; return Ok(reply); } @@ -1043,7 +1143,7 @@ impl WorkerManager { }; let reply = self.build_reply_from_request_files(request, input_context)?; let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, false, false)?; Ok(reply) } @@ -1058,6 +1158,7 @@ impl WorkerManager { let echo_input = options.echo_input; let pending_state_prechecked = options.pending_state_prechecked; let deferred_sandbox_state_update = options.deferred_sandbox_state_update.clone(); + let suppress_session_end_reset = options.suppress_session_end_reset; self.last_detached_prefix_item_count = 0; if let Some((control, remaining)) = split_write_stdin_control_prefix(&text) { self.clear_guardrail_busy_event(); @@ -1066,6 +1167,7 @@ impl WorkerManager { if pending_state_prechecked && control_requires_spawn && deferred_sandbox_state_update.is_none() + && !suppress_session_end_reset { return Err(prechecked_follow_up_requires_meta_error()); } @@ -1081,12 +1183,19 @@ impl WorkerManager { }; let control_reply = match control { WriteStdinControlAction::Interrupt if stage_interrupt_after_session_end => { - self.interrupt_pager_control_tail(worker_timeout) + self.interrupt_pager(worker_timeout, None, true) } - WriteStdinControlAction::Interrupt => self.interrupt_pager(worker_timeout), + WriteStdinControlAction::Interrupt => self.interrupt_pager( + worker_timeout, + tail_sandbox_state_update.clone(), + suppress_session_end_reset, + ), WriteStdinControlAction::Restart => self.restart_pager(worker_timeout), }?; - if stage_interrupt_after_session_end && self.session_end_seen { + if stage_interrupt_after_session_end + && self.session_end_seen + && !suppress_session_end_reset + { self.stage_session_end_sandbox_state_update( tail_sandbox_state_update.take(), pending_state_prechecked, @@ -1108,6 +1217,7 @@ impl WorkerManager { } if self.guardrail_busy_event_pending() { + self.maybe_emit_guardrail_notice(); let event = self .guardrail .event @@ -1123,7 +1233,7 @@ impl WorkerManager { let preserve_pager = self.pager.is_active(); let _ = self.reset_with_pager_preserving_detached_prefix_item_count(preserve_pager); let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, false, false)?; return Ok(reply); } @@ -1134,7 +1244,7 @@ impl WorkerManager { if trimmed.is_empty() || trimmed.starts_with(':') { if let Some(reply) = self.handle_pager_command(&text) { let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, true, false)?; return Ok(reply); } } else { @@ -1155,20 +1265,18 @@ impl WorkerManager { { let reply = self.poll_pending_output_pager(worker_timeout, page_bytes)?; let reply = self.finalize_reply(reply); - if self.session_end_seen { - self.stage_session_end_sandbox_state_update( - deferred_sandbox_state_update, - pending_state_prechecked, - )?; - } - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options( + deferred_sandbox_state_update, + suppress_session_end_reset, + pending_state_prechecked, + )?; return Ok(reply); } if self.pager.is_active() && let Some(reply) = self.handle_pager_command(&text) { let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, true, false)?; return Ok(reply); } if pending_state_prechecked && self.control_only_interrupt_requires_spawn()? { @@ -1180,7 +1288,7 @@ impl WorkerManager { let input_context = self.prepare_input_context_pager(&text, echo_input); let reply = self.build_reply_from_worker_error_pager(&err, input_context, page_bytes); let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, false, false)?; return Ok(reply); } if !empty_input { @@ -1193,7 +1301,7 @@ impl WorkerManager { if empty_input { let reply = self.build_idle_poll_reply_pager(); let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, false, false)?; return Ok(reply); } if !pending_state_prechecked && self.pending_request { @@ -1207,13 +1315,11 @@ impl WorkerManager { self.last_detached_prefix_item_count = detached_prefix_item_count; mark_busy_follow_up_reply(&mut reply.reply); let reply = self.finalize_reply(reply); - if self.session_end_seen { - self.stage_session_end_sandbox_state_update( - deferred_sandbox_state_update, - pending_state_prechecked, - )?; - } - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options( + deferred_sandbox_state_update, + suppress_session_end_reset, + pending_state_prechecked, + )?; return Ok(reply); } self.apply_deferred_sandbox_state_update(deferred_sandbox_state_update)?; @@ -1233,7 +1339,7 @@ impl WorkerManager { }; let reply = self.build_reply_from_request_pager(request, input_context, page_bytes)?; let reply = self.finalize_reply(reply); - self.maybe_reset_after_session_end(); + self.maybe_reset_after_session_end_with_options(None, false, false)?; Ok(reply) } @@ -1319,6 +1425,9 @@ impl WorkerManager { && !completed_request && let Some(info) = self.settled_pending_completion.take() { + if info.session_end_seen { + self.note_session_end(false); + } completion = info; consumed_completion = true; } @@ -1466,6 +1575,9 @@ impl WorkerManager { } } if !completed_request && let Some(info) = self.settled_pending_completion.take() { + if info.session_end_seen { + self.note_session_end(false); + } completion = info; completed_request = true; end_offset = self.output.end_offset().unwrap_or(end_offset); @@ -1564,6 +1676,51 @@ impl WorkerManager { /// Drains detached output that arrived before the next accepted request so it can be prefixed /// into that request's visible reply. fn prepare_input_context_files(&mut self) -> InputContext { + let reply_prefix = self.take_current_prefix_files(); + let (detached_prefix, reply_prefix) = self.take_prefixes_for_next_request(reply_prefix); + InputContext { + detached_prefix_contents: detached_prefix.contents, + reply_prefix_contents: reply_prefix.contents, + prefix_is_error: detached_prefix.is_error || reply_prefix.is_error, + start_offset: 0, + prefix_bytes: 0, + input_echo: None, + input_transcript: None, + } + } + + fn prepare_input_context_pager(&mut self, text: &str, echo_input: bool) -> InputContext { + self.output.start_capture(); + + let had_pending_output = self.output.has_pending_output(); + let saw_background_output = self.output.pending_output_since_last_reply(); + let prompt_hint = self.current_prompt_hint(); + self.remember_prompt(prompt_hint.clone()); + + let mut input_echo = echo_input + .then(|| text.to_string()) + .and_then(|value| pager::build_input_echo(&value)); + let input_transcript = build_input_transcript(prompt_hint.as_deref(), text); + let reply_prefix = self.take_current_prefix_pager(had_pending_output); + let (detached_prefix, reply_prefix) = self.take_prefixes_for_next_request(reply_prefix); + + let start_offset = self.output.end_offset().unwrap_or(0); + if input_echo.is_none() && (echo_input || saw_background_output || had_pending_output) { + input_echo = pager::build_input_echo(text); + } + + InputContext { + detached_prefix_contents: detached_prefix.contents, + reply_prefix_contents: reply_prefix.contents, + prefix_is_error: detached_prefix.is_error || reply_prefix.is_error, + start_offset, + prefix_bytes: detached_prefix.bytes.saturating_add(reply_prefix.bytes), + input_echo, + input_transcript, + } + } + + fn take_current_prefix_files(&mut self) -> PrefixCapture { let settled_completion = self.settled_pending_completion.take(); let fallback_input = settled_completion .as_ref() @@ -1614,28 +1771,14 @@ impl WorkerManager { ); } } - InputContext { - prefix_contents: contents, - prefix_is_error: saw_stderr, - start_offset: 0, - prefix_bytes: 0, - input_echo: None, - input_transcript: None, + PrefixCapture { + contents, + is_error: saw_stderr, + bytes: 0, } } - fn prepare_input_context_pager(&mut self, text: &str, echo_input: bool) -> InputContext { - self.output.start_capture(); - - let had_pending_output = self.output.has_pending_output(); - let saw_background_output = self.output.pending_output_since_last_reply(); - let prompt_hint = self.current_prompt_hint(); - self.remember_prompt(prompt_hint.clone()); - - let mut input_echo = echo_input - .then(|| text.to_string()) - .and_then(|value| pager::build_input_echo(&value)); - let input_transcript = build_input_transcript(prompt_hint.as_deref(), text); + fn take_current_prefix_pager(&mut self, had_pending_output: bool) -> PrefixCapture { let settled_completion = self.settled_pending_completion.take(); let mut prefix_contents = Vec::new(); @@ -1668,18 +1811,10 @@ impl WorkerManager { prefix_bytes = pending_bytes; } - let start_offset = self.output.end_offset().unwrap_or(0); - if input_echo.is_none() && (echo_input || saw_background_output || had_pending_output) { - input_echo = pager::build_input_echo(text); - } - - InputContext { - prefix_contents, - prefix_is_error, - start_offset, - prefix_bytes, - input_echo, - input_transcript, + PrefixCapture { + contents: prefix_contents, + is_error: prefix_is_error, + bytes: prefix_bytes, } } @@ -1691,11 +1826,9 @@ impl WorkerManager { ) -> Result { let text = normalize_input_newlines(&text); let started_at = std::time::Instant::now(); - if matches!(self.oversized_output, OversizedOutputMode::Files) { - let prompt = self.current_prompt_hint(); - self.remember_prompt(prompt.clone()); - self.pending_request_input = Some(text.clone()); - } + let prompt = self.current_prompt_hint(); + self.remember_prompt(prompt); + self.pending_request_input = Some(text.clone()); let ipc = self .process .as_ref() @@ -1723,8 +1856,9 @@ impl WorkerManager { err: &WorkerError, context: InputContext, ) -> ReplyWithOffset { - self.last_detached_prefix_item_count = context.prefix_contents.len(); - let mut contents = context.prefix_contents; + self.last_detached_prefix_item_count = context.detached_prefix_contents.len(); + let mut contents = context.detached_prefix_contents; + contents.extend(context.reply_prefix_contents); let formatted = self.drain_sealed_formatted_output(); contents.extend(formatted.contents); contents.push(WorkerContent::server_stderr(format!("worker error: {err}"))); @@ -1746,10 +1880,11 @@ impl WorkerManager { context: InputContext, page_bytes: u64, ) -> ReplyWithOffset { - self.last_detached_prefix_item_count = context.prefix_contents.len(); + self.last_detached_prefix_item_count = context.detached_prefix_contents.len(); let end_offset = self.output.end_offset().unwrap_or(context.start_offset); let first_page_budget = page_bytes.saturating_sub(context.prefix_bytes); - let mut contents = context.prefix_contents; + let mut contents = context.detached_prefix_contents; + contents.extend(context.reply_prefix_contents); if let Some(echo) = context.input_echo { contents.push(WorkerContent::stdout(echo)); } @@ -1786,7 +1921,7 @@ impl WorkerManager { request: RequestState, context: InputContext, ) -> Result { - self.last_detached_prefix_item_count = context.prefix_contents.len(); + self.last_detached_prefix_item_count = context.detached_prefix_contents.len(); match self.wait_for_request_completion(request.timeout) { Ok(completion) => { let mut session_end = completion.session_end_seen; @@ -1799,7 +1934,8 @@ impl WorkerManager { if session_end { self.note_session_end(true); } - let mut contents = context.prefix_contents; + let mut contents = context.detached_prefix_contents; + contents.extend(context.reply_prefix_contents); let formatted = self.drain_final_formatted_output(); let is_error = context.prefix_is_error || formatted.saw_stderr; contents.extend(formatted.contents); @@ -1885,7 +2021,8 @@ impl WorkerManager { } self.pending_request = true; self.pending_request_started_at = Some(request.started_at); - let mut contents = context.prefix_contents; + let mut contents = context.detached_prefix_contents; + contents.extend(context.reply_prefix_contents); let formatted = self.drain_formatted_output(); contents.extend(formatted.contents); @@ -1918,7 +2055,7 @@ impl WorkerManager { context: InputContext, page_bytes: u64, ) -> Result { - self.last_detached_prefix_item_count = context.prefix_contents.len(); + self.last_detached_prefix_item_count = context.detached_prefix_contents.len(); match self.wait_for_request_completion(request.timeout) { Ok(completion) => { let fallback_input_transcript = context.input_transcript.clone(); @@ -1934,7 +2071,8 @@ impl WorkerManager { } let end_offset = self.output.end_offset().unwrap_or(context.start_offset); let first_page_budget = page_bytes.saturating_sub(context.prefix_bytes); - let mut contents = context.prefix_contents; + let mut contents = context.detached_prefix_contents; + contents.extend(context.reply_prefix_contents); if let Some(echo) = context.input_echo { contents.push(WorkerContent::stdout(echo)); } @@ -2039,7 +2177,8 @@ impl WorkerManager { self.pending_request_started_at = Some(request.started_at); let end_offset = self.output.end_offset().unwrap_or(0); let first_page_budget = page_bytes.saturating_sub(context.prefix_bytes); - let mut contents = context.prefix_contents; + let mut contents = context.detached_prefix_contents; + contents.extend(context.reply_prefix_contents); if let Some(echo) = context.input_echo { contents.push(WorkerContent::stdout(echo)); } @@ -2193,7 +2332,9 @@ impl WorkerManager { } fn should_settle_multiline_r_timeout(&self) -> bool { - if self.backend != Backend::R { + if self.backend != Backend::R + || !matches!(self.oversized_output, OversizedOutputMode::Files) + { return false; } self.pending_request_input @@ -2262,26 +2403,49 @@ impl WorkerManager { } fn maybe_emit_guardrail_notice(&mut self) { - let mut slot = self - .guardrail - .event - .lock() - .expect("guardrail event mutex poisoned"); - if slot.as_ref().is_some_and(|event| event.was_busy) { + self.maybe_emit_pending_server_notice(); + let event = { + let mut slot = self + .guardrail + .event + .lock() + .expect("guardrail event mutex poisoned"); + if slot.as_ref().is_some_and(|event| event.was_busy) { + return; + } + slot.take() + }; + let Some(event) = event else { return; - } - let Some(event) = slot.take() else { + }; + self.append_server_notice(event); + } + + fn maybe_emit_pending_server_notice(&mut self) { + let Some(event) = self.pending_server_notice.take() else { return; }; + self.append_server_notice(event); + } + + fn append_server_notice(&mut self, event: GuardrailEvent) { match self.oversized_output { - OversizedOutputMode::Files => self - .pending_output_tape - .append_server_stderr_bytes(event.message.as_bytes()), - OversizedOutputMode::Pager => self.output_timeline.append_text( - event.message.as_bytes(), - true, - ContentOrigin::Server, - ), + OversizedOutputMode::Files => { + if event.is_error { + self.pending_output_tape + .append_server_stderr_bytes(event.message.as_bytes()); + } else { + self.pending_output_tape + .append_stdout_status_line(event.message.as_bytes()); + } + } + OversizedOutputMode::Pager => { + self.output_timeline.append_text( + event.message.as_bytes(), + event.is_error, + ContentOrigin::Server, + ); + } } } @@ -2335,37 +2499,43 @@ impl WorkerManager { fn maybe_reset_after_session_end(&mut self) { if self.session_end_seen { - let _ = match self.oversized_output { + let result = match self.oversized_output { OversizedOutputMode::Files => self.reset_preserving_detached_prefix_item_count(), OversizedOutputMode::Pager => self .reset_with_pager_preserving_detached_prefix_item_count(self.pager.is_active()), }; + if result.is_ok() { + self.note_respawn_during_write(); + } self.session_end_seen = false; } } - pub fn interrupt(&mut self, timeout: Duration) -> Result { - match self.oversized_output { - OversizedOutputMode::Files => self.interrupt_files(timeout), - OversizedOutputMode::Pager => self.interrupt_pager(timeout), - } - } - - fn interrupt_files(&mut self, timeout: Duration) -> Result { - self.interrupt_files_inner(timeout, true) - } - - fn interrupt_files_control_tail( + pub fn interrupt( &mut self, timeout: Duration, + deferred_sandbox_state_update: Option, + suppress_session_end_reset: bool, ) -> Result { - self.interrupt_files_inner(timeout, false) + match self.oversized_output { + OversizedOutputMode::Files => self.interrupt_files( + timeout, + deferred_sandbox_state_update, + suppress_session_end_reset, + ), + OversizedOutputMode::Pager => self.interrupt_pager( + timeout, + deferred_sandbox_state_update, + suppress_session_end_reset, + ), + } } - fn interrupt_files_inner( + fn interrupt_files( &mut self, timeout: Duration, - reset_after_session_end: bool, + deferred_sandbox_state_update: Option, + suppress_session_end_reset: bool, ) -> Result { crate::event_log::log( "worker_interrupt_begin", @@ -2373,7 +2543,11 @@ impl WorkerManager { "timeout_ms": timeout.as_millis(), }), ); - self.ensure_process()?; + let interrupt_drains_existing_completion = + self.pending_request || self.settled_pending_completion.is_some(); + if self.pending_request { + self.ensure_process()?; + } if self.pending_request && let Err(err) = self.driver.interrupt( self.process @@ -2391,7 +2565,7 @@ impl WorkerManager { return Err(err); } - if self.pending_request { + if interrupt_drains_existing_completion { let mut reply = self.poll_pending_output_files(timeout)?; let prompt = match &reply.reply { WorkerReply::Output { prompt, .. } => prompt.clone(), @@ -2404,9 +2578,11 @@ impl WorkerManager { append_prompt_if_missing(contents, Some(prompt)); } let reply = self.finalize_reply(reply); - if reset_after_session_end { - self.maybe_reset_after_session_end(); - } + self.maybe_reset_after_session_end_with_options( + deferred_sandbox_state_update, + suppress_session_end_reset, + false, + )?; return Ok(reply); } @@ -2473,10 +2649,16 @@ impl WorkerManager { "session_end": session_end, }), ); - Ok(self.finalize_reply(ReplyWithOffset { + let reply = self.finalize_reply(ReplyWithOffset { reply, end_offset: 0, - })) + }); + self.maybe_reset_after_session_end_with_options( + deferred_sandbox_state_update, + suppress_session_end_reset, + false, + )?; + Ok(reply) } pub fn restart(&mut self, timeout: Duration) -> Result { @@ -2502,28 +2684,21 @@ impl WorkerManager { let _ = process.shutdown_graceful(timeout); } self.guardrail.busy.store(false, Ordering::Relaxed); + self.maybe_emit_pending_server_notice(); let reply = self.build_session_reset_reply_files("new session started"); + self.clear_preserved_prefixes(); self.reset_output_state_files(true); + self.note_respawn_during_write(); crate::event_log::log("worker_restart_end", serde_json::json!({"status": "ok"})); Ok(self.finalize_reply(reply)) } - fn interrupt_pager(&mut self, timeout: Duration) -> Result { - self.interrupt_pager_inner(timeout, true) - } - - fn interrupt_pager_control_tail( - &mut self, - timeout: Duration, - ) -> Result { - self.interrupt_pager_inner(timeout, false) - } - - fn interrupt_pager_inner( + fn interrupt_pager( &mut self, timeout: Duration, - reset_after_session_end: bool, + deferred_sandbox_state_update: Option, + suppress_session_end_reset: bool, ) -> Result { crate::event_log::log( "worker_interrupt_begin", @@ -2531,7 +2706,11 @@ impl WorkerManager { "timeout_ms": timeout.as_millis(), }), ); - self.ensure_process()?; + let interrupt_drains_existing_completion = + self.pending_request || self.settled_pending_completion.is_some(); + if self.pending_request { + self.ensure_process()?; + } if self.pending_request && let Err(err) = self.driver.interrupt( self.process @@ -2550,7 +2729,7 @@ impl WorkerManager { } let page_bytes = pager::resolve_page_bytes(None); - if self.pending_request { + if interrupt_drains_existing_completion { let mut reply = self.poll_pending_output_pager(timeout, page_bytes)?; let pager_active = self.pager.is_active(); let prompt = match &reply.reply { @@ -2566,9 +2745,11 @@ impl WorkerManager { } } let reply = self.finalize_reply(reply); - if reset_after_session_end { - self.maybe_reset_after_session_end(); - } + self.maybe_reset_after_session_end_with_options( + deferred_sandbox_state_update, + suppress_session_end_reset, + false, + )?; return Ok(reply); } @@ -2657,7 +2838,13 @@ impl WorkerManager { "session_end": session_end, }), ); - Ok(self.finalize_reply(ReplyWithOffset { reply, end_offset })) + let reply = self.finalize_reply(ReplyWithOffset { reply, end_offset }); + self.maybe_reset_after_session_end_with_options( + deferred_sandbox_state_update, + suppress_session_end_reset, + false, + )?; + Ok(reply) } fn restart_pager(&mut self, timeout: Duration) -> Result { @@ -2676,10 +2863,13 @@ impl WorkerManager { let _ = process.shutdown_graceful(timeout); } self.guardrail.busy.store(false, Ordering::Relaxed); + self.maybe_emit_pending_server_notice(); let page_bytes = pager::resolve_page_bytes(None); let reply = self.build_session_reset_reply_pager(page_bytes, "new session started"); + self.clear_preserved_prefixes(); self.reset_output_state_pager(true, false); + self.note_respawn_during_write(); crate::event_log::log("worker_restart_end", serde_json::json!({"status": "ok"})); Ok(self.finalize_reply(reply)) } @@ -2783,12 +2973,20 @@ impl WorkerManager { crate::sandbox::log_sandbox_policy_update(&update.sandbox_policy); let mut inherited_state = self.sandbox_defaults.clone(); inherited_state.apply_update(update); + #[cfg(target_os = "linux")] + self.apply_linux_bwrap_fallback_override(&mut inherited_state); let resolved_state = resolve_effective_sandbox_state_with_defaults( &self.sandbox_plan, Some(&inherited_state), &self.sandbox_defaults, ) .map_err(WorkerError::Sandbox)?; + #[cfg(target_os = "linux")] + let resolved_state = { + let mut resolved_state = resolved_state; + self.apply_linux_bwrap_fallback_override(&mut resolved_state); + resolved_state + }; let missing_before = self.missing_inherited_sandbox_state(); self.inherited_sandbox_state = Some(inherited_state); let changed = self.sandbox_state != resolved_state; @@ -2807,6 +3005,13 @@ impl WorkerManager { }) } + #[cfg(target_os = "linux")] + fn apply_linux_bwrap_fallback_override(&self, state: &mut SandboxState) { + if self.linux_bwrap_fallback_disabled { + state.use_linux_sandbox_bwrap = false; + } + } + fn log_sandbox_state_update( prepared: &PreparedSandboxStateUpdate, timeout: Option, @@ -2850,11 +3055,14 @@ impl WorkerManager { OversizedOutputMode::Pager => self.spawn_process_with_pager(false)?, }); respawned = true; + self.note_respawn_during_write(); } Self::log_sandbox_state_update(&prepared, Some(timeout), respawned); return Ok(respawned); } + let aborted_request = self.pending_request; + let had_prior_session = self.last_spawn.is_some(); if let Some(process) = self.process.take() { let _ = process.shutdown_graceful(timeout); } @@ -2873,10 +3081,34 @@ impl WorkerManager { OversizedOutputMode::Pager => self.spawn_process_with_pager(false)?, }); respawned = true; + self.note_respawn_during_write(); + if had_prior_session { + self.stage_sandbox_change_restart_notice(aborted_request); + self.next_live_prefix_belongs_to_reply = true; + } Self::log_sandbox_state_update(&prepared, Some(timeout), respawned); Ok(respawned) } + fn stage_sandbox_change_restart_notice(&mut self, aborted_request: bool) { + let policy = serde_json::to_string(&self.sandbox_state.sandbox_policy) + .unwrap_or_else(|err| format!("{{\"serialize_error\":\"{}\"}}", err)); + let mut message = String::from("[repl] sandbox policy changed; new session started\n"); + if aborted_request { + message.push_str("[repl] previous request aborted because sandbox policy changed\n"); + } + message.push_str(&format!("[repl] new sandbox policy: {policy}\n")); + let event = GuardrailEvent { + message, + was_busy: false, + is_error: false, + }; + match &mut self.pending_server_notice { + Some(pending) => pending.message.push_str(&event.message), + None => self.pending_server_notice = Some(event), + } + } + fn has_detached_output_to_preserve(&self) -> bool { match self.oversized_output { OversizedOutputMode::Files => { @@ -2893,7 +3125,28 @@ impl WorkerManager { } fn reset_output_state_files_preserving_detached_output(&mut self) { - self.reset_output_state_files_inner(false, true); + self.seed_aborted_files_completion_for_respawn(); + let prefix = self.take_current_prefix_files(); + self.stage_prefix_before_respawn(prefix); + self.reset_output_state_files_inner(true, false); + } + + fn seed_aborted_files_completion_for_respawn(&mut self) { + if !self.pending_request + || self.settled_pending_completion.is_some() + || self.pending_request_input.is_none() + { + return; + } + + let prompt = self.last_prompt.clone(); + self.settled_pending_completion = Some(CompletionInfo { + prompt: prompt.clone(), + prompt_variants: prompt.clone().map(|prompt| vec![prompt]), + echo_events: Vec::new(), + protocol_warnings: Vec::new(), + session_end_seen: false, + }); } fn reset_output_state_files_inner( @@ -2923,7 +3176,34 @@ impl WorkerManager { } fn reset_output_state_pager_preserving_detached_output(&mut self, preserve_pager: bool) { - self.reset_output_state_pager_inner(false, preserve_pager, true); + self.seed_aborted_pager_completion_for_respawn(); + let had_pending_output = self.output.has_pending_output(); + let prefix = self.take_current_prefix_pager(had_pending_output); + self.stage_prefix_before_respawn(prefix); + self.reset_output_state_pager_inner(true, preserve_pager, false); + } + + fn seed_aborted_pager_completion_for_respawn(&mut self) { + if !self.pending_request + || self.settled_pending_completion.is_some() + || self.pending_request_input.is_none() + { + return; + } + + let prompt = self.last_prompt.clone(); + let prompt_variants = prompt.clone().map(|prompt| vec![prompt]); + let echo_events = match (prompt, self.pending_request_input.clone()) { + (Some(prompt), Some(line)) => vec![IpcEchoEvent { prompt, line }], + _ => Vec::new(), + }; + self.settled_pending_completion = Some(CompletionInfo { + prompt: self.last_prompt.clone(), + prompt_variants, + echo_events, + protocol_warnings: Vec::new(), + session_end_seen: false, + }); } fn reset_output_state_pager_inner( @@ -2956,9 +3236,53 @@ impl WorkerManager { self.guardrail.busy.store(false, Ordering::Relaxed); } - fn remember_prompt(&mut self, prompt: Option) { - if let Some(prompt) = normalize_prompt(prompt) { - self.last_prompt = Some(prompt); + fn append_prefix_capture(target: &mut PrefixCapture, mut prefix: PrefixCapture) { + if prefix.contents.is_empty() { + prefix.bytes = 0; + } + if prefix.contents.is_empty() && !prefix.is_error { + return; + } + target.is_error |= prefix.is_error; + target.bytes = target + .bytes + .saturating_add(prefix_worker_text_bytes(&prefix.contents)); + target.contents.append(&mut prefix.contents); + } + + fn take_prefixes_for_next_request( + &mut self, + current_prefix: PrefixCapture, + ) -> (PrefixCapture, PrefixCapture) { + let mut detached_prefix = std::mem::take(&mut self.preserved_detached_prefix); + let mut reply_prefix = std::mem::take(&mut self.reply_owned_prefix); + if self.next_live_prefix_belongs_to_reply { + Self::append_prefix_capture(&mut reply_prefix, current_prefix); + } else { + Self::append_prefix_capture(&mut detached_prefix, current_prefix); + } + self.next_live_prefix_belongs_to_reply = false; + (detached_prefix, reply_prefix) + } + + fn stage_prefix_before_respawn(&mut self, prefix: PrefixCapture) { + if self.next_live_prefix_belongs_to_reply { + Self::append_prefix_capture(&mut self.reply_owned_prefix, prefix); + self.next_live_prefix_belongs_to_reply = false; + } else { + Self::append_prefix_capture(&mut self.preserved_detached_prefix, prefix); + } + } + + fn clear_preserved_prefixes(&mut self) { + self.preserved_detached_prefix = PrefixCapture::default(); + self.reply_owned_prefix = PrefixCapture::default(); + self.next_live_prefix_belongs_to_reply = false; + } + + fn remember_prompt(&mut self, prompt: Option) { + if let Some(prompt) = normalize_prompt(prompt) { + self.last_prompt = Some(prompt); } } @@ -3187,6 +3511,7 @@ impl WorkerManager { }), ); + self.linux_bwrap_fallback_disabled = true; self.sandbox_state.use_linux_sandbox_bwrap = false; self.sandbox_defaults.use_linux_sandbox_bwrap = false; if let Some(inherited_state) = self.inherited_sandbox_state.as_mut() { @@ -4345,6 +4670,20 @@ fn mark_busy_follow_up_reply(reply: &mut WorkerReply) { } } +fn prefix_worker_text_bytes(contents: &[WorkerContent]) -> u64 { + contents + .iter() + .map(|content| match content { + WorkerContent::ContentText { + text, + origin: ContentOrigin::Worker, + .. + } => text.len() as u64, + WorkerContent::ContentText { .. } | WorkerContent::ContentImage { .. } => 0, + }) + .sum() +} + struct WorkerProcess { child: Child, stdin_tx: mpsc::Sender, @@ -5276,6 +5615,7 @@ fn start_memory_guardrail( *slot = Some(GuardrailEvent { message: message.clone(), was_busy: busy, + is_error: true, }); } } @@ -5704,8 +6044,8 @@ fn worker_error_code(err: &WorkerError) -> Option { mod tests { use super::*; use crate::output_capture::{ - OUTPUT_RING_CAPACITY_BYTES, OutputEventKind, OutputRing, ensure_output_ring, - reset_last_reply_marker_offset, reset_output_ring, + OUTPUT_RING_CAPACITY_BYTES, OutputEventKind, OutputRing, OutputTextSpan, + ensure_output_ring, reset_last_reply_marker_offset, reset_output_ring, }; use crate::sandbox::SandboxPolicy; #[cfg(target_os = "linux")] @@ -5727,7 +6067,7 @@ mod tests { fn output_ring_test_guard() -> MutexGuard<'static, ()> { crate::output_capture::output_ring_test_mutex() .lock() - .expect("output ring test lock") + .unwrap_or_else(|err| err.into_inner()) } fn echo_event(prompt: &str, line: &str) -> IpcEchoEvent { @@ -5748,6 +6088,31 @@ mod tests { .join("") } + fn pager_buffer_from_worker_text(text: &str) -> crate::pager::PagerBuffer { + pager_buffer_from_worker_text_with_source_end(text, text.len() as u64) + } + + fn static_pager_buffer_from_worker_text(text: &str) -> crate::pager::PagerBuffer { + pager_buffer_from_worker_text_with_source_end(text, u64::MAX) + } + + fn pager_buffer_from_worker_text_with_source_end( + text: &str, + source_end: u64, + ) -> crate::pager::PagerBuffer { + crate::pager::PagerBuffer::from_bytes_and_events( + text.as_bytes().to_vec(), + Vec::new(), + vec![OutputTextSpan { + start_byte: 0, + end_byte: text.len(), + is_stderr: false, + origin: ContentOrigin::Worker, + }], + source_end, + ) + } + #[cfg(target_family = "unix")] fn sleeping_test_child() -> Child { Command::new("sh") @@ -6570,7 +6935,7 @@ mod tests { }); let context = manager.prepare_input_context_files(); - let text = contents_text(&context.prefix_contents); + let text = contents_text(&context.detached_prefix_contents); assert!( text.contains("DETACHED_OK\n"), @@ -6586,6 +6951,49 @@ mod tests { ); } + #[test] + fn interrupt_files_drains_settled_completion_without_leaking_echo() { + let mut manager = WorkerManager::new( + Backend::Python, + SandboxCliPlan::default(), + crate::oversized_output::OversizedOutputMode::Files, + ) + .expect("worker manager"); + manager + .pending_output_tape + .append_stdout_bytes(b">>> import time; time.sleep(0.07)\nDETACHED_OK\n"); + manager.pending_request_input = Some("import time; time.sleep(0.07)\n".to_string()); + manager.settled_pending_completion = Some(CompletionInfo { + prompt: Some(">>> ".to_string()), + prompt_variants: Some(vec![">>> ".to_string()]), + echo_events: Vec::new(), + protocol_warnings: Vec::new(), + session_end_seen: false, + }); + + let WorkerReply::Output { contents, .. } = manager + .interrupt(Duration::from_millis(10), None, false) + .expect("interrupt reply"); + let text = contents_text(&contents); + + assert!( + text.contains("DETACHED_OK\n"), + "expected the settled completion output to be preserved, got: {text:?}" + ); + assert!( + !text.contains("import time; time.sleep(0.07)"), + "did not expect the settled completion echo to leak through interrupt handling, got: {text:?}" + ); + assert!( + text.contains(">>> "), + "expected the settled completion to keep the prompt on the interrupt reply, got: {text:?}" + ); + assert!( + manager.settled_pending_completion.is_none(), + "expected the settled completion to be consumed by the interrupt follow-up" + ); + } + #[test] fn files_reset_preserving_detached_output_keeps_pending_request_input_for_trim() { let mut manager = WorkerManager::new( @@ -6609,7 +7017,7 @@ mod tests { manager.reset_output_state_files_preserving_detached_output(); let context = manager.prepare_input_context_files(); - let text = contents_text(&context.prefix_contents); + let text = contents_text(&context.detached_prefix_contents); assert!( text.contains("DETACHED_OK\n"), @@ -6625,6 +7033,78 @@ mod tests { ); } + #[test] + fn files_respawned_pending_request_trims_echo_without_settled_completion() { + let mut manager = WorkerManager::new( + Backend::Python, + SandboxCliPlan::default(), + crate::oversized_output::OversizedOutputMode::Files, + ) + .expect("worker manager"); + manager.pending_request = true; + manager.last_prompt = Some(">>> ".to_string()); + manager + .pending_output_tape + .append_stdout_bytes(b">>> import time; time.sleep(0.2)\nDETACHED_OK\n"); + manager.pending_request_input = Some("import time; time.sleep(0.2)\n".to_string()); + + manager.reset_output_state_files_preserving_detached_output(); + + let context = manager.prepare_input_context_files(); + let text = contents_text(&context.detached_prefix_contents); + + assert!( + text.contains("DETACHED_OK\n"), + "expected aborted pending output to survive the respawned reset, got: {text:?}" + ); + assert!( + !text.contains("import time; time.sleep(0.2)"), + "did not expect the aborted request echo to leak across the respawn boundary, got: {text:?}" + ); + assert!( + manager.pending_request_input.is_none(), + "expected the aborted request input fallback to be consumed once the detached prefix is prepared" + ); + } + + #[test] + fn pager_respawned_pending_request_trims_echo_without_echo_events() { + let _guard = output_ring_test_guard(); + let _output_ring = ensure_output_ring(OUTPUT_RING_CAPACITY_BYTES); + reset_output_ring(); + reset_last_reply_marker_offset(); + + let mut manager = WorkerManager::new( + Backend::Python, + SandboxCliPlan::default(), + crate::oversized_output::OversizedOutputMode::Pager, + ) + .expect("worker manager"); + manager.pending_request = true; + manager.last_prompt = Some(">>> ".to_string()); + manager.pending_request_input = Some("import time; time.sleep(0.2)\n".to_string()); + manager.output.start_capture(); + manager.output_timeline.append_text( + b">>> import time; time.sleep(0.2)\nDETACHED_OK\n", + false, + ContentOrigin::Worker, + ); + + manager.reset_output_state_pager_preserving_detached_output(false); + + let context = manager.prepare_input_context_pager("1+1", false); + let text = contents_text(&context.detached_prefix_contents); + + assert!( + text.contains("DETACHED_OK\n"), + "expected aborted pager output to survive the respawned reset, got: {text:?}" + ); + assert!( + !text.contains("import time; time.sleep(0.2)"), + "did not expect the aborted pager echo to leak across the respawn boundary, got: {text:?}" + ); + } + #[test] fn files_prepare_input_context_seals_split_utf8_at_request_boundary() { let mut manager = WorkerManager::new( @@ -6637,7 +7117,7 @@ mod tests { let first = manager.prepare_input_context_files(); assert_eq!( - contents_text(&first.prefix_contents), + contents_text(&first.detached_prefix_contents), "\\xC3", "expected an accepted request to seal the detached utf-8 lead byte into the prefix" ); @@ -6648,7 +7128,7 @@ mod tests { let second = manager.prepare_input_context_files(); assert_eq!( - contents_text(&second.prefix_contents), + contents_text(&second.detached_prefix_contents), "\\xA9\n", "expected the next request output to stay split after the detached prefix was sealed" ); @@ -6733,41 +7213,165 @@ mod tests { let context = manager.prepare_input_context_files(); assert_eq!( - context.prefix_contents, + context.detached_prefix_contents, vec![WorkerContent::stdout("> Sys.sleep(5)\n")], "expected a sealed files-mode prefix without settled completion metadata to keep echoed input" ); } #[test] - fn pager_prepare_input_context_trims_echo_from_settled_completion() { - let _guard = output_ring_test_guard(); - let _output_ring = ensure_output_ring(OUTPUT_RING_CAPACITY_BYTES); - reset_output_ring(); - reset_last_reply_marker_offset(); + fn files_preserved_detached_prefix_stays_separate_from_new_session_startup_output() { + let mut manager = WorkerManager::new( + Backend::Python, + SandboxCliPlan::default(), + crate::oversized_output::OversizedOutputMode::Files, + ) + .expect("worker manager"); + manager + .pending_output_tape + .append_stdout_bytes(b"OLD_TAIL\n"); + + manager.reset_output_state_files_preserving_detached_output(); + manager.next_live_prefix_belongs_to_reply = true; + manager + .pending_output_tape + .append_stdout_bytes(b"NEW_SESSION_STARTUP\n"); + + let context = manager.prepare_input_context_files(); + + assert_eq!( + contents_text(&context.detached_prefix_contents), + "OLD_TAIL\n", + "expected preserved detached output to stay isolated from the replacement session" + ); + assert_eq!( + contents_text(&context.reply_prefix_contents), + "NEW_SESSION_STARTUP\n", + "expected fresh-session startup output to stay with the new reply prefix" + ); + } + #[test] + fn busy_guardrail_event_survives_sandbox_restart_notice() { let mut manager = WorkerManager::new( Backend::R, SandboxCliPlan::default(), - crate::oversized_output::OversizedOutputMode::Pager, + crate::oversized_output::OversizedOutputMode::Files, ) .expect("worker manager"); - manager.output.start_capture(); - manager.output_timeline.append_text( - b"> Sys.sleep(0.2); 1+1\n[1] 2\n", - false, - ContentOrigin::Worker, + manager.exe_path = PathBuf::from("definitely-missing-worker-exe"); + manager.stage_sandbox_change_restart_notice(true); + manager.guardrail.busy.store(true, Ordering::Relaxed); + { + let mut slot = manager + .guardrail + .event + .lock() + .expect("guardrail event mutex poisoned"); + *slot = Some(GuardrailEvent { + message: "[repl] worker killed by memory guardrail\n".to_string(), + was_busy: true, + is_error: true, + }); + } + + let reply = manager + .write_stdin_files( + "1+1".to_string(), + Duration::from_millis(10), + Duration::from_millis(10), + WriteStdinOptions::default(), + ) + .expect("guardrail reply"); + let WorkerReply::Output { contents, .. } = reply; + let text = contents_text(&contents); + + assert!( + text.contains("sandbox policy changed; new session started"), + "expected the queued restart notice to stay visible, got: {text:?}" ); - manager.settled_pending_completion = Some(CompletionInfo { - prompt: Some("> ".to_string()), - prompt_variants: Some(vec!["> ".to_string()]), - echo_events: vec![echo_event("> ", "Sys.sleep(0.2); 1+1\n")], - protocol_warnings: Vec::new(), - session_end_seen: false, - }); + assert!( + text.contains("worker error: [repl] worker killed by memory guardrail"), + "expected the busy guardrail error to remain authoritative, got: {text:?}" + ); + assert!( + !manager.guardrail_busy_event_pending(), + "expected the busy guardrail slot to be consumed by the local retry reply" + ); + assert!( + manager.pending_server_notice.is_none(), + "expected the restart notice to be emitted instead of lingering" + ); + assert!( + manager.process.is_none(), + "did not expect the unit test to retain a spawned worker" + ); + } + + #[test] + fn bare_restart_flushes_queued_sandbox_change_notice() { + let mut manager = WorkerManager::new( + Backend::R, + SandboxCliPlan::default(), + crate::oversized_output::OversizedOutputMode::Files, + ) + .expect("worker manager"); + manager.stage_sandbox_change_restart_notice(true); + + let reply = manager + .write_stdin_files( + "\u{4}".to_string(), + Duration::from_millis(10), + Duration::from_millis(10), + WriteStdinOptions::default(), + ) + .expect("restart reply"); + let WorkerReply::Output { contents, .. } = reply; + let text = contents_text(&contents); + + assert!( + text.contains("sandbox policy changed; new session started"), + "expected bare restart to flush the queued sandbox notice, got: {text:?}" + ); + assert!( + text.contains("[repl] new session started"), + "expected the explicit restart notice to remain visible, got: {text:?}" + ); + assert!( + manager.pending_server_notice.is_none(), + "expected the queued sandbox notice to be consumed by the restart reply" + ); + } + + #[test] + fn pager_collapsed_settled_completion_trims_echo_and_keeps_output() { + let range = OutputRange { + start_offset: 0, + end_offset: 25, + bytes: b"> Sys.sleep(0.2); 1+1\n[1] 2\n".to_vec(), + events: Vec::new(), + text_spans: vec![OutputTextSpan { + start_byte: 0, + end_byte: 25, + is_stderr: false, + origin: ContentOrigin::Worker, + }], + }; - let context = manager.prepare_input_context_pager("3+3", false); - let text = contents_text(&context.prefix_contents); + let collapsed = collapse_echo_with_attribution( + range, + &[echo_event("> ", "Sys.sleep(0.2); 1+1\n")], + 0, + &["> ".to_string()], + EchoCollapseMode::CollapseForFinalReply, + ); + let contents = pager::contents_from_collapsed_output( + collapsed.bytes, + collapsed.events, + collapsed.text_spans, + 25, + ); + let text = contents_text(&contents); assert!( text.contains("[1] 2\n"), @@ -6777,10 +7381,6 @@ mod tests { !text.contains("Sys.sleep(0.2); 1+1"), "did not expect settled pager echo to leak into the next input context, got: {text:?}" ); - assert!( - manager.settled_pending_completion.is_none(), - "expected settled completion metadata to be consumed with the detached prefix" - ); } #[test] @@ -6798,17 +7398,12 @@ mod tests { .expect("worker manager"); manager.process = Some(test_worker_process(sleeping_test_child())); - manager.output.start_capture(); - manager.output_timeline.append_text( - b"line0001\nline0002\nline0003\nline0004\n", + manager.pager.activate( + pager_buffer_from_worker_text("line0001\nline0002\nline0003\nline0004\n"), false, - ContentOrigin::Worker, ); - let end_offset = manager.output.end_offset().expect("output end offset"); - let SnapshotWithImages { buffer, .. } = - snapshot_page_with_images(&manager.output, end_offset, 16); - manager.pager.activate(buffer.expect("pager buffer"), false); + manager.output.start_capture(); manager .output_timeline .append_text(b"detached\n", false, ContentOrigin::Worker); @@ -6850,15 +7445,10 @@ mod tests { manager.process = Some(test_worker_process(sleeping_test_child())); manager.exe_path = PathBuf::from("definitely-missing-worker-exe"); - manager.output.start_capture(); let output = (1..=24).map(|n| format!("L{n:04}\n")).collect::(); manager - .output_timeline - .append_text(output.as_bytes(), false, ContentOrigin::Worker); - let end_offset = manager.output.end_offset().expect("output end offset"); - let SnapshotWithImages { buffer, .. } = - snapshot_page_with_images(&manager.output, end_offset, 16); - manager.pager.activate(buffer.expect("pager buffer"), false); + .pager + .activate(static_pager_buffer_from_worker_text(&output), false); { let process = manager.process.as_mut().expect("worker process"); @@ -6898,6 +7488,48 @@ mod tests { ); } + #[test] + fn bare_restart_clears_preserved_detached_prefixes() { + let mut manager = WorkerManager::new( + Backend::R, + SandboxCliPlan::default(), + crate::oversized_output::OversizedOutputMode::Files, + ) + .expect("worker manager"); + manager.preserved_detached_prefix = PrefixCapture { + contents: vec![WorkerContent::worker_stdout("OLD_DETACHED\n")], + is_error: false, + bytes: "OLD_DETACHED\n".len() as u64, + }; + manager.reply_owned_prefix = PrefixCapture { + contents: vec![WorkerContent::worker_stdout("OLD_REPLY\n")], + is_error: false, + bytes: "OLD_REPLY\n".len() as u64, + }; + manager.next_live_prefix_belongs_to_reply = true; + + let reply = manager + .write_stdin_files( + "\u{4}".to_string(), + Duration::from_millis(10), + Duration::from_millis(10), + WriteStdinOptions::default(), + ) + .expect("restart reply"); + let WorkerReply::Output { contents, .. } = reply; + let reply_text = contents_text(&contents); + assert!( + !reply_text.contains("OLD_DETACHED") && !reply_text.contains("OLD_REPLY"), + "did not expect preserved detached prefixes in restart reply, got: {reply_text:?}" + ); + + let context = manager.prepare_input_context_files(); + assert!( + context.detached_prefix_contents.is_empty() && context.reply_prefix_contents.is_empty(), + "did not expect explicit restart to leak old prefixes into the next input" + ); + } + #[test] fn pager_empty_input_preserves_idle_guardrail_notice() { let _guard = output_ring_test_guard(); @@ -6921,6 +7553,7 @@ mod tests { *slot = Some(GuardrailEvent { message: "[repl] worker was idle; new session started\n".to_string(), was_busy: false, + is_error: false, }); } @@ -7166,6 +7799,107 @@ mod tests { ); } + #[cfg(target_os = "linux")] + #[test] + fn linux_bwrap_startup_retry_stays_disabled_after_followup_plan_bwrap_override() { + let plan = SandboxCliPlan { + operations: vec![ + crate::sandbox_cli::SandboxCliOperation::SetMode( + crate::sandbox_cli::SandboxModeArg::Inherit, + ), + crate::sandbox_cli::SandboxCliOperation::Config( + crate::sandbox_cli::SandboxConfigOperation::SetUseLinuxSandboxBwrap(true), + ), + ], + }; + let mut manager = WorkerManager::new( + Backend::Python, + plan, + crate::oversized_output::OversizedOutputMode::Files, + ) + .expect("worker manager"); + let mut inherited_state = manager.sandbox_defaults.clone(); + inherited_state.apply_update(SandboxStateUpdate { + sandbox_policy: SandboxPolicy::WorkspaceWrite { + writable_roots: Vec::new(), + network_access: false, + exclude_tmpdir_env_var: false, + exclude_slash_tmp: false, + }, + sandbox_cwd: Some(std::env::temp_dir()), + use_linux_sandbox_bwrap: None, + use_legacy_landlock: None, + }); + manager.inherited_sandbox_state = Some(inherited_state.clone()); + manager.sandbox_state = resolve_effective_sandbox_state_with_defaults( + &manager.sandbox_plan, + Some(&inherited_state), + &manager.sandbox_defaults, + ) + .expect("resolved initial sandbox state"); + assert!( + manager.sandbox_state.use_linux_sandbox_bwrap, + "test setup should start with the plan-level bwrap override enabled" + ); + + let retry = manager.maybe_retry_spawn_without_linux_bwrap( + &WorkerError::Protocol("ipc disconnected while waiting for backend info".to_string()), + false, + ); + assert!(retry, "expected startup failure to disable bwrap"); + + let update = sandbox_state_update_from_codex_meta(&json!({ + "sandboxPolicy": { + "type": "workspace-write", + "writable_roots": [], + "network_access": false, + "exclude_tmpdir_env_var": false, + "exclude_slash_tmp": false + }, + "sandboxCwd": std::env::temp_dir(), + "useLegacyLandlock": false, + "codexLinuxSandboxExe": "/tmp/codex-linux-sandbox" + })) + .expect("Codex sandbox metadata"); + manager + .update_sandbox_state(update, Duration::from_millis(1)) + .expect("follow-up sandbox state"); + + assert!( + !manager.sandbox_state.use_linux_sandbox_bwrap, + "plan-level bwrap overrides should not re-enable bwrap after the local fallback" + ); + } + + #[test] + fn inherit_ending_invalid_plan_fails_during_startup_validation() { + let plan = SandboxCliPlan { + operations: vec![ + crate::sandbox_cli::SandboxCliOperation::SetMode( + crate::sandbox_cli::SandboxModeArg::ReadOnly, + ), + crate::sandbox_cli::SandboxCliOperation::AddWritableRoot(std::env::temp_dir()), + crate::sandbox_cli::SandboxCliOperation::SetMode( + crate::sandbox_cli::SandboxModeArg::Inherit, + ), + ], + }; + + let err = match WorkerManager::new( + Backend::Python, + plan, + crate::oversized_output::OversizedOutputMode::Files, + ) { + Ok(_) => panic!("invalid inherit-ending plan should fail during startup"), + Err(err) => err, + }; + + assert!( + matches!(err, WorkerError::Sandbox(ref message) if message.contains("--add-writable-root can only be used while sandbox mode is workspace-write")), + "unexpected error: {err}" + ); + } + #[test] fn inherit_workspace_write_refinements_wait_for_client_state() { let writable_root = std::env::temp_dir(); @@ -7284,7 +8018,7 @@ mod tests { } #[test] - fn exact_interrupt_requires_current_sandbox_when_worker_would_respawn() { + fn exact_interrupt_remains_local_when_worker_would_respawn() { let plan = SandboxCliPlan { operations: vec![crate::sandbox_cli::SandboxCliOperation::SetMode( crate::sandbox_cli::SandboxModeArg::Inherit, @@ -7298,10 +8032,10 @@ mod tests { .expect("worker manager"); assert!( - !manager + manager .nonexecuting_follow_up_uses_existing_state("\u{3}") .expect("interrupt follow-up classification"), - "a bare Ctrl-C should require current per-call sandbox metadata when it would respawn" + "a bare Ctrl-C should stay a local follow-up even when it would otherwise respawn" ); } @@ -7366,6 +8100,7 @@ mod tests { *slot = Some(GuardrailEvent { message: "[repl] previous request aborted; retry your last input\n".to_string(), was_busy: true, + is_error: true, }); } @@ -7378,7 +8113,7 @@ mod tests { } #[test] - fn nonempty_input_with_busy_guardrail_uses_existing_state() { + fn nonempty_input_with_busy_guardrail_requires_current_state() { let plan = SandboxCliPlan { operations: vec![crate::sandbox_cli::SandboxCliOperation::SetMode( crate::sandbox_cli::SandboxModeArg::Inherit, @@ -7399,14 +8134,15 @@ mod tests { *slot = Some(GuardrailEvent { message: "[repl] previous request aborted; retry your last input\n".to_string(), was_busy: true, + is_error: true, }); } assert!( - manager + !manager .nonexecuting_follow_up_uses_existing_state("1+1") .expect("follow-up classification"), - "busy-guardrail retries should keep pending recovery local" + "busy-guardrail retries should require current per-call sandbox metadata" ); } @@ -7432,6 +8168,7 @@ mod tests { *slot = Some(GuardrailEvent { message: "[repl] worker was idle; new session started\n".to_string(), was_busy: false, + is_error: false, }); } diff --git a/tests/sandbox_state_updates.rs b/tests/sandbox_state_updates.rs index 30300ab..5e93bcd 100644 --- a/tests/sandbox_state_updates.rs +++ b/tests/sandbox_state_updates.rs @@ -47,6 +47,27 @@ fn collect_text(result: &CallToolResult) -> String { .join("\n") } +fn home_env_vars(home_dir: &Path) -> Vec<(String, String)> { + let home = home_dir.to_string_lossy().to_string(); + #[cfg_attr(not(windows), allow(unused_mut))] + let mut env_vars = vec![ + ("HOME".to_string(), home.clone()), + ("R_USER".to_string(), home.clone()), + ]; + #[cfg(windows)] + { + env_vars.push(("USERPROFILE".to_string(), home.clone())); + if home.len() >= 3 + && home.as_bytes()[1] == b':' + && (home.as_bytes()[2] == b'\\' || home.as_bytes()[2] == b'/') + { + env_vars.push(("HOMEDRIVE".to_string(), home[..2].to_string())); + env_vars.push(("HOMEPATH".to_string(), home[2..].to_string())); + } + } + env_vars +} + fn linux_sandbox_exe_value(use_legacy_landlock: bool) -> Value { #[cfg(target_os = "linux")] { @@ -166,6 +187,16 @@ fn outside_workspace_target(label: &str) -> TestResult { Ok(base.join(format!(".mcp-repl-{label}-{nanos}.txt"))) } +fn home_scratch_dir(label: &str) -> TestResult { + let base = std::env::var_os("HOME") + .or_else(|| std::env::var_os("USERPROFILE")) + .map(std::path::PathBuf::from) + .ok_or_else(|| "missing HOME/USERPROFILE for sandbox test home".to_string())?; + Ok(Builder::new() + .prefix(&format!(".tmp-{label}-")) + .tempdir_in(base)?) +} + fn repo_scratch_dir(label: &str) -> TestResult { Ok(Builder::new() .prefix(&format!(".tmp-{label}-")) @@ -257,6 +288,21 @@ async fn spawn_inherit_pager_server(cwd: &Path, page_chars: u64) -> TestResult, +) -> TestResult { + common::spawn_server_with_args_env_and_cwd_and_pager_page_chars( + vec!["--sandbox".to_string(), "inherit".to_string()], + env, + Some(cwd.to_path_buf()), + page_chars, + ) + .await +} + fn timeout_then_tail_code() -> &'static str { r#" Sys.sleep(0.2) @@ -268,6 +314,28 @@ flush.console() "# } +fn timeout_then_paged_tail_code() -> &'static str { + r#" +line <- paste(rep("foo", 80), collapse = " ") +for (i in 1:300) cat(sprintf("line%04d %s\n", i, line)) +flush.console() +Sys.sleep(1.0) +cat("TAIL\n") +flush.console() +"# +} + +#[cfg(unix)] +fn timeout_then_paged_exit_code() -> &'static str { + r#" +line <- paste(rep("foo", 80), collapse = " ") +for (i in 1:300) cat(sprintf("line%04d %s\n", i, line)) +flush.console() +Sys.sleep(0.2) +q("no", status = 0, runLast = FALSE) +"# +} + fn timeout_then_done_code() -> &'static str { r#" Sys.sleep(0.2) @@ -322,6 +390,16 @@ tryCatch({ "# } +fn timeout_then_done_code_after(wait_secs: f64) -> String { + format!( + r#" +Sys.sleep({wait_secs:.3}) +cat("DONE\n") +flush.console() +"# + ) +} + fn timeout_then_large_completion_code() -> &'static str { Box::leak( format!( @@ -341,6 +419,36 @@ fn timeout_then_large_completion_code() -> &'static str { ) } +fn timeout_then_large_completion_and_quit_code() -> &'static str { + Box::leak( + format!( + "small <- paste(rep('s', {UNDER_HARD_SPILL_TEXT_LEN}), collapse = ''); \ + big <- paste(rep('t', {OVER_HARD_SPILL_TEXT_LEN}), collapse = ''); \ + cat('FIRST_START\\n'); \ + cat(small); \ + cat('\\nFIRST_END\\n'); \ + flush.console(); \ + Sys.sleep(0.5); \ + cat('SECOND_START\\n'); \ + cat(big); \ + cat('\\nSECOND_END\\n'); \ + flush.console(); \ + quit('no')" + ) + .into_boxed_str(), + ) +} + +fn oversized_follow_up_code(marker: &str) -> String { + format!( + "big <- paste(rep('u', {OVER_HARD_SPILL_TEXT_LEN}), collapse = ''); \ + cat('{marker}_START\\n'); \ + cat(big); \ + cat('\\n{marker}_END\\n'); \ + flush.console()" + ) +} + fn test_delay_ms(default_ms: u64, windows_ms: u64) -> std::time::Duration { std::time::Duration::from_millis(if cfg!(windows) { windows_ms @@ -486,6 +594,12 @@ async fn sandbox_inherit_without_state_meta_fails_on_first_tool_call() -> TestRe !text.contains("2"), "did not expect successful evaluation, got: {text}" ); + assert_eq!( + result.is_error, + Some(true), + "expected missing metadata on the first worker interaction to set isError, got: {:?}", + result.is_error + ); session.cancel().await?; Ok(()) } @@ -520,6 +634,12 @@ async fn sandbox_inherit_with_malformed_state_meta_fails_on_first_tool_call() -> !text.contains("2"), "did not expect successful evaluation, got: {text}" ); + assert_eq!( + result.is_error, + Some(true), + "expected malformed metadata on the first worker interaction to set isError, got: {:?}", + result.is_error + ); session.cancel().await?; Ok(()) } @@ -632,6 +752,32 @@ async fn sandbox_inherit_empty_poll_with_existing_worker_ignores_bad_state_meta( Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_empty_repl_without_state_meta_sets_is_error() -> TestResult<()> { + let _guard = test_guard(); + let temp = tempdir()?; + let session = spawn_inherit_server(temp.path()).await?; + let result = session.write_stdin_raw_with("", Some(2.0)).await?; + let text = collect_text(&result); + if backend_unavailable(&text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + assert!( + text.contains(MISSING_INHERITED_STATE_MESSAGE), + "expected empty inherit repl call without metadata to fail closed, got: {text}" + ); + assert_eq!( + result.is_error, + Some(true), + "expected empty inherit repl preflight failure to set isError, got: {:?}", + result.is_error + ); + session.cancel().await?; + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn sandbox_inherit_interrupt_follow_up_ignores_local_meta_errors() -> TestResult<()> { let _guard = test_guard(); @@ -673,6 +819,51 @@ async fn sandbox_inherit_interrupt_follow_up_ignores_local_meta_errors() -> Test Ok(()) } +#[cfg(unix)] +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_pending_bare_interrupt_ignores_missing_state_meta() -> TestResult<()> { + let _guard = test_guard(); + let temp = tempdir()?; + let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; + let timeout = session + .write_stdin_raw_with_meta( + interrupt_then_prompt_code(), + Some(0.05), + Some(workspace_write_meta(temp.path())), + ) + .await?; + let timeout_text = collect_text(&timeout); + if backend_unavailable(&timeout_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + assert!( + timeout_text.contains("< TestResult<()> { let _guard = test_guard(); @@ -713,7 +904,9 @@ async fn sandbox_inherit_metadata_error_preserves_hidden_timeout_bundle() -> Tes let mut final_text = String::new(); for _ in 0..10 { - let final_poll = session.write_stdin_raw_with("", Some(2.0)).await?; + let final_poll = session + .write_stdin_raw_with_meta("", Some(2.0), Some(workspace_write_meta(temp.path()))) + .await?; final_text = common::result_text(&final_poll); if !final_text.contains("< Te ); assert!( quit_text.contains(">"), - "expected prompt after pager quit, got: {quit_text}" + "expected prompt after :q, got: {quit_text}" ); session.cancel().await?; Ok(()) } #[tokio::test(flavor = "multi_thread")] -async fn sandbox_inherit_pending_interrupt_tail_with_bad_meta_still_interrupts() -> TestResult<()> { +async fn sandbox_inherit_active_pager_command_ignores_state_meta_changes() -> TestResult<()> { let _guard = test_guard(); let temp = tempdir()?; - let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; - let input = format!( - "small <- paste(rep('s', {UNDER_HARD_SPILL_TEXT_LEN}), collapse = ''); detached <- paste(rep('d', {OVER_HARD_SPILL_TEXT_LEN}), collapse = ''); cat('SMALL_START\\n'); cat(small); cat('\\nSMALL_END\\n'); flush.console(); tryCatch({{ Sys.sleep(30) }}, interrupt = function(e) {{ cat('DETACHED_START\\n'); cat(detached); cat('\\nDETACHED_END\\n'); flush.console() }})" - ); - let first = session - .write_stdin_raw_with_meta(input, Some(0.05), Some(workspace_write_meta(temp.path()))) + let session = spawn_inherit_pager_server(temp.path(), 120).await?; + let initial = session + .write_stdin_raw_with_meta( + "line <- paste(rep(\"foo\", 80), collapse = \" \"); for (i in 1:300) cat(sprintf(\"line%04d %s\\n\", i, line))", + Some(30.0), + Some(workspace_write_meta(temp.path())), + ) .await?; - let first_text = common::result_text(&first); - if backend_unavailable(&first_text) { + let initial_text = common::result_text(&initial); + if backend_unavailable(&initial_text) { eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); session.cancel().await?; return Ok(()); } assert!( - bundle_transcript_path(&first_text).is_none(), - "did not expect timeout bundle disclosure before the interrupt-side metadata error, got: {first_text:?}" + initial_text.contains("--More--"), + "expected pager to activate before local pager command test, got: {initial_text:?}" ); - tokio::time::sleep(test_delay_ms(260, 700)).await; - let interrupt_error = session - .write_stdin_raw_with_meta( - "\u{3}cat('AFTER_INTERRUPT\\n')", - Some(10.0), - Some(json!({ SANDBOX_STATE_META_CAPABILITY: "invalid" })), - ) + let quit = session + .write_stdin_raw_with_meta(":q", Some(30.0), Some(full_access_meta(temp.path()))) .await?; - assert_eq!( - interrupt_error.is_error, - Some(true), - "expected malformed metadata follow-up to be reported as an MCP tool error" - ); - let interrupt_error_text = common::result_text(&interrupt_error); - assert!( - interrupt_error_text.contains("failed to parse Codex sandbox state metadata"), - "expected malformed metadata error after local interrupt, got: {interrupt_error_text}" - ); - let transcript_path = bundle_transcript_path(&interrupt_error_text).unwrap_or_else(|| { - panic!( - "expected the interrupt-side metadata error reply to disclose the detached timeout transcript, got: {interrupt_error_text:?}" - ) - }); - let transcript = fs::read_to_string(&transcript_path)?; + let quit_text = common::result_text(&quit); assert!( - transcript.contains("SMALL_START") && transcript.contains("SMALL_END"), - "expected the earlier timed-out output to remain on the transcript path, got: {transcript:?}" + !quit_text.contains("sandbox policy changed; new session started"), + "did not expect active pager command to restart the worker, got: {quit_text}" ); assert!( - transcript.contains("DETACHED_START") && transcript.contains("DETACHED_END"), - "expected the interrupt-side detached output to remain on the transcript path, got: {transcript:?}" + !quit_text.contains("new sandbox policy"), + "did not expect active pager command to apply the new sandbox policy immediately, got: {quit_text}" ); - - let mut recovery_text = String::new(); - for _ in 0..20 { - let recovery = session - .write_stdin_raw_with_meta("1+1", Some(0.5), Some(workspace_write_meta(temp.path()))) - .await?; - recovery_text = common::result_text(&recovery); - if !recovery_text.contains("[repl] input discarded while worker busy") - && !recovery_text.contains("<"), + "expected prompt after pager quit, got: {quit_text}" ); + session.cancel().await?; Ok(()) } +#[cfg(unix)] #[tokio::test(flavor = "multi_thread")] -async fn sandbox_inherit_pending_restart_with_bad_meta_clears_timeout_state() -> TestResult<()> { +async fn sandbox_inherit_session_ended_pager_command_ignores_state_meta_changes() -> TestResult<()> +{ let _guard = test_guard(); - let temp = tempdir()?; - let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; - let first = session + let scratch = repo_scratch_dir("sandbox-ended-pager-local-state-meta")?; + let debug_dir = scratch.path().join("debug"); + let session = spawn_inherit_pager_server_with_env( + scratch.path(), + 120, + vec![( + "MCP_REPL_DEBUG_DIR".to_string(), + debug_dir.to_string_lossy().to_string(), + )], + ) + .await?; + let timed_out = session .write_stdin_raw_with_meta( - timeout_then_tail_code(), + timeout_then_paged_exit_code(), Some(0.05), - Some(workspace_write_meta(temp.path())), + Some(workspace_write_meta(scratch.path())), ) .await?; - let first_text = common::result_text(&first); - if backend_unavailable(&first_text) { + let timed_out_text = common::result_text(&timed_out); + if backend_unavailable(&timed_out_text) { eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); session.cancel().await?; return Ok(()); } - tokio::time::sleep(std::time::Duration::from_millis(260)).await; - - let restart_error = session - .write_stdin_raw_with_meta( - "\u{4}cat('AFTER_RESTART\\n')", - Some(0.1), - Some(json!({ SANDBOX_STATE_META_CAPABILITY: "invalid" })), - ) - .await?; - assert_eq!( - restart_error.is_error, - Some(true), - "expected malformed metadata restart follow-up to be reported as an MCP tool error" - ); - let restart_error_text = common::result_text(&restart_error); - assert!( - restart_error_text.contains("failed to parse Codex sandbox state metadata"), - "expected malformed metadata error after local restart, got: {restart_error_text}" - ); assert!( - restart_error_text.contains("new session started"), - "expected the restart-side metadata error reply to include the restart notice, got: {restart_error_text}" + timed_out_text.contains("--More--"), + "expected timed-out request to leave pager active, got: {timed_out_text}" ); + tokio::time::sleep(test_delay_ms(350, 700)).await; - let recovery = session - .write_stdin_raw_with_meta("1+1", Some(1.0), Some(workspace_write_meta(temp.path()))) + let quit = session + .write_stdin_raw_with_meta(":q", Some(5.0), Some(read_only_meta(scratch.path()))) .await?; - let recovery_text = common::result_text(&recovery); - session.cancel().await?; - - assert!( - recovery_text.contains("[1] 2"), - "expected the next valid call to run in the restarted session, got: {recovery_text}" - ); + let quit_text = common::result_text(&quit); + if quit_text.contains("< TestResult<()> { +async fn sandbox_inherit_pending_pager_command_ignores_missing_state_meta() -> TestResult<()> { let _guard = test_guard(); let temp = tempdir()?; - let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; - let first = session + let session = spawn_inherit_pager_server(temp.path(), 120).await?; + let timed_out = session .write_stdin_raw_with_meta( - timeout_then_tail_code(), - Some(0.05), + timeout_then_paged_tail_code(), + Some(0.5), Some(workspace_write_meta(temp.path())), ) .await?; - let first_text = collect_text(&first); - if backend_unavailable(&first_text) { + let timed_out_text = common::result_text(&timed_out); + if backend_unavailable(&timed_out_text) { eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); session.cancel().await?; return Ok(()); } - tokio::time::sleep(std::time::Duration::from_millis(260)).await; + assert!( + timed_out_text.contains("--More--"), + "expected timed-out request to leave pager active, got: {timed_out_text}" + ); + + let quit = session.write_stdin_raw_with(":q", Some(1.0)).await?; + let quit_text = common::result_text(&quit); + assert!( + !quit_text.contains(MISSING_INHERITED_STATE_MESSAGE), + "expected pending pager command to ignore missing inherited metadata, got: {quit_text}" + ); + assert!( + !quit_text.contains("sandbox policy changed; new session started"), + "did not expect pending pager command to restart the worker, got: {quit_text}" + ); + assert!( + !quit_text.contains("unexpected ':'"), + "expected :q to remain pager-local while a request is pending, got: {quit_text}" + ); + + tokio::time::sleep(test_delay_ms(1100, 1500)).await; + let poll = session + .write_stdin_raw_with_meta("", Some(2.0), Some(workspace_write_meta(temp.path()))) + .await?; + let poll_text = common::result_text(&poll); + session.cancel().await?; + + assert!( + poll_text.contains("TAIL"), + "expected pending request to keep running after pager-local :q, got: {poll_text}" + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_active_pager_empty_input_ignores_missing_state_meta() -> TestResult<()> { + let _guard = test_guard(); + let temp = tempdir()?; + let session = spawn_inherit_pager_server(temp.path(), 120).await?; + let initial = session + .write_stdin_raw_with_meta( + "line <- paste(rep(\"foo\", 80), collapse = \" \"); for (i in 1:300) cat(sprintf(\"line%04d %s\\n\", i, line))", + Some(30.0), + Some(workspace_write_meta(temp.path())), + ) + .await?; + let initial_text = common::result_text(&initial); + if backend_unavailable(&initial_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + assert!( + initial_text.contains("--More--"), + "expected pager to activate before empty pager command test, got: {initial_text:?}" + ); + + let page_advance = session.write_stdin_raw_with("", Some(30.0)).await?; + let page_advance_text = common::result_text(&page_advance); + session.cancel().await?; + + assert!( + !page_advance_text.contains(MISSING_INHERITED_STATE_MESSAGE), + "expected active pager empty input to ignore missing inherited metadata, got: {page_advance_text}" + ); + assert!( + page_advance_text.contains("--More--") || page_advance_text.contains("(END"), + "expected active pager empty input to stay in pager mode, got: {page_advance_text}" + ); + assert_ne!( + page_advance.is_error, + Some(true), + "did not expect active pager empty input to set isError, got: {:?}", + page_advance.is_error + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_pending_interrupt_tail_with_bad_meta_fails_closed() -> TestResult<()> { + let _guard = test_guard(); + let temp = tempdir()?; + let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + timeout_then_tail_code(), + Some(0.05), + Some(workspace_write_meta(temp.path())), + ) + .await?; + let first_text = common::result_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + tokio::time::sleep(test_delay_ms(260, 700)).await; + + let interrupt_error = session + .write_stdin_raw_with_meta( + "\u{3}cat('AFTER_INTERRUPT\\n')", + Some(10.0), + Some(json!({ SANDBOX_STATE_META_CAPABILITY: "invalid" })), + ) + .await?; + assert_eq!( + interrupt_error.is_error, + Some(true), + "expected malformed metadata follow-up to be reported as an MCP tool error" + ); + let interrupt_error_text = common::result_text(&interrupt_error); + assert!( + interrupt_error_text.contains("failed to parse Codex sandbox state metadata"), + "expected malformed metadata error on rejected interrupt follow-up, got: {interrupt_error_text}" + ); + assert!( + interrupt_error.is_error == Some(true), + "expected rejected interrupt follow-up to set isError, got: {:?}", + interrupt_error.is_error + ); + assert!( + !interrupt_error_text.contains("new session started"), + "did not expect rejected interrupt follow-up to mutate session state, got: {interrupt_error_text}" + ); + + let busy_follow_up = session + .write_stdin_raw_with_meta("1+1", Some(0.1), Some(workspace_write_meta(temp.path()))) + .await?; + let busy_follow_up_text = common::result_text(&busy_follow_up); + assert!( + busy_follow_up_text.contains("[repl] input discarded while worker busy") + || busy_follow_up_text.contains("< TestResult<()> { + let _guard = test_guard(); + let temp = tempdir()?; + let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + format!("x <- 1\n{}", timeout_then_tail_code()), + Some(0.05), + Some(workspace_write_meta(temp.path())), + ) + .await?; + let first_text = common::result_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + tokio::time::sleep(std::time::Duration::from_millis(260)).await; + + let restart_error = session + .write_stdin_raw_with_meta( + "\u{4}cat('AFTER_RESTART\\n')", + Some(0.1), + Some(json!({ SANDBOX_STATE_META_CAPABILITY: "invalid" })), + ) + .await?; + assert_eq!( + restart_error.is_error, + Some(true), + "expected malformed metadata restart follow-up to be reported as an MCP tool error" + ); + let restart_error_text = common::result_text(&restart_error); + assert!( + restart_error_text.contains("failed to parse Codex sandbox state metadata"), + "expected malformed metadata error on rejected restart follow-up, got: {restart_error_text}" + ); + assert!( + restart_error.is_error == Some(true), + "expected rejected restart follow-up to set isError, got: {:?}", + restart_error.is_error + ); + assert!( + !restart_error_text.contains("new session started"), + "did not expect rejected restart follow-up to restart the worker, got: {restart_error_text}" + ); + + tokio::time::sleep(std::time::Duration::from_millis(1100)).await; + + let recovery = session + .write_stdin_raw_with_meta( + variable_probe_code(), + Some(1.0), + Some(workspace_write_meta(temp.path())), + ) + .await?; + let recovery_text = common::result_text(&recovery); + session.cancel().await?; + + assert!( + recovery_text.contains("X_EXISTS:TRUE"), + "expected rejected restart follow-up to preserve the running session, got: {recovery_text}" + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_pending_interrupt_tail_restarts_on_state_change() -> TestResult<()> { + let _guard = test_guard(); + let temp = tempdir()?; + let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + format!("x <- 1\n{}", timeout_then_tail_code()), + Some(0.05), + Some(workspace_write_meta(temp.path())), + ) + .await?; + let first_text = common::result_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + tokio::time::sleep(test_delay_ms(260, 700)).await; + + let follow_up = session + .write_stdin_raw_with_meta( + format!("\u{3}{}", variable_probe_code()), + Some(1.0), + Some(full_access_meta(temp.path())), + ) + .await?; + let follow_up_text = common::result_text(&follow_up); + session.cancel().await?; + + assert!( + follow_up_text.contains("sandbox policy changed; new session started"), + "expected interrupt tail with changed metadata to restart the worker, got: {follow_up_text}" + ); + assert!( + follow_up_text.contains("new sandbox policy"), + "expected restart notice to include the new sandbox policy, got: {follow_up_text}" + ); + assert!( + follow_up_text.contains("X_EXISTS:FALSE"), + "expected interrupt tail to run in the restarted session, got: {follow_up_text}" + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_pending_follow_up_restarts_on_new_state_meta() -> TestResult<()> { + let _guard = test_guard(); + let temp = tempdir()?; + let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + format!("x <- 1\n{}", timeout_then_tail_code()), + Some(0.05), + Some(workspace_write_meta(temp.path())), + ) + .await?; + let first_text = collect_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + tokio::time::sleep(std::time::Duration::from_millis(260)).await; + + let second = session + .write_stdin_raw_with_meta( + variable_probe_code(), + Some(1.0), + Some(full_access_meta(temp.path())), + ) + .await?; + let second_text = collect_text(&second); + assert!( + second_text.contains("sandbox policy changed; new session started"), + "expected changed metadata to restart the worker instead of preserving the pending request, got: {second_text}" + ); + assert!( + second_text.contains("X_EXISTS:FALSE"), + "expected changed metadata to reset the worker session before running the follow-up, got: {second_text}" + ); + assert!( + !second_text.contains("[repl] input discarded while worker busy"), + "did not expect changed metadata to keep the old busy session alive, got: {second_text}" + ); + session.cancel().await?; + Ok(()) +} + +#[cfg(unix)] +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_busy_follow_up_stages_current_meta_before_session_end_reset() +-> TestResult<()> { + let _guard = test_guard(); + let temp = tempdir()?; + let debug_dir = temp.path().join("debug"); + let session = spawn_inherit_files_server( + temp.path(), + vec![( + "MCP_REPL_DEBUG_DIR".to_string(), + debug_dir.to_string_lossy().to_string(), + )], + ) + .await?; + + let timeout = session + .write_stdin_raw_with_meta( + timeout_then_tail_exit_code(), + Some(0.05), + Some(workspace_write_meta(temp.path())), + ) + .await?; + let timeout_text = collect_text(&timeout); + if backend_unavailable(&timeout_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + assert!( + timeout_text.contains("< TestResult<()> { + let _guard = test_guard(); + let temp = tempdir()?; + let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + timeout_then_tail_code(), + Some(0.05), + Some(workspace_write_meta(temp.path())), + ) + .await?; + let first_text = collect_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + tokio::time::sleep(std::time::Duration::from_millis(260)).await; + + let poll = session + .write_stdin_raw_with_meta("", Some(2.0), Some(full_access_meta(temp.path()))) + .await?; + let poll_text = collect_text(&poll); + assert!( + poll_text.contains("TAIL"), + "expected empty poll to continue draining the original request, got: {poll_text}" + ); + session.cancel().await?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_pending_empty_poll_ignores_missing_state_meta() -> TestResult<()> { + let _guard = test_guard(); + let temp = tempdir()?; + let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + timeout_then_tail_code(), + Some(0.05), + Some(workspace_write_meta(temp.path())), + ) + .await?; + let first_text = collect_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + tokio::time::sleep(std::time::Duration::from_millis(260)).await; + + let poll = session.write_stdin_raw_with("", Some(2.0)).await?; + let poll_text = collect_text(&poll); + assert!( + poll_text.contains("TAIL"), + "expected empty poll without metadata to continue draining the original request, got: {poll_text}" + ); + assert!( + !poll_text.contains(MISSING_INHERITED_STATE_MESSAGE), + "did not expect empty draining poll without metadata to fail closed, got: {poll_text}" + ); + assert_ne!( + poll.is_error, + Some(true), + "did not expect empty draining poll without metadata to set isError, got: {:?}", + poll.is_error + ); + session.cancel().await?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_empty_poll_session_end_respawn_uses_current_state_meta() -> TestResult<()> +{ + let _guard = test_guard(); + let scratch = repo_scratch_dir("sandbox-empty-poll-session-end-respawn")?; + let home_dir = home_scratch_dir("sandbox-empty-poll-session-end-respawn-home")?; + let startup_target = home_dir.path().join("startup-spawn.txt"); + let encoded_target = encode_path(&startup_target)?; + fs::write( + home_dir.path().join(".Rprofile"), + format!( + "invisible(suppressWarnings(tryCatch({{ writeLines(\"startup\", {encoded_target}) }}, error = function(e) NULL)))\n" + ), + )?; + + let session = + spawn_inherit_files_server(scratch.path(), home_env_vars(home_dir.path())).await?; + let first = session + .write_stdin_raw_with_meta( + "Sys.sleep(0.2)\nquit(\"no\")", + Some(0.05), + Some(full_access_meta(scratch.path())), + ) + .await?; + let first_text = common::result_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + + let _ = fs::remove_file(&startup_target); + tokio::time::sleep(std::time::Duration::from_millis(260)).await; + + let drained = session + .write_stdin_raw_with_meta("", Some(2.0), Some(read_only_meta(scratch.path()))) + .await?; + let drained_text = common::result_text(&drained); + assert!( + drained_text.contains("session ended") + || drained_text.contains("ipc disconnected while waiting for request completion"), + "expected timed-out quit request to end the session on the draining poll, got: {drained_text}" + ); + + let prompt = session + .write_stdin_raw_with_meta("", Some(2.0), Some(read_only_meta(scratch.path()))) + .await?; + let prompt_text = common::result_text(&prompt); + session.cancel().await?; + + assert!( + prompt_text.contains("<>"), + "expected a replacement idle session after draining the ended request, got: {prompt_text}" + ); + assert!( + !startup_target.exists(), + "expected drained-session respawn to honor the current empty-poll read-only metadata" + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_empty_poll_respawn_retires_disclosed_timeout_bundle() -> TestResult<()> { + let _guard = test_guard(); + let scratch = repo_scratch_dir("sandbox-empty-poll-retires-timeout-bundle")?; + let session = spawn_inherit_files_server(scratch.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + timeout_then_large_completion_and_quit_code(), + Some(0.05), + Some(full_access_meta(scratch.path())), + ) + .await?; + let first_text = common::result_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + + let mut drained = None; + for _ in 0..20 { + let poll = session + .write_stdin_raw_with_meta("", Some(2.0), Some(full_access_meta(scratch.path()))) + .await?; + let poll_text = common::result_text(&poll); + if bundle_transcript_path(&poll_text).is_some() + && (poll_text.contains("session ended") + || poll_text.contains("ipc disconnected while waiting for request completion")) + { + drained = Some(poll); + break; + } + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + let drained = drained.unwrap_or_else(|| { + panic!("expected draining poll to disclose the settled timeout transcript before respawn") + }); + let drained_text = common::result_text(&drained); + let first_transcript_path = bundle_transcript_path(&drained_text).unwrap_or_else(|| { + panic!( + "expected draining poll to disclose the settled timeout transcript, got: {drained_text:?}" + ) + }); + let first_transcript_before = fs::read_to_string(&first_transcript_path)?; + assert!( + first_transcript_before.contains("SECOND_START") + && first_transcript_before.contains("SECOND_END"), + "expected the disclosed timeout transcript to contain the settled completion chunk, got: {first_transcript_before:?}" + ); + + let respawned = session + .write_stdin_raw_with_meta("", Some(2.0), Some(read_only_meta(scratch.path()))) + .await?; + let respawned_text = common::result_text(&respawned); + assert!( + respawned_text.contains("<>"), + "expected the empty poll to respawn the ended session before the fresh follow-up, got: {respawned_text:?}" + ); + + let follow_up = session + .write_stdin_raw_with_meta( + oversized_follow_up_code("FOLLOW_UP"), + Some(10.0), + Some(read_only_meta(scratch.path())), + ) + .await?; + let follow_up_text = common::result_text(&follow_up); + let follow_up_transcript_path = bundle_transcript_path(&follow_up_text); + let first_transcript_after = fs::read_to_string(&first_transcript_path)?; + let follow_up_transcript = follow_up_transcript_path + .as_ref() + .map(fs::read_to_string) + .transpose()? + .unwrap_or_default(); + + session.cancel().await?; + + if let Some(follow_up_transcript_path) = follow_up_transcript_path { + assert_ne!( + first_transcript_path, follow_up_transcript_path, + "expected the empty-poll respawn to stop reusing the old disclosed timeout bundle" + ); + } + assert!( + !first_transcript_after.contains("FOLLOW_UP_START"), + "did not expect the fresh post-respawn output in the prior disclosed timeout transcript: {first_transcript_after:?}" + ); + assert!( + follow_up_text.contains("FOLLOW_UP_START") + || follow_up_transcript.contains("FOLLOW_UP_START"), + "expected the fresh post-respawn output to stay with the new turn, got reply {follow_up_text:?} and transcript {follow_up_transcript:?}" + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_empty_poll_session_end_without_state_meta_does_not_respawn_stale_worker() +-> TestResult<()> { + let _guard = test_guard(); + let scratch = repo_scratch_dir("sandbox-empty-poll-session-end-missing-meta")?; + let home_dir = home_scratch_dir("sandbox-empty-poll-session-end-missing-meta-home")?; + let startup_target = home_dir.path().join("startup-spawn.txt"); + let encoded_target = encode_path(&startup_target)?; + fs::write( + home_dir.path().join(".Rprofile"), + format!( + "invisible(suppressWarnings(tryCatch({{ writeLines(\"startup\", {encoded_target}) }}, error = function(e) NULL)))\n" + ), + )?; + + let session = + spawn_inherit_files_server(scratch.path(), home_env_vars(home_dir.path())).await?; + let first = session + .write_stdin_raw_with_meta( + "Sys.sleep(0.2)\nquit(\"no\")", + Some(0.05), + Some(full_access_meta(scratch.path())), + ) + .await?; + let first_text = common::result_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + + let _ = fs::remove_file(&startup_target); + tokio::time::sleep(std::time::Duration::from_millis(260)).await; + + let drained = session.write_stdin_raw_with("", Some(2.0)).await?; + let drained_text = common::result_text(&drained); + assert!( + drained_text.contains("session ended") + || drained_text.contains("ipc disconnected while waiting for request completion"), + "expected timed-out quit request to end the session on the draining poll, got: {drained_text}" + ); + assert!( + !drained_text.contains(MISSING_INHERITED_STATE_MESSAGE), + "did not expect draining poll without metadata to replace local output with a metadata error, got: {drained_text}" + ); + assert_ne!( + drained.is_error, + Some(true), + "did not expect draining poll without metadata to set isError, got: {:?}", + drained.is_error + ); + assert!( + !startup_target.exists(), + "did not expect a draining poll without metadata to respawn a stale worker" + ); + + let prompt = session.write_stdin_raw_with("", Some(2.0)).await?; + let prompt_text = common::result_text(&prompt); + session.cancel().await?; + + assert!( + prompt_text.contains(MISSING_INHERITED_STATE_MESSAGE), + "expected the next empty poll to fail closed once a new worker spawn was required, got: {prompt_text}" + ); + assert_eq!( + prompt.is_error, + Some(true), + "expected the spawn-needed follow-up poll without metadata to set isError, got: {:?}", + prompt.is_error + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_bare_interrupt_after_session_end_uses_current_state_meta() -> TestResult<()> +{ + let _guard = test_guard(); + let scratch = repo_scratch_dir("sandbox-bare-interrupt-session-end-meta")?; + let home_dir = home_scratch_dir("sandbox-bare-interrupt-session-end-meta-home")?; + let startup_target = home_dir.path().join("startup-spawn.txt"); + let encoded_target = encode_path(&startup_target)?; + fs::write( + home_dir.path().join(".Rprofile"), + format!( + "invisible(suppressWarnings(tryCatch({{ writeLines(\"startup\", {encoded_target}) }}, error = function(e) NULL)))\n" + ), + )?; + + let session = + spawn_inherit_files_server(scratch.path(), home_env_vars(home_dir.path())).await?; + let first = session + .write_stdin_raw_with_meta( + "Sys.sleep(0.2)\nquit(\"no\")", + Some(0.05), + Some(full_access_meta(scratch.path())), + ) + .await?; + let first_text = common::result_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + + let _ = fs::remove_file(&startup_target); + tokio::time::sleep(std::time::Duration::from_millis(260)).await; + + let interrupt = session + .write_stdin_raw_with_meta("\u{3}", Some(2.0), Some(read_only_meta(scratch.path()))) + .await?; + let interrupt_text = common::result_text(&interrupt); + assert!( + !interrupt_text.contains(MISSING_INHERITED_STATE_MESSAGE), + "did not expect bare interrupt with current metadata to fail closed, got: {interrupt_text}" + ); + + let prompt = session.write_stdin_raw_with("", Some(2.0)).await?; + let prompt_text = common::result_text(&prompt); + session.cancel().await?; + + assert!( + prompt_text.contains("<>"), + "expected bare interrupt to let the session respawn under the current metadata, got: {prompt_text}" + ); + assert!( + !startup_target.exists(), + "expected bare interrupt respawn to honor the current read-only metadata" + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_bare_interrupt_after_session_end_without_state_meta_does_not_respawn_stale_worker() +-> TestResult<()> { + let _guard = test_guard(); + let scratch = repo_scratch_dir("sandbox-bare-interrupt-session-end-missing-meta")?; + let home_dir = home_scratch_dir("sandbox-bare-interrupt-session-end-missing-meta-home")?; + let startup_target = home_dir.path().join("startup-spawn.txt"); + let encoded_target = encode_path(&startup_target)?; + fs::write( + home_dir.path().join(".Rprofile"), + format!( + "invisible(suppressWarnings(tryCatch({{ writeLines(\"startup\", {encoded_target}) }}, error = function(e) NULL)))\n" + ), + )?; + + let session = + spawn_inherit_files_server(scratch.path(), home_env_vars(home_dir.path())).await?; + let first = session + .write_stdin_raw_with_meta( + "Sys.sleep(0.2)\nquit(\"no\")", + Some(0.05), + Some(full_access_meta(scratch.path())), + ) + .await?; + let first_text = common::result_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + + let _ = fs::remove_file(&startup_target); + tokio::time::sleep(std::time::Duration::from_millis(260)).await; + + let interrupt = session.write_stdin_raw_with("\u{3}", Some(2.0)).await?; + let interrupt_text = common::result_text(&interrupt); + assert!( + !interrupt_text.contains(MISSING_INHERITED_STATE_MESSAGE), + "did not expect bare interrupt without metadata to fail closed, got: {interrupt_text}" + ); + assert!( + !startup_target.exists(), + "did not expect bare interrupt without metadata to respawn a stale worker" + ); + + let prompt = session.write_stdin_raw_with("", Some(2.0)).await?; + let prompt_text = common::result_text(&prompt); + session.cancel().await?; + + assert!( + prompt_text.contains(MISSING_INHERITED_STATE_MESSAGE), + "expected the next empty poll to fail closed once a new worker spawn was required, got: {prompt_text}" + ); + assert_eq!( + prompt.is_error, + Some(true), + "expected the spawn-needed poll after bare interrupt without metadata to set isError, got: {:?}", + prompt.is_error + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_applies_new_state_meta_after_timed_out_request_settles() -> TestResult<()> +{ + let _guard = test_guard(); + let scratch = repo_scratch_dir("sandbox-timeout-settle-fresh-call")?; + let target = scratch.path().join("fresh-call-write.txt"); + let session = spawn_inherit_files_server(scratch.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + timeout_then_done_code(), + Some(0.05), + Some(read_only_meta(scratch.path())), + ) + .await?; + let first_text = collect_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + tokio::time::sleep(std::time::Duration::from_millis(260)).await; + + let second = session + .write_stdin_raw_with_meta( + write_file_code(&target)?, + Some(10.0), + Some(workspace_write_meta(scratch.path())), + ) + .await?; + let second_text = collect_text(&second); + assert!( + second_text.contains("WRITE_OK"), + "expected fresh follow-up call to apply current sandbox metadata, got: {second_text}" + ); + assert!( + !second_text.contains("WRITE_ERROR:"), + "did not expect stale settled timeout state to keep the old sandbox, got: {second_text}" + ); + session.cancel().await?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_metadata_change_keeps_settled_timeout_output() -> TestResult<()> { + let _guard = test_guard(); + let scratch = repo_scratch_dir("sandbox-timeout-tail-across-state-change")?; + let session = spawn_inherit_files_server(scratch.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + timeout_then_tail_code(), + Some(0.05), + Some(read_only_meta(scratch.path())), + ) + .await?; + let first_text = collect_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + assert!( + !first_text.contains("TAIL"), + "expected the late completion chunk to remain detached from the timeout reply, got: {first_text}" + ); + tokio::time::sleep(test_delay_ms(1400, 1800)).await; let second = session - .write_stdin_raw_with_meta("1+1", Some(0.1), Some(full_access_meta(temp.path()))) + .write_stdin_raw_with_meta( + "1+1", + Some(10.0), + Some(workspace_write_meta(scratch.path())), + ) .await?; let second_text = collect_text(&second); assert!( - second_text.contains("[repl] input discarded while worker busy"), - "expected busy follow-up to preserve the pending request, got: {second_text}" + second_text.contains("TAIL"), + "expected settled timeout output to survive sandbox respawn, got: {second_text}" + ); + assert!( + second_text.contains("[1] 2"), + "expected the fresh call to still execute after the preserved timeout tail, got: {second_text}" + ); + session.cancel().await?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_metadata_change_keeps_timeout_bundle_output() -> TestResult<()> { + let _guard = test_guard(); + let scratch = repo_scratch_dir("sandbox-timeout-bundle-across-state-change")?; + let session = spawn_inherit_files_server(scratch.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + timeout_then_large_completion_code(), + Some(0.05), + Some(read_only_meta(scratch.path())), + ) + .await?; + let first_text = common::result_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + assert!( + bundle_transcript_path(&first_text).is_none(), + "did not expect the initial timeout reply to disclose a transcript path, got: {first_text:?}" + ); + + tokio::time::sleep(test_delay_ms(900, 1200)).await; + + let second = session + .write_stdin_raw_with_meta( + "1+1", + Some(10.0), + Some(workspace_write_meta(scratch.path())), + ) + .await?; + let second_text = common::result_text(&second); + let transcript_path = bundle_transcript_path(&second_text).unwrap_or_else(|| { + panic!( + "expected the metadata-changing follow-up to preserve and disclose the timeout transcript, got: {second_text:?}" + ) + }); + let transcript = fs::read_to_string(&transcript_path)?; + + session.cancel().await?; + + assert!( + transcript.contains("FIRST_START") && transcript.contains("FIRST_END"), + "expected the preserved timeout transcript to include the first timed-out chunk, got: {transcript:?}" + ); + assert!( + transcript.contains("SECOND_START") && transcript.contains("SECOND_END"), + "expected the preserved timeout transcript to include the settled completion chunk, got: {transcript:?}" ); assert!( - !second_text.contains("[1] 2"), - "did not expect changed metadata to start a fresh request, got: {second_text}" + second_text.contains("[1] 2") || transcript.contains("[1] 2"), + "expected the fresh follow-up result to execute after preserving the timeout transcript, got reply {second_text:?} and transcript {transcript:?}" ); - session.cancel().await?; Ok(()) } -#[cfg(unix)] #[tokio::test(flavor = "multi_thread")] -async fn sandbox_inherit_busy_follow_up_stages_current_meta_before_session_end_reset() +async fn sandbox_inherit_restart_tail_after_sandbox_respawn_keeps_timeout_bundle_output() -> TestResult<()> { let _guard = test_guard(); - let temp = tempdir()?; - let debug_dir = temp.path().join("debug"); - let session = spawn_inherit_files_server( - temp.path(), - vec![( - "MCP_REPL_DEBUG_DIR".to_string(), - debug_dir.to_string_lossy().to_string(), - )], - ) - .await?; - - let timeout = session + let scratch = repo_scratch_dir("sandbox-timeout-bundle-across-restart-tail-respawn")?; + let session = spawn_inherit_files_server(scratch.path(), Vec::new()).await?; + let first = session .write_stdin_raw_with_meta( - timeout_then_tail_exit_code(), + timeout_then_large_completion_code(), Some(0.05), - Some(workspace_write_meta(temp.path())), + Some(read_only_meta(scratch.path())), ) .await?; - let timeout_text = collect_text(&timeout); - if backend_unavailable(&timeout_text) { + let first_text = common::result_text(&first); + if backend_unavailable(&first_text) { eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); session.cancel().await?; return Ok(()); } assert!( - timeout_text.contains("< TestResult<()> { +async fn sandbox_inherit_disclosed_timeout_bundle_is_retired_on_state_change() -> TestResult<()> { let _guard = test_guard(); - let temp = tempdir()?; - let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; + let scratch = repo_scratch_dir("sandbox-disclosed-timeout-bundle-respawn")?; + let session = spawn_inherit_files_server(scratch.path(), Vec::new()).await?; let first = session .write_stdin_raw_with_meta( - timeout_then_tail_code(), + timeout_then_large_completion_code(), Some(0.05), - Some(workspace_write_meta(temp.path())), + Some(read_only_meta(scratch.path())), ) .await?; - let first_text = collect_text(&first); + let first_text = common::result_text(&first); if backend_unavailable(&first_text) { eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); session.cancel().await?; return Ok(()); } - tokio::time::sleep(std::time::Duration::from_millis(260)).await; - let poll = session - .write_stdin_raw_with_meta("", Some(2.0), Some(full_access_meta(temp.path()))) - .await?; - let poll_text = collect_text(&poll); + let first_transcript_path = loop { + let poll = session + .write_stdin_raw_with_meta("", Some(2.0), Some(read_only_meta(scratch.path()))) + .await?; + let first_poll_text = common::result_text(&poll); + if let Some(path) = bundle_transcript_path(&first_poll_text) { + break path; + } + if !first_poll_text.contains("< TestResult<()> -{ +async fn sandbox_inherit_busy_follow_up_never_executes_under_stale_sandbox() -> TestResult<()> { let _guard = test_guard(); - let scratch = repo_scratch_dir("sandbox-timeout-settle-fresh-call")?; - let target = scratch.path().join("fresh-call-write.txt"); + for delay_ms in [90_u64, 100, 110, 120, 130, 140, 150, 160] { + let scratch = repo_scratch_dir(&format!("sandbox-busy-recheck-{delay_ms}"))?; + let target = scratch + .path() + .join(format!("stale-follow-up-{delay_ms}.txt")); + let session = spawn_inherit_files_server(scratch.path(), Vec::new()).await?; + let first = session + .write_stdin_raw_with_meta( + timeout_then_done_code_after(0.22), + Some(0.05), + Some(workspace_write_meta(scratch.path())), + ) + .await?; + let first_text = collect_text(&first); + if backend_unavailable(&first_text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + + let second = session + .write_stdin_raw_with_meta( + write_file_code(&target)?, + Some(10.0), + Some(read_only_meta(scratch.path())), + ) + .await?; + let second_text = collect_text(&second); + assert!( + !second_text.contains("WRITE_OK"), + "did not expect stale sandbox execution after busy follow-up at delay {delay_ms}ms, got: {second_text}" + ); + assert!( + !target.exists(), + "did not expect follow-up to create {} at delay {delay_ms}ms", + target.display() + ); + session.cancel().await?; + } + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_restart_follow_up_applies_current_state_meta() -> TestResult<()> { + let _guard = test_guard(); + let scratch = repo_scratch_dir("sandbox-restart-follow-up-state-meta")?; + let target = scratch.path().join("restart-follow-up-write.txt"); let session = spawn_inherit_files_server(scratch.path(), Vec::new()).await?; let first = session .write_stdin_raw_with_meta( - timeout_then_done_code(), + timeout_then_tail_code(), Some(0.05), - Some(read_only_meta(scratch.path())), + Some(workspace_write_meta(scratch.path())), ) .await?; let first_text = collect_text(&first); @@ -1080,121 +2240,136 @@ async fn sandbox_inherit_applies_new_state_meta_after_timed_out_request_settles( let second = session .write_stdin_raw_with_meta( - write_file_code(&target)?, + format!("\u{4}{}", write_file_code(&target)?), Some(10.0), - Some(workspace_write_meta(scratch.path())), + Some(read_only_meta(scratch.path())), ) .await?; let second_text = collect_text(&second); assert!( - second_text.contains("WRITE_OK"), - "expected fresh follow-up call to apply current sandbox metadata, got: {second_text}" + second_text.contains("new session started"), + "expected restart follow-up reply to include restart notice, got: {second_text}" ); assert!( - !second_text.contains("WRITE_ERROR:"), - "did not expect stale settled timeout state to keep the old sandbox, got: {second_text}" + !second_text.contains("WRITE_OK"), + "did not expect restart follow-up to run under stale workspace-write metadata, got: {second_text}" + ); + assert!( + !target.exists(), + "did not expect restart follow-up to create {}", + target.display() ); session.cancel().await?; Ok(()) } #[tokio::test(flavor = "multi_thread")] -async fn sandbox_inherit_metadata_change_keeps_settled_timeout_output() -> TestResult<()> { +async fn sandbox_inherit_bare_restart_stays_restart_after_sandbox_respawn() -> TestResult<()> { let _guard = test_guard(); - let scratch = repo_scratch_dir("sandbox-timeout-tail-across-state-change")?; + let scratch = repo_scratch_dir("sandbox-bare-restart-after-respawn")?; let session = spawn_inherit_files_server(scratch.path(), Vec::new()).await?; let first = session .write_stdin_raw_with_meta( timeout_then_tail_code(), Some(0.05), - Some(read_only_meta(scratch.path())), + Some(workspace_write_meta(scratch.path())), ) .await?; - let first_text = collect_text(&first); + let first_text = common::result_text(&first); if backend_unavailable(&first_text) { eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); session.cancel().await?; return Ok(()); } + tokio::time::sleep(test_delay_ms(260, 700)).await; + + let restart = session + .write_stdin_raw_with_meta("\u{4}", Some(1.0), Some(read_only_meta(scratch.path()))) + .await?; + let restart_text = common::result_text(&restart); assert!( - !first_text.contains("TAIL"), - "expected the late completion chunk to remain detached from the timeout reply, got: {first_text}" + restart_text.contains("new session started"), + "expected bare Ctrl-D after sandbox respawn to remain an explicit restart, got: {restart_text}" + ); + assert!( + restart_text.contains("sandbox policy changed; new session started"), + "expected bare Ctrl-D after sandbox respawn to flush the sandbox-change notice, got: {restart_text}" + ); + assert!( + !restart_text.contains("MID") && !restart_text.contains("TAIL"), + "did not expect bare Ctrl-D after sandbox respawn to drain preserved timeout output, got: {restart_text}" + ); + assert!( + !restart_text.contains("<>"), + "did not expect bare Ctrl-D after sandbox respawn to degrade into an empty poll, got: {restart_text}" ); - tokio::time::sleep(test_delay_ms(1400, 1800)).await; - let second = session - .write_stdin_raw_with_meta( - "1+1", - Some(10.0), - Some(workspace_write_meta(scratch.path())), - ) + let follow_up = session + .write_stdin_raw_with_meta("1+1", Some(1.0), Some(read_only_meta(scratch.path()))) .await?; - let second_text = collect_text(&second); + let follow_up_text = common::result_text(&follow_up); + session.cancel().await?; + assert!( - second_text.contains("TAIL"), - "expected settled timeout output to survive sandbox respawn, got: {second_text}" + !follow_up_text.contains("sandbox policy changed; new session started"), + "did not expect the sandbox-change notice to leak into the next unrelated reply, got: {follow_up_text}" ); assert!( - second_text.contains("[1] 2"), - "expected the fresh call to still execute after the preserved timeout tail, got: {second_text}" + !follow_up_text.contains("MID") && !follow_up_text.contains("TAIL"), + "did not expect preserved timeout output to leak into the next unrelated reply, got: {follow_up_text}" + ); + assert!( + follow_up_text.contains("[1] 2"), + "expected the post-restart follow-up to run normally, got: {follow_up_text}" ); - session.cancel().await?; Ok(()) } #[tokio::test(flavor = "multi_thread")] -async fn sandbox_inherit_metadata_change_keeps_timeout_bundle_output() -> TestResult<()> { +async fn sandbox_inherit_active_pager_bare_restart_stays_restart_after_sandbox_respawn() +-> TestResult<()> { let _guard = test_guard(); - let scratch = repo_scratch_dir("sandbox-timeout-bundle-across-state-change")?; - let session = spawn_inherit_files_server(scratch.path(), Vec::new()).await?; - let first = session + let scratch = repo_scratch_dir("sandbox-pager-bare-restart-after-respawn")?; + let session = spawn_inherit_pager_server(scratch.path(), 120).await?; + let initial = session .write_stdin_raw_with_meta( - timeout_then_large_completion_code(), - Some(0.05), - Some(read_only_meta(scratch.path())), + "line <- paste(rep(\"foo\", 80), collapse = \" \"); for (i in 1:300) cat(sprintf(\"line%04d %s\\n\", i, line))", + Some(30.0), + Some(workspace_write_meta(scratch.path())), ) .await?; - let first_text = common::result_text(&first); - if backend_unavailable(&first_text) { + let initial_text = common::result_text(&initial); + if backend_unavailable(&initial_text) { eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); session.cancel().await?; return Ok(()); } assert!( - bundle_transcript_path(&first_text).is_none(), - "did not expect the initial timeout reply to disclose a transcript path, got: {first_text:?}" + initial_text.contains("--More--"), + "expected pager to activate before bare Ctrl-D restart test, got: {initial_text:?}" ); - tokio::time::sleep(test_delay_ms(900, 1200)).await; - - let second = session - .write_stdin_raw_with_meta( - "1+1", - Some(10.0), - Some(workspace_write_meta(scratch.path())), - ) + let restart = session + .write_stdin_raw_with_meta("\u{4}", Some(10.0), Some(read_only_meta(scratch.path()))) .await?; - let second_text = common::result_text(&second); - let transcript_path = bundle_transcript_path(&second_text).unwrap_or_else(|| { - panic!( - "expected the metadata-changing follow-up to preserve and disclose the timeout transcript, got: {second_text:?}" - ) - }); - let transcript = fs::read_to_string(&transcript_path)?; - + let restart_text = common::result_text(&restart); session.cancel().await?; assert!( - transcript.contains("FIRST_START") && transcript.contains("FIRST_END"), - "expected the preserved timeout transcript to include the first timed-out chunk, got: {transcript:?}" + restart_text.contains("new session started"), + "expected active-pager bare Ctrl-D after sandbox respawn to remain an explicit restart, got: {restart_text}" ); assert!( - transcript.contains("SECOND_START") && transcript.contains("SECOND_END"), - "expected the preserved timeout transcript to include the settled completion chunk, got: {transcript:?}" + restart_text.contains("[repl] new session started"), + "expected active-pager bare Ctrl-D to emit the explicit restart reply, got: {restart_text}" ); assert!( - second_text.contains("[1] 2") || transcript.contains("[1] 2"), - "expected the fresh follow-up result to execute after preserving the timeout transcript, got reply {second_text:?} and transcript {transcript:?}" + restart_text.contains("sandbox policy changed; new session started"), + "expected active-pager bare Ctrl-D to flush the sandbox-change notice, got: {restart_text}" + ); + assert!( + !restart_text.contains("--More--"), + "did not expect active-pager bare Ctrl-D to degrade into pager navigation, got: {restart_text}" ); Ok(()) } @@ -1335,6 +2510,37 @@ async fn sandbox_inherit_rejects_read_only_network_access_meta() -> TestResult<( Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn sandbox_inherit_read_only_meta_blocks_write_in_cwd() -> TestResult<()> { + let _guard = test_guard(); + let scratch = repo_scratch_dir("sandbox-read-only")?; + let target = scratch.path().join("blocked.txt"); + let session = spawn_inherit_server(scratch.path()).await?; + let result = session + .write_stdin_raw_with_meta( + write_file_code(&target)?, + Some(10.0), + Some(read_only_meta(scratch.path())), + ) + .await?; + let text = collect_text(&result); + if backend_unavailable(&text) { + eprintln!("sandbox_state_updates backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + assert!( + text.contains("WRITE_ERROR:"), + "expected read-only metadata to block write in cwd, got: {text}" + ); + assert!( + !text.contains("WRITE_OK"), + "did not expect read-only metadata to allow write in cwd, got: {text}" + ); + session.cancel().await?; + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn sandbox_inherit_full_access_meta_allows_write_outside_cwd() -> TestResult<()> { let _guard = test_guard(); @@ -1606,7 +2812,8 @@ async fn sandbox_inherit_without_state_meta_fails_on_repl_reset() -> TestResult< assert_eq!( result.is_error, Some(true), - "expected missing sandbox-state-meta reset to be reported as an MCP tool error" + "expected repl_reset without required metadata to set isError, got: {:?}", + result.is_error ); session.cancel().await?; Ok(()) @@ -2009,8 +3216,7 @@ async fn sandbox_inherit_empty_poll_stages_current_meta_before_session_end_reset #[cfg(unix)] #[tokio::test(flavor = "multi_thread")] -async fn sandbox_inherit_empty_poll_without_meta_fails_before_session_end_reset() -> TestResult<()> -{ +async fn sandbox_inherit_empty_poll_without_meta_defers_session_end_respawn() -> TestResult<()> { let _guard = test_guard(); let temp = tempdir()?; let session = spawn_inherit_files_server(temp.path(), Vec::new()).await?; @@ -2036,16 +3242,33 @@ async fn sandbox_inherit_empty_poll_without_meta_fails_before_session_end_reset( let poll = session.write_stdin_raw_with("", Some(5.0)).await?; let poll_text = collect_text(&poll); + assert_ne!( + poll.is_error, + Some(true), + "did not expect omitted metadata empty poll to fail while draining local output, got: {poll_text}" + ); + assert!( + poll_text.contains("session ended") + || poll_text.contains("ipc disconnected while waiting for request completion"), + "expected empty poll without metadata to report the ended session locally, got: {poll_text}" + ); + assert!( + !poll_text.contains(MISSING_INHERITED_STATE_MESSAGE), + "did not expect empty poll without metadata to respawn while draining local output, got: {poll_text}" + ); + + let follow_up = session.write_stdin_raw_with("", Some(5.0)).await?; + let follow_up_text = collect_text(&follow_up); session.cancel().await?; assert_eq!( - poll.is_error, + follow_up.is_error, Some(true), - "expected omitted metadata empty poll to fail before respawn, got: {poll_text}" + "expected later spawn-needed empty poll without metadata to fail, got: {follow_up_text}" ); assert!( - poll_text.contains(MISSING_INHERITED_STATE_MESSAGE), - "expected missing sandbox metadata error before empty-poll respawn, got: {poll_text}" + follow_up_text.contains(MISSING_INHERITED_STATE_MESSAGE), + "expected missing sandbox metadata error once a fresh worker was needed, got: {follow_up_text}" ); Ok(()) } diff --git a/tests/snapshots/mcp_transcripts__snapshots_support_multiple_calls_and_sessions@transcript.snap b/tests/snapshots/mcp_transcripts__snapshots_support_multiple_calls_and_sessions@transcript.snap index 6ed6983..521e9de 100644 --- a/tests/snapshots/mcp_transcripts__snapshots_support_multiple_calls_and_sessions@transcript.snap +++ b/tests/snapshots/mcp_transcripts__snapshots_support_multiple_calls_and_sessions@transcript.snap @@ -1,6 +1,6 @@ --- source: tests/mcp_transcripts.rs -expression: snapshot.render_transcript() +expression: transcript --- == session: session_1 == 1) r_repl timeout_ms=10000 diff --git a/tests/snapshots/write_stdin_batch__write_stdin_timeout_then_busy_then_recovers@transcript.snap b/tests/snapshots/write_stdin_batch__write_stdin_timeout_then_busy_then_recovers@transcript.snap index dd3d8c2..24d3a2f 100644 --- a/tests/snapshots/write_stdin_batch__write_stdin_timeout_then_busy_then_recovers@transcript.snap +++ b/tests/snapshots/write_stdin_batch__write_stdin_timeout_then_busy_then_recovers@transcript.snap @@ -1,6 +1,6 @@ --- source: tests/write_stdin_batch.rs -expression: snapshot.render_transcript() +expression: transcript --- == session: timeout_list == 1) r_repl timeout_ms=2000