Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions conf/terminalbench.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defaults:
- base
- override streams: files
- _self_

world:
actor_fraction: 4
preprocessor_fraction: 0
finetune_fraction: 4

model_path: Qwen/Qwen2.5-7B

finetune:
seq_length: 100000 # input + output tokens
max_train_steps: 1024
train_batch_size: 1
gradient_accumulation_passes: 8
seq_parallel: 4
rl:
policy_loss: gspo
epsilon_high: 4e-4
epsilon_low: 3e-4

eval_every_n_versions: 0

llm:
parameters:
max_tokens: 16000 # output tokens
temperature: 1.0
test_llm:
parameters:
max_tokens: ${...llm.parameters.max_tokens}
temperature: 1.0

vllm_config:
vllm_kwargs:
max_model_len: 100000 # input + output tokens

actor:
rollout_policy: pipelinerl.domains.terminalbench.rollouts.generate_rollout
shared_memory_entry_size: 200000000
llm_max_rollouts: 1 # container tasks are slow; keep concurrency low
max_steps: 100 # max LLM calls per rollout

preprocess:
shared_memory_entry_size: 200000000


# ENVIRONMENT CONFIGURATION
environment:
_target_: pipelinerl.domains.terminalbench.environment_server.ContainerEnvironmentServer
cube_backend:
_target_: cube.backends.toolkit.ToolkitContainerBackend
account: snow.research.adea
timeout_seconds: 1800
n_parallel_envs: 8
host: "0.0.0.0"
env_call_timeout: 120 # timeout per exec/evaluate call in seconds
start_task_timeout: 600 # timeout for container launch (queuing + running)

# DATASET CONFIGURATION
dataset_loader: pipelinerl.domains.terminalbench.load_tasks.load_tasks
dataset_loader_params:
train_split: 0.6
max_train_examples: 32
max_test_examples: 4
train_dataset_names:
- train
test_dataset_names:
- test
151 changes: 151 additions & 0 deletions pipelinerl/domains/terminalbench/environment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import base64
import io
import logging
import shlex
import tarfile
import time
from pathlib import Path

from cube.container import Container, ContainerBackend, ContainerConfig

logger = logging.getLogger(__name__)


class ContainerEnvironment:
"""
Wraps a single container as a terminal environment for one task.

A fresh container is launched in start_task() using the task's ContainerConfig
and stopped in release(). No container is held between tasks.
"""

def __init__(self, backend: ContainerBackend) -> None:
self.backend = backend
self.container: Container | None = None
self._task_id: str = ""

def start_task(self, task_data: dict) -> dict:
"""Launch a fresh container and prepare it for the task."""
self._task_id = task_data.get("id", "unknown")
container_config = ContainerConfig(**task_data.get("container_config", {}))
logger.info(f"[{self._task_id}] Launching container (image={container_config.image})")

t0 = time.monotonic()
self.container = self.backend.launch(container_config)
logger.info(
f"[{self._task_id}] Container {self.container.id} running "
f"(launch took {time.monotonic() - t0:.1f}s)"
)

self.container.exec("rm -rf /app && mkdir -p /app", timeout=30)

if archive_b64 := task_data.get("archive_b64"):
logger.debug(f"[{self._task_id}] Uploading task archive to /app")
self._upload_archive_subset(
archive_b64,
target_dir="/app",
skip_prefixes=("tests/", "solution/", "task.toml", "instruction.md"),
)

for cmd in task_data.get("setup_commands", []):
logger.debug(f"[{self._task_id}] Setup: {cmd}")
result = self.container.exec(cmd)
if result.exit_code != 0:
logger.warning(
f"[{self._task_id}] Setup command failed (exit {result.exit_code}): {cmd}\n{result.stderr}"
)

logger.info(f"[{self._task_id}] Task ready in container {self.container.id}")
return {
"task_id": self._task_id,
"description": task_data.get("description", ""),
}

def exec(self, command: str, timeout: int | None = None) -> dict:
"""Run a shell command in the container and return stdout/stderr/exit_code."""
assert self.container is not None, "No active container"
logger.debug(f"[{self._task_id}] exec: {command!r}")
result = self.container.exec(command, timeout=timeout)
logger.debug(
f"[{self._task_id}] exec done in {result.duration_seconds:.1f}s "
f"(exit={result.exit_code}): {command!r}"
)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"exit_code": result.exit_code,
"duration_seconds": result.duration_seconds,
}

