Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions code-interpreter/app/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ def _stage_request_files(
req: ExecuteRequest,
storage: FileStorageService,
) -> tuple[list[tuple[str, bytes]], dict[str, bytes]]:
"""Resolve uploaded file IDs into content for the executor."""
"""Resolve uploaded file IDs into content for the executor.

Returns (staged_files, input_files_map).
"""
return _resolve_uploaded_files(req.files, storage)


Expand Down Expand Up @@ -267,16 +270,18 @@ def delete_file(file_id: str) -> Response:
status_code=status.HTTP_201_CREATED,
)
def create_session(req: CreateSessionRequest) -> CreateSessionResponse:
"""Create a long-lived code-executor pod.
"""Create a long-lived code-executor pod with the given TTL.

The session must be torn down explicitly via DELETE /v1/sessions/{id}.
The pod is guaranteed to be torn down at or before the TTL expires, even
if the API service crashes and restarts.
"""
settings = get_settings()
storage = get_file_storage()
staged_files, _ = _resolve_uploaded_files(req.files, storage)

try:
info = get_executor().create_session(
ttl_seconds=req.ttl_seconds,
files=staged_files,
cpu_time_limit_sec=settings.cpu_time_limit_sec,
memory_limit_mb=settings.memory_limit_mb,
Expand All @@ -292,7 +297,10 @@ def create_session(req: CreateSessionRequest) -> CreateSessionResponse:
detail=str(exc),
) from exc

return CreateSessionResponse(session_id=info.session_id)
return CreateSessionResponse(
session_id=info.session_id,
expires_at=info.expires_at,
)


@router.delete("/sessions/{session_id}", status_code=status.HTTP_204_NO_CONTENT)
Expand Down
34 changes: 31 additions & 3 deletions code-interpreter/app/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import annotations

import asyncio
import logging
import subprocess
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager, suppress
from shutil import which
from typing import Final

Expand All @@ -14,6 +15,8 @@
from app.models.schemas import HealthResponse
from app.services.executor_factory import get_executor

SESSION_REAPER_INTERVAL_SEC = 30

# Configure logging
logging.basicConfig(
level=logging.INFO,
Expand Down Expand Up @@ -78,6 +81,24 @@ def _ensure_docker_image_available() -> None:
) from e


async def _reap_expired_sessions_once() -> None:
"""Run a single reap pass via the configured executor."""
try:
count = await asyncio.to_thread(get_executor().reap_expired_sessions)
except Exception:
logger.warning("Session reaper pass failed", exc_info=True)
return
if count > 0:
logger.info("Reaped %d expired session(s)", count)


async def _session_reaper_loop() -> None:
"""Periodically delete sessions whose TTL has elapsed."""
while True:
await asyncio.sleep(SESSION_REAPER_INTERVAL_SEC)
await _reap_expired_sessions_once()


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""Manage application lifespan events."""
Expand All @@ -87,9 +108,16 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
_ensure_docker_image_available()
logger.info("Docker executor image is ready")

yield
# Reap any sessions whose TTL elapsed while the service was down.
await _reap_expired_sessions_once()
reaper_task = asyncio.create_task(_session_reaper_loop())

# Shutdown: Add any cleanup logic here if needed in the future
try:
yield
finally:
reaper_task.cancel()
with suppress(asyncio.CancelledError):
await reaper_task


def create_app() -> FastAPI:
Expand Down
16 changes: 16 additions & 0 deletions code-interpreter/app/models/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,28 @@ class HealthResponse(BaseModel):
message: StrictStr | None = None


DEFAULT_SESSION_TTL_SEC = 15 * 60
MAX_SESSION_TTL_SEC = 24 * 60 * 60


class CreateSessionRequest(BaseModel):
files: list[ExecuteFile] = Field(
default_factory=list,
description="Files to stage in the session workspace at create time.",
)
ttl_seconds: StrictInt = Field(
DEFAULT_SESSION_TTL_SEC,
ge=1,
le=MAX_SESSION_TTL_SEC,
description=(
"Session lifetime in seconds. The session pod is automatically "
"destroyed after this duration even if the API service crashes."
),
)


class CreateSessionResponse(BaseModel):
session_id: StrictStr = Field(..., description="Identifier for the session pod/container.")
expires_at: float = Field(
..., description="Unix timestamp when the session is scheduled to expire."
)
12 changes: 10 additions & 2 deletions code-interpreter/app/services/executor_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ class SessionInfo:
"""Identifying information for a long-lived session."""

session_id: str
expires_at: float


SESSION_NAME_PREFIX = "code-session-"
SESSION_APP_LABEL = "code-interpreter"
SESSION_COMPONENT_LABEL = "session"
SESSION_EXPIRES_AT_KEY = "code-interpreter.expires-at"


