diff --git a/code-interpreter/app/api/routes.py b/code-interpreter/app/api/routes.py index f90f600..d63e60b 100644 --- a/code-interpreter/app/api/routes.py +++ b/code-interpreter/app/api/routes.py @@ -76,7 +76,10 @@ def _stage_request_files( req: ExecuteRequest, storage: FileStorageService, ) -> tuple[list[tuple[str, bytes]], dict[str, bytes]]: - """Resolve uploaded file IDs into content for the executor.""" + """Resolve uploaded file IDs into content for the executor. + + Returns (staged_files, input_files_map). + """ return _resolve_uploaded_files(req.files, storage) @@ -267,9 +270,10 @@ def delete_file(file_id: str) -> Response: status_code=status.HTTP_201_CREATED, ) def create_session(req: CreateSessionRequest) -> CreateSessionResponse: - """Create a long-lived code-executor pod. + """Create a long-lived code-executor pod with the given TTL. - The session must be torn down explicitly via DELETE /v1/sessions/{id}. + The pod is guaranteed to be torn down at or before the TTL expires, even + if the API service crashes and restarts. """ settings = get_settings() storage = get_file_storage() @@ -277,6 +281,7 @@ def create_session(req: CreateSessionRequest) -> CreateSessionResponse: try: info = get_executor().create_session( + ttl_seconds=req.ttl_seconds, files=staged_files, cpu_time_limit_sec=settings.cpu_time_limit_sec, memory_limit_mb=settings.memory_limit_mb, @@ -292,7 +297,10 @@ def create_session(req: CreateSessionRequest) -> CreateSessionResponse: detail=str(exc), ) from exc - return CreateSessionResponse(session_id=info.session_id) + return CreateSessionResponse( + session_id=info.session_id, + expires_at=info.expires_at, + ) @router.delete("/sessions/{session_id}", status_code=status.HTTP_204_NO_CONTENT) diff --git a/code-interpreter/app/main.py b/code-interpreter/app/main.py index 25a4579..0ac7e1c 100644 --- a/code-interpreter/app/main.py +++ b/code-interpreter/app/main.py @@ -1,9 +1,10 @@ from __future__ import annotations +import asyncio import logging import subprocess from collections.abc import AsyncGenerator -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager, suppress from shutil import which from typing import Final @@ -14,6 +15,8 @@ from app.models.schemas import HealthResponse from app.services.executor_factory import get_executor +SESSION_REAPER_INTERVAL_SEC = 30 + # Configure logging logging.basicConfig( level=logging.INFO, @@ -78,6 +81,24 @@ def _ensure_docker_image_available() -> None: ) from e +async def _reap_expired_sessions_once() -> None: + """Run a single reap pass via the configured executor.""" + try: + count = await asyncio.to_thread(get_executor().reap_expired_sessions) + except Exception: + logger.warning("Session reaper pass failed", exc_info=True) + return + if count > 0: + logger.info("Reaped %d expired session(s)", count) + + +async def _session_reaper_loop() -> None: + """Periodically delete sessions whose TTL has elapsed.""" + while True: + await asyncio.sleep(SESSION_REAPER_INTERVAL_SEC) + await _reap_expired_sessions_once() + + @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: """Manage application lifespan events.""" @@ -87,9 +108,16 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: _ensure_docker_image_available() logger.info("Docker executor image is ready") - yield + # Reap any sessions whose TTL elapsed while the service was down. + await _reap_expired_sessions_once() + reaper_task = asyncio.create_task(_session_reaper_loop()) - # Shutdown: Add any cleanup logic here if needed in the future + try: + yield + finally: + reaper_task.cancel() + with suppress(asyncio.CancelledError): + await reaper_task def create_app() -> FastAPI: diff --git a/code-interpreter/app/models/schemas.py b/code-interpreter/app/models/schemas.py index 53a82a6..3101682 100644 --- a/code-interpreter/app/models/schemas.py +++ b/code-interpreter/app/models/schemas.py @@ -122,12 +122,28 @@ class HealthResponse(BaseModel): message: StrictStr | None = None +DEFAULT_SESSION_TTL_SEC = 15 * 60 +MAX_SESSION_TTL_SEC = 24 * 60 * 60 + + class CreateSessionRequest(BaseModel): files: list[ExecuteFile] = Field( default_factory=list, description="Files to stage in the session workspace at create time.", ) + ttl_seconds: StrictInt = Field( + DEFAULT_SESSION_TTL_SEC, + ge=1, + le=MAX_SESSION_TTL_SEC, + description=( + "Session lifetime in seconds. The session pod is automatically " + "destroyed after this duration even if the API service crashes." + ), + ) class CreateSessionResponse(BaseModel): session_id: StrictStr = Field(..., description="Identifier for the session pod/container.") + expires_at: float = Field( + ..., description="Unix timestamp when the session is scheduled to expire." + ) diff --git a/code-interpreter/app/services/executor_base.py b/code-interpreter/app/services/executor_base.py index d9297f4..c5ee0df 100644 --- a/code-interpreter/app/services/executor_base.py +++ b/code-interpreter/app/services/executor_base.py @@ -111,11 +111,13 @@ class SessionInfo: """Identifying information for a long-lived session.""" session_id: str + expires_at: float SESSION_NAME_PREFIX = "code-session-" SESSION_APP_LABEL = "code-interpreter" SESSION_COMPONENT_LABEL = "session" +SESSION_EXPIRES_AT_KEY = "code-interpreter.expires-at" class ExecutorProtocol(Protocol): @@ -183,14 +185,16 @@ def execute_python_streaming( def create_session( self, *, + ttl_seconds: int, files: Sequence[tuple[str, bytes]] | None = None, cpu_time_limit_sec: int | None = None, memory_limit_mb: int | None = None, ) -> SessionInfo: """Create a long-lived execution environment. - Returns identifying information for the session. The caller is - responsible for invoking ``delete_session`` when finished. + Returns identifying information for the session. The session is + guaranteed to be torn down at or before ``expires_at`` even if this + process crashes. """ raise NotImplementedError(f"{type(self).__name__} does not support sessions") @@ -198,6 +202,10 @@ def delete_session(self, session_id: str) -> bool: """Tear down a session by ID. Returns True if found and deleted.""" raise NotImplementedError(f"{type(self).__name__} does not support sessions") + def reap_expired_sessions(self) -> int: + """Delete sessions whose TTL has elapsed. Returns number reaped.""" + return 0 + @staticmethod def truncate_output(stream: bytes, max_bytes: int) -> str: if len(stream) <= max_bytes: diff --git a/code-interpreter/app/services/executor_docker.py b/code-interpreter/app/services/executor_docker.py index 967ed4a..3e2757c 100644 --- a/code-interpreter/app/services/executor_docker.py +++ b/code-interpreter/app/services/executor_docker.py @@ -23,6 +23,7 @@ from app.services.executor_base import ( SESSION_APP_LABEL, SESSION_COMPONENT_LABEL, + SESSION_EXPIRES_AT_KEY, SESSION_NAME_PREFIX, BaseExecutor, EntryKind, @@ -38,10 +39,6 @@ logger = logging.getLogger(__name__) -# Sessions keep their idle container alive for at most this many seconds; a -# follow-up PR replaces this with a per-session TTL plus a reaper. -SESSION_MAX_LIFETIME_SECONDS = 24 * 60 * 60 - @dataclass class _ExecContext: @@ -405,20 +402,23 @@ def _run_in_container( def create_session( self, *, + ttl_seconds: int, files: Sequence[tuple[str, bytes]] | None = None, cpu_time_limit_sec: int | None = None, memory_limit_mb: int | None = None, ) -> SessionInfo: container_name = f"{SESSION_NAME_PREFIX}{uuid.uuid4().hex}" + expires_at = time.time() + ttl_seconds cmd = self._build_run_command( container_name=container_name, cpu_time_limit_sec=cpu_time_limit_sec, memory_limit_mb=memory_limit_mb, - sleep_seconds=SESSION_MAX_LIFETIME_SECONDS, + sleep_seconds=ttl_seconds, labels={ "app": SESSION_APP_LABEL, "component": SESSION_COMPONENT_LABEL, + SESSION_EXPIRES_AT_KEY: str(expires_at), }, ) start_proc = subprocess.run(cmd, capture_output=True, text=True) # nosec B603 @@ -433,8 +433,8 @@ def create_session( self._kill_container(container_name) raise - logger.info("Created session container %s", container_name) - return SessionInfo(session_id=container_name) + logger.info("Created session container %s (expires at %s)", container_name, expires_at) + return SessionInfo(session_id=container_name, expires_at=expires_at) def delete_session(self, session_id: str) -> bool: if not session_id.startswith(SESSION_NAME_PREFIX): @@ -444,15 +444,64 @@ def delete_session(self, session_id: str) -> bool: capture_output=True, text=True, ) - # `docker rm -f ` exits 0 on modern Docker, so check stderr - # for the "not found" message regardless of exit code. + if result.returncode == 0: + return True + # docker rm -f exits non-zero only when the container does not exist. stderr = (result.stderr or "").lower() if "no such container" in stderr or "not found" in stderr: return False - if result.returncode == 0: - return True raise RuntimeError(f"Failed to delete session {session_id}: {result.stderr}") + def reap_expired_sessions(self) -> int: + list_cmd = [ + self.docker_binary, + "ps", + "-a", + "--filter", + f"label=app={SESSION_APP_LABEL}", + "--filter", + f"label=component={SESSION_COMPONENT_LABEL}", + "--format", + f'{{{{.Names}}}}\t{{{{.Label "{SESSION_EXPIRES_AT_KEY}"}}}}', + ] + try: + list_result = subprocess.run( # nosec B603 + list_cmd, capture_output=True, text=True, timeout=10 + ) + except subprocess.TimeoutExpired: + logger.warning("Timed out listing session containers for reap") + return 0 + + if list_result.returncode != 0: + logger.warning("Failed to list session containers: %s", list_result.stderr) + return 0 + + now = time.time() + reaped = 0 + for line in list_result.stdout.splitlines(): + name, _, expires_str = line.partition("\t") + name = name.strip() + expires_str = expires_str.strip() + if not name or not expires_str: + continue + try: + expires_at = float(expires_str) + except ValueError: + continue + if expires_at >= now: + continue + rm_result = subprocess.run( # nosec B603 + [self.docker_binary, "rm", "-f", name], + capture_output=True, + text=True, + ) + if rm_result.returncode == 0: + reaped += 1 + logger.info("Reaped expired session container %s", name) + else: + logger.warning("Failed to reap session container %s: %s", name, rm_result.stderr) + return reaped + def execute_python( self, *, diff --git a/code-interpreter/app/services/executor_kubernetes.py b/code-interpreter/app/services/executor_kubernetes.py index 27c05f1..f4afe36 100644 --- a/code-interpreter/app/services/executor_kubernetes.py +++ b/code-interpreter/app/services/executor_kubernetes.py @@ -30,6 +30,7 @@ from app.services.executor_base import ( SESSION_APP_LABEL, SESSION_COMPONENT_LABEL, + SESSION_EXPIRES_AT_KEY, SESSION_NAME_PREFIX, BaseExecutor, EntryKind, @@ -49,9 +50,7 @@ POD_DELETE_RETRY_DELAY_SECONDS = 0.2 POD_DELETE_CONFIRM_TIMEOUT_SECONDS = 2.0 -# Sessions keep their idle pod alive for at most this many seconds; a follow-up -# PR replaces this with a per-session TTL plus a reaper. -SESSION_MAX_LIFETIME_SECONDS = 24 * 60 * 60 +SESSION_LABEL_SELECTOR = f"app={SESSION_APP_LABEL},component={SESSION_COMPONENT_LABEL}" def _parse_exit_code(error: str) -> int | None: @@ -607,21 +606,30 @@ def _cleanup_pod(self, pod_name: str) -> None: def create_session( self, *, + ttl_seconds: int, files: Sequence[tuple[str, bytes]] | None = None, cpu_time_limit_sec: int | None = None, memory_limit_mb: int | None = None, ) -> SessionInfo: pod_name = f"{SESSION_NAME_PREFIX}{uuid.uuid4().hex}" + expires_at = time.time() + ttl_seconds manifest = self._create_pod_manifest( pod_name=pod_name, - command=["sleep", str(SESSION_MAX_LIFETIME_SECONDS)], + command=["sleep", str(ttl_seconds)], labels={"app": SESSION_APP_LABEL, "component": SESSION_COMPONENT_LABEL}, + annotations={SESSION_EXPIRES_AT_KEY: str(expires_at)}, + active_deadline_seconds=ttl_seconds, memory_limit_mb=memory_limit_mb, cpu_time_limit_sec=cpu_time_limit_sec, ) - logger.info("Creating session pod %s in namespace %s", pod_name, self.namespace) + logger.info( + "Creating session pod %s in namespace %s (ttl=%ss)", + pod_name, + self.namespace, + ttl_seconds, + ) self.v1.create_namespaced_pod(namespace=self.namespace, body=manifest) try: @@ -633,7 +641,7 @@ def create_session( self._cleanup_pod(pod_name) raise - return SessionInfo(session_id=pod_name) + return SessionInfo(session_id=pod_name, expires_at=expires_at) def delete_session(self, session_id: str) -> bool: if not session_id.startswith(SESSION_NAME_PREFIX): @@ -650,6 +658,50 @@ def delete_session(self, session_id: str) -> bool: raise return True + def reap_expired_sessions(self) -> int: + try: + pods = self.v1.list_namespaced_pod( + namespace=self.namespace, + label_selector=SESSION_LABEL_SELECTOR, + ) + except ApiException as e: + logger.warning("Failed to list session pods for reap: %s", e) + return 0 + + now = time.time() + reaped = 0 + for pod in pods.items: + metadata = pod.metadata + annotations = metadata.annotations or {} + expires_str = annotations.get(SESSION_EXPIRES_AT_KEY) + if expires_str is None: + continue + try: + expires_at = float(expires_str) + except ValueError: + logger.warning( + "Session pod %s has invalid expires-at annotation %r", + metadata.name, + expires_str, + ) + continue + if expires_at >= now: + continue + try: + self.v1.delete_namespaced_pod( + name=metadata.name, + namespace=self.namespace, + body=client.V1DeleteOptions(grace_period_seconds=0), + ) + except ApiException as e: + if e.status == 404: + continue + logger.warning("Failed to reap session pod %s: %s", metadata.name, e) + continue + reaped += 1 + logger.info("Reaped expired session pod %s", metadata.name) + return reaped + def execute_python( self, *, diff --git a/code-interpreter/tests/integration_tests/test_sessions_docker.py b/code-interpreter/tests/integration_tests/test_sessions_docker.py index a1a3a68..3281f8d 100644 --- a/code-interpreter/tests/integration_tests/test_sessions_docker.py +++ b/code-interpreter/tests/integration_tests/test_sessions_docker.py @@ -7,6 +7,7 @@ from __future__ import annotations import subprocess +import time from unittest.mock import MagicMock, patch import pytest @@ -14,6 +15,7 @@ from app.services.executor_base import ( SESSION_APP_LABEL, SESSION_COMPONENT_LABEL, + SESSION_EXPIRES_AT_KEY, SESSION_NAME_PREFIX, ) from app.services.executor_docker import DockerExecutor @@ -46,21 +48,35 @@ def _label_values(cmd: list[str]) -> list[str]: def test_create_session_returns_session_info(executor: DockerExecutor) -> None: + before = time.time() with patch("app.services.executor_docker.subprocess.run", return_value=_completed(0)): - info = executor.create_session() + info = executor.create_session(ttl_seconds=600) assert info.session_id.startswith(SESSION_NAME_PREFIX) + assert before + 600 <= info.expires_at <= time.time() + 600 def test_create_session_runs_docker_with_session_labels(executor: DockerExecutor) -> None: with patch("app.services.executor_docker.subprocess.run") as run: run.return_value = _completed(0) - executor.create_session() + executor.create_session(ttl_seconds=600) cmd = run.call_args.args[0] label_values = _label_values(cmd) assert f"app={SESSION_APP_LABEL}" in label_values assert f"component={SESSION_COMPONENT_LABEL}" in label_values + assert any(v.startswith(f"{SESSION_EXPIRES_AT_KEY}=") for v in label_values) + + +def test_create_session_sleeps_for_ttl(executor: DockerExecutor) -> None: + """The container's idle command must be ``sleep `` so it self-destructs at TTL.""" + with patch("app.services.executor_docker.subprocess.run") as run: + run.return_value = _completed(0) + executor.create_session(ttl_seconds=600) + + cmd = run.call_args.args[0] + assert cmd[-3:] == [executor.image, "sleep", "600"] + assert "--rm" in cmd # ensures self-cleanup at TTL def test_create_session_stages_files(executor: DockerExecutor) -> None: @@ -68,7 +84,7 @@ def test_create_session_stages_files(executor: DockerExecutor) -> None: patch("app.services.executor_docker.subprocess.run", return_value=_completed(0)), patch.object(executor, "_upload_tar_to_container") as upload, ): - info = executor.create_session(files=[("data.txt", b"hello")]) + info = executor.create_session(ttl_seconds=300, files=[("data.txt", b"hello")]) upload.assert_called_once() container_arg, tar_arg = upload.call_args.args @@ -82,7 +98,7 @@ def test_create_session_skips_upload_when_no_files(executor: DockerExecutor) -> patch("app.services.executor_docker.subprocess.run", return_value=_completed(0)), patch.object(executor, "_upload_tar_to_container") as upload, ): - executor.create_session() + executor.create_session(ttl_seconds=300) upload.assert_not_called() @@ -94,7 +110,7 @@ def test_create_session_kills_container_on_staging_failure(executor: DockerExecu patch.object(executor, "_kill_container") as kill, pytest.raises(RuntimeError, match="boom"), ): - executor.create_session(files=[("data.txt", b"x")]) + executor.create_session(ttl_seconds=300, files=[("data.txt", b"x")]) kill.assert_called_once() @@ -107,7 +123,7 @@ def test_create_session_raises_when_docker_run_fails(executor: DockerExecutor) - ), pytest.raises(RuntimeError, match="Failed to start session container"), ): - executor.create_session() + executor.create_session(ttl_seconds=300) # --------------------------------------------------------------------------- @@ -121,10 +137,9 @@ def test_delete_session_returns_true_on_success(executor: DockerExecutor) -> Non def test_delete_session_returns_false_on_no_such_container(executor: DockerExecutor) -> None: - """Modern Docker exits 0 even when the container is missing — stderr is the signal.""" with patch( "app.services.executor_docker.subprocess.run", - return_value=_completed(0, stderr="Error: No such container: code-session-abc"), + return_value=_completed(1, stderr="Error: No such container: code-session-abc"), ): assert executor.delete_session(f"{SESSION_NAME_PREFIX}abc") is False @@ -146,3 +161,59 @@ def test_delete_session_raises_on_unexpected_failure(executor: DockerExecutor) - pytest.raises(RuntimeError, match="Failed to delete session"), ): executor.delete_session(f"{SESSION_NAME_PREFIX}abc") + + +# --------------------------------------------------------------------------- +# reap_expired_sessions +# --------------------------------------------------------------------------- + + +def test_reap_deletes_expired_containers(executor: DockerExecutor) -> None: + now = time.time() + list_output = f"code-session-old\t{now - 100}\ncode-session-new\t{now + 100}\n" + + with patch("app.services.executor_docker.subprocess.run") as run: + run.side_effect = [ + _completed(0, stdout=list_output), # ps + _completed(0), # rm code-session-old + ] + assert executor.reap_expired_sessions() == 1 + + rm_call = run.call_args_list[1] + assert rm_call.args[0] == [executor.docker_binary, "rm", "-f", "code-session-old"] + + +def test_reap_skips_invalid_lines(executor: DockerExecutor) -> None: + list_output = "code-session-bad\tnot-a-number\n\ncode-session-empty\t\n" + with patch("app.services.executor_docker.subprocess.run") as run: + run.return_value = _completed(0, stdout=list_output) + assert executor.reap_expired_sessions() == 0 + # Only the list call should have been made — no rm + assert run.call_count == 1 + + +def test_reap_returns_zero_when_list_fails(executor: DockerExecutor) -> None: + with patch("app.services.executor_docker.subprocess.run") as run: + run.return_value = _completed(1, stderr="docker not available") + assert executor.reap_expired_sessions() == 0 + + +def test_reap_returns_zero_on_list_timeout(executor: DockerExecutor) -> None: + with patch( + "app.services.executor_docker.subprocess.run", + side_effect=subprocess.TimeoutExpired(cmd="docker", timeout=10), + ): + assert executor.reap_expired_sessions() == 0 + + +def test_reap_continues_when_individual_rm_fails(executor: DockerExecutor) -> None: + """A failed rm of one container shouldn't stop us from reaping others.""" + now = time.time() + list_output = f"code-session-a\t{now - 100}\ncode-session-b\t{now - 100}\n" + with patch("app.services.executor_docker.subprocess.run") as run: + run.side_effect = [ + _completed(0, stdout=list_output), # ps + _completed(1, stderr="rm failed"), # rm a + _completed(0), # rm b + ] + assert executor.reap_expired_sessions() == 1 diff --git a/code-interpreter/tests/integration_tests/test_sessions_kubernetes.py b/code-interpreter/tests/integration_tests/test_sessions_kubernetes.py index 3fada96..706c159 100644 --- a/code-interpreter/tests/integration_tests/test_sessions_kubernetes.py +++ b/code-interpreter/tests/integration_tests/test_sessions_kubernetes.py @@ -6,6 +6,7 @@ from __future__ import annotations +import time from unittest.mock import MagicMock, patch import pytest @@ -14,9 +15,13 @@ from app.services.executor_base import ( SESSION_APP_LABEL, SESSION_COMPONENT_LABEL, + SESSION_EXPIRES_AT_KEY, SESSION_NAME_PREFIX, ) -from app.services.executor_kubernetes import KubernetesExecutor +from app.services.executor_kubernetes import ( + SESSION_LABEL_SELECTOR, + KubernetesExecutor, +) @pytest.fixture() @@ -33,28 +38,53 @@ def executor() -> KubernetesExecutor: return inst +def _make_pod(name: str, expires_at: float | None) -> MagicMock: + pod = MagicMock() + pod.metadata.name = name + pod.metadata.annotations = ( + {SESSION_EXPIRES_AT_KEY: str(expires_at)} if expires_at is not None else {} + ) + return pod + + # --------------------------------------------------------------------------- # create_session # --------------------------------------------------------------------------- def test_create_session_returns_session_info(executor: KubernetesExecutor) -> None: - info = executor.create_session() + before = time.time() + info = executor.create_session(ttl_seconds=600) + assert info.session_id.startswith(SESSION_NAME_PREFIX) + assert before + 600 <= info.expires_at <= time.time() + 600 def test_create_session_pod_carries_session_metadata(executor: KubernetesExecutor) -> None: - info = executor.create_session() + info = executor.create_session(ttl_seconds=600) pod = executor.v1.create_namespaced_pod.call_args.kwargs["body"] assert pod.metadata.name == info.session_id assert pod.metadata.labels["app"] == SESSION_APP_LABEL assert pod.metadata.labels["component"] == SESSION_COMPONENT_LABEL + assert pod.metadata.annotations[SESSION_EXPIRES_AT_KEY] == str(info.expires_at) + + +def test_create_session_sets_active_deadline(executor: KubernetesExecutor) -> None: + """active_deadline_seconds is what makes kubelet stop the pod at TTL even if API is down.""" + executor.create_session(ttl_seconds=600) + + pod = executor.v1.create_namespaced_pod.call_args.kwargs["body"] + assert pod.spec.active_deadline_seconds == 600 + assert pod.spec.containers[0].command == ["sleep", "600"] def test_create_session_stages_files(executor: KubernetesExecutor) -> None: with patch.object(executor, "_upload_tar_to_pod") as upload: - info = executor.create_session(files=[("data.txt", b"hello")]) + info = executor.create_session( + ttl_seconds=300, + files=[("data.txt", b"hello")], + ) upload.assert_called_once() pod_name_arg, tar_arg = upload.call_args.args @@ -65,7 +95,7 @@ def test_create_session_stages_files(executor: KubernetesExecutor) -> None: def test_create_session_skips_upload_when_no_files(executor: KubernetesExecutor) -> None: with patch.object(executor, "_upload_tar_to_pod") as upload: - executor.create_session() + executor.create_session(ttl_seconds=300) upload.assert_not_called() @@ -76,7 +106,7 @@ def test_create_session_cleans_up_on_staging_failure(executor: KubernetesExecuto patch.object(executor, "_cleanup_pod") as cleanup, pytest.raises(RuntimeError, match="boom"), ): - executor.create_session(files=[("data.txt", b"x")]) + executor.create_session(ttl_seconds=300, files=[("data.txt", b"x")]) cleanup.assert_called_once() @@ -106,3 +136,71 @@ def test_delete_session_propagates_other_api_errors(executor: KubernetesExecutor executor.v1.delete_namespaced_pod.side_effect = ApiException(status=500) with pytest.raises(ApiException): executor.delete_session(f"{SESSION_NAME_PREFIX}abc") + + +# --------------------------------------------------------------------------- +# reap_expired_sessions +# --------------------------------------------------------------------------- + + +def test_reap_deletes_expired_pods(executor: KubernetesExecutor) -> None: + now = time.time() + expired_pod = _make_pod("code-session-old", expires_at=now - 100) + fresh_pod = _make_pod("code-session-new", expires_at=now + 100) + executor.v1.list_namespaced_pod.return_value = MagicMock(items=[expired_pod, fresh_pod]) + + reaped = executor.reap_expired_sessions() + + assert reaped == 1 + executor.v1.list_namespaced_pod.assert_called_once_with( + namespace=executor.namespace, + label_selector=SESSION_LABEL_SELECTOR, + ) + executor.v1.delete_namespaced_pod.assert_called_once() + delete_kwargs = executor.v1.delete_namespaced_pod.call_args.kwargs + assert delete_kwargs["name"] == "code-session-old" + + +def test_reap_skips_pods_without_annotation(executor: KubernetesExecutor) -> None: + pod = _make_pod("code-session-bare", expires_at=None) + executor.v1.list_namespaced_pod.return_value = MagicMock(items=[pod]) + + assert executor.reap_expired_sessions() == 0 + executor.v1.delete_namespaced_pod.assert_not_called() + + +def test_reap_skips_pods_with_invalid_annotation(executor: KubernetesExecutor) -> None: + pod = MagicMock() + pod.metadata.name = "code-session-bad" + pod.metadata.annotations = {SESSION_EXPIRES_AT_KEY: "not-a-number"} + executor.v1.list_namespaced_pod.return_value = MagicMock(items=[pod]) + + assert executor.reap_expired_sessions() == 0 + executor.v1.delete_namespaced_pod.assert_not_called() + + +def test_reap_handles_list_failure(executor: KubernetesExecutor) -> None: + executor.v1.list_namespaced_pod.side_effect = ApiException(status=500) + assert executor.reap_expired_sessions() == 0 + + +def test_reap_continues_when_individual_delete_404s(executor: KubernetesExecutor) -> None: + """Race: pod can vanish between list and delete — treat as already-gone.""" + now = time.time() + expired_a = _make_pod("code-session-a", expires_at=now - 100) + expired_b = _make_pod("code-session-b", expires_at=now - 100) + executor.v1.list_namespaced_pod.return_value = MagicMock(items=[expired_a, expired_b]) + executor.v1.delete_namespaced_pod.side_effect = [ApiException(status=404), None] + + assert executor.reap_expired_sessions() == 1 + + +def test_reap_keeps_going_when_delete_errors(executor: KubernetesExecutor) -> None: + """A 500 on one delete shouldn't prevent others from being reaped.""" + now = time.time() + expired_a = _make_pod("code-session-a", expires_at=now - 100) + expired_b = _make_pod("code-session-b", expires_at=now - 100) + executor.v1.list_namespaced_pod.return_value = MagicMock(items=[expired_a, expired_b]) + executor.v1.delete_namespaced_pod.side_effect = [ApiException(status=500), None] + + assert executor.reap_expired_sessions() == 1 diff --git a/code-interpreter/tests/integration_tests/test_sessions_routes.py b/code-interpreter/tests/integration_tests/test_sessions_routes.py index e7bcd77..2a602ba 100644 --- a/code-interpreter/tests/integration_tests/test_sessions_routes.py +++ b/code-interpreter/tests/integration_tests/test_sessions_routes.py @@ -24,9 +24,11 @@ def _clear_executor_cache() -> Generator[None, None, None]: get_executor.cache_clear() -def test_create_session_returns_session_id() -> None: +def test_create_session_returns_id_and_expires_at() -> None: mock_executor = MagicMock() - mock_executor.create_session.return_value = SessionInfo(session_id="code-session-abc") + mock_executor.create_session.return_value = SessionInfo( + session_id="code-session-abc", expires_at=12345.6 + ) with patch("app.api.routes.get_executor", return_value=mock_executor): client = TestClient(create_app()) @@ -35,6 +37,45 @@ def test_create_session_returns_session_id() -> None: assert response.status_code == 201 body = response.json() assert body["session_id"] == "code-session-abc" + assert body["expires_at"] == 12345.6 + + +def test_create_session_defaults_ttl_to_15_minutes() -> None: + mock_executor = MagicMock() + mock_executor.create_session.return_value = SessionInfo( + session_id="code-session-x", expires_at=0.0 + ) + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + client.post("/v1/sessions", json={}) + + assert mock_executor.create_session.call_args.kwargs["ttl_seconds"] == 900 + + +def test_create_session_uses_provided_ttl() -> None: + mock_executor = MagicMock() + mock_executor.create_session.return_value = SessionInfo( + session_id="code-session-x", expires_at=0.0 + ) + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + client.post("/v1/sessions", json={"ttl_seconds": 300}) + + assert mock_executor.create_session.call_args.kwargs["ttl_seconds"] == 300 + + +def test_create_session_rejects_non_positive_ttl() -> None: + client = TestClient(create_app()) + response = client.post("/v1/sessions", json={"ttl_seconds": 0}) + assert response.status_code == 422 + + +def test_create_session_rejects_oversized_ttl() -> None: + client = TestClient(create_app()) + response = client.post("/v1/sessions", json={"ttl_seconds": 86_401}) + assert response.status_code == 422 def test_create_session_returns_404_for_unknown_file_id() -> None: @@ -57,7 +98,9 @@ def test_create_session_resolves_file_ids_into_content() -> None: file_id = upload_resp.json()["file_id"] mock_executor = MagicMock() - mock_executor.create_session.return_value = SessionInfo(session_id="code-session-x") + mock_executor.create_session.return_value = SessionInfo( + session_id="code-session-x", expires_at=0.0 + ) with patch("app.api.routes.get_executor", return_value=mock_executor): response = client.post(