Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 70 additions & 64 deletions code-interpreter/app/services/executor_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import tarfile
import time
import uuid
from collections.abc import Generator, Sequence
from collections.abc import Generator, Mapping, Sequence
from contextlib import contextmanager, suppress
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -134,58 +134,51 @@ def _validate_relative_path(self, path_str: str) -> Path:

def _create_tar_archive(
self,
code: str,
code: str | None = None,
files: Sequence[tuple[str, bytes]] | None = None,
last_line_interactive: bool = True,
) -> bytes:
"""Create a tar archive containing the code and any additional files.
"""Create a tar archive optionally containing an entrypoint and files.

Args:
last_line_interactive: If True, wrap code so the last line prints its value
if it's a bare expression (only the last line is affected).
code: If provided, written as ``__main__.py`` at the archive root.
last_line_interactive: If True and code is provided, wrap the code so
the last line prints its value if it's a bare expression.
"""
tar_buffer = io.BytesIO()
with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
# Add __main__.py - optionally wrap in last-line-interactive mode
code_to_execute = code
if last_line_interactive:
# Wrap to make the last expression value print to stdout like Jupyter/REPL
code_to_execute = wrap_last_line_interactive(code)

code_bytes = code_to_execute.encode("utf-8")
code_info = tarfile.TarInfo(name="__main__.py")
code_info.size = len(code_bytes)
code_info.mode = 0o644
tar.addfile(code_info, io.BytesIO(code_bytes))

# Track directories we've created
created_dirs = set()

# Add any additional files
if files:
for file_path, content in files:
# Validate the path
validated_path = self._validate_relative_path(file_path)
if validated_path == Path("__main__.py"):
raise ValueError(
"File path '__main__.py' is reserved for the execution entrypoint."
)

# Create parent directories if needed
parent_parts = validated_path.parts[:-1]
for i in range(len(parent_parts)):
dir_path = "/".join(parent_parts[: i + 1])
if dir_path not in created_dirs:
dir_info = tarfile.TarInfo(name=dir_path + "/")
dir_info.type = tarfile.DIRTYPE
dir_info.mode = 0o755
tar.addfile(dir_info)
created_dirs.add(dir_path)

file_info = tarfile.TarInfo(name=validated_path.as_posix())
file_info.size = len(content)
file_info.mode = 0o644
tar.addfile(file_info, io.BytesIO(content))
if code is not None:
code_to_execute = (
wrap_last_line_interactive(code) if last_line_interactive else code
)
code_bytes = code_to_execute.encode("utf-8")
code_info = tarfile.TarInfo(name="__main__.py")
code_info.size = len(code_bytes)
code_info.mode = 0o644
tar.addfile(code_info, io.BytesIO(code_bytes))

created_dirs: set[str] = set()
for file_path, content in files or ():
validated_path = self._validate_relative_path(file_path)
if code is not None and validated_path == Path("__main__.py"):
raise ValueError(
"File path '__main__.py' is reserved for the execution entrypoint."
)

parent_parts = validated_path.parts[:-1]
for i in range(len(parent_parts)):
dir_path = "/".join(parent_parts[: i + 1])
if dir_path not in created_dirs:
dir_info = tarfile.TarInfo(name=dir_path + "/")
dir_info.type = tarfile.DIRTYPE
dir_info.mode = 0o755
tar.addfile(dir_info)
created_dirs.add(dir_path)

file_info = tarfile.TarInfo(name=validated_path.as_posix())
file_info.size = len(content)
file_info.mode = 0o644
tar.addfile(file_info, io.BytesIO(content))

return tar_buffer.getvalue()

Expand Down Expand Up @@ -245,11 +238,16 @@ def _build_run_command(
container_name: str,
cpu_time_limit_sec: int | None,
memory_limit_mb: int | None,
timeout_ms: int,
sleep_seconds: int,
labels: Mapping[str, str] | None = None,
) -> list[str]:
"""Build the ``docker run`` command for an ephemeral container."""
# Start the container in detached mode
# We need CAP_CHOWN to set up the workspace, but we'll drop privileges for execution
"""Build a detached ``docker run`` command.

