Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion code-interpreter/app/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from app.app_configs import get_settings
from app.models.schemas import (
BashExecRequest,
BashExecResponse,
CreateSessionRequest,
CreateSessionResponse,
ExecuteFile,
Expand All @@ -21,7 +23,13 @@
UploadFileResponse,
WorkspaceFile,
)
from app.services.executor_base import EntryKind, StreamChunk, StreamResult, WorkspaceEntry
from app.services.executor_base import (
EntryKind,
SessionNotFoundError,
StreamChunk,
StreamResult,
WorkspaceEntry,
)
from app.services.executor_factory import execute_python, execute_python_streaming, get_executor
from app.services.file_storage import FileStorageService

Expand Down Expand Up @@ -321,3 +329,48 @@ def delete_session(session_id: str) -> Response:
)

return Response(status_code=status.HTTP_204_NO_CONTENT)


@router.post(
"/sessions/{session_id}/bash",
response_model=BashExecResponse,
status_code=status.HTTP_200_OK,
)
def session_exec_bash(session_id: str, req: BashExecRequest) -> BashExecResponse:
"""Run a bash command inside an existing session.

The session pod has no network access (enforced at session creation), and
that restriction continues to apply for every command run via this route.
"""
settings = get_settings()
if req.timeout_ms > settings.max_exec_timeout_ms:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"timeout_ms exceeds maximum of {settings.max_exec_timeout_ms} ms",
)

try:
result = get_executor().execute_bash_in_session(
session_id,
cmd=req.cmd,
timeout_ms=req.timeout_ms,
max_output_bytes=settings.max_output_bytes,
)
except SessionNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(exc),
) from exc
except NotImplementedError as exc:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail=str(exc),
) from exc

return BashExecResponse(
stdout=result.stdout,
stderr=result.stderr,
exit_code=result.exit_code,
timed_out=result.timed_out,
duration_ms=result.duration_ms,
)
20 changes: 20 additions & 0 deletions code-interpreter/app/models/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,23 @@ class CreateSessionResponse(BaseModel):
expires_at: float = Field(
..., description="Unix timestamp when the session is scheduled to expire."
)


DEFAULT_BASH_TIMEOUT_MS = 30_000


class BashExecRequest(BaseModel):
cmd: StrictStr = Field(..., description="Bash command to execute in the session.")
timeout_ms: StrictInt = Field(
DEFAULT_BASH_TIMEOUT_MS,
ge=1,
description="Per-command execution timeout in milliseconds.",
)


class BashExecResponse(BaseModel):
stdout: StrictStr
stderr: StrictStr
exit_code: int | None
timed_out: bool
duration_ms: StrictInt
23 changes: 23 additions & 0 deletions code-interpreter/app/services/executor_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ class SessionInfo:
SESSION_EXPIRES_AT_KEY = "code-interpreter.expires-at"


class SessionNotFoundError(LookupError):
"""Raised when a session ID does not refer to an existing session."""

def __init__(self, session_id: str) -> None:
super().__init__(f"Session '{session_id}' not found")
self.session_id = session_id


