Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 69 additions & 4 deletions code-interpreter/app/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

from app.app_configs import get_settings
from app.models.schemas import (
CreateSessionRequest,
CreateSessionResponse,
ExecuteFile,
ExecuteRequest,
ExecuteResponse,
FileMetadataResponse,
Expand All @@ -19,7 +22,7 @@
WorkspaceFile,
)
from app.services.executor_base import EntryKind, StreamChunk, StreamResult, WorkspaceEntry
from app.services.executor_factory import execute_python, execute_python_streaming
from app.services.executor_factory import execute_python, execute_python_streaming, get_executor
from app.services.file_storage import FileStorageService

router = APIRouter()
Expand All @@ -46,8 +49,8 @@ def _validate_timeout(req: ExecuteRequest) -> None:
)


def _stage_request_files(
req: ExecuteRequest,
def _resolve_uploaded_files(
files: list[ExecuteFile],
storage: FileStorageService,
) -> tuple[list[tuple[str, bytes]], dict[str, bytes]]:
"""Resolve uploaded file IDs into content for the executor.
Expand All @@ -56,7 +59,7 @@ def _stage_request_files(
"""
staged_files: list[tuple[str, bytes]] = []
input_files_map: dict[str, bytes] = {}
for file in req.files:
for file in files:
try:
content, _ = storage.get_file(file.file_id)
except FileNotFoundError as exc:
Expand All @@ -69,6 +72,14 @@ def _stage_request_files(
return staged_files, input_files_map


def _stage_request_files(
req: ExecuteRequest,
storage: FileStorageService,
) -> tuple[list[tuple[str, bytes]], dict[str, bytes]]:
"""Resolve uploaded file IDs into content for the executor."""
return _resolve_uploaded_files(req.files, storage)


def _save_workspace_files(
entries: tuple[WorkspaceEntry, ...],
input_files_map: dict[str, bytes],
Expand Down Expand Up @@ -248,3 +259,57 @@ def delete_file(file_id: str) -> Response:
)

return Response(status_code=status.HTTP_204_NO_CONTENT)


@router.post(
"/sessions",
response_model=CreateSessionResponse,
status_code=status.HTTP_201_CREATED,
)
def create_session(req: CreateSessionRequest) -> CreateSessionResponse:
"""Create a long-lived code-executor pod.

The session must be torn down explicitly via DELETE /v1/sessions/{id}.
"""
settings = get_settings()
storage = get_file_storage()
staged_files, _ = _resolve_uploaded_files(req.files, storage)

try:
info = get_executor().create_session(
files=staged_files,
cpu_time_limit_sec=settings.cpu_time_limit_sec,
memory_limit_mb=settings.memory_limit_mb,
)
except NotImplementedError as exc:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail=str(exc),
) from exc
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(exc),
) from exc

return CreateSessionResponse(session_id=info.session_id)


@router.delete("/sessions/{session_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_session(session_id: str) -> Response:
"""Tear down a session pod by ID."""
try:
deleted = get_executor().delete_session(session_id)
except NotImplementedError as exc:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED,
detail=str(exc),
) from exc

if not deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session '{session_id}' not found",
)

return Response(status_code=status.HTTP_204_NO_CONTENT)
11 changes: 11 additions & 0 deletions code-interpreter/app/models/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,14 @@ class ListFilesResponse(BaseModel):
class HealthResponse(BaseModel):
status: Literal["ok", "error"]
message: StrictStr | None = None


class CreateSessionRequest(BaseModel):
files: list[ExecuteFile] = Field(
default_factory=list,
description="Files to stage in the session workspace at create time.",
)


class CreateSessionResponse(BaseModel):
session_id: StrictStr = Field(..., description="Identifier for the session pod/container.")
30 changes: 30 additions & 0 deletions code-interpreter/app/services/executor_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ class HealthCheck:
message: str | None = None


@dataclass(frozen=True, slots=True)
class SessionInfo:
"""Identifying information for a long-lived session."""

session_id: str


SESSION_NAME_PREFIX = "code-session-"
SESSION_APP_LABEL = "code-interpreter"
SESSION_COMPONENT_LABEL = "session"