``sleep_seconds`` controls how long the container's idle ``sleep`` lasts;
callers must ensure it exceeds their work duration. ``labels`` are
attached for later filtering (e.g. by the session reaper).
"""
# We need CAP_CHOWN to set up the workspace, but drop privileges for execution
cmd: list[str] = [
self.docker_binary,
"run",
Expand All @@ -267,7 +265,6 @@ def _build_run_command(
"64",
"--security-opt",
"no-new-privileges",
# Keep CAP_CHOWN to allow setting up workspace permissions
"--cap-drop",
"ALL",
"--cap-add",
Expand All @@ -288,31 +285,26 @@ def _build_run_command(
"MPLCONFIGDIR=/tmp/matplotlib",
]

for key, value in (labels or {}).items():
cmd.extend(["--label", f"{key}={value}"])

if cpu_time_limit_sec is not None:
cpu_limit = max(int(cpu_time_limit_sec), 1)
cpu_limit = max(cpu_time_limit_sec, 1)
cmd.extend(["--ulimit", f"cpu={cpu_limit}:{cpu_limit}"])

if memory_limit_mb is not None:
memory_limit = max(int(memory_limit_mb), 16)
memory_limit = max(memory_limit_mb, 16)
mem_flag = f"{memory_limit}m"
cmd.extend(["--memory", mem_flag, "--memory-swap", mem_flag])

if self.run_args:
cmd.extend(shlex.split(self.run_args))

# Just sleep - workspace is already created as tmpfs with correct ownership
cmd.extend([self.image, "sleep", str((timeout_ms * 1000) + 10)])
cmd.extend([self.image, "sleep", str(sleep_seconds)])
return cmd

def _stage_files_in_container(
self,
container_name: str,
code: str,
files: Sequence[tuple[str, bytes]] | None,
last_line_interactive: bool,
) -> None:
"""Create a tar archive and stream it into the container workspace."""
tar_archive = self._create_tar_archive(code, files, last_line_interactive)
def _upload_tar_to_container(self, container_name: str, tar_archive: bytes) -> None:
"""Stream a tar archive into the container workspace."""
tar_cmd = [
self.docker_binary,
"exec",
Expand All @@ -331,6 +323,17 @@ def _stage_files_in_container(
f"Failed to extract files: {tar_proc.stderr.decode('utf-8', errors='replace')}"
)

def _stage_files_in_container(
self,
container_name: str,
code: str,
files: Sequence[tuple[str, bytes]] | None,
last_line_interactive: bool,
) -> None:
"""Create a tar archive and stream it into the container workspace."""
tar_archive = self._create_tar_archive(code, files, last_line_interactive)
self._upload_tar_to_container(container_name, tar_archive)

@contextmanager
def _run_in_container(
self,
Expand All @@ -351,7 +354,10 @@ def _run_in_container(
container_name = f"code-exec-{uuid.uuid4().hex}"

cmd = self._build_run_command(
container_name, cpu_time_limit_sec, memory_limit_mb, timeout_ms
container_name=container_name,
cpu_time_limit_sec=cpu_time_limit_sec,
memory_limit_mb=memory_limit_mb,
sleep_seconds=(timeout_ms * 1000) + 10,
)
start_proc = subprocess.run(cmd, capture_output=True, text=True) # nosec B603
if start_proc.returncode != 0:
Expand Down
124 changes: 64 additions & 60 deletions code-interpreter/app/services/executor_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import tarfile
import time
import uuid
from collections.abc import Generator, Sequence
from collections.abc import Generator, Mapping, Sequence
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -131,30 +131,36 @@ def check_health(self) -> HealthCheck:
def _create_pod_manifest(
self,
pod_name: str,
memory_limit_mb: int | None,
cpu_time_limit_sec: int | None,
*,
command: Sequence[str],
labels: Mapping[str, str],
annotations: Mapping[str, str] | None = None,
active_deadline_seconds: int | None = None,
memory_limit_mb: int | None = None,
cpu_time_limit_sec: int | None = None,
) -> V1Pod:
"""Create a Kubernetes pod manifest for code execution."""
"""Build a Kubernetes pod manifest for an isolated executor container.

