From fcdcd861e895288db1cb784969b46cf795c38459 Mon Sep 17 00:00:00 2001 From: ehsk Date: Wed, 15 Apr 2026 16:18:55 +0000 Subject: [PATCH 1/4] RL on TerminalBench using CUBE --- conf/terminalbench.yaml | 72 ++++ .../domains/terminalbench/environment.py | 151 +++++++++ .../terminalbench/environment_server.py | 175 ++++++++++ .../domains/terminalbench/load_tasks.py | 51 +++ pipelinerl/domains/terminalbench/rollouts.py | 319 ++++++++++++++++++ pipelinerl/launch.py | 15 + pyproject.toml | 14 + 7 files changed, 797 insertions(+) create mode 100644 conf/terminalbench.yaml create mode 100644 pipelinerl/domains/terminalbench/environment.py create mode 100644 pipelinerl/domains/terminalbench/environment_server.py create mode 100644 pipelinerl/domains/terminalbench/load_tasks.py create mode 100644 pipelinerl/domains/terminalbench/rollouts.py diff --git a/conf/terminalbench.yaml b/conf/terminalbench.yaml new file mode 100644 index 00000000..21c2e93c --- /dev/null +++ b/conf/terminalbench.yaml @@ -0,0 +1,72 @@ +defaults: + - base + - override streams: redis + - _self_ + +world: + actor_fraction: 4 + preprocessor_fraction: 0 + finetune_fraction: 4 + +model_path: Qwen/Qwen2.5-7B + +finetune: + seq_length: 64000 # input + output tokens + max_train_steps: 1024 + train_batch_size: 1 + gradient_accumulation_passes: 1024 + seq_parallel: 4 + rl: + policy_loss: gspo + epsilon_high: 4e-4 + epsilon_low: 3e-4 + +eval_every_n_versions: 0 + +llm: + parameters: + max_tokens: 16000 # output tokens + temperature: 1.0 +test_llm: + parameters: + max_tokens: ${...llm.parameters.max_tokens} + temperature: 1.0 + +vllm_config: + vllm_kwargs: + max_model_len: 64000 # input + output tokens + +actor: + rollout_policy: pipelinerl.domains.terminalbench.rollouts.generate_rollout + shared_memory_entry_size: 100000000 + llm_max_rollouts: 8 # container tasks are slow; keep concurrency low + max_steps: 100 # max LLM calls per rollout + max_obs_chars: 200000 + max_history_tokens: 240000 + +rollout_timeout: 1800 # seconds; covers up to max_steps * exec_timeout +exec_timeout: 60 # seconds per shell command + + +# ENVIRONMENT CONFIGURATION +environment: + _target_: pipelinerl.domains.terminalbench.environment_server.ContainerEnvironmentServer + cube_backend: + _target_: cube.backends.toolkit.ToolkitContainerBackend + account: snow.research.adea + timeout_seconds: 1800 + n_parallel_envs: 2 + host: "0.0.0.0" + env_call_timeout: 120 # timeout per exec/evaluate call in seconds + start_task_timeout: 600 # timeout for container launch (queuing + running) + +# DATASET CONFIGURATION +dataset_loader: pipelinerl.domains.terminalbench.load_tasks.load_tasks +dataset_loader_params: + train_split: 0.6 + max_train: 10 + max_test: 4 +train_dataset_names: + - train +test_dataset_names: + - test diff --git a/pipelinerl/domains/terminalbench/environment.py b/pipelinerl/domains/terminalbench/environment.py new file mode 100644 index 00000000..5bce0099 --- /dev/null +++ b/pipelinerl/domains/terminalbench/environment.py @@ -0,0 +1,151 @@ +import base64 +import io +import logging +import shlex +import tarfile +import time +from pathlib import Path + +from cube.container import Container, ContainerBackend, ContainerConfig + +logger = logging.getLogger(__name__) + + +class ContainerEnvironment: + """ + Wraps a single container as a terminal environment for one task. + + A fresh container is launched in start_task() using the task's ContainerConfig + and stopped in release(). No container is held between tasks. + """ + + def __init__(self, backend: ContainerBackend) -> None: + self.backend = backend + self.container: Container | None = None + self._task_id: str = "" + + def start_task(self, task_data: dict) -> dict: + """Launch a fresh container and prepare it for the task.""" + self._task_id = task_data.get("id", "unknown") + container_config = ContainerConfig(**task_data.get("container_config", {})) + logger.info(f"[{self._task_id}] Launching container (image={container_config.image})") + + t0 = time.monotonic() + self.container = self.backend.launch(container_config) + logger.info( + f"[{self._task_id}] Container {self.container.id} running " + f"(launch took {time.monotonic() - t0:.1f}s)" + ) + + self.container.exec("rm -rf /app && mkdir -p /app", timeout=30) + + if archive_b64 := task_data.get("archive_b64"): + logger.debug(f"[{self._task_id}] Uploading task archive to /app") + self._upload_archive_subset( + archive_b64, + target_dir="/app", + skip_prefixes=("tests/", "solution/", "task.toml", "instruction.md"), + ) + + for cmd in task_data.get("setup_commands", []): + logger.debug(f"[{self._task_id}] Setup: {cmd}") + result = self.container.exec(cmd) + if result.exit_code != 0: + logger.warning( + f"[{self._task_id}] Setup command failed (exit {result.exit_code}): {cmd}\n{result.stderr}" + ) + + logger.info(f"[{self._task_id}] Task ready in container {self.container.id}") + return { + "task_id": self._task_id, + "description": task_data.get("description", ""), + } + + def exec(self, command: str, timeout: int | None = None) -> dict: + """Run a shell command in the container and return stdout/stderr/exit_code.""" + assert self.container is not None, "No active container" + logger.debug(f"[{self._task_id}] exec: {command!r}") + result = self.container.exec(command, timeout=timeout) + logger.debug( + f"[{self._task_id}] exec done in {result.duration_seconds:.1f}s " + f"(exit={result.exit_code}): {command!r}" + ) + return { + "stdout": result.stdout, + "stderr": result.stderr, + "exit_code": result.exit_code, + "duration_seconds": result.duration_seconds, + } + + def evaluate(self, archive_b64: str, test_timeout_sec: int = 900) -> dict: + """Upload tests from the archive, run test.sh, and return the reward.""" + assert self.container is not None, "No active container" + logger.info(f"[{self._task_id}] Running evaluation (timeout={test_timeout_sec}s)") + + self.container.exec("mkdir -p /tests /logs/verifier") + logger.debug(f"[{self._task_id}] Uploading test archive") + self._upload_archive_subset(archive_b64, target_dir="/", include_prefix="tests/") + self.container.exec("chmod +x /tests/test.sh") + + t0 = time.monotonic() + self.container.exec("cd /app && bash /tests/test.sh", timeout=test_timeout_sec) + logger.debug(f"[{self._task_id}] test.sh finished in {time.monotonic() - t0:.1f}s") + + reward_result = self.container.exec("cat /logs/verifier/reward.txt 2>/dev/null || echo 0") + try: + reward = float(reward_result.stdout.strip().split()[0]) + except (ValueError, IndexError): + reward = 0.0 + + logger.info(f"[{self._task_id}] Evaluation complete: reward={reward}") + return {"reward": reward} + + def release(self) -> None: + """Stop the container. Called when the task session ends.""" + if self.container is not None: + logger.info(f"[{self._task_id}] Stopping container {self.container.id}") + try: + self.container.stop() + logger.info(f"[{self._task_id}] Container {self.container.id} stopped") + except Exception as e: + logger.warning(f"[{self._task_id}] Error stopping container {self.container.id}: {e}") + self.container = None + + # ── Private helpers ──────────────────────────────────────────── + + def _upload_archive_subset( + self, + archive_b64: str, + target_dir: str, + skip_prefixes: tuple[str, ...] = (), + include_prefix: str | None = None, + ) -> None: + archive_bytes = base64.b64decode(archive_b64) + with tarfile.open(fileobj=io.BytesIO(archive_bytes), mode="r:gz") as tar: + for member in tar.getmembers(): + if not member.isfile(): + continue + + if include_prefix is not None: + if not member.name.startswith(include_prefix): + continue + rel = member.name[len(include_prefix):] + else: + if any(member.name.startswith(p) for p in skip_prefixes): + continue + rel = member.name + + f = tar.extractfile(member) + if f is None: + continue + content = f.read() + + remote_path = f"{target_dir.rstrip('/')}/{rel}" + parent = shlex.quote(str(Path(remote_path).parent)) + remote = shlex.quote(remote_path) + b64 = base64.b64encode(content).decode("ascii") + logger.debug(f"[{self._task_id}] Uploading {remote_path}") + self.container.exec(f"mkdir -p {parent}") # type: ignore[union-attr] + self.container.exec( # type: ignore[union-attr] + f'printf "%s" {shlex.quote(b64)} | base64 -d > {remote}' + ) diff --git a/pipelinerl/domains/terminalbench/environment_server.py b/pipelinerl/domains/terminalbench/environment_server.py new file mode 100644 index 00000000..5d136e1c --- /dev/null +++ b/pipelinerl/domains/terminalbench/environment_server.py @@ -0,0 +1,175 @@ +import asyncio +import logging +import uuid + +import uvicorn +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from cube.container import ContainerBackend +from pipelinerl.domains.terminalbench.environment import ContainerEnvironment + +logger = logging.getLogger(__name__) + + +class ContainerEnvironmentServer: + """ + Manages on-demand container environments exposed via HTTP. + + Each incoming /start_task request launches a fresh container using the + task's ContainerConfig. Concurrency is capped at n_parallel_envs; requests + beyond that receive a 503 immediately. + """ + + def __init__( + self, + cube_backend: ContainerBackend, + n_parallel_envs: int = 8, + host: str = "0.0.0.0", + env_call_timeout: int = 120, + start_task_timeout: int = 600, + ) -> None: + self.cube_backend = cube_backend + self.n_parallel_envs = n_parallel_envs + self.host = host + self.env_call_timeout = env_call_timeout + self.start_task_timeout = start_task_timeout + + self.sessions: dict[str, ContainerEnvironment] = {} + self._active = 0 + self._lock = asyncio.Lock() + + async def _try_acquire(self) -> bool: + async with self._lock: + if self._active >= self.n_parallel_envs: + return False + self._active += 1 + return True + + async def _release_slot(self) -> None: + async with self._lock: + self._active = max(0, self._active - 1) + + def create_app(self) -> FastAPI: + app = FastAPI(title="Container Environment Server") + + class SessionRequest(BaseModel): + session_id: str + + class ExecRequest(BaseModel): + session_id: str + command: str + timeout: int | None = None + + class StartTaskRequest(BaseModel): + task_data: dict = {} + + class EvaluateRequest(BaseModel): + session_id: str + archive_b64: str + test_timeout_sec: int = 900 + + def _get_env(session_id: str) -> ContainerEnvironment: + if session_id not in self.sessions: + raise HTTPException(status_code=400, detail=f"Invalid or expired session: {session_id}") + return self.sessions[session_id] + + @app.post("/start_task") + async def start_task(request: StartTaskRequest): + if not await self._try_acquire(): + raise HTTPException(status_code=503, detail="Server at capacity") + + task_id = request.task_data.get("id", "unknown") + session_id = str(uuid.uuid4()) + env = ContainerEnvironment(self.cube_backend) + self.sessions[session_id] = env + logger.info( + f"[{task_id}] Session {session_id[:8]} acquired " + f"(active={self._active}/{self.n_parallel_envs})" + ) + loop = asyncio.get_running_loop() + try: + result = await asyncio.wait_for( + loop.run_in_executor(None, env.start_task, request.task_data), + timeout=self.start_task_timeout, + ) + except Exception as e: + self.sessions.pop(session_id, None) + await self._release_slot() + if isinstance(e, asyncio.TimeoutError): + logger.error(f"[{task_id}] start_task timed out for session {session_id[:8]}") + raise HTTPException(status_code=503, detail="start_task timed out") + logger.exception(f"[{task_id}] start_task failed for session {session_id[:8]}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + return {"session_id": session_id, **result} + + @app.post("/release") + async def release(request: SessionRequest): + env = _get_env(request.session_id) + task_id = env._task_id + self.sessions.pop(request.session_id, None) + loop = asyncio.get_running_loop() + try: + await asyncio.wait_for( + loop.run_in_executor(None, env.release), + timeout=self.env_call_timeout, + ) + except Exception as e: + logger.warning(f"[{task_id}] Error releasing session {request.session_id[:8]}: {e}") + finally: + await self._release_slot() + logger.info( + f"[{task_id}] Session {request.session_id[:8]} released " + f"(active={self._active}/{self.n_parallel_envs})" + ) + return {"status": "ok"} + + @app.post("/exec") + async def exec_command(request: ExecRequest): + env = _get_env(request.session_id) + loop = asyncio.get_running_loop() + call_timeout = (request.timeout or 0) + 5 or self.env_call_timeout + try: + result = await asyncio.wait_for( + loop.run_in_executor(None, env.exec, request.command, request.timeout), + timeout=call_timeout, + ) + except asyncio.TimeoutError: + raise HTTPException(status_code=503, detail=f"exec timed out: {request.command!r}") + return result + + @app.post("/evaluate") + async def evaluate(request: EvaluateRequest): + env = _get_env(request.session_id) + loop = asyncio.get_running_loop() + call_timeout = request.test_timeout_sec + 30 + try: + result = await asyncio.wait_for( + loop.run_in_executor( + None, env.evaluate, request.archive_b64, request.test_timeout_sec + ), + timeout=call_timeout, + ) + except asyncio.TimeoutError: + raise HTTPException(status_code=503, detail="evaluate timed out") + return result + + @app.get("/health") + async def health(): + return { + "status": "ok", + "active": self._active, + "n_parallel_envs": self.n_parallel_envs, + "free": self.n_parallel_envs - self._active, + } + + return app + + def launch(self, port: int) -> None: + """Start the HTTP server. Blocking.""" + app = self.create_app() + logger.info( + f"Starting Container Environment Server at http://{self.host}:{port} " + f"(n_parallel_envs={self.n_parallel_envs})" + ) + uvicorn.run(app, host=self.host, port=port, timeout_keep_alive=3600, log_level="info") diff --git a/pipelinerl/domains/terminalbench/load_tasks.py b/pipelinerl/domains/terminalbench/load_tasks.py new file mode 100644 index 00000000..5da74516 --- /dev/null +++ b/pipelinerl/domains/terminalbench/load_tasks.py @@ -0,0 +1,51 @@ +import base64 + +from terminalbench_cube.benchmark import TerminalBenchBenchmark + + +def load_tasks( + dataset_names: list[str], + train_split: float = 0.6, + difficulty_filter: str | None = None, + max_train_examples: int | None = None, # cap number of train tasks (intended for debugging) + max_test_examples: int | None = None, # cap number of test tasks (intended for debugging) +) -> list[dict]: + benchmark = TerminalBenchBenchmark( + shuffle=True, + shuffle_seed=42, + difficulty_filter=difficulty_filter, + ) + benchmark.install() + benchmark._setup() + + all_ids = list(TerminalBenchBenchmark.task_metadata.keys()) + n_train = int(len(all_ids) * train_split) + train_ids = all_ids[:n_train] + test_ids = all_ids[n_train:] + + if max_train_examples is not None: + train_ids = train_ids[:max_train_examples] + if max_test_examples is not None: + test_ids = test_ids[:max_test_examples] + + problems = [] + for name in dataset_names: + if name == "train": + problems.extend([_make_problem(tid, "terminalbench.train") for tid in train_ids]) + elif name == "test": + problems.extend([_make_problem(tid, "terminalbench.test") for tid in test_ids]) + return problems + + +def _make_problem(task_id: str, dataset: str) -> dict: + meta = TerminalBenchBenchmark.task_metadata[task_id] + extra = meta.extra_info + return { + "task_id": task_id, + "instruction": extra["instruction"], + "archive_b64": base64.b64encode(extra["archive"]).decode("ascii"), + "dataset": dataset, + "difficulty": extra.get("difficulty", "unknown"), + "max_test_timeout_sec": extra.get("max_test_timeout_sec", 900), + "container_config": meta.container_config.model_dump(), + } diff --git a/pipelinerl/domains/terminalbench/rollouts.py b/pipelinerl/domains/terminalbench/rollouts.py new file mode 100644 index 00000000..3a6779e5 --- /dev/null +++ b/pipelinerl/domains/terminalbench/rollouts.py @@ -0,0 +1,319 @@ + +import asyncio +import logging +import random +import re +import time +import traceback +from typing import Any + +import aiohttp +from omegaconf import DictConfig + +from pipelinerl.llm import LLMCall, Prompt, TrainableLLM +from pipelinerl.async_llm import llm_async_generate, make_training_text +from pipelinerl.rollouts import BaseMetrics, RolloutResult +from pipelinerl.utils import get_environment_jobs, resolve_environment_key + + +logger = logging.getLogger(__name__) + +# From hello_terminalbench_toolkit.py, extended with bash-block output format +# since llm_async_generate works with text content, not tool-use function calls. +DEFAULT_SYSTEM_PROMPT = """\ +You are an expert software engineer working in a Linux terminal. +Work in the /app directory. Read existing files, test your solutions before declaring completion. + +To run a shell command, write it in a bash code block: +```bash + +``` + +You will receive the command output after each step. +When the task is complete, write TASK_COMPLETE on a line by itself.""" + + +class TerminalBenchMetrics(BaseMetrics): + reward: float + success: bool + no_error: bool + no_answer: bool + overflow: bool + n_llm_calls: int + n_step_errors: int + n_steps: int + total_execution_time: float + + +# ── Environment server HTTP helpers ─────────────────────────────────────────── + +async def _start_task( + env_url: str, + task_data: dict, + session: aiohttp.ClientSession, + timeout: int = 300, +) -> dict: + """Acquire a free env and set up the task in one call. Returns session_id + task info.""" + async with session.post( + f"{env_url}/start_task", + json={"task_data": task_data}, + timeout=aiohttp.ClientTimeout(total=timeout), + ) as resp: + if resp.status != 200: + raise RuntimeError(f"start_task failed (HTTP {resp.status}): {await resp.text()}") + return await resp.json() + + +async def _release(env_url: str, session_id: str, session: aiohttp.ClientSession) -> None: + try: + async with session.post( + f"{env_url}/release", + json={"session_id": session_id}, + timeout=aiohttp.ClientTimeout(total=60), + ) as resp: + await resp.read() + except Exception as e: + logger.warning(f"release failed for session {session_id}: {e}") + + +async def _exec( + env_url: str, + session_id: str, + command: str, + session: aiohttp.ClientSession, + timeout: int = 60, +) -> dict: + async with session.post( + f"{env_url}/exec", + json={"session_id": session_id, "command": command, "timeout": timeout}, + timeout=aiohttp.ClientTimeout(total=timeout + 10), + ) as resp: + return await resp.json() + + +async def _evaluate( + env_url: str, + session_id: str, + archive_b64: str, + session: aiohttp.ClientSession, + test_timeout_sec: int = 900, +) -> float: + async with session.post( + f"{env_url}/evaluate", + json={ + "session_id": session_id, + "archive_b64": archive_b64, + "test_timeout_sec": test_timeout_sec, + }, + timeout=aiohttp.ClientTimeout(total=test_timeout_sec + 60), + ) as resp: + data = await resp.json() + return float(data.get("reward", 0.0)) + + +# ── Rollout entry point ──────────────────────────────────────────────────────── + +async def generate_rollout( + cfg: DictConfig, + llm: TrainableLLM, + problem: dict[str, Any], + session: aiohttp.ClientSession, +) -> RolloutResult: + start_time = time.time() + rollout_timeout = getattr(cfg, "rollout_timeout", 600) + + env_key = resolve_environment_key(cfg, default="terminalbench") + env_jobs = get_environment_jobs(cfg, env_key) + if not env_jobs: + raise RuntimeError("No environment servers available for terminalbench domain") + + shuffled = list(env_jobs) + random.shuffle(shuffled) + + for env_job in shuffled: + env_url = f"http://{env_job.hostname}:{env_job.port}" + + # Health check + try: + async with session.get( + f"{env_url}/health", timeout=aiohttp.ClientTimeout(total=10) + ) as resp: + if resp.status != 200: + logger.warning(f"Env server {env_url} unhealthy (status {resp.status})") + continue + except Exception as e: + logger.warning(f"Health check failed for {env_url}: {e}") + continue + + try: + return await asyncio.wait_for( + _run_rollout(cfg, llm, problem, session, env_url, start_time), + timeout=rollout_timeout, + ) + except asyncio.TimeoutError: + logger.warning( + f"Rollout timed out after {rollout_timeout}s on {env_url} " + f"for task {problem.get('task_id', '?')}" + ) + except Exception: + logger.warning( + f"Rollout failed on {env_url} for task {problem.get('task_id', '?')}:\n" + f"{traceback.format_exc()}" + ) + + logger.error( + f"All env servers failed for task {problem.get('task_id', '?')}. Returning failed rollout." + ) + return _failed_rollout(problem, start_time) + + +# ── Agent loop ───────────────────────────────────────────────────────────────── + +async def _run_rollout( + cfg: DictConfig, + llm: TrainableLLM, + problem: dict, + session: aiohttp.ClientSession, + env_url: str, + start_time: float, +) -> RolloutResult: + max_steps = getattr(cfg.actor, "max_steps", 100) + exec_timeout = getattr(cfg, "exec_timeout", 60) + test_timeout_sec = problem.get("max_test_timeout_sec", 900) + archive_b64: str | None = problem.get("archive_b64") + + task_info = await _start_task( + env_url, + { + "id": problem.get("task_id", ""), + "description": problem.get("instruction", ""), + "archive_b64": archive_b64, + "container_config": problem.get("container_config", {}), + }, + session, + ) + session_id = task_info["session_id"] + description = task_info.get("description") or problem.get("instruction", "") + + try: + system_prompt = getattr(cfg.actor, "system_prompt", None) or DEFAULT_SYSTEM_PROMPT + messages: list[dict] = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": description}, + ] + + llm_calls: list[LLMCall] = [] + n_step_errors = 0 + + for _step in range(max_steps): + prompt = Prompt(messages=list(messages)) + llm_call = await llm_async_generate(llm, prompt, session) + llm_calls.append(llm_call) + + response_text = llm_call.output.content or "" + messages.append({"role": "assistant", "content": response_text}) + + if "TASK_COMPLETE" in response_text: + break + + command = _extract_bash_command(response_text) + if command is None: + n_step_errors += 1 + messages.append({ + "role": "user", + "content": "No bash command found. Please write a command in a ```bash block.", + }) + continue + + exec_result = await _exec(env_url, session_id, command, session, timeout=exec_timeout) + messages.append({"role": "user", "content": _format_exec_result(exec_result)}) + + # Compute reward by running the task's test suite inside the sandbox + reward = 0.0 + if archive_b64: + try: + reward = await _evaluate( + env_url, session_id, archive_b64, session, test_timeout_sec=test_timeout_sec + ) + except Exception as e: + logger.warning(f"Evaluation failed for task {problem.get('task_id', '?')}: {e}") + else: + logger.warning( + f"No archive_b64 in problem dict for task {problem.get('task_id', '?')}; " + "reward defaults to 0." + ) + + finally: + await _release(env_url, session_id, session) + + latency = time.time() - start_time + n_llm_calls = len(llm_calls) + max_tokens = int(llm.parameters.get("max_tokens", 16000)) + overflow = any(c.output_length_tokens >= max_tokens for c in llm_calls) + + training_texts = [make_training_text(llm, c) for c in llm_calls] + for t in training_texts: + t.reward = reward + + metrics = TerminalBenchMetrics( + reward=reward, + success=reward > 0.5, + no_error=n_step_errors == 0, + no_answer=n_llm_calls == 0, + overflow=overflow, + n_llm_calls=n_llm_calls, + n_step_errors=n_step_errors, + n_steps=n_llm_calls, + total_execution_time=latency, + ) + + return RolloutResult( + training_texts=training_texts, + metrics=metrics, + latency=latency, + dataset_name=problem.get("dataset", "terminalbench"), + domain="terminalbench", + ) + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + +def _extract_bash_command(text: str) -> str | None: + match = re.search(r"```(?:bash|sh|shell)?\s*\n(.*?)```", text, re.DOTALL) + if match: + return match.group(1).strip() + return None + + +def _format_exec_result(result: dict) -> str: + parts = [] + if result.get("stdout"): + parts.append(result["stdout"]) + if result.get("stderr"): + parts.append(f"[stderr]: {result['stderr']}") + exit_code = result.get("exit_code", 0) + if exit_code != 0: + parts.append(f"[exit code: {exit_code}]") + return "\n".join(parts) if parts else "(no output)" + + +def _failed_rollout(problem: dict, start_time: float) -> RolloutResult: + latency = time.time() - start_time + metrics = TerminalBenchMetrics( + reward=0.0, + success=False, + no_error=False, + no_answer=True, + overflow=False, + n_llm_calls=0, + n_step_errors=0, + n_steps=0, + total_execution_time=latency, + ) + return RolloutResult( + training_texts=[], + metrics=metrics, + latency=latency, + dataset_name=problem.get("dataset", "terminalbench"), + domain="terminalbench", + ) diff --git a/pipelinerl/launch.py b/pipelinerl/launch.py index b4dd9e08..9d9a8d27 100644 --- a/pipelinerl/launch.py +++ b/pipelinerl/launch.py @@ -584,6 +584,19 @@ def launch_jobs(cfg: DictConfig, world_map: WorldMap, job_kind_filter: list | No return processes +def setup_cube_cache(exp_dir: Path) -> None: + """Point cube's cache to the shared experiment directory. + + All nodes (actors, environment servers) inherit this via os.environ, + ensuring they all use the same dataset location rather than each node's + local ~/.cube directory. + """ + cube_cache_dir = exp_dir / "cube_cache" + os.makedirs(cube_cache_dir, exist_ok=True) + os.environ["CUBE_CACHE_DIR"] = str(cube_cache_dir) + logger.info(f"Cube cache directory set to {cube_cache_dir}") + + def setup_logging(log_file: Path): file_handler = logging.FileHandler(log_file) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") @@ -604,6 +617,8 @@ def main(cfg: DictConfig): exp_dir = Path(cfg.output_dir) config_dir = exp_dir / "conf" + setup_cube_cache(exp_dir) + os.makedirs(exp_dir / "launcher", exist_ok=True) log_file = exp_dir / "launcher" / f"launcher_{os.environ.get('RANK', 0)}.log" setup_logging(log_file) diff --git a/pyproject.toml b/pyproject.toml index 5f646c3a..89af9381 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,12 +74,21 @@ ifeval = [ "langdetect", "absl-py", ] +terminalbench = [ + "cube-standard[toolkit] @ file:///home/toolkit/cube-standard", + "cube-harness @ file:///home/toolkit/cube-harness", + "terminalbench-cube @ file:///home/toolkit/cube-harness/cubes/terminalbench-cube", +] # Install all domain dependencies domains = [ "pipelinerl[coding,fn_calling,logic,ifeval]", ] [tool.uv] +conflicts = [ + [{ extra = "terminalbench" }, { extra = "tapeagents" }], + [{ extra = "lora" }, { extra = "tapeagents" }], +] # tapeagents==0.1.16 requires transformers<4.52 and accelerate<1.8, which conflict with the project's versions. # Overrides to allow resolution to proceed; tapeagents extra will be broken at runtime until it supports newer versions. override-dependencies = [ @@ -87,6 +96,11 @@ override-dependencies = [ "accelerate>=1.7.0", ] +[tool.uv.sources] +cube-standard = { path = "/home/toolkit/cube-standard", editable = true } +cube-harness = { path = "/home/toolkit/cube-harness", editable = true } +terminalbench-cube = { path = "/home/toolkit/cube-harness/cubes/terminalbench-cube", editable = true } + [tool.setuptools.packages.find] where = ["."] include = ["pipelinerl*"] From c9563d24cc017236d7291b6992fa4b6b26f9d143 Mon Sep 17 00:00:00 2001 From: ehsk Date: Wed, 15 Apr 2026 19:29:03 +0000 Subject: [PATCH 2/4] local paths removed --- pyproject.toml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 89af9381..c45c6014 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,9 +75,9 @@ ifeval = [ "absl-py", ] terminalbench = [ - "cube-standard[toolkit] @ file:///home/toolkit/cube-standard", - "cube-harness @ file:///home/toolkit/cube-harness", - "terminalbench-cube @ file:///home/toolkit/cube-harness/cubes/terminalbench-cube", + "cube-standard[toolkit] @ git+https://github.com/The-AI-Alliance/cube-standard.git@a5947a0", + "cube-harness @ git+https://github.com/The-AI-Alliance/cube-harness.git@064efc6", + "terminalbench-cube @ git+https://github.com/The-AI-Alliance/cube-harness.git@064efc6#subdirectory=cubes/terminalbench-cube", ] # Install all domain dependencies domains = [ @@ -97,9 +97,9 @@ override-dependencies = [ ] [tool.uv.sources] -cube-standard = { path = "/home/toolkit/cube-standard", editable = true } -cube-harness = { path = "/home/toolkit/cube-harness", editable = true } -terminalbench-cube = { path = "/home/toolkit/cube-harness/cubes/terminalbench-cube", editable = true } +cube-standard = { git = "https://github.com/The-AI-Alliance/cube-standard.git", rev = "a5947a0" } +cube-harness = { git = "https://github.com/The-AI-Alliance/cube-harness.git", rev = "064efc6" } +terminalbench-cube = { git = "https://github.com/The-AI-Alliance/cube-harness.git", rev = "064efc6", subdirectory = "cubes/terminalbench-cube" } [tool.setuptools.packages.find] where = ["."] From 7ca21ec6f05d6ddb60ab745538ce6a5b7f266d8c Mon Sep 17 00:00:00 2001 From: ehsk Date: Thu, 16 Apr 2026 03:15:31 +0000 Subject: [PATCH 3/4] length overflow issues resolved --- pipelinerl/domains/terminalbench/rollouts.py | 22 +++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/pipelinerl/domains/terminalbench/rollouts.py b/pipelinerl/domains/terminalbench/rollouts.py index 3a6779e5..517e6a0d 100644 --- a/pipelinerl/domains/terminalbench/rollouts.py +++ b/pipelinerl/domains/terminalbench/rollouts.py @@ -161,10 +161,7 @@ async def generate_rollout( f"{traceback.format_exc()}" ) - logger.error( - f"All env servers failed for task {problem.get('task_id', '?')}. Returning failed rollout." - ) - return _failed_rollout(problem, start_time) + raise RuntimeError(f"All env servers failed for task {problem.get('task_id', '?')}") # ── Agent loop ───────────────────────────────────────────────────────────────── @@ -207,12 +204,27 @@ async def _run_rollout( for _step in range(max_steps): prompt = Prompt(messages=list(messages)) - llm_call = await llm_async_generate(llm, prompt, session) + try: + llm_call = await llm_async_generate(llm, prompt, session) + except aiohttp.ClientResponseError as e: + if e.status == 400: + if not llm_calls: + raise RuntimeError(f"Context length exceeded on first LLM call for task {problem.get('task_id', '?')}") from e + logger.warning(f"Context length exceeded for task {problem.get('task_id', '?')}, ending rollout") + else: + if not llm_calls: + raise + logger.warning(f"LLM call failed (HTTP {e.status}) for task {problem.get('task_id', '?')}, ending rollout: {e}") + break llm_calls.append(llm_call) response_text = llm_call.output.content or "" messages.append({"role": "assistant", "content": response_text}) + if llm_call.llm_info.get("finish_reason") == "length": + logger.warning(f"Output truncated (finish_reason=length) for task {problem.get('task_id', '?')}, ending rollout") + break + if "TASK_COMPLETE" in response_text: break From db9aa257fa9a13e5546bb0e30f72516a8b82d24d Mon Sep 17 00:00:00 2001 From: ehsk Date: Thu, 16 Apr 2026 03:15:48 +0000 Subject: [PATCH 4/4] hyperparameters updated --- conf/terminalbench.yaml | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/conf/terminalbench.yaml b/conf/terminalbench.yaml index 21c2e93c..b8aba314 100644 --- a/conf/terminalbench.yaml +++ b/conf/terminalbench.yaml @@ -1,6 +1,6 @@ defaults: - base - - override streams: redis + - override streams: files - _self_ world: @@ -11,10 +11,10 @@ world: model_path: Qwen/Qwen2.5-7B finetune: - seq_length: 64000 # input + output tokens + seq_length: 100000 # input + output tokens max_train_steps: 1024 train_batch_size: 1 - gradient_accumulation_passes: 1024 + gradient_accumulation_passes: 8 seq_parallel: 4 rl: policy_loss: gspo @@ -34,18 +34,16 @@ test_llm: vllm_config: vllm_kwargs: - max_model_len: 64000 # input + output tokens + max_model_len: 100000 # input + output tokens actor: rollout_policy: pipelinerl.domains.terminalbench.rollouts.generate_rollout - shared_memory_entry_size: 100000000 - llm_max_rollouts: 8 # container tasks are slow; keep concurrency low + shared_memory_entry_size: 200000000 + llm_max_rollouts: 1 # container tasks are slow; keep concurrency low max_steps: 100 # max LLM calls per rollout - max_obs_chars: 200000 - max_history_tokens: 240000 -rollout_timeout: 1800 # seconds; covers up to max_steps * exec_timeout -exec_timeout: 60 # seconds per shell command +preprocess: + shared_memory_entry_size: 200000000 # ENVIRONMENT CONFIGURATION @@ -55,7 +53,7 @@ environment: _target_: cube.backends.toolkit.ToolkitContainerBackend account: snow.research.adea timeout_seconds: 1800 - n_parallel_envs: 2 + n_parallel_envs: 8 host: "0.0.0.0" env_call_timeout: 120 # timeout per exec/evaluate call in seconds start_task_timeout: 600 # timeout for container launch (queuing + running) @@ -64,8 +62,8 @@ environment: dataset_loader: pipelinerl.domains.terminalbench.load_tasks.load_tasks dataset_loader_params: train_split: 0.6 - max_train: 10 - max_test: 4 + max_train_examples: 32 + max_test_examples: 4 train_dataset_names: - train test_dataset_names: