From 1d2e375e100f9e83e7562225183b680898307e1e Mon Sep 17 00:00:00 2001 From: Dane Urban Date: Tue, 5 May 2026 15:31:11 -0700 Subject: [PATCH] . --- code-interpreter/app/api/routes.py | 55 ++++- code-interpreter/app/models/schemas.py | 20 ++ .../app/services/executor_base.py | 23 ++ .../app/services/executor_docker.py | 72 +++++++ .../app/services/executor_kubernetes.py | 134 +++++++++--- .../test_session_bash_docker.py | 197 ++++++++++++++++++ .../test_session_bash_kubernetes.py | 180 ++++++++++++++++ .../test_session_bash_routes.py | 192 +++++++++++++++++ 8 files changed, 840 insertions(+), 33 deletions(-) create mode 100644 code-interpreter/tests/integration_tests/test_session_bash_docker.py create mode 100644 code-interpreter/tests/integration_tests/test_session_bash_kubernetes.py create mode 100644 code-interpreter/tests/integration_tests/test_session_bash_routes.py diff --git a/code-interpreter/app/api/routes.py b/code-interpreter/app/api/routes.py index d63e60b..8794fcc 100644 --- a/code-interpreter/app/api/routes.py +++ b/code-interpreter/app/api/routes.py @@ -8,6 +8,8 @@ from app.app_configs import get_settings from app.models.schemas import ( + BashExecRequest, + BashExecResponse, CreateSessionRequest, CreateSessionResponse, ExecuteFile, @@ -21,7 +23,13 @@ UploadFileResponse, WorkspaceFile, ) -from app.services.executor_base import EntryKind, StreamChunk, StreamResult, WorkspaceEntry +from app.services.executor_base import ( + EntryKind, + SessionNotFoundError, + StreamChunk, + StreamResult, + WorkspaceEntry, +) from app.services.executor_factory import execute_python, execute_python_streaming, get_executor from app.services.file_storage import FileStorageService @@ -321,3 +329,48 @@ def delete_session(session_id: str) -> Response: ) return Response(status_code=status.HTTP_204_NO_CONTENT) + + +@router.post( + "/sessions/{session_id}/bash", + response_model=BashExecResponse, + status_code=status.HTTP_200_OK, +) +def session_exec_bash(session_id: str, req: BashExecRequest) -> BashExecResponse: + """Run a bash command inside an existing session. + + The session pod has no network access (enforced at session creation), and + that restriction continues to apply for every command run via this route. + """ + settings = get_settings() + if req.timeout_ms > settings.max_exec_timeout_ms: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail=f"timeout_ms exceeds maximum of {settings.max_exec_timeout_ms} ms", + ) + + try: + result = get_executor().execute_bash_in_session( + session_id, + cmd=req.cmd, + timeout_ms=req.timeout_ms, + max_output_bytes=settings.max_output_bytes, + ) + except SessionNotFoundError as exc: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=str(exc), + ) from exc + except NotImplementedError as exc: + raise HTTPException( + status_code=status.HTTP_501_NOT_IMPLEMENTED, + detail=str(exc), + ) from exc + + return BashExecResponse( + stdout=result.stdout, + stderr=result.stderr, + exit_code=result.exit_code, + timed_out=result.timed_out, + duration_ms=result.duration_ms, + ) diff --git a/code-interpreter/app/models/schemas.py b/code-interpreter/app/models/schemas.py index 3101682..91ca319 100644 --- a/code-interpreter/app/models/schemas.py +++ b/code-interpreter/app/models/schemas.py @@ -147,3 +147,23 @@ class CreateSessionResponse(BaseModel): expires_at: float = Field( ..., description="Unix timestamp when the session is scheduled to expire." ) + + +DEFAULT_BASH_TIMEOUT_MS = 30_000 + + +class BashExecRequest(BaseModel): + cmd: StrictStr = Field(..., description="Bash command to execute in the session.") + timeout_ms: StrictInt = Field( + DEFAULT_BASH_TIMEOUT_MS, + ge=1, + description="Per-command execution timeout in milliseconds.", + ) + + +class BashExecResponse(BaseModel): + stdout: StrictStr + stderr: StrictStr + exit_code: int | None + timed_out: bool + duration_ms: StrictInt diff --git a/code-interpreter/app/services/executor_base.py b/code-interpreter/app/services/executor_base.py index c5ee0df..33d4aa8 100644 --- a/code-interpreter/app/services/executor_base.py +++ b/code-interpreter/app/services/executor_base.py @@ -120,6 +120,14 @@ class SessionInfo: SESSION_EXPIRES_AT_KEY = "code-interpreter.expires-at" +class SessionNotFoundError(LookupError): + """Raised when a session ID does not refer to an existing session.""" + + def __init__(self, session_id: str) -> None: + super().__init__(f"Session '{session_id}' not found") + self.session_id = session_id + + class ExecutorProtocol(Protocol): def execute_python( self, @@ -206,6 +214,21 @@ def reap_expired_sessions(self) -> int: """Delete sessions whose TTL has elapsed. Returns number reaped.""" return 0 + def execute_bash_in_session( + self, + session_id: str, + *, + cmd: str, + timeout_ms: int, + max_output_bytes: int, + ) -> ExecutionResult: + """Run a bash command inside an existing session. + + Raises ``SessionNotFoundError`` when the session does not exist. + Network restrictions established at session creation remain in force. + """ + raise NotImplementedError(f"{type(self).__name__} does not support sessions") + @staticmethod def truncate_output(stream: bytes, max_bytes: int) -> str: if len(stream) <= max_bytes: diff --git a/code-interpreter/app/services/executor_docker.py b/code-interpreter/app/services/executor_docker.py index 3e2757c..a69393f 100644 --- a/code-interpreter/app/services/executor_docker.py +++ b/code-interpreter/app/services/executor_docker.py @@ -30,6 +30,7 @@ ExecutionResult, HealthCheck, SessionInfo, + SessionNotFoundError, StreamChunk, StreamEvent, StreamResult, @@ -40,6 +41,12 @@ logger = logging.getLogger(__name__) +def _looks_like_missing_container(stderr: bytes) -> bool: + """Heuristic: ``docker exec`` writes these to stderr when the target is gone.""" + text = stderr.decode("utf-8", errors="replace").lower() + return "no such container" in text or "is not running" in text + + @dataclass class _ExecContext: """Holds the live container and process for the duration of an execution.""" @@ -502,6 +509,71 @@ def reap_expired_sessions(self) -> int: logger.warning("Failed to reap session container %s: %s", name, rm_result.stderr) return reaped + def execute_bash_in_session( + self, + session_id: str, + *, + cmd: str, + timeout_ms: int, + max_output_bytes: int, + ) -> ExecutionResult: + """Run a bash command inside an existing session container. + + The container was created with ``--network none`` at session-create time + and that network namespace is what the exec inherits — no additional + flags are needed (or accepted) for ``docker exec``. + """ + if not session_id.startswith(SESSION_NAME_PREFIX): + raise SessionNotFoundError(session_id) + + exec_cmd = [ + self.docker_binary, + "exec", + "-u", + "65532:65532", + session_id, + "bash", + "-c", + cmd, + ] + + start = time.perf_counter() + proc = subprocess.Popen( # nosec B603 + exec_cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + try: + stdout_bytes, stderr_bytes = proc.communicate(timeout=timeout_ms / 1000.0) + timed_out = False + except subprocess.TimeoutExpired: + timed_out = True + # Kill bash inside the container; pkill matches all bash procs in the + # container — acceptable since the agent runs commands sequentially. + subprocess.run( # nosec B603 + [self.docker_binary, "exec", session_id, "pkill", "-9", "bash"], + capture_output=True, + ) + proc.kill() + stdout_bytes, stderr_bytes = proc.communicate() + + duration_ms = int((time.perf_counter() - start) * 1000) + exit_code = None if timed_out else proc.returncode + + if not timed_out and proc.returncode != 0 and _looks_like_missing_container(stderr_bytes): + raise SessionNotFoundError(session_id) + + return ExecutionResult( + stdout=self.truncate_output(stdout_bytes or b"", max_output_bytes), + stderr=self.truncate_output(stderr_bytes or b"", max_output_bytes), + exit_code=exit_code, + timed_out=timed_out, + duration_ms=duration_ms, + files=tuple(), + ) + def execute_python( self, *, diff --git a/code-interpreter/app/services/executor_kubernetes.py b/code-interpreter/app/services/executor_kubernetes.py index f4afe36..c88c2af 100644 --- a/code-interpreter/app/services/executor_kubernetes.py +++ b/code-interpreter/app/services/executor_kubernetes.py @@ -37,6 +37,7 @@ ExecutionResult, HealthCheck, SessionInfo, + SessionNotFoundError, StreamChunk, StreamEvent, StreamResult, @@ -376,19 +377,63 @@ def _upload_tar_to_pod(self, pod_name: str, tar_archive: bytes) -> None: f"stderr: {tar_stderr.decode('utf-8', errors='replace')}" ) - def _kill_python_process(self, pod_name: str) -> None: - """Kill the Python process running in the pod.""" + def _kill_processes_in_pod(self, pod_name: str, process_name: str) -> None: + """Best-effort SIGKILL of all processes named ``process_name`` in the pod.""" try: self._stream_pod_exec( pod_name, - command=["pkill", "-9", "python"], + command=["pkill", "-9", process_name], stderr=False, stdin=False, stdout=False, tty=False, ) except Exception: - logger.warning("Failed to kill Python process in pod %s", pod_name, exc_info=True) + logger.warning( + "Failed to kill %s process in pod %s", process_name, pod_name, exc_info=True + ) + + def _kill_python_process(self, pod_name: str) -> None: + """Kill the Python process running in the pod.""" + self._kill_processes_in_pod(pod_name, "python") + + def _drain_exec_stream( + self, + exec_resp: ws_client.WSClient, + timeout_ms: int, + ) -> tuple[bytes, bytes, int | None, bool]: + """Read stdout/stderr from an exec stream until completion or timeout. + + Returns ``(stdout_bytes, stderr_bytes, exit_code, timed_out)``. + """ + stdout_data = b"" + stderr_data = b"" + exit_code: int | None = None + timed_out = False + + end_time = time.time() + timeout_ms / 1000.0 + + while exec_resp.is_open(): + remaining = end_time - time.time() + if remaining <= 0: + timed_out = True + break + + exec_resp.update(timeout=min(remaining, 1)) + + if exec_resp.peek_stdout(): + stdout_data += exec_resp.read_stdout().encode("utf-8") + + if exec_resp.peek_stderr(): + stderr_data += exec_resp.read_stderr().encode("utf-8") + + error = exec_resp.read_channel(ws_client.ERROR_CHANNEL) + if error: + exit_code = _parse_exit_code(error) + break + + exec_resp.close() + return stdout_data, stderr_data, exit_code, timed_out @contextmanager def _run_in_pod( @@ -702,6 +747,56 @@ def reap_expired_sessions(self) -> int: logger.info("Reaped expired session pod %s", metadata.name) return reaped + def execute_bash_in_session( + self, + session_id: str, + *, + cmd: str, + timeout_ms: int, + max_output_bytes: int, + ) -> ExecutionResult: + """Run a bash command inside an existing session pod. + + Network restrictions established at pod creation (the iptables init + container) remain in force — exec inherits the pod's network namespace. + """ + if not session_id.startswith(SESSION_NAME_PREFIX): + raise SessionNotFoundError(session_id) + + try: + self.v1.read_namespaced_pod(session_id, self.namespace) + except ApiException as e: + if e.status == 404: + raise SessionNotFoundError(session_id) from e + raise + + start = time.perf_counter() + exec_resp = self._stream_pod_exec( + session_id, + command=["bash", "-c", cmd], + stderr=True, + stdin=False, + stdout=True, + tty=False, + ) + + stdout_data, stderr_data, exit_code, timed_out = self._drain_exec_stream( + exec_resp, timeout_ms + ) + + if timed_out: + self._kill_processes_in_pod(session_id, "bash") + + duration_ms = int((time.perf_counter() - start) * 1000) + return ExecutionResult( + stdout=self.truncate_output(stdout_data, max_output_bytes), + stderr=self.truncate_output(stderr_data, max_output_bytes), + exit_code=None if timed_out else exit_code, + timed_out=timed_out, + duration_ms=duration_ms, + files=tuple(), + ) + def execute_python( self, *, @@ -731,34 +826,9 @@ def execute_python( logger.debug("Writing stdin to Python process") ctx.exec_resp.write_stdin(stdin) - stdout_data = b"" - stderr_data = b"" - exit_code: int | None = None - timed_out = False - - timeout_sec = timeout_ms / 1000.0 - end_time = time.time() + timeout_sec - - while ctx.exec_resp.is_open(): - remaining = end_time - time.time() - if remaining <= 0: - timed_out = True - break - - ctx.exec_resp.update(timeout=min(remaining, 1)) - - if ctx.exec_resp.peek_stdout(): - stdout_data += ctx.exec_resp.read_stdout().encode("utf-8") - - if ctx.exec_resp.peek_stderr(): - stderr_data += ctx.exec_resp.read_stderr().encode("utf-8") - - error = ctx.exec_resp.read_channel(ws_client.ERROR_CHANNEL) - if error: - exit_code = _parse_exit_code(error) - break - - ctx.exec_resp.close() + stdout_data, stderr_data, exit_code, timed_out = self._drain_exec_stream( + ctx.exec_resp, timeout_ms + ) if timed_out: self._kill_python_process(ctx.pod_name) diff --git a/code-interpreter/tests/integration_tests/test_session_bash_docker.py b/code-interpreter/tests/integration_tests/test_session_bash_docker.py new file mode 100644 index 0000000..3132859 --- /dev/null +++ b/code-interpreter/tests/integration_tests/test_session_bash_docker.py @@ -0,0 +1,197 @@ +"""Unit tests for DockerExecutor.execute_bash_in_session.""" + +from __future__ import annotations + +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from app.services.executor_base import SESSION_NAME_PREFIX, SessionNotFoundError +from app.services.executor_docker import DockerExecutor + + +@pytest.fixture() +def executor() -> DockerExecutor: + inst = DockerExecutor.__new__(DockerExecutor) + inst.docker_binary = "/usr/bin/docker" + inst.image = "test:latest" + inst.run_args = "" + return inst + + +def _popen_mock( + *, + stdout: bytes = b"", + stderr: bytes = b"", + returncode: int = 0, + raise_timeout: bool = False, +) -> MagicMock: + proc = MagicMock() + if raise_timeout: + proc.communicate.side_effect = [ + subprocess.TimeoutExpired(cmd="docker", timeout=1), + (stdout, stderr), + ] + else: + proc.communicate.return_value = (stdout, stderr) + proc.returncode = returncode + return proc + + +def test_bash_returns_stdout_and_exit_code(executor: DockerExecutor) -> None: + proc = _popen_mock(stdout=b"hi\n", returncode=0) + with patch("app.services.executor_docker.subprocess.Popen", return_value=proc): + result = executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="echo hi", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + assert result.stdout == "hi\n" + assert result.exit_code == 0 + assert result.timed_out is False + assert result.files == () + + +def test_bash_passes_through_nonzero_exit(executor: DockerExecutor) -> None: + proc = _popen_mock(stderr=b"boom\n", returncode=2) + with patch("app.services.executor_docker.subprocess.Popen", return_value=proc): + result = executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="false", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + assert result.exit_code == 2 + assert result.stderr == "boom\n" + + +def test_bash_uses_docker_exec_with_bash_dash_c(executor: DockerExecutor) -> None: + proc = _popen_mock(returncode=0) + with patch("app.services.executor_docker.subprocess.Popen", return_value=proc) as popen: + executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="ls -la", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + cmd = popen.call_args.args[0] + assert cmd[:2] == [executor.docker_binary, "exec"] + assert cmd[-3:] == ["bash", "-c", "ls -la"] + assert f"{SESSION_NAME_PREFIX}abc" in cmd + # No --network flag — exec inherits the container's locked-down network namespace. + assert "--network" not in cmd + + +def test_bash_runs_as_unprivileged_user(executor: DockerExecutor) -> None: + """The exec must drop to 65532:65532, matching the session container's user.""" + proc = _popen_mock(returncode=0) + with patch("app.services.executor_docker.subprocess.Popen", return_value=proc) as popen: + executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="id", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + cmd = popen.call_args.args[0] + user_idx = cmd.index("-u") + assert cmd[user_idx + 1] == "65532:65532" + + +def test_bash_times_out_and_kills_bash(executor: DockerExecutor) -> None: + proc = _popen_mock(raise_timeout=True) + with ( + patch("app.services.executor_docker.subprocess.Popen", return_value=proc), + patch("app.services.executor_docker.subprocess.run") as run, + ): + result = executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="sleep 999", + timeout_ms=10, + max_output_bytes=65_536, + ) + + assert result.timed_out is True + assert result.exit_code is None + proc.kill.assert_called_once() + # Verify pkill -9 bash was invoked inside the container. + pkill_calls = [ + c + for c in run.call_args_list + if c.args[0] + == [ + executor.docker_binary, + "exec", + f"{SESSION_NAME_PREFIX}abc", + "pkill", + "-9", + "bash", + ] + ] + assert len(pkill_calls) == 1 + + +def test_bash_rejects_non_session_id(executor: DockerExecutor) -> None: + with ( + patch("app.services.executor_docker.subprocess.Popen") as popen, + pytest.raises(SessionNotFoundError), + ): + executor.execute_bash_in_session( + "random-name", + cmd="true", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + popen.assert_not_called() + + +def test_bash_raises_session_not_found_when_container_missing( + executor: DockerExecutor, +) -> None: + proc = _popen_mock( + stderr=b"Error response from daemon: No such container: code-session-abc\n", + returncode=1, + ) + with ( + patch("app.services.executor_docker.subprocess.Popen", return_value=proc), + pytest.raises(SessionNotFoundError), + ): + executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="true", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + +def test_bash_treats_other_nonzero_as_command_failure(executor: DockerExecutor) -> None: + """Generic non-zero exit + stderr should NOT be misclassified as 'session missing'.""" + proc = _popen_mock(stderr=b"command not found: foo\n", returncode=127) + with patch("app.services.executor_docker.subprocess.Popen", return_value=proc): + result = executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="foo", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + assert result.exit_code == 127 + assert result.stderr == "command not found: foo\n" + + +def test_bash_truncates_stdout_to_max_bytes(executor: DockerExecutor) -> None: + proc = _popen_mock(stdout=b"x" * 200, returncode=0) + with patch("app.services.executor_docker.subprocess.Popen", return_value=proc): + result = executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="cat big", + timeout_ms=5_000, + max_output_bytes=20, + ) + + assert "[truncated]" in result.stdout diff --git a/code-interpreter/tests/integration_tests/test_session_bash_kubernetes.py b/code-interpreter/tests/integration_tests/test_session_bash_kubernetes.py new file mode 100644 index 0000000..a0a2282 --- /dev/null +++ b/code-interpreter/tests/integration_tests/test_session_bash_kubernetes.py @@ -0,0 +1,180 @@ +"""Unit tests for KubernetesExecutor.execute_bash_in_session.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest +from kubernetes.client.exceptions import ApiException # type: ignore[import-untyped] + +from app.services.executor_base import SESSION_NAME_PREFIX, SessionNotFoundError +from app.services.executor_kubernetes import KubernetesExecutor + + +@pytest.fixture() +def executor() -> KubernetesExecutor: + inst = KubernetesExecutor.__new__(KubernetesExecutor) + inst.v1 = MagicMock() + inst.namespace = "test" + inst.image = "test:latest" + inst.service_account = "" + return inst + + +class _FakeExecResp: + """Minimal stand-in for a Kubernetes WebSocket exec stream.""" + + def __init__( + self, + stdout_chunks: list[str] | None = None, + stderr_chunks: list[str] | None = None, + exit_status: str = "{'status': 'Success'}", + ) -> None: + self._stdout = list(stdout_chunks or []) + self._stderr = list(stderr_chunks or []) + self._exit_status = exit_status + self._exit_delivered = False + self._closed = False + + def is_open(self) -> bool: + if self._closed: + return False + return bool(self._stdout or self._stderr or not self._exit_delivered) + + def update(self, timeout: float = 1) -> None: # noqa: ARG002 + pass + + def peek_stdout(self) -> bool: + return bool(self._stdout) + + def read_stdout(self) -> str: + return self._stdout.pop(0) + + def peek_stderr(self) -> bool: + return bool(self._stderr) + + def read_stderr(self) -> str: + return self._stderr.pop(0) + + def read_channel(self, _channel: int) -> str: + if not self._stdout and not self._stderr and not self._exit_delivered: + self._exit_delivered = True + return self._exit_status + return "" + + def close(self) -> None: + self._closed = True + + +def test_bash_returns_stdout_and_exit_code(executor: KubernetesExecutor) -> None: + fake = _FakeExecResp(stdout_chunks=["hi\n"]) + with patch.object(executor, "_stream_pod_exec", return_value=fake): + result = executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="echo hi", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + assert result.stdout == "hi\n" + assert result.exit_code == 0 + assert result.timed_out is False + assert result.files == () + + +def test_bash_passes_through_nonzero_exit(executor: KubernetesExecutor) -> None: + fake = _FakeExecResp( + stderr_chunks=["nope\n"], + exit_status="{'status': 'Failure', 'details': {'exitCode': 7}}", + ) + with patch.object(executor, "_stream_pod_exec", return_value=fake): + result = executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="false", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + assert result.exit_code == 7 + assert result.stderr == "nope\n" + + +def test_bash_invokes_bash_dash_c(executor: KubernetesExecutor) -> None: + fake = _FakeExecResp() + with patch.object(executor, "_stream_pod_exec", return_value=fake) as exec_mock: + executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="ls -la", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + call = exec_mock.call_args + assert call.kwargs["command"] == ["bash", "-c", "ls -la"] + # Network-related kwargs should be absent — exec inherits the pod's locked-down namespace. + assert "network" not in call.kwargs + + +def test_bash_times_out_and_kills_bash(executor: KubernetesExecutor) -> None: + fake = _FakeExecResp() # never delivers exit + with ( + patch.object(executor, "_stream_pod_exec", return_value=fake), + patch.object(executor, "_kill_processes_in_pod") as kill, + ): + result = executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="sleep 999", + timeout_ms=0, # forces immediate timeout + max_output_bytes=65_536, + ) + + assert result.timed_out is True + assert result.exit_code is None + kill.assert_called_once_with(f"{SESSION_NAME_PREFIX}abc", "bash") + + +def test_bash_rejects_non_session_id(executor: KubernetesExecutor) -> None: + with pytest.raises(SessionNotFoundError): + executor.execute_bash_in_session( + "code-exec-abc", + cmd="true", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + executor.v1.read_namespaced_pod.assert_not_called() + + +def test_bash_raises_session_not_found_on_404(executor: KubernetesExecutor) -> None: + executor.v1.read_namespaced_pod.side_effect = ApiException(status=404) + with pytest.raises(SessionNotFoundError): + executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}gone", + cmd="true", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + +def test_bash_propagates_other_api_errors(executor: KubernetesExecutor) -> None: + executor.v1.read_namespaced_pod.side_effect = ApiException(status=500) + with pytest.raises(ApiException): + executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="true", + timeout_ms=5_000, + max_output_bytes=65_536, + ) + + +def test_bash_truncates_stdout_to_max_bytes(executor: KubernetesExecutor) -> None: + fake = _FakeExecResp(stdout_chunks=["x" * 200]) + with patch.object(executor, "_stream_pod_exec", return_value=fake): + result = executor.execute_bash_in_session( + f"{SESSION_NAME_PREFIX}abc", + cmd="cat big", + timeout_ms=5_000, + max_output_bytes=20, + ) + + assert "[truncated]" in result.stdout + assert len(result.stdout) <= 50 # head + suffix diff --git a/code-interpreter/tests/integration_tests/test_session_bash_routes.py b/code-interpreter/tests/integration_tests/test_session_bash_routes.py new file mode 100644 index 0000000..8cca7b0 --- /dev/null +++ b/code-interpreter/tests/integration_tests/test_session_bash_routes.py @@ -0,0 +1,192 @@ +"""Route-layer tests for POST /v1/sessions/{id}/bash.""" + +from __future__ import annotations + +from collections.abc import Generator +from unittest.mock import MagicMock, patch + +import pytest +from fastapi.testclient import TestClient + +from app.main import create_app +from app.services.executor_base import ExecutionResult, SessionNotFoundError +from app.services.executor_factory import get_executor + + +@pytest.fixture(autouse=True) +def _clear_executor_cache() -> Generator[None, None, None]: + get_executor.cache_clear() + yield + get_executor.cache_clear() + + +def _result( + *, + stdout: str = "", + stderr: str = "", + exit_code: int | None = 0, + timed_out: bool = False, + duration_ms: int = 5, +) -> ExecutionResult: + return ExecutionResult( + stdout=stdout, + stderr=stderr, + exit_code=exit_code, + timed_out=timed_out, + duration_ms=duration_ms, + files=tuple(), + ) + + +def test_bash_returns_execution_result() -> None: + mock_executor = MagicMock() + mock_executor.execute_bash_in_session.return_value = _result( + stdout="hi\n", exit_code=0, duration_ms=12 + ) + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.post( + "/v1/sessions/code-session-abc/bash", + json={"cmd": "echo hi"}, + ) + + assert response.status_code == 200 + body = response.json() + assert body == { + "stdout": "hi\n", + "stderr": "", + "exit_code": 0, + "timed_out": False, + "duration_ms": 12, + } + + call = mock_executor.execute_bash_in_session.call_args + assert call.args == ("code-session-abc",) + assert call.kwargs["cmd"] == "echo hi" + + +def test_bash_passes_through_nonzero_exit() -> None: + mock_executor = MagicMock() + mock_executor.execute_bash_in_session.return_value = _result(stderr="boom\n", exit_code=2) + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.post( + "/v1/sessions/code-session-abc/bash", + json={"cmd": "false"}, + ) + + assert response.status_code == 200 + body = response.json() + assert body["exit_code"] == 2 + assert body["stderr"] == "boom\n" + + +def test_bash_timed_out_is_reported() -> None: + mock_executor = MagicMock() + mock_executor.execute_bash_in_session.return_value = _result(exit_code=None, timed_out=True) + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.post( + "/v1/sessions/code-session-abc/bash", + json={"cmd": "sleep 100"}, + ) + + assert response.status_code == 200 + body = response.json() + assert body["timed_out"] is True + assert body["exit_code"] is None + + +def test_bash_default_timeout_is_30s() -> None: + mock_executor = MagicMock() + mock_executor.execute_bash_in_session.return_value = _result() + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + client.post("/v1/sessions/code-session-x/bash", json={"cmd": "true"}) + + assert mock_executor.execute_bash_in_session.call_args.kwargs["timeout_ms"] == 30_000 + + +def test_bash_uses_provided_timeout() -> None: + mock_executor = MagicMock() + mock_executor.execute_bash_in_session.return_value = _result() + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + client.post( + "/v1/sessions/code-session-x/bash", + json={"cmd": "true", "timeout_ms": 5000}, + ) + + assert mock_executor.execute_bash_in_session.call_args.kwargs["timeout_ms"] == 5_000 + + +def test_bash_rejects_timeout_above_cap() -> None: + """The route's cap mirrors /v1/execute (max_exec_timeout_ms, default 60s).""" + client = TestClient(create_app()) + response = client.post( + "/v1/sessions/code-session-x/bash", + json={"cmd": "true", "timeout_ms": 1_000_000}, + ) + assert response.status_code == 422 + + +def test_bash_rejects_non_positive_timeout() -> None: + client = TestClient(create_app()) + response = client.post( + "/v1/sessions/code-session-x/bash", + json={"cmd": "true", "timeout_ms": 0}, + ) + assert response.status_code == 422 + + +def test_bash_requires_cmd() -> None: + client = TestClient(create_app()) + response = client.post("/v1/sessions/code-session-x/bash", json={}) + assert response.status_code == 422 + + +def test_bash_returns_404_when_session_not_found() -> None: + mock_executor = MagicMock() + mock_executor.execute_bash_in_session.side_effect = SessionNotFoundError("code-session-missing") + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.post( + "/v1/sessions/code-session-missing/bash", + json={"cmd": "true"}, + ) + + assert response.status_code == 404 + assert "not found" in response.json()["detail"].lower() + + +def test_bash_returns_501_when_unsupported() -> None: + mock_executor = MagicMock() + mock_executor.execute_bash_in_session.side_effect = NotImplementedError("nope") + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + response = client.post( + "/v1/sessions/code-session-x/bash", + json={"cmd": "true"}, + ) + + assert response.status_code == 501 + + +def test_bash_max_output_bytes_passed_from_settings() -> None: + mock_executor = MagicMock() + mock_executor.execute_bash_in_session.return_value = _result() + + with patch("app.api.routes.get_executor", return_value=mock_executor): + client = TestClient(create_app()) + client.post("/v1/sessions/code-session-x/bash", json={"cmd": "true"}) + + # The route should forward the configured cap, not let callers override it. + kwargs = mock_executor.execute_bash_in_session.call_args.kwargs + assert kwargs["max_output_bytes"] > 0