From e0e5003c64f95027f00954116fdd53ed479b5ba1 Mon Sep 17 00:00:00 2001 From: Dane Urban Date: Mon, 4 May 2026 16:20:03 -0700 Subject: [PATCH] Add session creation/deletion + enforced ttl --- code-interpreter/app/api/routes.py | 81 +++++- code-interpreter/app/main.py | 34 ++- code-interpreter/app/models/schemas.py | 27 ++ .../app/services/executor_base.py | 38 +++ .../app/services/executor_docker.py | 242 +++++++++++++----- .../app/services/executor_kubernetes.py | 230 ++++++++++++----- .../integration_tests/test_sessions_docker.py | 219 ++++++++++++++++ .../test_sessions_kubernetes.py | 206 +++++++++++++++ .../integration_tests/test_sessions_routes.py | 170 ++++++++++++ 9 files changed, 1116 insertions(+), 131 deletions(-) create mode 100644 code-interpreter/tests/integration_tests/test_sessions_docker.py create mode 100644 code-interpreter/tests/integration_tests/test_sessions_kubernetes.py create mode 100644 code-interpreter/tests/integration_tests/test_sessions_routes.py diff --git a/code-interpreter/app/api/routes.py b/code-interpreter/app/api/routes.py index 371ce23..d63e60b 100644 --- a/code-interpreter/app/api/routes.py +++ b/code-interpreter/app/api/routes.py @@ -8,6 +8,9 @@ from app.app_configs import get_settings from app.models.schemas import ( + CreateSessionRequest, + CreateSessionResponse, + ExecuteFile, ExecuteRequest, ExecuteResponse, FileMetadataResponse, @@ -19,7 +22,7 @@ WorkspaceFile, ) from app.services.executor_base import EntryKind, StreamChunk, StreamResult, WorkspaceEntry -from app.services.executor_factory import execute_python, execute_python_streaming +from app.services.executor_factory import execute_python, execute_python_streaming, get_executor from app.services.file_storage import FileStorageService router = APIRouter() @@ -46,8 +49,8 @@ def _validate_timeout(req: ExecuteRequest) -> None: ) -def _stage_request_files( - req: ExecuteRequest, +def _resolve_uploaded_files( + files: list[ExecuteFile], storage: FileStorageService, ) -> tuple[list[tuple[str, bytes]], dict[str, bytes]]: """Resolve uploaded file IDs into content for the executor. @@ -56,7 +59,7 @@ def _stage_request_files( """ staged_files: list[tuple[str, bytes]] = [] input_files_map: dict[str, bytes] = {} - for file in req.files: + for file in files: try: content, _ = storage.get_file(file.file_id) except FileNotFoundError as exc: @@ -69,6 +72,17 @@ def _stage_request_files( return staged_files, input_files_map +def _stage_request_files( + req: ExecuteRequest, + storage: FileStorageService, +) -> tuple[list[tuple[str, bytes]], dict[str, bytes]]: + """Resolve uploaded file IDs into content for the executor. + + Returns (staged_files, input_files_map). + """ + return _resolve_uploaded_files(req.files, storage) + + def _save_workspace_files( entries: tuple[WorkspaceEntry, ...], input_files_map: dict[str, bytes], @@ -248,3 +262,62 @@ def delete_file(file_id: str) -> Response: ) return Response(status_code=status.HTTP_204_NO_CONTENT) + + +@router.post( + "/sessions", + response_model=CreateSessionResponse, + status_code=status.HTTP_201_CREATED, +) +def create_session(req: CreateSessionRequest) -> CreateSessionResponse: + """Create a long-lived code-executor pod with the given TTL. + + The pod is guaranteed to be torn down at or before the TTL expires, even + if the API service crashes and restarts. + """ + settings = get_settings() + storage = get_file_storage() + staged_files, _ = _resolve_uploaded_files(req.files, storage) + + try: + info = get_executor().create_session( + ttl_seconds=req.ttl_seconds, + files=staged_files, + cpu_time_limit_sec=settings.cpu_time_limit_sec, + memory_limit_mb=settings.memory_limit_mb, + ) + except NotImplementedError as exc: + raise HTTPException( + status_code=status.HTTP_501_NOT_IMPLEMENTED, + detail=str(exc), + ) from exc + except ValueError as exc: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=str(exc), + ) from exc + + return CreateSessionResponse( + session_id=info.session_id, + expires_at=info.expires_at, + ) + + +@router.delete("/sessions/{session_id}", status_code=status.HTTP_204_NO_CONTENT) +def delete_session(session_id: str) -> Response: + """Tear down a session pod by ID.""" + try: + deleted = get_executor().delete_session(session_id) + except NotImplementedError as exc: + raise HTTPException( + status_code=status.HTTP_501_NOT_IMPLEMENTED, + detail=str(exc), + ) from exc + + if not deleted: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Session '{session_id}' not found", + ) + + return Response(status_code=status.HTTP_204_NO_CONTENT) diff --git a/code-interpreter/app/main.py b/code-interpreter/app/main.py index 25a4579..0ac7e1c 100644 --- a/code-interpreter/app/main.py +++ b/code-interpreter/app/main.py @@ -1,9 +1,10 @@ from __future__ import annotations +import asyncio import logging import subprocess from collections.abc import AsyncGenerator -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, suppress from shutil import which from typing import Final @@ -14,6 +15,8 @@ from app.models.schemas import HealthResponse from app.services.executor_factory import get_executor +SESSION_REAPER_INTERVAL_SEC = 30 + # Configure logging logging.basicConfig( level=logging.INFO, @@ -78,6 +81,24 @@ def _ensure_docker_image_available() -> None: ) from e +async def _reap_expired_sessions_once() -> None: + """Run a single reap pass via the configured executor.""" + try: + count = await asyncio.to_thread(get_executor().reap_expired_sessions) + except Exception: + logger.warning("Session reaper pass failed", exc_info=True) + return + if count > 0: + logger.info("Reaped %d expired session(s)", count) + + +async def _session_reaper_loop() -> None: + """Periodically delete sessions whose TTL has elapsed.""" + while True: + await asyncio.sleep(SESSION_REAPER_INTERVAL_SEC) + await _reap_expired_sessions_once() + + @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Manage application lifespan events.""" @@ -87,9 +108,16 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: _ensure_docker_image_available() logger.info("Docker executor image is ready") - yield + # Reap any sessions whose TTL elapsed while the service was down. + await _reap_expired_sessions_once() + reaper_task = asyncio.create_task(_session_reaper_loop()) - # Shutdown: Add any cleanup logic here if needed in the future + try: + yield + finally: + reaper_task.cancel() + with suppress(asyncio.CancelledError): + await reaper_task def create_app() -> FastAPI: diff --git a/code-interpreter/app/models/schemas.py b/code-interpreter/app/models/schemas.py index 5d85b19..3101682 100644 --- a/code-interpreter/app/models/schemas.py +++ b/code-interpreter/app/models/schemas.py @@ -120,3 +120,30 @@ class ListFilesResponse(BaseModel): class HealthResponse(BaseModel): status: Literal["ok", "error"] message: StrictStr | None = None + + +DEFAULT_SESSION_TTL_SEC = 15 * 60 +MAX_SESSION_TTL_SEC = 24 * 60 * 60 + + +class CreateSessionRequest(BaseModel): + files: list[ExecuteFile] = Field( + default_factory=list, + description="Files to stage in the session workspace at create time.", + ) + ttl_seconds: StrictInt = Field( + DEFAULT_SESSION_TTL_SEC, + ge=1, + le=MAX_SESSION_TTL_SEC, + description=( + "Session lifetime in seconds. The session pod is automatically " + "destroyed after this duration even if the API service crashes." + ), + ) + + +class CreateSessionResponse(BaseModel): + session_id: StrictStr = Field(..., description="Identifier for the session pod/container.") + expires_at: float = Field( + ..., description="Unix timestamp when the session is scheduled to expire." + ) diff --git a/code-interpreter/app/services/executor_base.py b/code-interpreter/app/services/executor_base.py index a0e8519..c5ee0df 100644 --- a/code-interpreter/app/services/executor_base.py +++ b/code-interpreter/app/services/executor_base.py @@ -106,6 +106,20 @@ class HealthCheck: message: str | None = None +@dataclass(frozen=True, slots=True) +class SessionInfo: + """Identifying information for a long-lived session.""" + + session_id: str + expires_at: float + + +SESSION_NAME_PREFIX = "code-session-" +SESSION_APP_LABEL = "code-interpreter" +SESSION_COMPONENT_LABEL = "session" +SESSION_EXPIRES_AT_KEY = "code-interpreter.expires-at" + + class ExecutorProtocol(Protocol): def execute_python( self, @@ -168,6 +182,30 @@ def execute_python_streaming( """ raise NotImplementedError(f"{type(self).__name__} does not support streaming execution") + def create_session( + self, + *, + ttl_seconds: int, + files: Sequence[tuple[str, bytes]] | None = None, + cpu_time_limit_sec: int | None = None, + memory_limit_mb: int | None = None, + ) -> SessionInfo: + """Create a long-lived execution environment. + + Returns identifying information for the session. The session is + guaranteed to be torn down at or before ``expires_at`` even if this + process crashes. + """ + raise NotImplementedError(f"{type(self).__name__} does not support sessions") + + def delete_session(self, session_id: str) -> bool: + """Tear down a session by ID. Returns True if found and deleted.""" + raise NotImplementedError(f"{type(self).__name__} does not support sessions") + + def reap_expired_sessions(self) -> int: + """Delete sessions whose TTL has elapsed. Returns number reaped.""" + return 0 + @staticmethod def truncate_output(stream: bytes, max_bytes: int) -> str: if len(stream) <= max_bytes: diff --git a/code-interpreter/app/services/executor_docker.py b/code-interpreter/app/services/executor_docker.py index 8219f7c..af18d25 100644 --- a/code-interpreter/app/services/executor_docker.py +++ b/code-interpreter/app/services/executor_docker.py @@ -8,7 +8,7 @@ import tarfile import time import uuid -from collections.abc import Generator, Sequence +from collections.abc import Generator, Mapping, Sequence from contextlib import contextmanager, suppress from dataclasses import dataclass from pathlib import Path @@ -21,10 +21,15 @@ PYTHON_EXECUTOR_DOCKER_RUN_ARGS, ) from app.services.executor_base import ( + SESSION_APP_LABEL, + SESSION_COMPONENT_LABEL, + SESSION_EXPIRES_AT_KEY, + SESSION_NAME_PREFIX, BaseExecutor, EntryKind, ExecutionResult, HealthCheck, + SessionInfo, StreamChunk, StreamEvent, StreamResult, @@ -134,58 +139,51 @@ def _validate_relative_path(self, path_str: str) -> Path: def _create_tar_archive( self, - code: str, + code: str | None = None, files: Sequence[tuple[str, bytes]] | None = None, last_line_interactive: bool = True, ) -> bytes: - """Create a tar archive containing the code and any additional files. + """Create a tar archive optionally containing an entrypoint and files. Args: - last_line_interactive: If True, wrap code so the last line prints its value - if it's a bare expression (only the last line is affected). + code: If provided, written as ``__main__.py`` at the archive root. + last_line_interactive: If True and code is provided, wrap the code so + the last line prints its value if it's a bare expression. """ tar_buffer = io.BytesIO() with tarfile.open(fileobj=tar_buffer, mode="w") as tar: - # Add __main__.py - optionally wrap in last-line-interactive mode - code_to_execute = code - if last_line_interactive: - # Wrap to make the last expression value print to stdout like Jupyter/REPL - code_to_execute = wrap_last_line_interactive(code) - - code_bytes = code_to_execute.encode("utf-8") - code_info = tarfile.TarInfo(name="__main__.py") - code_info.size = len(code_bytes) - code_info.mode = 0o644 - tar.addfile(code_info, io.BytesIO(code_bytes)) - - # Track directories we've created - created_dirs = set() - - # Add any additional files - if files: - for file_path, content in files: - # Validate the path - validated_path = self._validate_relative_path(file_path) - if validated_path == Path("__main__.py"): - raise ValueError( - "File path '__main__.py' is reserved for the execution entrypoint." - ) - - # Create parent directories if needed - parent_parts = validated_path.parts[:-1] - for i in range(len(parent_parts)): - dir_path = "/".join(parent_parts[: i + 1]) - if dir_path not in created_dirs: - dir_info = tarfile.TarInfo(name=dir_path + "/") - dir_info.type = tarfile.DIRTYPE - dir_info.mode = 0o755 - tar.addfile(dir_info) - created_dirs.add(dir_path) - - file_info = tarfile.TarInfo(name=validated_path.as_posix()) - file_info.size = len(content) - file_info.mode = 0o644 - tar.addfile(file_info, io.BytesIO(content)) + if code is not None: + code_to_execute = ( + wrap_last_line_interactive(code) if last_line_interactive else code + ) + code_bytes = code_to_execute.encode("utf-8") + code_info = tarfile.TarInfo(name="__main__.py") + code_info.size = len(code_bytes) + code_info.mode = 0o644 + tar.addfile(code_info, io.BytesIO(code_bytes)) + + created_dirs: set[str] = set() + for file_path, content in files or (): + validated_path = self._validate_relative_path(file_path) + if code is not None and validated_path == Path("__main__.py"): + raise ValueError( + "File path '__main__.py' is reserved for the execution entrypoint." + ) + + parent_parts = validated_path.parts[:-1] + for i in range(len(parent_parts)): + dir_path = "/".join(parent_parts[: i + 1]) + if dir_path not in created_dirs: + dir_info = tarfile.TarInfo(name=dir_path + "/") + dir_info.type = tarfile.DIRTYPE + dir_info.mode = 0o755 + tar.addfile(dir_info) + created_dirs.add(dir_path) + + file_info = tarfile.TarInfo(name=validated_path.as_posix()) + file_info.size = len(content) + file_info.mode = 0o644 + tar.addfile(file_info, io.BytesIO(content)) return tar_buffer.getvalue() @@ -245,11 +243,16 @@ def _build_run_command( container_name: str, cpu_time_limit_sec: int | None, memory_limit_mb: int | None, - timeout_ms: int, + sleep_seconds: int, + labels: Mapping[str, str] | None = None, ) -> list[str]: - """Build the ``docker run`` command for an ephemeral container.""" - # Start the container in detached mode - # We need CAP_CHOWN to set up the workspace, but we'll drop privileges for execution + """Build a detached ``docker run`` command. + + ``sleep_seconds`` controls how long the container's idle ``sleep`` lasts; + callers must ensure it exceeds their work duration. ``labels`` are + attached for later filtering (e.g. by the session reaper). + """ + # We need CAP_CHOWN to set up the workspace, but drop privileges for execution cmd: list[str] = [ self.docker_binary, "run", @@ -267,7 +270,6 @@ def _build_run_command( "64", "--security-opt", "no-new-privileges", - # Keep CAP_CHOWN to allow setting up workspace permissions "--cap-drop", "ALL", "--cap-add", @@ -288,31 +290,26 @@ def _build_run_command( "MPLCONFIGDIR=/tmp/matplotlib", ] + for key, value in (labels or {}).items(): + cmd.extend(["--label", f"{key}={value}"]) + if cpu_time_limit_sec is not None: - cpu_limit = max(int(cpu_time_limit_sec), 1) + cpu_limit = max(cpu_time_limit_sec, 1) cmd.extend(["--ulimit", f"cpu={cpu_limit}:{cpu_limit}"]) if memory_limit_mb is not None: - memory_limit = max(int(memory_limit_mb), 16) + memory_limit = max(memory_limit_mb, 16) mem_flag = f"{memory_limit}m" cmd.extend(["--memory", mem_flag, "--memory-swap", mem_flag]) if self.run_args: cmd.extend(shlex.split(self.run_args)) - # Just sleep - workspace is already created as tmpfs with correct ownership - cmd.extend([self.image, "sleep", str((timeout_ms * 1000) + 10)]) + cmd.extend([self.image, "sleep", str(sleep_seconds)]) return cmd - def _stage_files_in_container( - self, - container_name: str, - code: str, - files: Sequence[tuple[str, bytes]] | None, - last_line_interactive: bool, - ) -> None: - """Create a tar archive and stream it into the container workspace.""" - tar_archive = self._create_tar_archive(code, files, last_line_interactive) + def _upload_tar_to_container(self, container_name: str, tar_archive: bytes) -> None: + """Stream a tar archive into the container workspace.""" tar_cmd = [ self.docker_binary, "exec", @@ -331,6 +328,17 @@ def _stage_files_in_container( f"Failed to extract files: {tar_proc.stderr.decode('utf-8', errors='replace')}" ) + def _stage_files_in_container( + self, + container_name: str, + code: str, + files: Sequence[tuple[str, bytes]] | None, + last_line_interactive: bool, + ) -> None: + """Create a tar archive and stream it into the container workspace.""" + tar_archive = self._create_tar_archive(code, files, last_line_interactive) + self._upload_tar_to_container(container_name, tar_archive) + @contextmanager def _run_in_container( self, @@ -351,7 +359,10 @@ def _run_in_container( container_name = f"code-exec-{uuid.uuid4().hex}" cmd = self._build_run_command( - container_name, cpu_time_limit_sec, memory_limit_mb, timeout_ms + container_name=container_name, + cpu_time_limit_sec=cpu_time_limit_sec, + memory_limit_mb=memory_limit_mb, + sleep_seconds=(timeout_ms * 1000) + 10, ) start_proc = subprocess.run(cmd, capture_output=True, text=True) # nosec B603 if start_proc.returncode != 0: @@ -388,6 +399,109 @@ def _run_in_container( finally: self._kill_container(container_name) + def create_session( + self, + *, + ttl_seconds: int, + files: Sequence[tuple[str, bytes]] | None = None, + cpu_time_limit_sec: int | None = None, + memory_limit_mb: int | None = None, + ) -> SessionInfo: + container_name = f"{SESSION_NAME_PREFIX}{uuid.uuid4().hex}" + expires_at = time.time() + ttl_seconds + + cmd = self._build_run_command( + container_name=container_name, + cpu_time_limit_sec=cpu_time_limit_sec, + memory_limit_mb=memory_limit_mb, + sleep_seconds=ttl_seconds, + labels={ + "app": SESSION_APP_LABEL, + "component": SESSION_COMPONENT_LABEL, + SESSION_EXPIRES_AT_KEY: str(expires_at), + }, + ) + start_proc = subprocess.run(cmd, capture_output=True, text=True) # nosec B603 + if start_proc.returncode != 0: + raise RuntimeError(f"Failed to start session container: {start_proc.stderr}") + + try: + if files: + tar_archive = self._create_tar_archive(files=files) + self._upload_tar_to_container(container_name, tar_archive) + except Exception: + self._kill_container(container_name) + raise + + logger.info("Created session container %s (expires at %s)", container_name, expires_at) + return SessionInfo(session_id=container_name, expires_at=expires_at) + + def delete_session(self, session_id: str) -> bool: + if not session_id.startswith(SESSION_NAME_PREFIX): + return False + result = subprocess.run( # nosec B603 + [self.docker_binary, "rm", "-f", session_id], + capture_output=True, + text=True, + ) + if result.returncode == 0: + return True + # docker rm -f exits non-zero only when the container does not exist. + stderr = (result.stderr or "").lower() + if "no such container" in stderr or "not found" in stderr: + return False + raise RuntimeError(f"Failed to delete session {session_id}: {result.stderr}") + + def reap_expired_sessions(self) -> int: + list_cmd = [ + self.docker_binary, + "ps", + "-a", + "--filter", + f"label=app={SESSION_APP_LABEL}", + "--filter", + f"label=component={SESSION_COMPONENT_LABEL}", + "--format", + '{{.Names}}\t{{.Label "' + SESSION_EXPIRES_AT_KEY + '"}}', + ] + try: + list_result = subprocess.run( # nosec B603 + list_cmd, capture_output=True, text=True, timeout=10 + ) + except subprocess.TimeoutExpired: + logger.warning("Timed out listing session containers for reap") + return 0 + + if list_result.returncode != 0: + logger.warning("Failed to list session containers: %s", list_result.stderr) + return 0 + + now = time.time() + reaped = 0 + for line in list_result.stdout.splitlines(): + name, _, expires_str = line.partition("\t") + name = name.strip() + expires_str = expires_str.strip() + if not name or not expires_str: + continue + try: + expires_at = float(expires_str) + except ValueError: + continue + if expires_at >= now: + continue + rm_result = subprocess.run( # nosec B603 + [self.docker_binary, "rm", "-f", name], + capture_output=True, + text=True, + ) + if rm_result.returncode == 0: + reaped += 1 + logger.info("Reaped expired session container %s", name) + else: + logger.warning("Failed to reap session container %s: %s", name, rm_result.stderr) + return reaped + def execute_python( self, *, diff --git a/code-interpreter/app/services/executor_kubernetes.py b/code-interpreter/app/services/executor_kubernetes.py index 5247631..f4afe36 100644 --- a/code-interpreter/app/services/executor_kubernetes.py +++ b/code-interpreter/app/services/executor_kubernetes.py @@ -6,7 +6,7 @@ import tarfile import time import uuid -from collections.abc import Generator, Sequence +from collections.abc import Generator, Mapping, Sequence from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path @@ -28,10 +28,15 @@ KUBERNETES_EXECUTOR_SERVICE_ACCOUNT, ) from app.services.executor_base import ( + SESSION_APP_LABEL, + SESSION_COMPONENT_LABEL, + SESSION_EXPIRES_AT_KEY, + SESSION_NAME_PREFIX, BaseExecutor, EntryKind, ExecutionResult, HealthCheck, + SessionInfo, StreamChunk, StreamEvent, StreamResult, @@ -45,6 +50,8 @@ POD_DELETE_RETRY_DELAY_SECONDS = 0.2 POD_DELETE_CONFIRM_TIMEOUT_SECONDS = 2.0 +SESSION_LABEL_SELECTOR = f"app={SESSION_APP_LABEL},component={SESSION_COMPONENT_LABEL}" + def _parse_exit_code(error: str) -> int | None: """Parse the exit code from a Kubernetes exec error channel message.""" @@ -131,30 +138,36 @@ def check_health(self) -> HealthCheck: def _create_pod_manifest( self, pod_name: str, - memory_limit_mb: int | None, - cpu_time_limit_sec: int | None, + *, + command: Sequence[str], + labels: Mapping[str, str], + annotations: Mapping[str, str] | None = None, + active_deadline_seconds: int | None = None, + memory_limit_mb: int | None = None, + cpu_time_limit_sec: int | None = None, ) -> V1Pod: - """Create a Kubernetes pod manifest for code execution.""" + """Build a Kubernetes pod manifest for an isolated executor container. - resources: dict[str, dict[str, Any]] = { - "limits": {}, - "requests": {}, - } + ``command`` is the executor container's command (e.g. ``["sleep", "3600"]``). + ``active_deadline_seconds``, when set, instructs kubelet to stop the pod + at that age — used by sessions to enforce TTL even if the API is down. + """ + resources: dict[str, dict[str, Any]] = {"limits": {}, "requests": {}} if memory_limit_mb is not None: - memory_limit = max(int(memory_limit_mb), 16) + memory_limit = max(memory_limit_mb, 16) resources["limits"]["memory"] = f"{memory_limit}Mi" resources["requests"]["memory"] = f"{min(memory_limit, 64)}Mi" if cpu_time_limit_sec is not None: - cpu_limit = max(int(cpu_time_limit_sec), 1) + cpu_limit = max(cpu_time_limit_sec, 1) resources["limits"]["cpu"] = str(cpu_limit) resources["requests"]["cpu"] = "100m" container = V1Container( name="executor", image=self.image, - command=["sleep", "3600"], + command=list(command), working_dir="/workspace", resources=resources if resources["limits"] else None, security_context={ @@ -204,6 +217,7 @@ def _create_pod_manifest( init_containers=[network_lockdown_container], containers=[container], restart_policy="Never", + active_deadline_seconds=active_deadline_seconds, service_account_name=self.service_account if self.service_account else None, volumes=[ {"name": "workspace", "emptyDir": {"sizeLimit": "100Mi"}}, @@ -218,70 +232,65 @@ def _create_pod_manifest( metadata = V1ObjectMeta( name=pod_name, namespace=self.namespace, - labels={ - "app": "code-interpreter", - "component": "executor", - }, + labels=dict(labels), + annotations=dict(annotations) if annotations else None, ) return V1Pod(api_version="v1", kind="Pod", metadata=metadata, spec=spec) def _create_tar_archive( self, - code: str, + code: str | None = None, files: Sequence[tuple[str, bytes]] | None = None, last_line_interactive: bool = True, ) -> bytes: - """Create a tar archive containing the code and any additional files. + """Create a tar archive optionally containing an entrypoint and files. Args: - last_line_interactive: If True, wrap code so the last line prints its value - if it's a bare expression (only the last line is affected). + code: If provided, written as ``__main__.py`` at the archive root. + last_line_interactive: If True and code is provided, wrap the code so + the last line prints its value if it's a bare expression. """ tar_buffer = io.BytesIO() with tarfile.open(fileobj=tar_buffer, mode="w") as tar: - # Add __main__.py - optionally wrap in last-line-interactive mode - code_to_execute = code - if last_line_interactive: - # Wrap to make the last expression value print to stdout like Jupyter/REPL - code_to_execute = wrap_last_line_interactive(code) - - code_bytes = code_to_execute.encode("utf-8") - code_info = tarfile.TarInfo(name="__main__.py") - code_info.size = len(code_bytes) - code_info.mode = 0o644 - code_info.uid = 65532 - code_info.gid = 65532 - tar.addfile(code_info, io.BytesIO(code_bytes)) - - created_dirs = set() - - if files: - for file_path, content in files: - validated_path = self._validate_relative_path(file_path) - if validated_path == Path("__main__.py"): - raise ValueError( - "File path '__main__.py' is reserved for the execution entrypoint." - ) + if code is not None: + code_to_execute = ( + wrap_last_line_interactive(code) if last_line_interactive else code + ) + code_bytes = code_to_execute.encode("utf-8") + code_info = tarfile.TarInfo(name="__main__.py") + code_info.size = len(code_bytes) + code_info.mode = 0o644 + code_info.uid = 65532 + code_info.gid = 65532 + tar.addfile(code_info, io.BytesIO(code_bytes)) + + created_dirs: set[str] = set() + for file_path, content in files or (): + validated_path = self._validate_relative_path(file_path) + if code is not None and validated_path == Path("__main__.py"): + raise ValueError( + "File path '__main__.py' is reserved for the execution entrypoint." + ) - parent_parts = validated_path.parts[:-1] - for i in range(len(parent_parts)): - dir_path = "/".join(parent_parts[: i + 1]) - if dir_path not in created_dirs: - dir_info = tarfile.TarInfo(name=dir_path + "/") - dir_info.type = tarfile.DIRTYPE - dir_info.mode = 0o755 - dir_info.uid = 65532 - dir_info.gid = 65532 - tar.addfile(dir_info) - created_dirs.add(dir_path) - - file_info = tarfile.TarInfo(name=validated_path.as_posix()) - file_info.size = len(content) - file_info.mode = 0o644 - file_info.uid = 65532 - file_info.gid = 65532 - tar.addfile(file_info, io.BytesIO(content)) + parent_parts = validated_path.parts[:-1] + for i in range(len(parent_parts)): + dir_path = "/".join(parent_parts[: i + 1]) + if dir_path not in created_dirs: + dir_info = tarfile.TarInfo(name=dir_path + "/") + dir_info.type = tarfile.DIRTYPE + dir_info.mode = 0o755 + dir_info.uid = 65532 + dir_info.gid = 65532 + tar.addfile(dir_info) + created_dirs.add(dir_path) + + file_info = tarfile.TarInfo(name=validated_path.as_posix()) + file_info.size = len(content) + file_info.mode = 0o644 + file_info.uid = 65532 + file_info.gid = 65532 + tar.addfile(file_info, io.BytesIO(content)) return tar_buffer.getvalue() @@ -404,6 +413,8 @@ def _run_in_pod( pod_manifest = self._create_pod_manifest( pod_name=pod_name, + command=["sleep", "3600"], + labels={"app": "code-interpreter", "component": "executor"}, memory_limit_mb=memory_limit_mb, cpu_time_limit_sec=cpu_time_limit_sec, ) @@ -592,6 +603,105 @@ def _cleanup_pod(self, pod_name: str) -> None: POD_DELETE_RETRIES, ) + def create_session( + self, + *, + ttl_seconds: int, + files: Sequence[tuple[str, bytes]] | None = None, + cpu_time_limit_sec: int | None = None, + memory_limit_mb: int | None = None, + ) -> SessionInfo: + pod_name = f"{SESSION_NAME_PREFIX}{uuid.uuid4().hex}" + expires_at = time.time() + ttl_seconds + + manifest = self._create_pod_manifest( + pod_name=pod_name, + command=["sleep", str(ttl_seconds)], + labels={"app": SESSION_APP_LABEL, "component": SESSION_COMPONENT_LABEL}, + annotations={SESSION_EXPIRES_AT_KEY: str(expires_at)}, + active_deadline_seconds=ttl_seconds, + memory_limit_mb=memory_limit_mb, + cpu_time_limit_sec=cpu_time_limit_sec, + ) + + logger.info( + "Creating session pod %s in namespace %s (ttl=%ss)", + pod_name, + self.namespace, + ttl_seconds, + ) + self.v1.create_namespaced_pod(namespace=self.namespace, body=manifest) + + try: + self._wait_for_pod_ready(pod_name) + if files: + tar_archive = self._create_tar_archive(files=files) + self._upload_tar_to_pod(pod_name, tar_archive) + except Exception: + self._cleanup_pod(pod_name) + raise + + return SessionInfo(session_id=pod_name, expires_at=expires_at) + + def delete_session(self, session_id: str) -> bool: + if not session_id.startswith(SESSION_NAME_PREFIX): + return False + try: + self.v1.delete_namespaced_pod( + name=session_id, + namespace=self.namespace, + body=client.V1DeleteOptions(grace_period_seconds=0), + ) + except ApiException as e: + if e.status == 404: + return False + raise + return True + + def reap_expired_sessions(self) -> int: + try: + pods = self.v1.list_namespaced_pod( + namespace=self.namespace, + label_selector=SESSION_LABEL_SELECTOR, + ) + except ApiException as e: + logger.warning("Failed to list session pods for reap: %s", e) + return 0 + + now = time.time() + reaped = 0 + for pod in pods.items: + metadata = pod.metadata + annotations = metadata.annotations or {} + expires_str = annotations.get(SESSION_EXPIRES_AT_KEY) + if expires_str is None: + continue + try: + expires_at = float(expires_str) + except ValueError: + logger.warning( + "Session pod %s has invalid expires-at annotation %r", + metadata.name, + expires_str, + ) + continue + if expires_at >= now: + continue + try: + self.v1.delete_namespaced_pod( + name=metadata.name, + namespace=self.namespace, + body=client.V1DeleteOptions(grace_period_seconds=0), + ) + except ApiException as e: + if e.status == 404: + continue + logger.warning("Failed to reap session pod %s: %s", metadata.name, e) + continue + reaped += 1 + logger.info("Reaped expired session pod %s", metadata.name) + return reaped + def execute_python( self, *, diff --git a/code-interpreter/tests/integration_tests/test_sessions_docker.py b/code-interpreter/tests/integration_tests/test_sessions_docker.py new file mode 100644 index 0000000..3281f8d --- /dev/null +++ b/code-interpreter/tests/integration_tests/test_sessions_docker.py @@ -0,0 +1,219 @@ +"""Unit tests for DockerExecutor session methods. + +Mocks subprocess so the session lifecycle can be exercised without a real +Docker daemon. +""" + +from __future__ import annotations + +import subprocess +import time +from unittest.mock import MagicMock, patch + +import pytest + +from app.services.executor_base import ( + SESSION_APP_LABEL, + SESSION_COMPONENT_LABEL, + SESSION_EXPIRES_AT_KEY, + SESSION_NAME_PREFIX, +) +from app.services.executor_docker import DockerExecutor + + +@pytest.fixture() +def executor() -> DockerExecutor: + """Create a DockerExecutor bypassing __init__ (no docker binary needed).""" + inst = DockerExecutor.__new__(DockerExecutor) + inst.docker_binary = "/usr/bin/docker" + inst.image = "test:latest" + inst.run_args = "" + return inst + + +def _completed( + returncode: int, stdout: str = "", stderr: str = "" +) -> subprocess.CompletedProcess[str]: + """Build a CompletedProcess in text mode (subprocess calls use text=True).""" + return subprocess.CompletedProcess(args=[], returncode=returncode, stdout=stdout, stderr=stderr) + + +def _label_values(cmd: list[str]) -> list[str]: + return [cmd[i + 1] for i, arg in enumerate(cmd) if arg == "--label"] + + +# --------------------------------------------------------------------------- +# create_session +# --------------------------------------------------------------------------- + + +def test_create_session_returns_session_info(executor: DockerExecutor) -> None: + before = time.time() + with patch("app.services.executor_docker.subprocess.run", return_value=_completed(0)): + info = executor.create_session(ttl_seconds=600) + + assert info.session_id.startswith(SESSION_NAME_PREFIX) + assert before + 600 <= info.expires_at <= time.time() + 600 + + +def test_create_session_runs_docker_with_session_labels(executor: DockerExecutor) -> None: + with patch("app.services.executor_docker.subprocess.run") as run: + run.return_value = _completed(0) + executor.create_session(ttl_seconds=600) + + cmd = run.call_args.args[0] + label_values = _label_values(cmd) + assert f"app={SESSION_APP_LABEL}" in label_values + assert f"component={SESSION_COMPONENT_LABEL}" in label_values + assert any(v.startswith(f"{SESSION_EXPIRES_AT_KEY}=") for v in label_values) + + +def test_create_session_sleeps_for_ttl(executor: DockerExecutor) -> None: + """The container's idle command must be ``sleep `` so it self-destructs at TTL.""" + with patch("app.services.executor_docker.subprocess.run") as run: + run.return_value = _completed(0) + executor.create_session(ttl_seconds=600) + + cmd = run.call_args.args[0] + assert cmd[-3:] == [executor.image, "sleep", "600"] + assert "--rm" in cmd # ensures self-cleanup at TTL + + +def test_create_session_stages_files(executor: DockerExecutor) -> None: + with ( + patch("app.services.executor_docker.subprocess.run", return_value=_completed(0)), + patch.object(executor, "_upload_tar_to_container") as upload, + ): + info = executor.create_session(ttl_seconds=300, files=[("data.txt", b"hello")]) + + upload.assert_called_once() + container_arg, tar_arg = upload.call_args.args + assert container_arg == info.session_id + assert isinstance(tar_arg, bytes) + assert len(tar_arg) > 0 + + +def test_create_session_skips_upload_when_no_files(executor: DockerExecutor) -> None: + with ( + patch("app.services.executor_docker.subprocess.run", return_value=_completed(0)), + patch.object(executor, "_upload_tar_to_container") as upload, + ): + executor.create_session(ttl_seconds=300) + + upload.assert_not_called() + + +def test_create_session_kills_container_on_staging_failure(executor: DockerExecutor) -> None: + with ( + patch("app.services.executor_docker.subprocess.run", return_value=_completed(0)), + patch.object(executor, "_upload_tar_to_container", side_effect=RuntimeError("boom")), + patch.object(executor, "_kill_container") as kill, + pytest.raises(RuntimeError, match="boom"), + ): + executor.create_session(ttl_seconds=300, files=[("data.txt", b"x")]) + + kill.assert_called_once() + + +def test_create_session_raises_when_docker_run_fails(executor: DockerExecutor) -> None: + with ( + patch( + "app.services.executor_docker.subprocess.run", + return_value=_completed(1, stderr="docker daemon down"), + ), + pytest.raises(RuntimeError, match="Failed to start session container"), + ): + executor.create_session(ttl_seconds=300) + + +# --------------------------------------------------------------------------- +# delete_session +# --------------------------------------------------------------------------- + + +def test_delete_session_returns_true_on_success(executor: DockerExecutor) -> None: + with patch("app.services.executor_docker.subprocess.run", return_value=_completed(0)): + assert executor.delete_session(f"{SESSION_NAME_PREFIX}abc") is True + + +def test_delete_session_returns_false_on_no_such_container(executor: DockerExecutor) -> None: + with patch( + "app.services.executor_docker.subprocess.run", + return_value=_completed(1, stderr="Error: No such container: code-session-abc"), + ): + assert executor.delete_session(f"{SESSION_NAME_PREFIX}abc") is False + + +def test_delete_session_rejects_non_session_id(executor: DockerExecutor) -> None: + """Prefix check prevents accidentally deleting unrelated containers.""" + run_mock = MagicMock() + with patch("app.services.executor_docker.subprocess.run", run_mock): + assert executor.delete_session("random-name") is False + run_mock.assert_not_called() + + +def test_delete_session_raises_on_unexpected_failure(executor: DockerExecutor) -> None: + with ( + patch( + "app.services.executor_docker.subprocess.run", + return_value=_completed(1, stderr="some other failure"), + ), + pytest.raises(RuntimeError, match="Failed to delete session"), + ): + executor.delete_session(f"{SESSION_NAME_PREFIX}abc") + + +# --------------------------------------------------------------------------- +# reap_expired_sessions +# --------------------------------------------------------------------------- + + +def test_reap_deletes_expired_containers(executor: DockerExecutor) -> None: + now = time.time() + list_output = f"code-session-old\t{now - 100}\ncode-session-new\t{now + 100}\n" + + with patch("app.services.executor_docker.subprocess.run") as run: + run.side_effect = [ + _completed(0, stdout=list_output), # ps + _completed(0), # rm code-session-old + ] + assert executor.reap_expired_sessions() == 1 + + rm_call = run.call_args_list[1] + assert rm_call.args[0] == [executor.docker_binary, "rm", "-f", "code-session-old"] + + +def test_reap_skips_invalid_lines(executor: DockerExecutor) -> None: + list_output = "code-session-bad\tnot-a-number\n\ncode-session-empty\t\n" + with patch("app.services.executor_docker.subprocess.run") as run: + run.return_value = _completed(0, stdout=list_output) + assert executor.reap_expired_sessions() == 0 + # Only the list call should have been made — no rm + assert run.call_count == 1 + + +def test_reap_returns_zero_when_list_fails(executor: DockerExecutor) -> None: + with patch("app.services.executor_docker.subprocess.run") as run: + run.return_value = _completed(1, stderr="docker not available") + assert executor.reap_expired_sessions() == 0 + + +def test_reap_returns_zero_on_list_timeout(executor: DockerExecutor) -> None: + with patch( + "app.services.executor_docker.subprocess.run", + side_effect=subprocess.TimeoutExpired(cmd="docker", timeout=10), + ): + assert executor.reap_expired_sessions() == 0 + + +def test_reap_continues_when_individual_rm_fails(executor: DockerExecutor) -> None: + """A failed rm of one container shouldn't stop us from reaping others.""" + now = time.time() + list_output = f"code-session-a\t{now - 100}\ncode-session-b\t{now - 100}\n" + with patch("app.services.executor_docker.subprocess.run") as run: + run.side_effect = [ + _completed(0, stdout=list_output), # ps + _completed(1, stderr="rm failed"), # rm a + _completed(0), # rm b + ] + assert executor.reap_expired_sessions() == 1 diff --git a/code-interpreter/tests/integration_tests/test_sessions_kubernetes.py b/code-interpreter/tests/integration_tests/test_sessions_kubernetes.py new file mode 100644 index 0000000..706c159 --- /dev/null +++ b/code-interpreter/tests/integration_tests/test_sessions_kubernetes.py @@ -0,0 +1,206 @@ +"""Unit tests for KubernetesExecutor session methods. + +Mocks the Kubernetes API so the session lifecycle can be exercised without +a real cluster. +""" + +from __future__ import annotations + +import time +from unittest.mock import MagicMock, patch + +import pytest +from kubernetes.client.exceptions import ApiException # type: ignore[import-untyped] + +from app.services.executor_base import ( + SESSION_APP_LABEL, + SESSION_COMPONENT_LABEL, + SESSION_EXPIRES_AT_KEY, + SESSION_NAME_PREFIX, +) +from app.services.executor_kubernetes import ( + SESSION_LABEL_SELECTOR, + KubernetesExecutor, +) + + +@pytest.fixture() +def executor() -> KubernetesExecutor: + """Create a KubernetesExecutor bypassing __init__ (no cluster needed).""" + inst = KubernetesExecutor.__new__(KubernetesExecutor) + inst.v1 = MagicMock() + inst.namespace = "test" + inst.image = "test:latest" + inst.service_account = "" + pod_mock = MagicMock() + pod_mock.status.phase = "Running" + inst.v1.read_namespaced_pod.return_value = pod_mock + return inst + + +def _make_pod(name: str, expires_at: float | None) -> MagicMock: + pod = MagicMock() + pod.metadata.name = name + pod.metadata.annotations = ( + {SESSION_EXPIRES_AT_KEY: str(expires_at)} if expires_at is not None else {} + ) + return pod + + +# --------------------------------------------------------------------------- +# create_session +# --------------------------------------------------------------------------- + + +def test_create_session_returns_session_info(executor: KubernetesExecutor) -> None: + before = time.time() + info = executor.create_session(ttl_seconds=600) + + assert info.session_id.startswith(SESSION_NAME_PREFIX) + assert before + 600 <= info.expires_at <= time.time() + 600 + + +def test_create_session_pod_carries_session_metadata(executor: KubernetesExecutor) -> None: + info = executor.create_session(ttl_seconds=600) + + pod = executor.v1.create_namespaced_pod.call_args.kwargs["body"] + assert pod.metadata.name == info.session_id + assert pod.metadata.labels["app"] == SESSION_APP_LABEL + assert pod.metadata.labels["component"] == SESSION_COMPONENT_LABEL + assert pod.metadata.annotations[SESSION_EXPIRES_AT_KEY] == str(info.expires_at) + + +def test_create_session_sets_active_deadline(executor: KubernetesExecutor) -> None: + """active_deadline_seconds is what makes kubelet stop the pod at TTL even if API is down.""" + executor.create_session(ttl_seconds=600) + + pod = executor.v1.create_namespaced_pod.call_args.kwargs["body"] + assert pod.spec.active_deadline_seconds == 600 + assert pod.spec.containers[0].command == ["sleep", "600"] + + +def test_create_session_stages_files(executor: KubernetesExecutor) -> None: + with patch.object(executor, "_upload_tar_to_pod") as upload: + info = executor.create_session( + ttl_seconds=300, + files=[("data.txt", b"hello")], + ) + + upload.assert_called_once() + pod_name_arg, tar_arg = upload.call_args.args + assert pod_name_arg == info.session_id + assert isinstance(tar_arg, bytes) + assert len(tar_arg) > 0 + + +def test_create_session_skips_upload_when_no_files(executor: KubernetesExecutor) -> None: + with patch.object(executor, "_upload_tar_to_pod") as upload: + executor.create_session(ttl_seconds=300) + + upload.assert_not_called() + + +def test_create_session_cleans_up_on_staging_failure(executor: KubernetesExecutor) -> None: + with ( + patch.object(executor, "_upload_tar_to_pod", side_effect=RuntimeError("boom")), + patch.object(executor, "_cleanup_pod") as cleanup, + pytest.raises(RuntimeError, match="boom"), + ): + executor.create_session(ttl_seconds=300, files=[("data.txt", b"x")]) + + cleanup.assert_called_once() + + +# --------------------------------------------------------------------------- +# delete_session +# --------------------------------------------------------------------------- + + +def test_delete_session_returns_true_on_success(executor: KubernetesExecutor) -> None: + assert executor.delete_session(f"{SESSION_NAME_PREFIX}abc") is True + executor.v1.delete_namespaced_pod.assert_called_once() + + +def test_delete_session_returns_false_on_404(executor: KubernetesExecutor) -> None: + executor.v1.delete_namespaced_pod.side_effect = ApiException(status=404) + assert executor.delete_session(f"{SESSION_NAME_PREFIX}abc") is False + + +def test_delete_session_rejects_non_session_id(executor: KubernetesExecutor) -> None: + """Prefix check prevents accidentally deleting unrelated pods.""" + assert executor.delete_session("code-exec-abc") is False + executor.v1.delete_namespaced_pod.assert_not_called() + + +def test_delete_session_propagates_other_api_errors(executor: KubernetesExecutor) -> None: + executor.v1.delete_namespaced_pod.side_effect = ApiException(status=500) + with pytest.raises(ApiException): + executor.delete_session(f"{SESSION_NAME_PREFIX}abc") + + +# --------------------------------------------------------------------------- +# reap_expired_sessions +# --------------------------------------------------------------------------- + + +def test_reap_deletes_expired_pods(executor: KubernetesExecutor) -> None: + now = time.time() + expired_pod = _make_pod("code-session-old", expires_at=now - 100) + fresh_pod = _make_pod("code-session-new", expires_at=now + 100) + executor.v1.list_namespaced_pod.return_value = MagicMock(items=[expired_pod, fresh_pod]) + + reaped = executor.reap_expired_sessions() + + assert reaped == 1 + executor.v1.list_namespaced_pod.assert_called_once_with( + namespace=executor.namespace, + label_selector=SESSION_LABEL_SELECTOR, + ) + executor.v1.delete_namespaced_pod.assert_called_once() + delete_kwargs = executor.v1.delete_namespaced_pod.call_args.kwargs + assert delete_kwargs["name"] == "code-session-old" + + +def test_reap_skips_pods_without_annotation(executor: KubernetesExecutor) -> None: + pod = _make_pod("code-session-bare", expires_at=None) + executor.v1.list_namespaced_pod.return_value = MagicMock(items=[pod]) + + assert executor.reap_expired_sessions() == 0 + executor.v1.delete_namespaced_pod.assert_not_called() + + +def test_reap_skips_pods_with_invalid_annotation(executor: KubernetesExecutor) -> None: + pod = MagicMock() + pod.metadata.name = "code-session-bad" + pod.metadata.annotations = {SESSION_EXPIRES_AT_KEY: "not-a-number"} + executor.v1.list_namespaced_pod.return_value = MagicMock(items=[pod]) + + assert executor.reap_expired_sessions() == 0 + executor.v1.delete_namespaced_pod.assert_not_called() + + +def test_reap_handles_list_failure(executor: KubernetesExecutor) -> None: + executor.v1.list_namespaced_pod.side_effect = ApiException(status=500) + assert executor.reap_expired_sessions() == 0 + + +def test_reap_continues_when_individual_delete_404s(executor: KubernetesExecutor) -> None: + """Race: pod can vanish between list and delete — treat as already-gone.""" + now = time.time() + expired_a = _make_pod("code-session-a", expires_at=now - 100) + expired_b = _make_pod("code-session-b", expires_at=now - 100) + executor.v1.list_namespaced_pod.return_value = MagicMock(items=[expired_a, expired_b]) + executor.v1.delete_namespaced_pod.side_effect = [ApiException(status=404), None] + + assert executor.reap_expired_sessions() == 1 + + +def test_reap_keeps_going_when_delete_errors(executor: KubernetesExecutor) -> None: + """A 500 on one delete shouldn't prevent others from being reaped.""" + now = time.time() + expired_a = _make_pod("code-session-a", expires_at=now - 100) + expired_b = _make_pod("code-session-b", expires_at=now - 100) + executor.v1.list_namespaced_pod.return_value = MagicMock(items=[expired_a, expired_b]) + executor.v1.delete_namespaced_pod.side_effect = [ApiException(status=500), None] + + assert executor.reap_expired_sessions() == 1 diff --git a/code-interpreter/tests/integration_tests/test_sessions_routes.py b/code-interpreter/tests/integration_tests/test_sessions_routes.py new file mode 100644 index 0000000..2a602ba --- /dev/null +++ b/code-interpreter/tests/integration_tests/test_sessions_routes.py @@ -0,0 +1,170 @@ +"""Route-layer tests for /v1/sessions. + +Patches the executor so the routes can be exercised without a real Docker +daemon or Kubernetes cluster. +""" + +from __future__ import annotations + +from collections.abc import Generator +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient + +from app.main import create_app +from app.services.executor_base import SessionInfo +from app.services.executor_factory import get_executor + + +@pytest.fixture(autouse=True) +def _clear_executor_cache() -> Generator[None, None, None]: + get_executor.cache_clear() + yield + get_executor.cache_clear() + + +def test_create_session_returns_id_and_expires_at() -> None: + mock_executor = MagicMock() + mock_executor.create_session.return_value = SessionInfo( + session_id="code-session-abc", expires_at=12345.6 + ) + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.post("/v1/sessions", json={}) + + assert response.status_code == 201 + body = response.json() + assert body["session_id"] == "code-session-abc" + assert body["expires_at"] == 12345.6 + + +def test_create_session_defaults_ttl_to_15_minutes() -> None: + mock_executor = MagicMock() + mock_executor.create_session.return_value = SessionInfo( + session_id="code-session-x", expires_at=0.0 + ) + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + client.post("/v1/sessions", json={}) + + assert mock_executor.create_session.call_args.kwargs["ttl_seconds"] == 900 + + +def test_create_session_uses_provided_ttl() -> None: + mock_executor = MagicMock() + mock_executor.create_session.return_value = SessionInfo( + session_id="code-session-x", expires_at=0.0 + ) + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + client.post("/v1/sessions", json={"ttl_seconds": 300}) + + assert mock_executor.create_session.call_args.kwargs["ttl_seconds"] == 300 + + +def test_create_session_rejects_non_positive_ttl() -> None: + client = TestClient(create_app()) + response = client.post("/v1/sessions", json={"ttl_seconds": 0}) + assert response.status_code == 422 + + +def test_create_session_rejects_oversized_ttl() -> None: + client = TestClient(create_app()) + response = client.post("/v1/sessions", json={"ttl_seconds": 86_401}) + assert response.status_code == 422 + + +def test_create_session_returns_404_for_unknown_file_id() -> None: + client = TestClient(create_app()) + response = client.post( + "/v1/sessions", + json={"files": [{"path": "data.txt", "file_id": "does-not-exist"}]}, + ) + assert response.status_code == 404 + assert "not found" in response.json()["detail"].lower() + + +def test_create_session_resolves_file_ids_into_content() -> None: + """Files referenced by file_id must be loaded and passed to the executor.""" + client = TestClient(create_app()) + upload_resp = client.post( + "/v1/files", + files={"file": ("data.txt", b"hello bytes", "application/octet-stream")}, + ) + file_id = upload_resp.json()["file_id"] + + mock_executor = MagicMock() + mock_executor.create_session.return_value = SessionInfo( + session_id="code-session-x", expires_at=0.0 + ) + + with patch("app.api.routes.get_executor", return_value=mock_executor): + response = client.post( + "/v1/sessions", + json={"files": [{"path": "inputs/data.txt", "file_id": file_id}]}, + ) + + assert response.status_code == 201 + files_arg = mock_executor.create_session.call_args.kwargs["files"] + assert files_arg == [("inputs/data.txt", b"hello bytes")] + + +def test_create_session_returns_422_when_executor_raises_value_error() -> None: + mock_executor = MagicMock() + mock_executor.create_session.side_effect = ValueError("bad path") + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.post("/v1/sessions", json={}) + + assert response.status_code == 422 + assert "bad path" in response.json()["detail"] + + +def test_create_session_returns_501_when_unsupported() -> None: + mock_executor = MagicMock() + mock_executor.create_session.side_effect = NotImplementedError("nope") + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.post("/v1/sessions", json={}) + + assert response.status_code == 501 + + +def test_delete_session_returns_204_when_found() -> None: + mock_executor = MagicMock() + mock_executor.delete_session.return_value = True + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.delete("/v1/sessions/code-session-abc") + + assert response.status_code == 204 + mock_executor.delete_session.assert_called_once_with("code-session-abc") + + +def test_delete_session_returns_404_when_unknown() -> None: + mock_executor = MagicMock() + mock_executor.delete_session.return_value = False + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.delete("/v1/sessions/code-session-missing") + + assert response.status_code == 404 + + +def test_delete_session_returns_501_when_unsupported() -> None: + mock_executor = MagicMock() + mock_executor.delete_session.side_effect = NotImplementedError("nope") + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.delete("/v1/sessions/code-session-abc") + + assert response.status_code == 501