From 59fafdbf989e85d58ee2beb20a06752aa64e8af9 Mon Sep 17 00:00:00 2001 From: Dane Urban Date: Tue, 5 May 2026 15:50:38 -0700 Subject: [PATCH] refactor(executors): extract reusable primitives for upcoming session work Pure refactor - no behavior change. Existing test suite passes unmodified. * `DockerExecutor._build_run_command` now takes `sleep_seconds` and an optional `labels` mapping, so future callers can spin up containers with custom durations and label sets. * `DockerExecutor._create_tar_archive`'s `code` argument is now optional, letting callers stage files without an entrypoint. * `_upload_tar_to_container` extracted from `_stage_files_in_container` for reuse. * `KubernetesExecutor._create_pod_manifest` is parameterized over `command`, `labels`, `annotations`, and `active_deadline_seconds`. * `KubernetesExecutor._create_tar_archive` mirrors the Docker change. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../app/services/executor_docker.py | 134 +++++++++--------- .../app/services/executor_kubernetes.py | 124 ++++++++-------- 2 files changed, 134 insertions(+), 124 deletions(-) diff --git a/code-interpreter/app/services/executor_docker.py b/code-interpreter/app/services/executor_docker.py index 8219f7c..f4ada0c 100644 --- a/code-interpreter/app/services/executor_docker.py +++ b/code-interpreter/app/services/executor_docker.py @@ -8,7 +8,7 @@ import tarfile import time import uuid -from collections.abc import Generator, Sequence +from collections.abc import Generator, Mapping, Sequence from contextlib import contextmanager, suppress from dataclasses import dataclass from pathlib import Path @@ -134,58 +134,51 @@ def _validate_relative_path(self, path_str: str) -> Path: def _create_tar_archive( self, - code: str, + code: str | None = None, files: Sequence[tuple[str, bytes]] | None = None, last_line_interactive: bool = True, ) -> bytes: - """Create a tar archive containing the code and any additional files. + """Create a tar archive optionally containing an entrypoint and files. Args: - last_line_interactive: If True, wrap code so the last line prints its value - if it's a bare expression (only the last line is affected). + code: If provided, written as ``__main__.py`` at the archive root. + last_line_interactive: If True and code is provided, wrap the code so + the last line prints its value if it's a bare expression. """ tar_buffer = io.BytesIO() with tarfile.open(fileobj=tar_buffer, mode="w") as tar: - # Add __main__.py - optionally wrap in last-line-interactive mode - code_to_execute = code - if last_line_interactive: - # Wrap to make the last expression value print to stdout like Jupyter/REPL - code_to_execute = wrap_last_line_interactive(code) - - code_bytes = code_to_execute.encode("utf-8") - code_info = tarfile.TarInfo(name="__main__.py") - code_info.size = len(code_bytes) - code_info.mode = 0o644 - tar.addfile(code_info, io.BytesIO(code_bytes)) - - # Track directories we've created - created_dirs = set() - - # Add any additional files - if files: - for file_path, content in files: - # Validate the path - validated_path = self._validate_relative_path(file_path) - if validated_path == Path("__main__.py"): - raise ValueError( - "File path '__main__.py' is reserved for the execution entrypoint." - ) - - # Create parent directories if needed - parent_parts = validated_path.parts[:-1] - for i in range(len(parent_parts)): - dir_path = "/".join(parent_parts[: i + 1]) - if dir_path not in created_dirs: - dir_info = tarfile.TarInfo(name=dir_path + "/") - dir_info.type = tarfile.DIRTYPE - dir_info.mode = 0o755 - tar.addfile(dir_info) - created_dirs.add(dir_path) - - file_info = tarfile.TarInfo(name=validated_path.as_posix()) - file_info.size = len(content) - file_info.mode = 0o644 - tar.addfile(file_info, io.BytesIO(content)) + if code is not None: + code_to_execute = ( + wrap_last_line_interactive(code) if last_line_interactive else code + ) + code_bytes = code_to_execute.encode("utf-8") + code_info = tarfile.TarInfo(name="__main__.py") + code_info.size = len(code_bytes) + code_info.mode = 0o644 + tar.addfile(code_info, io.BytesIO(code_bytes)) + + created_dirs: set[str] = set() + for file_path, content in files or (): + validated_path = self._validate_relative_path(file_path) + if code is not None and validated_path == Path("__main__.py"): + raise ValueError( + "File path '__main__.py' is reserved for the execution entrypoint." + ) + + parent_parts = validated_path.parts[:-1] + for i in range(len(parent_parts)): + dir_path = "/".join(parent_parts[: i + 1]) + if dir_path not in created_dirs: + dir_info = tarfile.TarInfo(name=dir_path + "/") + dir_info.type = tarfile.DIRTYPE + dir_info.mode = 0o755 + tar.addfile(dir_info) + created_dirs.add(dir_path) + + file_info = tarfile.TarInfo(name=validated_path.as_posix()) + file_info.size = len(content) + file_info.mode = 0o644 + tar.addfile(file_info, io.BytesIO(content)) return tar_buffer.getvalue() @@ -245,11 +238,16 @@ def _build_run_command( container_name: str, cpu_time_limit_sec: int | None, memory_limit_mb: int | None, - timeout_ms: int, + sleep_seconds: int, + labels: Mapping[str, str] | None = None, ) -> list[str]: - """Build the ``docker run`` command for an ephemeral container.""" - # Start the container in detached mode - # We need CAP_CHOWN to set up the workspace, but we'll drop privileges for execution + """Build a detached ``docker run`` command. + + ``sleep_seconds`` controls how long the container's idle ``sleep`` lasts; + callers must ensure it exceeds their work duration. ``labels`` are + attached for later filtering (e.g. by the session reaper). + """ + # We need CAP_CHOWN to set up the workspace, but drop privileges for execution cmd: list[str] = [ self.docker_binary, "run", @@ -267,7 +265,6 @@ def _build_run_command( "64", "--security-opt", "no-new-privileges", - # Keep CAP_CHOWN to allow setting up workspace permissions "--cap-drop", "ALL", "--cap-add", @@ -288,31 +285,26 @@ def _build_run_command( "MPLCONFIGDIR=/tmp/matplotlib", ] + for key, value in (labels or {}).items(): + cmd.extend(["--label", f"{key}={value}"]) + if cpu_time_limit_sec is not None: - cpu_limit = max(int(cpu_time_limit_sec), 1) + cpu_limit = max(cpu_time_limit_sec, 1) cmd.extend(["--ulimit", f"cpu={cpu_limit}:{cpu_limit}"]) if memory_limit_mb is not None: - memory_limit = max(int(memory_limit_mb), 16) + memory_limit = max(memory_limit_mb, 16) mem_flag = f"{memory_limit}m" cmd.extend(["--memory", mem_flag, "--memory-swap", mem_flag]) if self.run_args: cmd.extend(shlex.split(self.run_args)) - # Just sleep - workspace is already created as tmpfs with correct ownership - cmd.extend([self.image, "sleep", str((timeout_ms * 1000) + 10)]) + cmd.extend([self.image, "sleep", str(sleep_seconds)]) return cmd - def _stage_files_in_container( - self, - container_name: str, - code: str, - files: Sequence[tuple[str, bytes]] | None, - last_line_interactive: bool, - ) -> None: - """Create a tar archive and stream it into the container workspace.""" - tar_archive = self._create_tar_archive(code, files, last_line_interactive) + def _upload_tar_to_container(self, container_name: str, tar_archive: bytes) -> None: + """Stream a tar archive into the container workspace.""" tar_cmd = [ self.docker_binary, "exec", @@ -331,6 +323,17 @@ def _stage_files_in_container( f"Failed to extract files: {tar_proc.stderr.decode('utf-8', errors='replace')}" ) + def _stage_files_in_container( + self, + container_name: str, + code: str, + files: Sequence[tuple[str, bytes]] | None, + last_line_interactive: bool, + ) -> None: + """Create a tar archive and stream it into the container workspace.""" + tar_archive = self._create_tar_archive(code, files, last_line_interactive) + self._upload_tar_to_container(container_name, tar_archive) + @contextmanager def _run_in_container( self, @@ -351,7 +354,10 @@ def _run_in_container( container_name = f"code-exec-{uuid.uuid4().hex}" cmd = self._build_run_command( - container_name, cpu_time_limit_sec, memory_limit_mb, timeout_ms + container_name=container_name, + cpu_time_limit_sec=cpu_time_limit_sec, + memory_limit_mb=memory_limit_mb, + sleep_seconds=(timeout_ms * 1000) + 10, ) start_proc = subprocess.run(cmd, capture_output=True, text=True) # nosec B603 if start_proc.returncode != 0: diff --git a/code-interpreter/app/services/executor_kubernetes.py b/code-interpreter/app/services/executor_kubernetes.py index 5247631..96967af 100644 --- a/code-interpreter/app/services/executor_kubernetes.py +++ b/code-interpreter/app/services/executor_kubernetes.py @@ -6,7 +6,7 @@ import tarfile import time import uuid -from collections.abc import Generator, Sequence +from collections.abc import Generator, Mapping, Sequence from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path @@ -131,30 +131,36 @@ def check_health(self) -> HealthCheck: def _create_pod_manifest( self, pod_name: str, - memory_limit_mb: int | None, - cpu_time_limit_sec: int | None, + *, + command: Sequence[str], + labels: Mapping[str, str], + annotations: Mapping[str, str] | None = None, + active_deadline_seconds: int | None = None, + memory_limit_mb: int | None = None, + cpu_time_limit_sec: int | None = None, ) -> V1Pod: - """Create a Kubernetes pod manifest for code execution.""" + """Build a Kubernetes pod manifest for an isolated executor container. - resources: dict[str, dict[str, Any]] = { - "limits": {}, - "requests": {}, - } + ``command`` is the executor container's command (e.g. ``["sleep", "3600"]``). + ``active_deadline_seconds``, when set, instructs kubelet to stop the pod + at that age — used by sessions to enforce TTL even if the API is down. + """ + resources: dict[str, dict[str, Any]] = {"limits": {}, "requests": {}} if memory_limit_mb is not None: - memory_limit = max(int(memory_limit_mb), 16) + memory_limit = max(memory_limit_mb, 16) resources["limits"]["memory"] = f"{memory_limit}Mi" resources["requests"]["memory"] = f"{min(memory_limit, 64)}Mi" if cpu_time_limit_sec is not None: - cpu_limit = max(int(cpu_time_limit_sec), 1) + cpu_limit = max(cpu_time_limit_sec, 1) resources["limits"]["cpu"] = str(cpu_limit) resources["requests"]["cpu"] = "100m" container = V1Container( name="executor", image=self.image, - command=["sleep", "3600"], + command=list(command), working_dir="/workspace", resources=resources if resources["limits"] else None, security_context={ @@ -204,6 +210,7 @@ def _create_pod_manifest( init_containers=[network_lockdown_container], containers=[container], restart_policy="Never", + active_deadline_seconds=active_deadline_seconds, service_account_name=self.service_account if self.service_account else None, volumes=[ {"name": "workspace", "emptyDir": {"sizeLimit": "100Mi"}}, @@ -218,70 +225,65 @@ def _create_pod_manifest( metadata = V1ObjectMeta( name=pod_name, namespace=self.namespace, - labels={ - "app": "code-interpreter", - "component": "executor", - }, + labels=dict(labels), + annotations=dict(annotations) if annotations else None, ) return V1Pod(api_version="v1", kind="Pod", metadata=metadata, spec=spec) def _create_tar_archive( self, - code: str, + code: str | None = None, files: Sequence[tuple[str, bytes]] | None = None, last_line_interactive: bool = True, ) -> bytes: - """Create a tar archive containing the code and any additional files. + """Create a tar archive optionally containing an entrypoint and files. Args: - last_line_interactive: If True, wrap code so the last line prints its value - if it's a bare expression (only the last line is affected). + code: If provided, written as ``__main__.py`` at the archive root. + last_line_interactive: If True and code is provided, wrap the code so + the last line prints its value if it's a bare expression. """ tar_buffer = io.BytesIO() with tarfile.open(fileobj=tar_buffer, mode="w") as tar: - # Add __main__.py - optionally wrap in last-line-interactive mode - code_to_execute = code - if last_line_interactive: - # Wrap to make the last expression value print to stdout like Jupyter/REPL - code_to_execute = wrap_last_line_interactive(code) - - code_bytes = code_to_execute.encode("utf-8") - code_info = tarfile.TarInfo(name="__main__.py") - code_info.size = len(code_bytes) - code_info.mode = 0o644 - code_info.uid = 65532 - code_info.gid = 65532 - tar.addfile(code_info, io.BytesIO(code_bytes)) - - created_dirs = set() - - if files: - for file_path, content in files: - validated_path = self._validate_relative_path(file_path) - if validated_path == Path("__main__.py"): - raise ValueError( - "File path '__main__.py' is reserved for the execution entrypoint." - ) + if code is not None: + code_to_execute = ( + wrap_last_line_interactive(code) if last_line_interactive else code + ) + code_bytes = code_to_execute.encode("utf-8") + code_info = tarfile.TarInfo(name="__main__.py") + code_info.size = len(code_bytes) + code_info.mode = 0o644 + code_info.uid = 65532 + code_info.gid = 65532 + tar.addfile(code_info, io.BytesIO(code_bytes)) + + created_dirs: set[str] = set() + for file_path, content in files or (): + validated_path = self._validate_relative_path(file_path) + if code is not None and validated_path == Path("__main__.py"): + raise ValueError( + "File path '__main__.py' is reserved for the execution entrypoint." + ) - parent_parts = validated_path.parts[:-1] - for i in range(len(parent_parts)): - dir_path = "/".join(parent_parts[: i + 1]) - if dir_path not in created_dirs: - dir_info = tarfile.TarInfo(name=dir_path + "/") - dir_info.type = tarfile.DIRTYPE - dir_info.mode = 0o755 - dir_info.uid = 65532 - dir_info.gid = 65532 - tar.addfile(dir_info) - created_dirs.add(dir_path) - - file_info = tarfile.TarInfo(name=validated_path.as_posix()) - file_info.size = len(content) - file_info.mode = 0o644 - file_info.uid = 65532 - file_info.gid = 65532 - tar.addfile(file_info, io.BytesIO(content)) + parent_parts = validated_path.parts[:-1] + for i in range(len(parent_parts)): + dir_path = "/".join(parent_parts[: i + 1]) + if dir_path not in created_dirs: + dir_info = tarfile.TarInfo(name=dir_path + "/") + dir_info.type = tarfile.DIRTYPE + dir_info.mode = 0o755 + dir_info.uid = 65532 + dir_info.gid = 65532 + tar.addfile(dir_info) + created_dirs.add(dir_path) + + file_info = tarfile.TarInfo(name=validated_path.as_posix()) + file_info.size = len(content) + file_info.mode = 0o644 + file_info.uid = 65532 + file_info.gid = 65532 + tar.addfile(file_info, io.BytesIO(content)) return tar_buffer.getvalue() @@ -404,6 +406,8 @@ def _run_in_pod( pod_manifest = self._create_pod_manifest( pod_name=pod_name, + command=["sleep", "3600"], + labels={"app": "code-interpreter", "component": "executor"}, memory_limit_mb=memory_limit_mb, cpu_time_limit_sec=cpu_time_limit_sec, )