def evaluate(self, archive_b64: str, test_timeout_sec: int = 900) -> dict:
"""Upload tests from the archive, run test.sh, and return the reward."""
assert self.container is not None, "No active container"
logger.info(f"[{self._task_id}] Running evaluation (timeout={test_timeout_sec}s)")

self.container.exec("mkdir -p /tests /logs/verifier")
logger.debug(f"[{self._task_id}] Uploading test archive")
self._upload_archive_subset(archive_b64, target_dir="/", include_prefix="tests/")
self.container.exec("chmod +x /tests/test.sh")

t0 = time.monotonic()
self.container.exec("cd /app && bash /tests/test.sh", timeout=test_timeout_sec)
logger.debug(f"[{self._task_id}] test.sh finished in {time.monotonic() - t0:.1f}s")

reward_result = self.container.exec("cat /logs/verifier/reward.txt 2>/dev/null || echo 0")
try:
reward = float(reward_result.stdout.strip().split()[0])
except (ValueError, IndexError):
reward = 0.0

logger.info(f"[{self._task_id}] Evaluation complete: reward={reward}")
return {"reward": reward}

def release(self) -> None:
"""Stop the container. Called when the task session ends."""
if self.container is not None:
logger.info(f"[{self._task_id}] Stopping container {self.container.id}")
try:
self.container.stop()
logger.info(f"[{self._task_id}] Container {self.container.id} stopped")
except Exception as e:
logger.warning(f"[{self._task_id}] Error stopping container {self.container.id}: {e}")
self.container = None

# ── Private helpers ────────────────────────────────────────────

def _upload_archive_subset(
self,
archive_b64: str,
target_dir: str,
skip_prefixes: tuple[str, ...] = (),
include_prefix: str | None = None,
) -> None:
archive_bytes = base64.b64decode(archive_b64)
with tarfile.open(fileobj=io.BytesIO(archive_bytes), mode="r:gz") as tar:
for member in tar.getmembers():
if not member.isfile():
continue

if include_prefix is not None:
if not member.name.startswith(include_prefix):
continue
rel = member.name[len(include_prefix):]
else:
if any(member.name.startswith(p) for p in skip_prefixes):
continue
rel = member.name

f = tar.extractfile(member)
if f is None:
continue
content = f.read()

remote_path = f"{target_dir.rstrip('/')}/{rel}"
parent = shlex.quote(str(Path(remote_path).parent))
remote = shlex.quote(remote_path)
b64 = base64.b64encode(content).decode("ascii")
logger.debug(f"[{self._task_id}] Uploading {remote_path}")
self.container.exec(f"mkdir -p {parent}") # type: ignore[union-attr]
self.container.exec( # type: ignore[union-attr]
f'printf "%s" {shlex.quote(b64)} | base64 -d > {remote}'
)
175 changes: 175 additions & 0 deletions pipelinerl/domains/terminalbench/environment_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import asyncio
import logging
import uuid

import uvicorn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

from cube.container import ContainerBackend
from pipelinerl.domains.terminalbench.environment import ContainerEnvironment

logger = logging.getLogger(__name__)


class ContainerEnvironmentServer:
"""
Manages on-demand container environments exposed via HTTP.

Each incoming /start_task request launches a fresh container using the
task's ContainerConfig. Concurrency is capped at n_parallel_envs; requests
beyond that receive a 503 immediately.
"""

def __init__(
self,
cube_backend: ContainerBackend,
n_parallel_envs: int = 8,
host: str = "0.0.0.0",
env_call_timeout: int = 120,
start_task_timeout: int = 600,
) -> None:
self.cube_backend = cube_backend
self.n_parallel_envs = n_parallel_envs
self.host = host
self.env_call_timeout = env_call_timeout
self.start_task_timeout = start_task_timeout

self.sessions: dict[str, ContainerEnvironment] = {}
self._active = 0
self._lock = asyncio.Lock()

