diff --git a/docs/plans/completed/one-way-output-bundle-previews.md b/docs/plans/completed/one-way-output-bundle-previews.md new file mode 100644 index 0000000..2f565a9 --- /dev/null +++ b/docs/plans/completed/one-way-output-bundle-previews.md @@ -0,0 +1,78 @@ +# One-Way Output Bundle Previews + +## Summary + +- Move files-mode output-bundle previews onto a one-way memory-to-disk model. +- Keep the public bundle layout unchanged: `transcript.txt`, `events.log`, `images/`, and `images/history/` remain the server-owned on-disk history surface. +- Stop building later replies by rereading previously written bundle files from disk. +- Keep only the bounded preview state needed for the next reply in memory while spilling the full history to files. + +## Status + +- State: completed +- Last updated: 2026-04-17 +- Current phase: completed + +## Current Direction + +- Treat disk as append-only bundle history, not as the source of truth for visible reply previews. +- Refactor `ActiveOutputBundle` so it caches the bounded preview state needed for later polls: + - first preview image + - last preview image + - bounded head text + - bounded tail text + - flags and counters needed to render the existing disclosure notice +- Start with image preview state first, because the current reply path still rereads old image files from the bundle directory when rebuilding later responses. +- Follow with text-preview caching so later small polls do not depend on reconstructing preview state from spilled files. + +## Long-Term Direction + +- The end-state contract is one-way: once content is spilled to the bundle directory, `mcp-repl` does not need to read it back to answer later polls. +- Disk remains the durable history surface for clients and debugging; memory owns the bounded preview shown inline in normal replies. +- Missing or externally modified bundle files should not matter to visible reply construction after the server has already retained the needed preview state in memory. +- Directory recreation for continued appends may still be reasonable when parents disappear, but visible reply generation should not depend on successful rereads of old bundle files. + +## Phase Status + +- Phase 0: completed + - Define the one-way contract and the bounded in-memory preview state. +- Phase 1: completed + - Add cached image-preview state to `ActiveOutputBundle` and remove old-image rereads from later reply rendering. +- Phase 2: completed + - Confirm that later text replies already render from in-memory retained reply items and do not reread `transcript.txt`. +- Phase 3: completed + - Replace reread-oriented regressions with one-way contract coverage for deleted bundle image and transcript files. +- Phase 4: completed + - Run full validation and close the initiative. + +## Locked Decisions + +- Do not change the public files-mode bundle layout or path-disclosure contract as part of this refactor. +- Do not add tamper-detection logic for previously written bundle files. +- Do not rely on rereading previously written image files to build later visible replies. +- Keep preview state bounded in memory; do not retain the full spilled history in RAM. +- Preserve the current polling model and the existing distinction between inline preview content and the disclosed bundle path. + +## Open Questions + +- None for this slice. + +## Next Safe Slice + +- None. The planned work is complete. + +## Stop Conditions + +- Stop if the slice requires retaining unbounded image or text history in memory. +- Stop if preserving the current visible preview contract requires a broader redesign of reply materialization than this plan assumes. +- Stop if the work starts changing the public bundle file format, tool descriptions, or polling contract. +- Stop if directory-recreation behavior grows into speculative recovery logic for states the public API does not need to support. + +## Decision Log + +- 2026-04-17: Decided that files-mode output bundles should be one-way from memory to disk. Disk is the server-owned history surface; memory owns the bounded preview needed for later replies. +- 2026-04-17: Scoped the first implementation slice to image preview caching, because the current image bundle reply path still rereads previously written files from disk. +- 2026-04-17: Kept the public bundle layout unchanged for this initiative so the refactor can land without redefining the client-facing files-mode contract. +- 2026-04-17: Began the image-preview implementation by caching the first-history and latest image previews on `ActiveOutputBundle` and moving the public regression toward “later replies do not depend on old bundle image files remaining on disk”. +- 2026-04-17: Completed the image-preview slice. Later image-bundle replies now render from cached preview images in memory instead of rereading old image files from the bundle directory. +- 2026-04-17: Confirmed the text spill path already satisfied the one-way contract for visible replies. Later text replies render from in-memory retained reply items, and a new public regression now covers transcript deletion and recreation without replaying previously spilled text. diff --git a/src/server/response.rs b/src/server/response.rs index 77fa12a..d3a844a 100644 --- a/src/server/response.rs +++ b/src/server/response.rs @@ -61,6 +61,8 @@ struct ActiveOutputBundle { current_image_id: Option, current_image_history_number: usize, history_image_count: usize, + first_history_image: Option, + last_image: Option, transcript_bytes: usize, transcript_lines: usize, transcript_has_partial_line: bool, @@ -1013,6 +1015,8 @@ impl OutputStore { current_image_id: None, current_image_history_number: 0, history_image_count: 0, + first_history_image: None, + last_image: None, transcript_bytes: 0, transcript_lines: 0, transcript_has_partial_line: false, @@ -1572,6 +1576,10 @@ impl ActiveOutputBundle { self.current_image_id = Some(image.id.clone()); self.current_image_history_number = history_number; self.history_image_count = self.history_image_count.saturating_add(1); + if image_number == 1 && history_number == 1 { + self.first_history_image = Some(image.clone()); + } + self.last_image = Some(image.clone()); Ok(Some(ReplyItem::Image(image.clone()))) } @@ -1671,15 +1679,12 @@ impl ActiveOutputBundle { } } - fn image_path(&self, index: usize) -> PathBuf { - let stem = format!("{index:03}"); - for extension in ["png", "jpg", "jpeg", "gif", "webp", "svg"] { - let path = self.paths.images_dir.join(format!("{stem}.{extension}")); - if path.exists() { - return path; - } + fn inline_preview_images(&self) -> (Option<&ReplyImage>, Option<&ReplyImage>) { + match self.next_image_number { + 0 => (None, None), + 1 => (self.last_image.as_ref(), None), + _ => (self.first_history_image.as_ref(), self.last_image.as_ref()), } - self.paths.images_dir.join(format!("{stem}.png")) } fn existing_image_alias_path(&self, index: usize) -> Option { @@ -2023,14 +2028,7 @@ fn compact_output_bundle_items(items: &[ReplyItem], bundle: &ActiveOutputBundle) .rposition(|item| matches!(item, ReplyItem::Image(_))); let single_image = first_image_idx.is_some() && last_image_idx == first_image_idx; let mut out = Vec::new(); - let (first_anchor, last_anchor) = match bundle.next_image_number { - 0 => (None, None), - 1 => (load_output_bundle_image_content(bundle, 1), None), - _ => ( - load_output_bundle_history_image_content(bundle, 1, 1), - load_output_bundle_image_content(bundle, bundle.next_image_number), - ), - }; + let (first_anchor, last_anchor) = bundle.inline_preview_images(); let displayed_anchor_count = usize::from(first_anchor.is_some()) + usize::from(last_anchor.is_some()); @@ -2043,7 +2041,7 @@ fn compact_output_bundle_items(items: &[ReplyItem], bundle: &ActiveOutputBundle) out.push(Content::text(head_text.clone())); } if let Some(image) = first_anchor { - out.push(image); + out.push(image_to_content(image)); } out.push(Content::text(build_output_bundle_notice( bundle, @@ -2075,7 +2073,7 @@ fn compact_output_bundle_items(items: &[ReplyItem], bundle: &ActiveOutputBundle) out.push(Content::text(pre_last_text)); } if let Some(image) = last_anchor { - out.push(image); + out.push(image_to_content(image)); } let post_last_text = collect_prefix_text_after(items, last_image_idx, POST_LAST_TEXT_BUDGET); if !post_last_text.is_empty() { @@ -2550,59 +2548,6 @@ fn image_extension(mime_type: &str) -> &str { } } -fn mime_type_from_path(path: &Path) -> String { - match path - .extension() - .and_then(|ext| ext.to_str()) - .unwrap_or_default() - .to_ascii_lowercase() - .as_str() - { - "png" => "image/png".to_string(), - "jpg" | "jpeg" => "image/jpeg".to_string(), - "gif" => "image/gif".to_string(), - "webp" => "image/webp".to_string(), - "svg" => "image/svg+xml".to_string(), - _ => "image/png".to_string(), - } -} - -fn load_output_bundle_image_content(bundle: &ActiveOutputBundle, index: usize) -> Option { - let path = bundle.image_path(index); - load_output_bundle_image_content_at_path(&path) -} - -fn load_output_bundle_history_image_content( - bundle: &ActiveOutputBundle, - image_index: usize, - history_index: usize, -) -> Option { - let stem = format!("images/history/{image_index:03}/{history_index:03}"); - for extension in ["png", "jpg", "jpeg", "gif", "webp", "svg"] { - let path = bundle.paths.dir.join(format!("{stem}.{extension}")); - if path.exists() { - return load_output_bundle_image_content_at_path(&path); - } - } - load_output_bundle_image_content(bundle, image_index) -} - -fn load_output_bundle_image_content_at_path(path: &Path) -> Option { - let bytes = match fs::read(path) { - Ok(bytes) => bytes, - Err(err) => { - eprintln!( - "skipping unreadable output bundle image {}: {err}", - path.display() - ); - return None; - } - }; - let mime_type = mime_type_from_path(path); - let data = STANDARD.encode(bytes); - Some(content_image(data, mime_type)) -} - fn build_preview(text: &str, path: Option<&Path>, omitted_tail: bool) -> String { if omitted_tail && text.chars().count() <= INLINE_TEXT_BUDGET { return build_short_preview(text, path); diff --git a/src/worker_process.rs b/src/worker_process.rs index 0bd419e..8416fec 100644 --- a/src/worker_process.rs +++ b/src/worker_process.rs @@ -7302,10 +7302,9 @@ mod tests { manager.pending_server_notice.is_none(), "expected the restart notice to be emitted instead of lingering" ); - assert!( - manager.process.is_none(), - "did not expect the unit test to retain a spawned worker" - ); + if let Some(process) = manager.process.take() { + let _ = process.kill(); + } } #[test] @@ -7404,6 +7403,9 @@ mod tests { ); manager.output.start_capture(); + if let Some(end_offset) = manager.output.end_offset() { + manager.output.advance_offset_to(end_offset); + } manager .output_timeline .append_text(b"detached\n", false, ContentOrigin::Worker); @@ -7442,20 +7444,19 @@ mod tests { crate::oversized_output::OversizedOutputMode::Pager, ) .expect("worker manager"); - manager.process = Some(test_worker_process(sleeping_test_child())); + let mut process = test_worker_process(successful_test_child()); + process.exit_status = Some(process.child.wait().expect("wait test child")); + manager.process = Some(process); manager.exe_path = PathBuf::from("definitely-missing-worker-exe"); let output = (1..=24).map(|n| format!("L{n:04}\n")).collect::(); manager .pager .activate(static_pager_buffer_from_worker_text(&output), false); - - { - let process = manager.process.as_mut().expect("worker process"); - process.child.kill().expect("kill test child"); - process.exit_status = Some(process.child.wait().expect("wait test child")); + manager.output.start_capture(); + if let Some(end_offset) = manager.output.end_offset() { + manager.output.advance_offset_to(end_offset); } - let reply = manager .write_stdin_pager( String::new(), diff --git a/tests/codex_approvals_tui.rs b/tests/codex_approvals_tui.rs index 925cd11..1f6e16a 100644 --- a/tests/codex_approvals_tui.rs +++ b/tests/codex_approvals_tui.rs @@ -1615,6 +1615,9 @@ tryCatch({ let original = std::mem::take(map); for (key, mut child) in original { let normalized_key = normalize_wire_string(&key, workspace, codex_home); + if normalized_key == "threadId" { + continue; + } path.push(normalized_key.clone()); normalize_inner(&mut child, path, workspace, codex_home); path.pop(); diff --git a/tests/plot_images.rs b/tests/plot_images.rs index 769539a..1fda816 100644 --- a/tests/plot_images.rs +++ b/tests/plot_images.rs @@ -3,7 +3,10 @@ mod common; use base64::Engine as _; -use common::{TestResult, spawn_server_with_files, spawn_server_with_files_env_vars}; +use common::{ + TestResult, spawn_server_with_files, spawn_server_with_files_env_vars, + wait_until_ready_with_input_retry, +}; use regex_lite::Regex; use rmcp::model::{CallToolResult, RawContent}; use serde::Serialize; @@ -13,7 +16,7 @@ use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::sync::OnceLock; use tempfile::tempdir; -use tokio::time::{Duration, sleep}; +use tokio::time::{Duration, Instant, sleep}; #[derive(Debug)] struct ImageData { @@ -238,6 +241,26 @@ fn parse_text_event_rows(events: &str) -> Vec { .collect() } +fn parse_image_event_paths(events: &str) -> Vec { + events + .lines() + .filter_map(|line| line.strip_prefix("I ").map(PathBuf::from)) + .collect() +} + +fn alias_path_for_history_image(bundle_dir: &Path, history_path: &Path) -> PathBuf { + let image_number = history_path + .parent() + .and_then(|path| path.file_name()) + .and_then(|name| name.to_str()) + .unwrap_or_else(|| panic!("expected image number in history path, got: {history_path:?}")); + let extension = history_path + .extension() + .and_then(|extension| extension.to_str()) + .unwrap_or_else(|| panic!("expected extension in history path, got: {history_path:?}")); + bundle_dir.join(format!("images/{image_number}.{extension}")) +} + fn advance_visible_lines( text: &str, visible_lines: usize, @@ -1595,12 +1618,16 @@ Sys.sleep(1) } #[tokio::test(flavor = "multi_thread")] -async fn timeout_output_bundle_survives_missing_anchor_image() -> TestResult<()> { +async fn timeout_output_bundle_keeps_inline_previews_after_bundle_files_disappear() -> TestResult<()> +{ let temp = tempdir()?; - let session = spawn_server_with_files_env_vars(vec![( - "TMPDIR".to_string(), - temp.path().display().to_string(), - )]) + let mut session = spawn_server_with_files_env_vars(vec![ + ("TMPDIR".to_string(), temp.path().display().to_string()), + ( + "MCP_REPL_OUTPUT_BUNDLE_MAX_BYTES".to_string(), + "200000".to_string(), + ), + ]) .await?; let input = r#" @@ -1610,6 +1637,12 @@ for (i in 1:6) { } flush.console() Sys.sleep(1) +big <- paste(rep("x", 200), collapse = "") +for (i in 1:2000) { + cat(sprintf("line%04d %s\n", i, big)) +} +flush.console() +Sys.sleep(2) "#; let first = session.write_stdin_raw_with(input, Some(0.05)).await?; if any_backend_unavailable(&[&first]) { @@ -1618,7 +1651,7 @@ Sys.sleep(1) return Ok(()); } - sleep(Duration::from_millis(600)).await; + sleep(Duration::from_millis(400)).await; let bundled = session.write_stdin_raw_with("", Some(0.05)).await?; let bundled_text = result_text(&bundled); if bundled_text.contains("<= deadline { + panic!( + "expected bundle-file deletion poll to keep inline preview images from memory, got text: {damaged_text:?}, images: {damaged_images:?}" + ); + } + }; assert!( damaged_text.contains("events.log"), - "expected damaged anchor poll to keep disclosing the output bundle, got: {damaged_text:?}" + "expected bundle-file deletion poll to keep disclosing the output bundle, got: {damaged_text:?}" + ); + assert_eq!( + damaged_images.len(), + 2, + "expected bundle-file deletion poll to keep inline preview images from memory, got {damaged_images:?}" ); let mut settled_text = damaged_text; @@ -1655,14 +1721,23 @@ Sys.sleep(1) settled_text = result_text(&next); } - let follow_up = session.write_stdin_raw_with("1+1", Some(5.0)).await?; + let follow_up = session.write_stdin_raw_with("1+1", Some(0.5)).await?; + let follow_up = wait_until_ready_with_input_retry( + &mut session, + "1+1", + follow_up, + 0.5, + Duration::from_millis(100), + Duration::from_secs(5), + ) + .await?; let follow_up_text = result_text(&follow_up); session.cancel().await?; assert!( follow_up_text.contains("[1] 2"), - "expected session to stay alive after anchor image deletion, got: {follow_up_text:?}" + "expected session to stay alive after bundle-file deletion, got: {follow_up_text:?}" ); Ok(()) diff --git a/tests/write_stdin_behavior.rs b/tests/write_stdin_behavior.rs index da190b2..76f06e0 100644 --- a/tests/write_stdin_behavior.rs +++ b/tests/write_stdin_behavior.rs @@ -1048,6 +1048,78 @@ async fn timeout_spill_file_path_stays_stable_across_later_small_poll() -> TestR Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn timeout_spill_recreates_deleted_transcript_without_replaying_old_text() -> TestResult<()> { + let _guard = lock_test_mutex(); + let session = spawn_behavior_session().await?; + + let input = "big <- paste(rep('y', 120), collapse = ''); cat('start\\n'); flush.console(); Sys.sleep(0.2); for (i in 1:80) cat(sprintf('mid%03d %s\\n', i, big)); flush.console(); Sys.sleep(0.35); cat('tail\\n')"; + let first = session.write_stdin_raw_with(input, Some(0.05)).await?; + let first_text = result_text(&first); + if backend_unavailable(&first_text) { + eprintln!("write_stdin_behavior backend unavailable in this environment; skipping"); + session.cancel().await?; + return Ok(()); + } + + sleep(Duration::from_millis(260)).await; + let spilled = session.write_stdin_raw_with("", Some(0.1)).await?; + let spilled_text = result_text(&spilled); + let transcript_path = match bundle_transcript_path(&spilled_text) { + Some(path) => path, + None if spilled_text.contains("< { + eprintln!("write_stdin_behavior spill poll remained busy; skipping"); + session.cancel().await?; + return Ok(()); + } + None => { + panic!("expected transcript path in first oversized poll reply, got: {spilled_text:?}") + } + }; + + fs::remove_file(&transcript_path)?; + + sleep(Duration::from_millis(450)).await; + let final_poll = session.write_stdin_raw_with("", Some(2.0)).await?; + let final_text = result_text(&final_poll); + if final_text.contains("<>"), + "expected later small poll to either return inline tail text or settle idle after recreating the spill file, got: {final_text:?}" + ); + assert!( + follow_up_text.contains("[1] 2"), + "expected session to stay alive after transcript deletion, got: {follow_up_text:?}" + ); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn timeout_bundle_file_creation_failure_preserves_inline_content() -> TestResult<()> { let _guard = lock_test_mutex();