class ExecutorProtocol(Protocol):
def execute_python(
self,
Expand Down Expand Up @@ -206,6 +214,21 @@ def reap_expired_sessions(self) -> int:
"""Delete sessions whose TTL has elapsed. Returns number reaped."""
return 0

def execute_bash_in_session(
self,
session_id: str,
*,
cmd: str,
timeout_ms: int,
max_output_bytes: int,
) -> ExecutionResult:
"""Run a bash command inside an existing session.

Raises ``SessionNotFoundError`` when the session does not exist.
Network restrictions established at session creation remain in force.
"""
raise NotImplementedError(f"{type(self).__name__} does not support sessions")

@staticmethod
def truncate_output(stream: bytes, max_bytes: int) -> str:
if len(stream) <= max_bytes:
Expand Down
72 changes: 72 additions & 0 deletions code-interpreter/app/services/executor_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
ExecutionResult,
HealthCheck,
SessionInfo,
SessionNotFoundError,
StreamChunk,
StreamEvent,
StreamResult,
Expand All @@ -40,6 +41,12 @@
logger = logging.getLogger(__name__)


def _looks_like_missing_container(stderr: bytes) -> bool:
"""Heuristic: ``docker exec`` writes these to stderr when the target is gone."""
text = stderr.decode("utf-8", errors="replace").lower()
return "no such container" in text or "is not running" in text


@dataclass
class _ExecContext:
"""Holds the live container and process for the duration of an execution."""
Expand Down Expand Up @@ -502,6 +509,71 @@ def reap_expired_sessions(self) -> int:
logger.warning("Failed to reap session container %s: %s", name, rm_result.stderr)
return reaped

def execute_bash_in_session(
self,
session_id: str,
*,
cmd: str,
timeout_ms: int,
max_output_bytes: int,
) -> ExecutionResult:
"""Run a bash command inside an existing session container.

The container was created with ``--network none`` at session-create time
and that network namespace is what the exec inherits — no additional
flags are needed (or accepted) for ``docker exec``.
"""
if not session_id.startswith(SESSION_NAME_PREFIX):
raise SessionNotFoundError(session_id)

exec_cmd = [
self.docker_binary,
"exec",
"-u",
"65532:65532",
session_id,
"bash",
"-c",
cmd,
]

start = time.perf_counter()
proc = subprocess.Popen( # nosec B603
exec_cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

try:
stdout_bytes, stderr_bytes = proc.communicate(timeout=timeout_ms / 1000.0)
timed_out = False
except subprocess.TimeoutExpired:
timed_out = True
# Kill bash inside the container; pkill matches all bash procs in the
# container — acceptable since the agent runs commands sequentially.
subprocess.run( # nosec B603
[self.docker_binary, "exec", session_id, "pkill", "-9", "bash"],
capture_output=True,
)
proc.kill()
stdout_bytes, stderr_bytes = proc.communicate()

duration_ms = int((time.perf_counter() - start) * 1000)
exit_code = None if timed_out else proc.returncode

if not timed_out and proc.returncode != 0 and _looks_like_missing_container(stderr_bytes):
raise SessionNotFoundError(session_id)

return ExecutionResult(
stdout=self.truncate_output(stdout_bytes or b"", max_output_bytes),
stderr=self.truncate_output(stderr_bytes or b"", max_output_bytes),
exit_code=exit_code,
timed_out=timed_out,
duration_ms=duration_ms,
files=tuple(),
)

def execute_python(
self,
*,
Expand Down
134 changes: 102 additions & 32 deletions code-interpreter/app/services/executor_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
ExecutionResult,
HealthCheck,
SessionInfo,
SessionNotFoundError,
StreamChunk,
StreamEvent,
StreamResult,
Expand Down Expand Up @@ -376,19 +377,63 @@ def _upload_tar_to_pod(self, pod_name: str, tar_archive: bytes) -> None:
f"stderr: {tar_stderr.decode('utf-8', errors='replace')}"
)

def _kill_python_process(self, pod_name: str) -> None:
"""Kill the Python process running in the pod."""
def _kill_processes_in_pod(self, pod_name: str, process_name: str) -> None:
"""Best-effort SIGKILL of all processes named ``process_name`` in the pod."""
try:
self._stream_pod_exec(
pod_name,
command=["pkill", "-9", "python"],
command=["pkill", "-9", process_name],
stderr=False,
stdin=False,
stdout=False,
tty=False,
)
except Exception:
logger.warning("Failed to kill Python process in pod %s", pod_name, exc_info=True)
logger.warning(
"Failed to kill %s process in pod %s", process_name, pod_name, exc_info=True
)

def _kill_python_process(self, pod_name: str) -> None:
"""Kill the Python process running in the pod."""
self._kill_processes_in_pod(pod_name, "python")

def _drain_exec_stream(
self,
exec_resp: ws_client.WSClient,
timeout_ms: int,
) -> tuple[bytes, bytes, int | None, bool]:
"""Read stdout/stderr from an exec stream until completion or timeout.

Returns ``(stdout_bytes, stderr_bytes, exit_code, timed_out)``.
"""
stdout_data = b""
stderr_data = b""
exit_code: int | None = None
timed_out = False

end_time = time.time() + timeout_ms / 1000.0

while exec_resp.is_open():
remaining = end_time - time.time()
if remaining <= 0:
timed_out = True
break

exec_resp.update(timeout=min(remaining, 1))

if exec_resp.peek_stdout():
stdout_data += exec_resp.read_stdout().encode("utf-8")

if exec_resp.peek_stderr():
stderr_data += exec_resp.read_stderr().encode("utf-8")

error = exec_resp.read_channel(ws_client.ERROR_CHANNEL)
if error:
exit_code = _parse_exit_code(error)
break

exec_resp.close()
return stdout_data, stderr_data, exit_code, timed_out

@contextmanager
def _run_in_pod(
Expand Down Expand Up @@ -702,6 +747,56 @@ def reap_expired_sessions(self) -> int:
logger.info("Reaped expired session pod %s", metadata.name)
return reaped

def execute_bash_in_session(
self,
session_id: str,
*,
cmd: str,
timeout_ms: int,
max_output_bytes: int,
) -> ExecutionResult:
"""Run a bash command inside an existing session pod.

Network restrictions established at pod creation (the iptables init
container) remain in force — exec inherits the pod's network namespace.
"""
if not session_id.startswith(SESSION_NAME_PREFIX):
raise SessionNotFoundError(session_id)

try:
self.v1.read_namespaced_pod(session_id, self.namespace)
except ApiException as e:
if e.status == 404:
raise SessionNotFoundError(session_id) from e
raise

start = time.perf_counter()
exec_resp = self._stream_pod_exec(
session_id,
command=["bash", "-c", cmd],
stderr=True,
stdin=False,
stdout=True,
tty=False,
)

stdout_data, stderr_data, exit_code, timed_out = self._drain_exec_stream(
exec_resp, timeout_ms
)

if timed_out:
self._kill_processes_in_pod(session_id, "bash")

duration_ms = int((time.perf_counter() - start) * 1000)
return ExecutionResult(
stdout=self.truncate_output(stdout_data, max_output_bytes),
stderr=self.truncate_output(stderr_data, max_output_bytes),
exit_code=None if timed_out else exit_code,
timed_out=timed_out,
duration_ms=duration_ms,
files=tuple(),
)

def execute_python(
self,
*,
Expand Down Expand Up @@ -731,34 +826,9 @@ def execute_python(
logger.debug("Writing stdin to Python process")
ctx.exec_resp.write_stdin(stdin)

stdout_data = b""
stderr_data = b""
exit_code: int | None = None
timed_out = False

timeout_sec = timeout_ms / 1000.0
end_time = time.time() + timeout_sec

while ctx.exec_resp.is_open():
remaining = end_time - time.time()
if remaining <= 0:
timed_out = True
break

ctx.exec_resp.update(timeout=min(remaining, 1))

if ctx.exec_resp.peek_stdout():
stdout_data += ctx.exec_resp.read_stdout().encode("utf-8")

if ctx.exec_resp.peek_stderr():
stderr_data += ctx.exec_resp.read_stderr().encode("utf-8")

error = ctx.exec_resp.read_channel(ws_client.ERROR_CHANNEL)
if error:
exit_code = _parse_exit_code(error)
break

ctx.exec_resp.close()
stdout_data, stderr_data, exit_code, timed_out = self._drain_exec_stream(
ctx.exec_resp, timeout_ms
)

if timed_out:
self._kill_python_process(ctx.pod_name)
Expand Down
Loading
Loading