async def _try_acquire(self) -> bool:
async with self._lock:
if self._active >= self.n_parallel_envs:
return False
self._active += 1
return True

async def _release_slot(self) -> None:
async with self._lock:
self._active = max(0, self._active - 1)

def create_app(self) -> FastAPI:
app = FastAPI(title="Container Environment Server")

class SessionRequest(BaseModel):
session_id: str

class ExecRequest(BaseModel):
session_id: str
command: str
timeout: int | None = None

class StartTaskRequest(BaseModel):
task_data: dict = {}

class EvaluateRequest(BaseModel):
session_id: str
archive_b64: str
test_timeout_sec: int = 900

def _get_env(session_id: str) -> ContainerEnvironment:
if session_id not in self.sessions:
raise HTTPException(status_code=400, detail=f"Invalid or expired session: {session_id}")
return self.sessions[session_id]

@app.post("/start_task")
async def start_task(request: StartTaskRequest):
if not await self._try_acquire():
raise HTTPException(status_code=503, detail="Server at capacity")

task_id = request.task_data.get("id", "unknown")
session_id = str(uuid.uuid4())
env = ContainerEnvironment(self.cube_backend)
self.sessions[session_id] = env
logger.info(
f"[{task_id}] Session {session_id[:8]} acquired "
f"(active={self._active}/{self.n_parallel_envs})"
)
loop = asyncio.get_running_loop()
try:
result = await asyncio.wait_for(
loop.run_in_executor(None, env.start_task, request.task_data),
timeout=self.start_task_timeout,
)
except Exception as e:
self.sessions.pop(session_id, None)
await self._release_slot()
if isinstance(e, asyncio.TimeoutError):
logger.error(f"[{task_id}] start_task timed out for session {session_id[:8]}")
raise HTTPException(status_code=503, detail="start_task timed out")
logger.exception(f"[{task_id}] start_task failed for session {session_id[:8]}: {e}")
raise HTTPException(status_code=500, detail=str(e))
return {"session_id": session_id, **result}

@app.post("/release")
async def release(request: SessionRequest):
env = _get_env(request.session_id)
task_id = env._task_id
self.sessions.pop(request.session_id, None)
loop = asyncio.get_running_loop()
try:
await asyncio.wait_for(
loop.run_in_executor(None, env.release),
timeout=self.env_call_timeout,
)
except Exception as e:
logger.warning(f"[{task_id}] Error releasing session {request.session_id[:8]}: {e}")
finally:
await self._release_slot()
logger.info(
f"[{task_id}] Session {request.session_id[:8]} released "
f"(active={self._active}/{self.n_parallel_envs})"
)
return {"status": "ok"}

@app.post("/exec")
async def exec_command(request: ExecRequest):
env = _get_env(request.session_id)
loop = asyncio.get_running_loop()
call_timeout = (request.timeout or 0) + 5 or self.env_call_timeout
try:
result = await asyncio.wait_for(
loop.run_in_executor(None, env.exec, request.command, request.timeout),
timeout=call_timeout,
)
except asyncio.TimeoutError:
raise HTTPException(status_code=503, detail=f"exec timed out: {request.command!r}")
return result

@app.post("/evaluate")
async def evaluate(request: EvaluateRequest):
env = _get_env(request.session_id)
loop = asyncio.get_running_loop()
call_timeout = request.test_timeout_sec + 30
try:
result = await asyncio.wait_for(
loop.run_in_executor(
None, env.evaluate, request.archive_b64, request.test_timeout_sec
),
timeout=call_timeout,
)
except asyncio.TimeoutError:
raise HTTPException(status_code=503, detail="evaluate timed out")
return result

@app.get("/health")
async def health():
return {
"status": "ok",
"active": self._active,
"n_parallel_envs": self.n_parallel_envs,
"free": self.n_parallel_envs - self._active,
}

return app

def launch(self, port: int) -> None:
"""Start the HTTP server. Blocking."""
app = self.create_app()
logger.info(
f"Starting Container Environment Server at http://{self.host}:{port} "
f"(n_parallel_envs={self.n_parallel_envs})"
)
uvicorn.run(app, host=self.host, port=port, timeout_keep_alive=3600, log_level="info")
Loading