From 8a57414ea19bdd3981dc7efa857f727ff6a0a9e3 Mon Sep 17 00:00:00 2001 From: Corv Date: Tue, 6 Jan 2026 21:10:44 +0100 Subject: [PATCH 1/4] Add deletion tracking and directory fd support for shutil.rmtree - Add s_unlink and s_rmdir handlers that queue deletions for approval - Support opening directories with O_RDONLY for fd-based operations - Implement fdopendir, rewinddir, dirfd for directory enumeration - Add *at syscalls: openat, unlinkat, fstatat64 - Fix RealDir/RealFile stat() to return real filesystem inodes (required for os.path.samestat compatibility) - Add dup/rpy_dup_noninheritable for os.listdir(fd) - Fix closedir to close underlying fd per POSIX (prevents fd leaks) - Increase fd limits for deep recursive directory trees - Add PendingDeletion dataclass and session integration - Update approve TUI to display pending deletions - Add audit logging for deletion events - Show deletion count in session summary - Move verbose DRY-RUN output to --debug mode only - Add SHANNOT_SANDBOX=1 environment variable for sandbox detection - Fix unlinkat to detect file vs directory from actual filesystem --- shannot/approve.py | 133 +++++++++++ shannot/audit.py | 27 +++ shannot/cli.py | 1 + shannot/interact.py | 5 +- shannot/mix_subprocess.py | 21 +- shannot/mix_vfs.py | 425 +++++++++++++++++++++++++++++++++--- shannot/pending_deletion.py | 141 ++++++++++++ shannot/run_session.py | 34 +++ shannot/session.py | 111 ++++++++++ shannot/virtualizedproc.py | 2 + 10 files changed, 870 insertions(+), 30 deletions(-) create mode 100644 shannot/pending_deletion.py diff --git a/shannot/approve.py b/shannot/approve.py index 59c08bd..b6a09b7 100644 --- a/shannot/approve.py +++ b/shannot/approve.py @@ -154,6 +154,7 @@ def render(self) -> None: cmd_count = len(session.commands) write_count = len(session.pending_writes) + delete_count = len(session.pending_deletions) date = session.created_at[:10] name = session.name[:30] @@ -161,6 +162,8 @@ def render(self) -> None: counts = f"{cmd_count:>2} cmds" if write_count: counts += f", {write_count} writes" + if delete_count: + counts += f", {delete_count} deletes" # Show remote target if present remote_tag = "" @@ -294,10 +297,41 @@ def render(self) -> None: if len(s.pending_writes) > 3: print(f" ... ({len(s.pending_writes) - 3} more)") + # Show pending deletions summary (collapsed by directory) + if s.pending_deletions: + from .pending_deletion import format_size, summarize_deletions + + print() + total_size = sum(d.get("size", 0) for d in s.pending_deletions) + count = len(s.pending_deletions) + print(f" \033[1mDeletions ({count} items, {format_size(total_size)}):\033[0m") + + # Group by root directory + summaries = summarize_deletions(s.pending_deletions) + for i, summary in enumerate(summaries[:3]): + root = summary["root"] + file_count = summary["file_count"] + dir_count = summary["dir_count"] + size = summary["total_size"] + + parts = [] + if file_count: + parts.append(f"{file_count} files") + if dir_count: + parts.append(f"{dir_count} dirs") + detail = ", ".join(parts) + + size_str = format_size(size) + print(f" {i + 1:>3}. \033[31mDELETE\033[0m {root} ({detail}, {size_str})") + if len(summaries) > 3: + print(f" ... ({len(summaries) - 3} more directories)") + print() help_text = " \033[90m[Up/Down] scroll [v] view script" if s.pending_writes: help_text += " [w] view writes" + if s.pending_deletions: + help_text += " [d] view deletes" help_text += " [x] execute [r] reject [Esc] back\033[0m" print(help_text) @@ -321,6 +355,9 @@ def handle_key(self, key: str) -> Action | View | None: elif key == "w" and self.session.pending_writes: return PendingWritesListView(self.session) + elif key == "d" and self.session.pending_deletions: + return PendingDeletionsListView(self.session) + elif key == "x": return Action("execute", [self.session]) @@ -479,6 +516,84 @@ def handle_key(self, key: str) -> Action | View | None: return None +# ============================================================================== +# Pending Deletions List View +# ============================================================================== + + +class PendingDeletionsListView(View): + """List of pending file/directory deletions for a session.""" + + def __init__(self, session: Session): + self.session = session + self.cursor = 0 + + def render(self) -> None: + clear_screen() + rows, cols = get_terminal_size() + + from .pending_deletion import format_size + + total_size = sum(d.get("size", 0) for d in self.session.pending_deletions) + print(f"\033[1m Pending Deletions: {self.session.name} ({format_size(total_size)}) \033[0m") + print() + + if not self.session.pending_deletions: + print(" No pending deletions.") + print() + print(" \033[90m[Esc] back\033[0m") + return + + visible_rows = rows - 8 + if visible_rows < 3: + visible_rows = 3 + + start = max(0, self.cursor - visible_rows // 2) + visible = self.session.pending_deletions[start : start + visible_rows] + + for i, del_data in enumerate(visible): + idx = start + i + pointer = "\033[36m>\033[0m" if idx == self.cursor else " " + path = del_data.get("path", "?") + target_type = del_data.get("target_type", "file") + size = del_data.get("size", 0) + remote = "\033[33m[R]\033[0m" if del_data.get("remote") else " " + + type_icon = "DIR" if target_type == "directory" else " " + size_str = format_size(size) if size > 0 else "" + + display_path = path[: cols - 35] + if len(path) > cols - 35: + display_path += "..." + + line = f" {pointer} \033[31mDEL\033[0m {type_icon} {remote} {display_path:<50}" + print(f"{line} {size_str:>10}") + + print() + remaining = len(self.session.pending_deletions) - start - len(visible) + if remaining > 0: + print(f" ... and {remaining} more") + print() + print(" \033[90m[Up/Down] scroll [Esc] back\033[0m") + + def handle_key(self, key: str) -> Action | View | None: + if not self.session.pending_deletions: + if key in ("b", "\x1b"): + return Action("back") + return None + + if key in ("b", "\x1b"): + return Action("back") + + elif key in ("j", "\x1b[B"): + self.cursor = min(self.cursor + 1, len(self.session.pending_deletions) - 1) + + elif key in ("k", "\x1b[A"): + self.cursor = max(self.cursor - 1, 0) + + return None + + # ============================================================================== # Pending Write Diff View # ============================================================================== @@ -569,6 +684,8 @@ def render(self) -> None: counts = f"{len(s.commands)} commands" if s.pending_writes: counts += f", {len(s.pending_writes)} writes" + if s.pending_deletions: + counts += f", {len(s.pending_deletions)} deletes" print(f" - {s.name} ({counts})") print() @@ -680,6 +797,22 @@ def _build_lines(self) -> list[str]: lines.append(f"✗ {path} ({error})") lines.append("") + # Show completed deletions (if any) + if self.session.completed_deletions: + lines.append("--- deletions ---") + for del_info in self.session.completed_deletions: + path = del_info.get("path", "") + if del_info.get("skipped"): + continue # Don't show skipped items + if del_info.get("success"): + target_type = del_info.get("target_type", "file") + type_label = " [dir]" if target_type == "directory" else "" + lines.append(f"✓ {path}{type_label}") + else: + error = del_info.get("error", "unknown") + lines.append(f"✗ {path} ({error})") + lines.append("") + lines.append("--- stdout ---") if self.session.stdout: lines.extend(self.session.stdout.split("\n")) diff --git a/shannot/audit.py b/shannot/audit.py index c0ee917..918058c 100644 --- a/shannot/audit.py +++ b/shannot/audit.py @@ -43,6 +43,8 @@ "command_decision", "file_write_queued", "file_write_executed", + "file_deletion_queued", + "file_deletion_executed", "approval_decision", "execution_started", "execution_completed", @@ -224,6 +226,7 @@ def log_session_created(session: Session) -> None: "script_path": session.script_path, "commands_count": len(session.commands), "writes_count": len(session.pending_writes), + "deletions_count": len(session.pending_deletions), }, target=session.target, ) @@ -238,6 +241,7 @@ def log_session_loaded(session: Session) -> None: "status": session.status, "commands_count": len(session.commands), "writes_count": len(session.pending_writes), + "deletions_count": len(session.pending_deletions), }, target=session.target, ) @@ -300,6 +304,28 @@ def log_file_write_queued( ) +def log_file_deletion_queued( + session_id: str | None, + path: str, + target_type: str, + size_bytes: int, + remote: bool, + target: str | None = None, +) -> None: + """Log file/directory deletion queueing event.""" + get_logger().log( + "file_deletion_queued", + session_id, + { + "path": path, + "target_type": target_type, + "size_bytes": size_bytes, + "remote": remote, + }, + target=target, + ) + + def log_approval_decision( sessions: list[Session], action: Literal["approved", "rejected"], @@ -326,6 +352,7 @@ def log_execution_started(session: Session) -> None: { "commands_to_execute": len(session.commands), "writes_to_execute": len(session.pending_writes), + "deletions_to_execute": len(session.pending_deletions), }, target=session.target, ) diff --git a/shannot/cli.py b/shannot/cli.py index f2efef5..60e8e80 100644 --- a/shannot/cli.py +++ b/shannot/cli.py @@ -427,6 +427,7 @@ def cmd_run_remote(args: argparse.Namespace) -> int: print(f" Target: {args.target}") print(f" Commands queued: {len(session.commands)}") print(f" File writes queued: {len(session.pending_writes)}") + print(f" Deletions queued: {len(session.pending_deletions)}") print(" Run 'shannot approve' to review and execute.") return 0 else: diff --git a/shannot/interact.py b/shannot/interact.py index c391243..4991cb9 100644 --- a/shannot/interact.py +++ b/shannot/interact.py @@ -99,10 +99,12 @@ class SandboxedProc( elif option == "--dry-run": SandboxedProc.subprocess_dry_run = True SandboxedProc.vfs_track_writes = True # Track file writes for approval + SandboxedProc.vfs_track_deletions = True # Track file deletions for approval elif option == "--session-id": session_id = value - # Enable write tracking for session execution (writes committed after) + # Enable tracking for session execution (writes/deletions committed after) SandboxedProc.vfs_track_writes = True + SandboxedProc.vfs_track_deletions = True elif option == "--script-name": SandboxedProc.subprocess_script_name = value # type: ignore[misc] sandbox_args["script_name"] = value @@ -319,6 +321,7 @@ class SandboxedProc( print(f"\n*** Session created: {session.id} ***") print(f" Commands queued: {len(session.commands)}") print(f" File writes queued: {len(session.pending_writes)}") + print(f" Deletions queued: {len(session.pending_deletions)}") print(" Run 'shannot approve' to review and execute.") else: print("\n*** No commands or writes were queued. ***") diff --git a/shannot/mix_subprocess.py b/shannot/mix_subprocess.py index 2387cfd..588d4d9 100644 --- a/shannot/mix_subprocess.py +++ b/shannot/mix_subprocess.py @@ -47,6 +47,7 @@ class MixSubprocess: subprocess_pending = [] # Commands awaiting approval subprocess_approved = set() # Commands approved this session file_writes_pending = [] # File writes awaiting approval + file_deletions_pending = [] # File/dir deletions awaiting approval # Execution tracking (populated during execution, NOT dry-run) # Note: Use _get_executed_commands() to access - ensures instance-level list @@ -277,14 +278,18 @@ def save_pending(self): def finalize_session(self): """ - Create a Session from queued commands and writes after script completes. + Create a Session from queued commands, writes, and deletions. Call this at the end of a dry-run execution to bundle all - queued commands and file writes into a reviewable session. + queued operations into a reviewable session. Returns the created Session, or None if nothing was queued. """ - if not self.subprocess_pending and not self.file_writes_pending: + if ( + not self.subprocess_pending + and not self.file_writes_pending + and not self.file_deletions_pending + ): return None from .session import create_session @@ -297,6 +302,14 @@ def finalize_session(self): elif isinstance(write, dict): pending_write_dicts.append(write) + # Convert PendingDeletion objects to dicts for serialization + pending_deletion_dicts = [] + for deletion in self.file_deletions_pending: + if hasattr(deletion, "to_dict"): + pending_deletion_dicts.append(deletion.to_dict()) + elif isinstance(deletion, dict): + pending_deletion_dicts.append(deletion) + session = create_session( script_path=self.subprocess_script_path or "", commands=list(self.subprocess_pending), @@ -305,10 +318,12 @@ def finalize_session(self): analysis=self.subprocess_analysis or "", sandbox_args=self.subprocess_sandbox_args, pending_writes=pending_write_dicts, + pending_deletions=pending_deletion_dicts, ) self.subprocess_pending.clear() self.file_writes_pending.clear() + self.file_deletions_pending.clear() return session def load_session_commands(self, session): diff --git a/shannot/mix_vfs.py b/shannot/mix_vfs.py index bc04290..979d5b4 100644 --- a/shannot/mix_vfs.py +++ b/shannot/mix_vfs.py @@ -20,6 +20,18 @@ GID = 1000 INO_COUNTER = 0 +# O_DIRECTORY flag for opening directories (platform-specific) +if sys.platform == "darwin": + O_DIRECTORY = 0x100000 +else: + O_DIRECTORY = 0x10000 # Linux + +# AT_FDCWD constant for *at syscalls (use cwd instead of dirfd) +AT_FDCWD = -100 + +# AT_REMOVEDIR flag for unlinkat() +AT_REMOVEDIR = 0x200 + class FSObject: """Base class for virtual filesystem objects. @@ -164,6 +176,28 @@ def join(self, name: str) -> FSObject: # don't allow access to symlinks and other special files raise OSError(errno.EACCES, path) + def stat(self): + """Return stat using real filesystem inode for samestat compatibility.""" + # Get real stat for inode/dev (needed for samestat to work) + if self.follow_links: + real_st = os.stat(self.path) + else: + real_st = os.lstat(self.path) + + # Build synthetic stat with real inode + st_mode = self.kind + st_mode |= stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH + st_mode |= stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH # Directories are executable + return new_stat( + st_ino=real_st.st_ino, + st_dev=real_st.st_dev, + st_nlink=1, + st_size=0, + st_mode=st_mode, + st_uid=0, # Read-only + st_gid=0, + ) + class OverlayDir(RealDir): """RealDir with virtual file overrides. @@ -224,12 +258,30 @@ def open(self) -> BinaryIO: except OSError as e: raise OSError(e.errno, "open failed") from e + def stat(self): + """Return stat using real filesystem inode for samestat compatibility.""" + real_st = os.stat(self.path) + + # Build synthetic stat with real inode + st_mode = self.kind + st_mode |= stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH + return new_stat( + st_ino=real_st.st_ino, + st_dev=real_st.st_dev, + st_nlink=1, + st_size=real_st.st_size, + st_mode=st_mode, + st_uid=0, # Read-only + st_gid=0, + ) + class OpenDir: """Iterator state for an open directory (used by opendir/readdir).""" - def __init__(self, node): + def __init__(self, node, fd=-1): self.node = node + self.fd = fd # fd from fdopendir(), or -1 for opendir() self.iter_names = iter(node.keys()) def readdir(self): @@ -278,17 +330,21 @@ class MixVFS: # The allowed 'fd' to return. You might increase the range if your # subprocess needs more fd's. - virtual_fd_range = range(3, 50) + virtual_fd_range = range(3, 512) # Support deep recursive rmtree # This is the number of simultaneous open directories. The value of 0 # prevents opendir() from working at all, which is fine in some situations # (notably with pypy2-sandbox, but not with pypy3-sandbox). - virtual_fd_directories = 20 + virtual_fd_directories = 100 # Support deep recursive rmtree # Write tracking vfs_track_writes = False # When True, queue writes for approval file_writes_pending = [] # List of PendingWrite objects + # Deletion tracking + vfs_track_deletions = False # When True, queue deletions for approval + file_deletions_pending = [] # List of PendingDeletion objects + def __init__(self, *args, **kwds): try: self.vfs_root = kwds.pop("vfs_root") @@ -305,7 +361,169 @@ def __init__(self, *args, **kwds): s_mkdir = sigerror("mkdir(pi)i", errno.EPERM, -1) s_fcntl = sigerror("fcntl(iii)i", errno.ENOSYS, -1) - s_unlink = sigerror("unlink(p)i", errno.EPERM, -1) + + @vfs_signature("unlink(p)i", filearg=0) + def s_unlink(self, p_pathname): + """Handle file deletion request.""" + path = self.vfs_fetch_path(p_pathname) + + # If tracking is disabled, deny the operation + if not self.vfs_track_deletions: + raise OSError(errno.EPERM, "unlink not permitted") + + # Verify it's a file (not a directory) + try: + node = self.vfs_getnode(p_pathname) + if node.is_dir(): + raise OSError(errno.EISDIR, "is a directory") + except OSError as e: + if e.errno == errno.ENOENT: + # File might exist on real filesystem (e.g., home dir) + pass + else: + raise + + # Get file size from real filesystem if possible + size = 0 + try: + real_path = Path(path) + if real_path.exists() and real_path.is_file(): + size = real_path.stat().st_size + except (OSError, PermissionError): + pass + + # Queue the deletion for approval + from .pending_deletion import PendingDeletion + + is_remote = hasattr(self, "remote_target") and self.remote_target is not None + + pending = PendingDeletion( + path=path, + target_type="file", + size=size, + remote=is_remote, + ) + self.file_deletions_pending.append(pending) + + if self.debug_errors: + sys.stderr.write(f"[DRY-RUN] DELETE {path}\n") + + return 0 # Pretend success (deferred) + + @vfs_signature("rmdir(p)i", filearg=0) + def s_rmdir(self, p_pathname): + """Handle directory deletion request.""" + path = self.vfs_fetch_path(p_pathname) + + # If tracking is disabled, deny the operation + if not self.vfs_track_deletions: + raise OSError(errno.EPERM, "rmdir not permitted") + + # Verify it's a directory + try: + node = self.vfs_getnode(p_pathname) + if not node.is_dir(): + raise OSError(errno.ENOTDIR, "not a directory") + except OSError as e: + if e.errno == errno.ENOENT: + # Directory might exist on real filesystem + pass + else: + raise + + # Queue the deletion for approval + from .pending_deletion import PendingDeletion + + is_remote = hasattr(self, "remote_target") and self.remote_target is not None + + pending = PendingDeletion( + path=path, + target_type="directory", + size=0, + remote=is_remote, + ) + self.file_deletions_pending.append(pending) + + if self.debug_errors: + sys.stderr.write(f"[DRY-RUN] RMDIR {path}\n") + + return 0 # Pretend success (deferred) + + @vfs_signature("unlinkat(ipi)i", filearg=1) + def s_unlinkat(self, dirfd, p_pathname, flags): + """Delete file/directory relative to directory fd. + + Used by shutil.rmtree with dir_fd parameter. + flags can be AT_REMOVEDIR (0x200) to remove directory instead of file. + """ + path = self.vfs_fetch_path(p_pathname) + + # Build full path for deletion tracking + if path.startswith("/"): + full_path = path + elif dirfd == AT_FDCWD: + full_path = os.path.join(self.virtual_cwd, path) + else: + # Get the directory node's real path + try: + f, node = self.vfs_open_fds[dirfd] + except KeyError: + raise OSError(errno.EBADF, "bad file descriptor") from None + + if not node.is_dir(): + raise OSError(errno.ENOTDIR, "not a directory") + + # For RealDir nodes, get the real path + if hasattr(node, "path"): + full_path = os.path.join(node.path, path) + else: + # Virtual directory - construct path + raise OSError(errno.EPERM, "unlinkat on virtual directory not supported") + + # Determine target type from flags and actual filesystem + # AT_REMOVEDIR flag indicates directory, but we also check filesystem + # to handle cases where sandboxed code misidentifies the target + real_path = Path(full_path) + is_directory = (flags & AT_REMOVEDIR) != 0 + if real_path.exists(): + is_directory = real_path.is_dir() + + if not self.vfs_track_deletions: + raise OSError(errno.EPERM, "deletion not permitted") + + from .pending_deletion import PendingDeletion + + is_remote = hasattr(self, "remote_target") and self.remote_target is not None + + if is_directory: + pending = PendingDeletion( + path=full_path, + target_type="directory", + size=0, + remote=is_remote, + ) + if self.debug_errors: + sys.stderr.write(f"[DRY-RUN] RMDIR {full_path}\n") + else: + size = 0 + try: + if real_path.exists() and real_path.is_file(): + size = real_path.stat().st_size + except (OSError, PermissionError): + pass + + pending = PendingDeletion( + path=full_path, + target_type="file", + size=size, + remote=is_remote, + ) + if self.debug_errors: + sys.stderr.write(f"[DRY-RUN] DELETE {full_path}\n") + + self.file_deletions_pending.append(pending) + + return 0 # Pretend success (deferred) @staticmethod def vfs_pypy_lib_directory(library_path, exclude=None): @@ -367,14 +585,16 @@ def vfs_write_stat(self, p_statbuf, node): self.sandio.write_buffer(p_statbuf, bytes_data) # type: ignore[attr-defined] def vfs_allocate_fd(self, f, node): - if node.is_dir(): - raise ValueError("cannot allocate fd for directory") + """Allocate fd for an open file or directory. + + For regular files, f is an open file object. + For directories, f is None (directory fds are used with fdopendir/unlinkat). + """ for fd in self.virtual_fd_range: - if fd not in self.vfs_open_fds: + if fd not in self.vfs_open_fds and fd not in self.vfs_write_buffers: self.vfs_open_fds[fd] = (f, node) return fd - else: - raise OSError(errno.EMFILE, "trying to open too many files") + raise OSError(errno.EMFILE, "trying to open too many files") def vfs_get_file(self, fd): """Return the open file for file descriptor `fd`.""" @@ -452,18 +672,84 @@ def s_fstat64(self, fd, p_statbuf): raise OSError(errno.EBADF, "bad file descriptor") from None self.vfs_write_stat(p_statbuf, node) + def vfs_resolve_at(self, dirfd, p_pathname): + """Resolve a path relative to a directory fd. + + If dirfd is AT_FDCWD or path is absolute, resolves from VFS root. + Otherwise, resolves relative to the directory represented by dirfd. + + Returns the resolved VFS node. + """ + path = self.vfs_fetch_path(p_pathname) + + # Absolute path - ignore dirfd + if path.startswith("/"): + return self.vfs_getnode(p_pathname) + + # AT_FDCWD - use virtual_cwd + if dirfd == AT_FDCWD: + return self.vfs_getnode(p_pathname) + + # Relative to dirfd + try: + f, node = self.vfs_open_fds[dirfd] + except KeyError: + raise OSError(errno.EBADF, "bad file descriptor") from None + + if not node.is_dir(): + raise OSError(errno.ENOTDIR, "not a directory") + + # Navigate from the directory node + for name in path.split("/"): + if name and name != ".": + node = node.join(name) + + return node + @vfs_signature("fstatat(ippi)i", filearg=1) def s_fstatat(self, dirfd, p_pathname, p_statbuf, flags): """Stat relative to directory fd. - For sandbox, we ignore dirfd (treat as AT_FDCWD) and resolve - paths from VFS root. - TODO: Handle AT_SYMLINK_NOFOLLOW (0x20) flag when VFS supports symlinks. """ - node = self.vfs_getnode(p_pathname) + node = self.vfs_resolve_at(dirfd, p_pathname) self.vfs_write_stat(p_statbuf, node) + # macOS uses fstatat64 as an alias + @vfs_signature("fstatat64(ippi)i", filearg=1) + def s_fstatat64(self, dirfd, p_pathname, p_statbuf, flags): + return self.s_fstatat(dirfd, p_pathname, p_statbuf, flags) + + @vfs_signature("openat(ipii)i", filearg=1) + def s_openat(self, dirfd, p_pathname, flags, mode): + """Open file relative to directory fd. + + Used by shutil.rmtree for fd-based operations. + """ + path = self.vfs_fetch_path(p_pathname) + + # For absolute paths or AT_FDCWD, delegate to s_open + if path.startswith("/") or dirfd == AT_FDCWD: + return self.s_open(p_pathname, flags, mode) + + # Resolve relative to dirfd + node = self.vfs_resolve_at(dirfd, p_pathname) + + write_mode = flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) != os.O_RDONLY + + if write_mode: + # Write mode not supported via openat + raise OSError(errno.EPERM, "write via openat not supported") + + # Handle directory opening + if node.is_dir(): + return self.vfs_allocate_fd(None, node) + + if not node.access(os.R_OK): + raise OSError(errno.EACCES, node) + f = node.open() + return self.vfs_allocate_fd(f, node) + @vfs_signature("access(pi)i", filearg=0) def s_access(self, p_pathname, mode): node = self.vfs_getnode(p_pathname) @@ -475,6 +761,7 @@ def s_open(self, p_pathname, flags, mode): path = self.vfs_fetch_path(p_pathname) write_mode = flags & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) != os.O_RDONLY create_mode = flags & os.O_CREAT + directory_mode = flags & O_DIRECTORY # Handle write mode with tracking if write_mode: @@ -485,13 +772,14 @@ def s_open(self, p_pathname, flags, mode): original = None try: node = self.vfs_getnode(p_pathname) - if not node.is_dir(): - try: - f = node.open() - original = f.read() - f.close() - except OSError: - pass + if node.is_dir(): + raise OSError(errno.EISDIR, "Is a directory") + try: + f = node.open() + original = f.read() + f.close() + except OSError: + pass except OSError: if not create_mode: raise @@ -504,6 +792,18 @@ def s_open(self, p_pathname, flags, mode): # Read-only mode node = self.vfs_getnode(p_pathname) + + # Handle directory opening (for shutil.rmtree fd-based operations) + if node.is_dir(): + if not node.access(os.R_OK): + raise OSError(errno.EACCES, node) + # Return fd for directory (file object is None) + return self.vfs_allocate_fd(None, node) + + # O_DIRECTORY flag but path is not a directory + if directory_mode: + raise OSError(errno.ENOTDIR, "Not a directory") + if not node.access(os.R_OK): raise OSError(errno.EACCES, node) f = node.open() @@ -580,10 +880,34 @@ def s_close(self, fd): ) return - # Regular file close - f = self.vfs_get_file(fd) + # Regular file/directory close + try: + f, node = self.vfs_open_fds[fd] + except KeyError: + raise OSError(errno.EBADF, "bad file descriptor") from None del self.vfs_open_fds[fd] - f.close() + # For directory fds, f is None (no file object to close) + if f is not None: + f.close() + + @vfs_signature("dup(i)i") + def s_dup(self, oldfd): + """Duplicate a file descriptor.""" + try: + f, node = self.vfs_open_fds[oldfd] + except KeyError: + raise OSError(errno.EBADF, "bad file descriptor") from None + + # Allocate new fd pointing to same file/node + return self.vfs_allocate_fd(f, node) + + @signature("rpy_dup_noninheritable(i)i") + def s_rpy_dup_noninheritable(self, oldfd): + """PyPy's dup with close-on-exec flag - same as dup for sandbox. + + Used by os.listdir(fd) and os.scandir(fd). + """ + return self.s_dup(oldfd) @vfs_signature("write(ipi)i") def s_write(self, fd, p_buf, count): @@ -715,11 +1039,60 @@ def s_readdir(self, p_dir): @vfs_signature("closedir(p)i") def s_closedir(self, p_dir): - del self.vfs_open_dirs[p_dir.addr] + fdir = self.vfs_open_dirs.pop(p_dir.addr, None) + # Per POSIX, closedir after fdopendir must close the underlying fd + if fdir is not None and fdir.fd >= 0: + self.vfs_open_fds.pop(fdir.fd, None) self.sandio.free(p_dir) # type: ignore[attr-defined] return 0 + @signature("rewinddir(p)v") + def s_rewinddir(self, p_dir): + """Reset directory stream to beginning. + + Used by os.listdir(fd) after fdopendir. + """ + try: + fdir = self.vfs_open_dirs[p_dir.addr] + except KeyError: + return # Invalid DIR* - just ignore + # Reset the iterator + fdir.index = 0 + + @vfs_signature("fdopendir(i)p") + def s_fdopendir(self, fd): + """Open a directory stream from a file descriptor. + + Used by os.listdir(fd) and os.scandir(fd) in shutil.rmtree. + """ + # Check concurrent directory limit + if len(self.vfs_open_dirs) >= self.virtual_fd_directories: + if self.virtual_fd_directories == 0: + raise OSError(errno.EPERM, "fdopendir() not allowed") + raise OSError(errno.EMFILE, "trying to open too many directories") + + # Get the node from the fd + try: + f, node = self.vfs_open_fds[fd] + except KeyError: + raise OSError(errno.EBADF, "bad file descriptor") from None + + if not node.is_dir(): + raise OSError(errno.ENOTDIR, "not a directory") + + # Create directory iterator, storing the fd for dirfd() + fdir = OpenDir(node, fd=fd) + p = self.sandio.malloc(b"\x00" * SIZEOF_DIRENT) # type: ignore[attr-defined] + self.vfs_open_dirs[p.addr] = fdir + return p + @vfs_signature("dirfd(p)i") def s_dirfd(self, p_dir): - """Return fd for directory stream - not supported, return error.""" - raise OSError(errno.ENOTSUP, "dirfd not supported") + """Return fd for directory stream opened via fdopendir().""" + try: + fdir = self.vfs_open_dirs[p_dir.addr] + except KeyError: + raise OSError(errno.EBADF, "bad directory stream") from None + if fdir.fd < 0: + raise OSError(errno.ENOTSUP, "dirfd not supported for opendir streams") + return fdir.fd diff --git a/shannot/pending_deletion.py b/shannot/pending_deletion.py new file mode 100644 index 0000000..854ad00 --- /dev/null +++ b/shannot/pending_deletion.py @@ -0,0 +1,141 @@ +"""Pending deletion tracking for approval system.""" + +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass +from pathlib import PurePosixPath +from typing import Literal + +DeletionType = Literal["file", "directory"] + + +@dataclass +class PendingDeletion: + """ + A file or directory deletion awaiting approval. + + Tracks the path, type (file/directory), and size for generating + previews and executing approved deletions. + """ + + path: str # Virtual/remote path to delete + target_type: DeletionType # "file" or "directory" + size: int = 0 # Size in bytes (files only) + remote: bool = False # Whether this is a remote (SSH) deletion + + def get_preview(self) -> str: + """Get a preview suitable for approval display.""" + type_label = "directory" if self.target_type == "directory" else "file" + remote_tag = " [remote]" if self.remote else "" + if self.size > 0: + size_str = self.size_human() + return f"DELETE {type_label}: {self.path} ({size_str}){remote_tag}" + return f"DELETE {type_label}: {self.path}{remote_tag}" + + def size_human(self) -> str: + """Return human-readable size string.""" + size = self.size + if size < 1024: + return f"{size} B" + elif size < 1024 * 1024: + return f"{size / 1024:.1f} KB" + else: + return f"{size / (1024 * 1024):.1f} MB" + + def to_dict(self) -> dict: + """Serialize to dict for JSON storage.""" + return { + "path": self.path, + "target_type": self.target_type, + "size": self.size, + "remote": self.remote, + } + + @classmethod + def from_dict(cls, data: dict) -> PendingDeletion: + """Deserialize from dict.""" + return cls( + path=data["path"], + target_type=data.get("target_type", "file"), + size=data.get("size", 0), + remote=data.get("remote", False), + ) + + +def summarize_deletions(deletions: list[dict]) -> list[dict]: + """ + Group deletions by root directory for TUI display. + + Parameters + ---------- + deletions + List of deletion dicts (from PendingDeletion.to_dict()) + + Returns + ------- + list[dict] + Grouped summaries: [{root, file_count, dir_count, total_size}, ...] + """ + if not deletions: + return [] + + # Find common prefixes to group deletions + groups: dict[str, dict] = defaultdict( + lambda: {"file_count": 0, "dir_count": 0, "total_size": 0, "paths": []} + ) + + for d in deletions: + path = d.get("path", "") + target_type = d.get("target_type", "file") + size = d.get("size", 0) + + # Find the root directory (first two components after home) + # e.g., ~/.cache/uv/cache/xyz -> ~/.cache/uv/ + parts = PurePosixPath(path).parts + if len(parts) >= 3: + # Take first 3 parts: ('/', 'Users', 'name', '.cache', 'uv', ...) + # We want the directory being deleted, e.g., ~/.cache/uv/ + home_idx = None + for i, p in enumerate(parts): + if p.startswith("."): # Found dotfile/dotdir + home_idx = i + break + if home_idx is not None and home_idx + 1 < len(parts): + root = str(PurePosixPath(*parts[: home_idx + 2])) + "/" + else: + root = str(PurePosixPath(*parts[:3])) + "/" + else: + root = path + + groups[root]["paths"].append(path) + groups[root]["total_size"] += size + if target_type == "directory": + groups[root]["dir_count"] += 1 + else: + groups[root]["file_count"] += 1 + + # Convert to list and sort by total size descending + result = [ + { + "root": root, + "file_count": data["file_count"], + "dir_count": data["dir_count"], + "total_size": data["total_size"], + } + for root, data in groups.items() + ] + result.sort(key=lambda x: x["total_size"], reverse=True) + return result + + +def format_size(size: int) -> str: + """Format size in bytes to human-readable string.""" + if size < 1024: + return f"{size} B" + elif size < 1024 * 1024: + return f"{size / 1024:.1f} KB" + elif size < 1024 * 1024 * 1024: + return f"{size / (1024 * 1024):.1f} MB" + else: + return f"{size / (1024 * 1024 * 1024):.1f} GB" diff --git a/shannot/run_session.py b/shannot/run_session.py index f8773d7..6e661e9 100644 --- a/shannot/run_session.py +++ b/shannot/run_session.py @@ -78,6 +78,36 @@ def format_execution_summary(session: Session, *, use_color: bool = True) -> str error = write_info.get("error", "unknown error") lines.append(f" {mark} {path} ({error})") + # Files/directories deleted + if session.completed_deletions: + if lines: + lines.append("") + # Count actual deletions (not skipped) + actual = [d for d in session.completed_deletions if not d.get("skipped")] + skipped = len(session.completed_deletions) - len(actual) + skip_note = f" ({skipped} already removed)" if skipped else "" + lines.append(f"Deleted {len(actual)} item(s){skip_note}:") + for del_info in session.completed_deletions: + path = del_info.get("path", "") + success = del_info.get("success", False) + skipped_item = del_info.get("skipped", False) + target_type = del_info.get("target_type", "file") + + if skipped_item: + continue # Don't show skipped items + + if use_color: + mark = f"{GREEN}✓{RESET}" if success else f"{RED}✗{RESET}" + else: + mark = "✓" if success else "✗" + + type_label = " [dir]" if target_type == "directory" else "" + if success: + lines.append(f" {mark} {path}{type_label}") + else: + error = del_info.get("error", "unknown error") + lines.append(f" {mark} {path} ({error})") + return "\n".join(lines) @@ -182,6 +212,9 @@ def execute_session_direct(session) -> int: # Commit pending writes to filesystem completed_writes = session.commit_writes() + # Commit pending deletions to filesystem + completed_deletions = session.commit_deletions() + # Update session with results session.stdout = stdout session.stderr = stderr @@ -190,6 +223,7 @@ def execute_session_direct(session) -> int: session.status = "executed" if exit_code == 0 else "failed" session.executed_commands = executed_commands session.completed_writes = completed_writes + session.completed_deletions = completed_deletions session.save() # Print output diff --git a/shannot/session.py b/shannot/session.py index 54d8c60..0db6877 100644 --- a/shannot/session.py +++ b/shannot/session.py @@ -29,6 +29,7 @@ class Session: script_path: str # Original script path commands: list[str] = field(default_factory=list) # Queued commands pending_writes: list[dict] = field(default_factory=list) # Queued file writes + pending_deletions: list[dict] = field(default_factory=list) # Queued file/dir deletions analysis: str = "" # Description of what script does status: SessionStatus = "pending" created_at: str = "" # ISO timestamp @@ -43,6 +44,7 @@ class Session: # Execution tracking (populated after execution completes) executed_commands: list[dict] = field(default_factory=list) # {cmd, exit_code} completed_writes: list[dict] = field(default_factory=list) # {path, success, size/error} + completed_deletions: list[dict] = field(default_factory=list) # {path, success, target_type} # Remote execution fields target: str | None = None # SSH target (user@host) if remote @@ -129,6 +131,113 @@ def commit_writes(self) -> list[dict]: return results + def commit_deletions(self) -> list[dict]: + """ + Commit pending deletions to real filesystem. + + Deletions are processed in order (files before parent directories, + as captured by rmtree). Already-deleted items are skipped. + + Returns list of results: {path, success, target_type} or {path, success, error} + """ + results = [] + for del_data in self.pending_deletions: + path = del_data.get("path", "") + target_type = del_data.get("target_type", "file") + + try: + target_path = Path(path) + + if not target_path.exists(): + # Already deleted (e.g., parent rmtree already removed it) + results.append( + { + "path": path, + "success": True, + "target_type": target_type, + "skipped": True, + } + ) + continue + + if target_type == "directory": + target_path.rmdir() + else: + target_path.unlink() + + results.append( + { + "path": path, + "success": True, + "target_type": target_type, + } + ) + except Exception as e: + results.append( + { + "path": path, + "success": False, + "error": str(e), + } + ) + + return results + + def commit_deletions_remote(self, ssh: object) -> list[dict]: + """ + Commit pending deletions to remote filesystem via SSH. + + Parameters + ---------- + ssh + An SSHConnection instance with run() method. + + Returns + ------- + list[dict] + List of results: {path, success, target_type} or {path, success, error} + """ + import shlex + + results = [] + for del_data in self.pending_deletions: + path = del_data.get("path", "") + target_type = del_data.get("target_type", "file") + + try: + if target_type == "directory": + result = ssh.run(f"rmdir {shlex.quote(path)} 2>/dev/null || true") # type: ignore[union-attr] + else: + result = ssh.run(f"rm -f {shlex.quote(path)}") # type: ignore[union-attr] + + if result.returncode == 0: + results.append( + { + "path": path, + "success": True, + "target_type": target_type, + } + ) + else: + stderr = result.stderr.decode("utf-8", errors="replace").strip() + results.append( + { + "path": path, + "success": False, + "error": stderr or "deletion failed", + } + ) + except Exception as e: + results.append( + { + "path": path, + "success": False, + "error": str(e), + } + ) + + return results + def commit_writes_remote(self, ssh: object) -> list[dict]: """ Commit pending writes to remote filesystem via SSH. @@ -336,6 +445,7 @@ def create_session( analysis: str = "", sandbox_args: dict | None = None, pending_writes: list[dict] | None = None, + pending_deletions: list[dict] | None = None, ) -> Session: """Create a new session from a dry-run execution.""" if name is None: @@ -347,6 +457,7 @@ def create_session( script_path=script_path, commands=commands, pending_writes=pending_writes or [], + pending_deletions=pending_deletions or [], analysis=analysis, sandbox_args=sandbox_args or {}, ) diff --git a/shannot/virtualizedproc.py b/shannot/virtualizedproc.py index 3eb44ec..5873533 100644 --- a/shannot/virtualizedproc.py +++ b/shannot/virtualizedproc.py @@ -570,6 +570,7 @@ def s_get_environ(self): env_vars = [ f"HOME={self.virtual_home}", f"USER={self.virtual_user}", + "SHANNOT_SANDBOX=1", ] # Allocate each string and collect pointers ptrs = [] @@ -603,6 +604,7 @@ def s_getenv(self, p_name): env_vars = { "HOME": self.virtual_home, "USER": self.virtual_user, + "SHANNOT_SANDBOX": "1", } value = env_vars.get(name) if value is None: From 9b97f64dde885525c1315634e57ee927c66cca49 Mon Sep 17 00:00:00 2001 From: Corv Date: Tue, 6 Jan 2026 22:02:47 +0100 Subject: [PATCH 2/4] Bump version to 0.10.0 and update changelog --- CHANGELOG.md | 7 ++++++- pyproject.toml | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 059ab4d..336ad99 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,16 +5,21 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [0.10.0] - 2026-01-06 ### Features +- **Deletion tracking**: `shutil.rmtree()` and file deletions are now queued for approval + - Full support for fd-based directory operations (fdopendir, dirfd, unlinkat) + - Deletions displayed in approve TUI with file/directory counts + - Audit logging for deletion events - Add PyPy sandbox support for macOS ARM64 - Add slash commands for changelog, pr, and release skills - Mount home directory read-only in sandbox VFS ### Enhancements +- Add `SHANNOT_SANDBOX=1` environment variable for sandbox detection by scripts - Add darwin/arm64 struct sizes to validation - Add `check_output` to subprocess stub for `platform.node()` support - Add `pwd` stub and populate environ with HOME/USER for `expanduser()` support diff --git a/pyproject.toml b/pyproject.toml index 547f40d..f996f10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "shannot" -version = "0.9.4" +version = "0.10.0" description = "Sandboxed system administration for LLM agents" readme = "README.md" license = {text = "Apache-2.0"} From 00e840ab08a22d83767097936736b7d8e4d09c85 Mon Sep 17 00:00:00 2001 From: Corv Date: Tue, 6 Jan 2026 22:14:20 +0100 Subject: [PATCH 3/4] Fix basedpyright type errors in mix_vfs.py - Add type annotations to s_rewinddir in virtualizedproc.py and mix_vfs.py to fix incompatible base class method signature errors - Add mixin attribute declarations (debug_errors, virtual_cwd, remote_target) to MixVFS class for type checking - Simplify hasattr checks to direct attribute access now that defaults exist - Import Ptr in mix_vfs.py for type annotation --- shannot/mix_vfs.py | 16 +++++++++++----- shannot/virtualizedproc.py | 2 +- uv.lock | 2 +- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/shannot/mix_vfs.py b/shannot/mix_vfs.py index 979d5b4..42e5ace 100644 --- a/shannot/mix_vfs.py +++ b/shannot/mix_vfs.py @@ -9,7 +9,7 @@ from pathlib import Path from typing import BinaryIO -from .sandboxio import NULL +from .sandboxio import NULL, Ptr from .structs import DT_DIR, DT_REG, IS_MACOS, SIZEOF_DIRENT, new_dirent, new_stat, struct_to_bytes from .virtualizedproc import sigerror, signature @@ -345,6 +345,12 @@ class MixVFS: vfs_track_deletions = False # When True, queue deletions for approval file_deletions_pending = [] # List of PendingDeletion objects + # Attributes expected from composed class (VirtualizedProc, MixRemote, etc.) + # These are declared here for type checking but overridden by mixins + debug_errors: bool = False + virtual_cwd: str = "/" + remote_target: str | None = None + def __init__(self, *args, **kwds): try: self.vfs_root = kwds.pop("vfs_root") @@ -395,7 +401,7 @@ def s_unlink(self, p_pathname): # Queue the deletion for approval from .pending_deletion import PendingDeletion - is_remote = hasattr(self, "remote_target") and self.remote_target is not None + is_remote = self.remote_target is not None pending = PendingDeletion( path=path, @@ -434,7 +440,7 @@ def s_rmdir(self, p_pathname): # Queue the deletion for approval from .pending_deletion import PendingDeletion - is_remote = hasattr(self, "remote_target") and self.remote_target is not None + is_remote = self.remote_target is not None pending = PendingDeletion( path=path, @@ -493,7 +499,7 @@ def s_unlinkat(self, dirfd, p_pathname, flags): from .pending_deletion import PendingDeletion - is_remote = hasattr(self, "remote_target") and self.remote_target is not None + is_remote = self.remote_target is not None if is_directory: pending = PendingDeletion( @@ -1047,7 +1053,7 @@ def s_closedir(self, p_dir): return 0 @signature("rewinddir(p)v") - def s_rewinddir(self, p_dir): + def s_rewinddir(self, p_dir: Ptr) -> None: """Reset directory stream to beginning. Used by os.listdir(fd) after fdopendir. diff --git a/shannot/virtualizedproc.py b/shannot/virtualizedproc.py index 5873533..bf2a634 100644 --- a/shannot/virtualizedproc.py +++ b/shannot/virtualizedproc.py @@ -692,7 +692,7 @@ def s_get_stdout(self, *args): raise Exception("subprocess calls the unsupported RPython get_stdout() helper") @signature("rewinddir(p)v") - def s_rewinddir(self, *args): + def s_rewinddir(self, p_dir: Ptr) -> None: raise Exception("subprocess calls the unsupported rewinddir() function") @signature("rpy_cpu_count()i") diff --git a/uv.lock b/uv.lock index 1387551..47eba34 100644 --- a/uv.lock +++ b/uv.lock @@ -793,7 +793,7 @@ wheels = [ [[package]] name = "shannot" -version = "0.9.4" +version = "0.10.0" source = { editable = "." } [package.optional-dependencies] From 862222a6857175d79241b50794dedabfadfe3964 Mon Sep 17 00:00:00 2001 From: Corv Date: Tue, 6 Jan 2026 22:19:20 +0100 Subject: [PATCH 4/4] Add explanatory comments to empty except clauses Address CodeQL warnings about empty except blocks by adding comments explaining why errors are intentionally ignored: - File size is optional metadata when tracking deletions - Original file content may be unreadable for new/permission-restricted files --- shannot/mix_vfs.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/shannot/mix_vfs.py b/shannot/mix_vfs.py index 42e5ace..106139b 100644 --- a/shannot/mix_vfs.py +++ b/shannot/mix_vfs.py @@ -389,25 +389,23 @@ def s_unlink(self, p_pathname): else: raise - # Get file size from real filesystem if possible + # Get file size from real filesystem if possible (optional metadata) size = 0 try: real_path = Path(path) if real_path.exists() and real_path.is_file(): size = real_path.stat().st_size except (OSError, PermissionError): - pass + pass # Size is optional; use 0 if we can't stat the file # Queue the deletion for approval from .pending_deletion import PendingDeletion - is_remote = self.remote_target is not None - pending = PendingDeletion( path=path, target_type="file", size=size, - remote=is_remote, + remote=self.remote_target is not None, ) self.file_deletions_pending.append(pending) @@ -516,7 +514,7 @@ def s_unlinkat(self, dirfd, p_pathname, flags): if real_path.exists() and real_path.is_file(): size = real_path.stat().st_size except (OSError, PermissionError): - pass + pass # Size is optional; use 0 if we can't stat the file pending = PendingDeletion( path=full_path, @@ -785,7 +783,7 @@ def s_open(self, p_pathname, flags, mode): original = f.read() f.close() except OSError: - pass + pass # File exists but can't be read; treat as new file except OSError: if not create_mode: raise