class ExecutorProtocol(Protocol):
Expand Down Expand Up @@ -183,21 +185,27 @@ def execute_python_streaming(
def create_session(
self,
*,
ttl_seconds: int,
files: Sequence[tuple[str, bytes]] | None = None,
cpu_time_limit_sec: int | None = None,
memory_limit_mb: int | None = None,
) -> SessionInfo:
"""Create a long-lived execution environment.

Returns identifying information for the session. The caller is
responsible for invoking ``delete_session`` when finished.
Returns identifying information for the session. The session is
guaranteed to be torn down at or before ``expires_at`` even if this
process crashes.
"""
raise NotImplementedError(f"{type(self).__name__} does not support sessions")

def delete_session(self, session_id: str) -> bool:
"""Tear down a session by ID. Returns True if found and deleted."""
raise NotImplementedError(f"{type(self).__name__} does not support sessions")

def reap_expired_sessions(self) -> int:
"""Delete sessions whose TTL has elapsed. Returns number reaped."""
return 0

@staticmethod
def truncate_output(stream: bytes, max_bytes: int) -> str:
if len(stream) <= max_bytes:
Expand Down
71 changes: 60 additions & 11 deletions code-interpreter/app/services/executor_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from app.services.executor_base import (
SESSION_APP_LABEL,
SESSION_COMPONENT_LABEL,
SESSION_EXPIRES_AT_KEY,
SESSION_NAME_PREFIX,
BaseExecutor,
EntryKind,
Expand All @@ -38,10 +39,6 @@

logger = logging.getLogger(__name__)

# Sessions keep their idle container alive for at most this many seconds; a
# follow-up PR replaces this with a per-session TTL plus a reaper.
SESSION_MAX_LIFETIME_SECONDS = 24 * 60 * 60


@dataclass
class _ExecContext:
Expand Down Expand Up @@ -405,20 +402,23 @@ def _run_in_container(
def create_session(
self,
*,
ttl_seconds: int,
files: Sequence[tuple[str, bytes]] | None = None,
cpu_time_limit_sec: int | None = None,
memory_limit_mb: int | None = None,
) -> SessionInfo:
container_name = f"{SESSION_NAME_PREFIX}{uuid.uuid4().hex}"
expires_at = time.time() + ttl_seconds

cmd = self._build_run_command(
container_name=container_name,
cpu_time_limit_sec=cpu_time_limit_sec,
memory_limit_mb=memory_limit_mb,
sleep_seconds=SESSION_MAX_LIFETIME_SECONDS,
sleep_seconds=ttl_seconds,
labels={
"app": SESSION_APP_LABEL,
"component": SESSION_COMPONENT_LABEL,
SESSION_EXPIRES_AT_KEY: str(expires_at),
},
)
start_proc = subprocess.run(cmd, capture_output=True, text=True) # nosec B603
Expand All @@ -433,8 +433,8 @@ def create_session(
self._kill_container(container_name)
raise

logger.info("Created session container %s", container_name)
return SessionInfo(session_id=container_name)
logger.info("Created session container %s (expires at %s)", container_name, expires_at)
return SessionInfo(session_id=container_name, expires_at=expires_at)

def delete_session(self, session_id: str) -> bool:
if not session_id.startswith(SESSION_NAME_PREFIX):
Expand All @@ -444,15 +444,64 @@ def delete_session(self, session_id: str) -> bool:
capture_output=True,
text=True,
)
# `docker rm -f <missing>` exits 0 on modern Docker, so check stderr
# for the "not found" message regardless of exit code.
if result.returncode == 0:
return True
# docker rm -f exits non-zero only when the container does not exist.
stderr = (result.stderr or "").lower()
if "no such container" in stderr or "not found" in stderr:
return False
if result.returncode == 0:
return True
raise RuntimeError(f"Failed to delete session {session_id}: {result.stderr}")

def reap_expired_sessions(self) -> int:
list_cmd = [
self.docker_binary,
"ps",
"-a",
"--filter",
f"label=app={SESSION_APP_LABEL}",
"--filter",
f"label=component={SESSION_COMPONENT_LABEL}",
"--format",
f'{{{{.Names}}}}\t{{{{.Label "{SESSION_EXPIRES_AT_KEY}"}}}}',
]
try:
list_result = subprocess.run( # nosec B603
list_cmd, capture_output=True, text=True, timeout=10
)
except subprocess.TimeoutExpired:
logger.warning("Timed out listing session containers for reap")
return 0

if list_result.returncode != 0:
logger.warning("Failed to list session containers: %s", list_result.stderr)
return 0

now = time.time()
reaped = 0
for line in list_result.stdout.splitlines():
name, _, expires_str = line.partition("\t")
name = name.strip()
expires_str = expires_str.strip()
if not name or not expires_str:
continue
try:
expires_at = float(expires_str)
except ValueError:
continue
if expires_at >= now:
continue
rm_result = subprocess.run( # nosec B603
[self.docker_binary, "rm", "-f", name],
capture_output=True,
text=True,
)
if rm_result.returncode == 0:
reaped += 1
logger.info("Reaped expired session container %s", name)
else:
logger.warning("Failed to reap session container %s: %s", name, rm_result.stderr)
return reaped

def execute_python(
self,
*,
Expand Down
Loading
Loading