class ExecutorProtocol(Protocol):
def execute_python(
self,
Expand Down Expand Up @@ -168,6 +180,24 @@ def execute_python_streaming(
"""
raise NotImplementedError(f"{type(self).__name__} does not support streaming execution")

def create_session(
self,
*,
files: Sequence[tuple[str, bytes]] | None = None,
cpu_time_limit_sec: int | None = None,
memory_limit_mb: int | None = None,
) -> SessionInfo:
"""Create a long-lived execution environment.

Returns identifying information for the session. The caller is
responsible for invoking ``delete_session`` when finished.
"""
raise NotImplementedError(f"{type(self).__name__} does not support sessions")

def delete_session(self, session_id: str) -> bool:
"""Tear down a session by ID. Returns True if found and deleted."""
raise NotImplementedError(f"{type(self).__name__} does not support sessions")

@staticmethod
def truncate_output(stream: bytes, max_bytes: int) -> str:
if len(stream) <= max_bytes:
Expand Down
59 changes: 59 additions & 0 deletions code-interpreter/app/services/executor_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
PYTHON_EXECUTOR_DOCKER_RUN_ARGS,
)
from app.services.executor_base import (
SESSION_APP_LABEL,
SESSION_COMPONENT_LABEL,
SESSION_NAME_PREFIX,
BaseExecutor,
EntryKind,
ExecutionResult,
HealthCheck,
SessionInfo,
StreamChunk,
StreamEvent,
StreamResult,
Expand All @@ -34,6 +38,10 @@

logger = logging.getLogger(__name__)

# Sessions keep their idle container alive for at most this many seconds; a
# follow-up PR replaces this with a per-session TTL plus a reaper.
SESSION_MAX_LIFETIME_SECONDS = 24 * 60 * 60


@dataclass
class _ExecContext:
Expand Down Expand Up @@ -394,6 +402,57 @@ def _run_in_container(
finally:
self._kill_container(container_name)

def create_session(
self,
*,
files: Sequence[tuple[str, bytes]] | None = None,
cpu_time_limit_sec: int | None = None,
memory_limit_mb: int | None = None,
) -> SessionInfo:
container_name = f"{SESSION_NAME_PREFIX}{uuid.uuid4().hex}"

cmd = self._build_run_command(
container_name=container_name,
cpu_time_limit_sec=cpu_time_limit_sec,
memory_limit_mb=memory_limit_mb,
sleep_seconds=SESSION_MAX_LIFETIME_SECONDS,
labels={
"app": SESSION_APP_LABEL,
"component": SESSION_COMPONENT_LABEL,
},
)
start_proc = subprocess.run(cmd, capture_output=True, text=True) # nosec B603
if start_proc.returncode != 0:
raise RuntimeError(f"Failed to start session container: {start_proc.stderr}")

try:
if files:
tar_archive = self._create_tar_archive(files=files)
self._upload_tar_to_container(container_name, tar_archive)
except Exception:
self._kill_container(container_name)
raise

logger.info("Created session container %s", container_name)
return SessionInfo(session_id=container_name)

def delete_session(self, session_id: str) -> bool:
if not session_id.startswith(SESSION_NAME_PREFIX):
return False
result = subprocess.run( # nosec B603
[self.docker_binary, "rm", "-f", session_id],
capture_output=True,
text=True,
)
# `docker rm -f <missing>` exits 0 on modern Docker, so check stderr
# for the "not found" message regardless of exit code.
stderr = (result.stderr or "").lower()
if "no such container" in stderr or "not found" in stderr:
return False
if result.returncode == 0:
return True
raise RuntimeError(f"Failed to delete session {session_id}: {result.stderr}")

def execute_python(
self,
*,
Expand Down
54 changes: 54 additions & 0 deletions code-interpreter/app/services/executor_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@
KUBERNETES_EXECUTOR_SERVICE_ACCOUNT,
)
from app.services.executor_base import (
SESSION_APP_LABEL,
SESSION_COMPONENT_LABEL,
SESSION_NAME_PREFIX,
BaseExecutor,
EntryKind,
ExecutionResult,
HealthCheck,
SessionInfo,
StreamChunk,
StreamEvent,
StreamResult,
Expand All @@ -45,6 +49,10 @@
POD_DELETE_RETRY_DELAY_SECONDS = 0.2
POD_DELETE_CONFIRM_TIMEOUT_SECONDS = 2.0

# Sessions keep their idle pod alive for at most this many seconds; a follow-up
# PR replaces this with a per-session TTL plus a reaper.
SESSION_MAX_LIFETIME_SECONDS = 24 * 60 * 60


def _parse_exit_code(error: str) -> int | None:
"""Parse the exit code from a Kubernetes exec error channel message."""
Expand Down Expand Up @@ -596,6 +604,52 @@ def _cleanup_pod(self, pod_name: str) -> None:
POD_DELETE_RETRIES,
)

def create_session(
self,
*,
files: Sequence[tuple[str, bytes]] | None = None,
cpu_time_limit_sec: int | None = None,
memory_limit_mb: int | None = None,
) -> SessionInfo:
pod_name = f"{SESSION_NAME_PREFIX}{uuid.uuid4().hex}"

manifest = self._create_pod_manifest(
pod_name=pod_name,
command=["sleep", str(SESSION_MAX_LIFETIME_SECONDS)],
labels={"app": SESSION_APP_LABEL, "component": SESSION_COMPONENT_LABEL},
memory_limit_mb=memory_limit_mb,
cpu_time_limit_sec=cpu_time_limit_sec,
)

logger.info("Creating session pod %s in namespace %s", pod_name, self.namespace)
self.v1.create_namespaced_pod(namespace=self.namespace, body=manifest)

try:
self._wait_for_pod_ready(pod_name)
if files:
tar_archive = self._create_tar_archive(files=files)
self._upload_tar_to_pod(pod_name, tar_archive)
except Exception:
self._cleanup_pod(pod_name)
raise

return SessionInfo(session_id=pod_name)

def delete_session(self, session_id: str) -> bool:
if not session_id.startswith(SESSION_NAME_PREFIX):
return False
try:
self.v1.delete_namespaced_pod(
name=session_id,
namespace=self.namespace,
body=client.V1DeleteOptions(grace_period_seconds=0),
)
except ApiException as e:
if e.status == 404:
return False
raise
return True

def execute_python(
self,
*,
Expand Down
Loading
Loading