resources: dict[str, dict[str, Any]] = {
"limits": {},
"requests": {},
}
``command`` is the executor container's command (e.g. ``["sleep", "3600"]``).
``active_deadline_seconds``, when set, instructs kubelet to stop the pod
at that age — used by sessions to enforce TTL even if the API is down.
"""
resources: dict[str, dict[str, Any]] = {"limits": {}, "requests": {}}

if memory_limit_mb is not None:
memory_limit = max(int(memory_limit_mb), 16)
memory_limit = max(memory_limit_mb, 16)
resources["limits"]["memory"] = f"{memory_limit}Mi"
resources["requests"]["memory"] = f"{min(memory_limit, 64)}Mi"

if cpu_time_limit_sec is not None:
cpu_limit = max(int(cpu_time_limit_sec), 1)
cpu_limit = max(cpu_time_limit_sec, 1)
resources["limits"]["cpu"] = str(cpu_limit)
resources["requests"]["cpu"] = "100m"

container = V1Container(
name="executor",
image=self.image,
command=["sleep", "3600"],
command=list(command),
working_dir="/workspace",
resources=resources if resources["limits"] else None,
security_context={
Expand Down Expand Up @@ -204,6 +210,7 @@ def _create_pod_manifest(
init_containers=[network_lockdown_container],
containers=[container],
restart_policy="Never",
active_deadline_seconds=active_deadline_seconds,
service_account_name=self.service_account if self.service_account else None,
volumes=[
{"name": "workspace", "emptyDir": {"sizeLimit": "100Mi"}},
Expand All @@ -218,70 +225,65 @@ def _create_pod_manifest(
metadata = V1ObjectMeta(
name=pod_name,
namespace=self.namespace,
labels={
"app": "code-interpreter",
"component": "executor",
},
labels=dict(labels),
annotations=dict(annotations) if annotations else None,
)

return V1Pod(api_version="v1", kind="Pod", metadata=metadata, spec=spec)

def _create_tar_archive(
self,
code: str,
code: str | None = None,
files: Sequence[tuple[str, bytes]] | None = None,
last_line_interactive: bool = True,
) -> bytes:
"""Create a tar archive containing the code and any additional files.
"""Create a tar archive optionally containing an entrypoint and files.

Args:
last_line_interactive: If True, wrap code so the last line prints its value
if it's a bare expression (only the last line is affected).
code: If provided, written as ``__main__.py`` at the archive root.
last_line_interactive: If True and code is provided, wrap the code so
the last line prints its value if it's a bare expression.
"""
tar_buffer = io.BytesIO()
with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
# Add __main__.py - optionally wrap in last-line-interactive mode
code_to_execute = code
if last_line_interactive:
# Wrap to make the last expression value print to stdout like Jupyter/REPL
code_to_execute = wrap_last_line_interactive(code)

code_bytes = code_to_execute.encode("utf-8")
code_info = tarfile.TarInfo(name="__main__.py")
code_info.size = len(code_bytes)
code_info.mode = 0o644
code_info.uid = 65532
code_info.gid = 65532
tar.addfile(code_info, io.BytesIO(code_bytes))

created_dirs = set()

if files:
for file_path, content in files:
validated_path = self._validate_relative_path(file_path)
if validated_path == Path("__main__.py"):
raise ValueError(
"File path '__main__.py' is reserved for the execution entrypoint."
)
if code is not None:
code_to_execute = (
wrap_last_line_interactive(code) if last_line_interactive else code
)
code_bytes = code_to_execute.encode("utf-8")
code_info = tarfile.TarInfo(name="__main__.py")
code_info.size = len(code_bytes)
code_info.mode = 0o644
code_info.uid = 65532
code_info.gid = 65532
tar.addfile(code_info, io.BytesIO(code_bytes))

created_dirs: set[str] = set()
for file_path, content in files or ():
validated_path = self._validate_relative_path(file_path)
if code is not None and validated_path == Path("__main__.py"):
raise ValueError(
"File path '__main__.py' is reserved for the execution entrypoint."
)

parent_parts = validated_path.parts[:-1]
for i in range(len(parent_parts)):
dir_path = "/".join(parent_parts[: i + 1])
if dir_path not in created_dirs:
dir_info = tarfile.TarInfo(name=dir_path + "/")
dir_info.type = tarfile.DIRTYPE
dir_info.mode = 0o755
dir_info.uid = 65532
dir_info.gid = 65532
tar.addfile(dir_info)
created_dirs.add(dir_path)

file_info = tarfile.TarInfo(name=validated_path.as_posix())
file_info.size = len(content)
file_info.mode = 0o644
file_info.uid = 65532
file_info.gid = 65532
tar.addfile(file_info, io.BytesIO(content))
parent_parts = validated_path.parts[:-1]
for i in range(len(parent_parts)):
dir_path = "/".join(parent_parts[: i + 1])
if dir_path not in created_dirs:
dir_info = tarfile.TarInfo(name=dir_path + "/")
dir_info.type = tarfile.DIRTYPE
dir_info.mode = 0o755
dir_info.uid = 65532
dir_info.gid = 65532
tar.addfile(dir_info)
created_dirs.add(dir_path)

file_info = tarfile.TarInfo(name=validated_path.as_posix())
file_info.size = len(content)
file_info.mode = 0o644
file_info.uid = 65532
file_info.gid = 65532
tar.addfile(file_info, io.BytesIO(content))

return tar_buffer.getvalue()

Expand Down Expand Up @@ -404,6 +406,8 @@ def _run_in_pod(

pod_manifest = self._create_pod_manifest(
pod_name=pod_name,
command=["sleep", "3600"],
labels={"app": "code-interpreter", "component": "executor"},
memory_limit_mb=memory_limit_mb,
cpu_time_limit_sec=cpu_time_limit_sec,
)
Expand Down
Loading