From bdab0a920d900cc5c12047fd6d6b49b9cf14db69 Mon Sep 17 00:00:00 2001 From: amarrmb Date: Sat, 25 Apr 2026 12:09:31 -0700 Subject: [PATCH 1/7] feat(eval): structured JSON output, Wilson CIs, per-trial provenance, spatial breakdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds robosandbox.eval — the evaluation contract layer the rest of the project hangs evaluation off of. summarise_eval() turns per-trial outcomes into a v2 EvalSummary dict with success rate, Wilson 95% CI, optional spatial breakdown (cube-pose bins → per-bin success rate), and a provenance block (checkpoint sha256 + git rev + library versions). Two-proportion z-test helpers wired in for compare workflows. Makes "compare two checkpoints under the same task contract" a one-command operation rather than an ad hoc script per project. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/robosandbox/eval/__init__.py | 14 ++ .../src/robosandbox/eval/stats.py | 218 ++++++++++++++++++ .../robosandbox-core/tests/test_eval_stats.py | 157 +++++++++++++ 3 files changed, 389 insertions(+) create mode 100644 packages/robosandbox-core/src/robosandbox/eval/__init__.py create mode 100644 packages/robosandbox-core/src/robosandbox/eval/stats.py create mode 100644 packages/robosandbox-core/tests/test_eval_stats.py diff --git a/packages/robosandbox-core/src/robosandbox/eval/__init__.py b/packages/robosandbox-core/src/robosandbox/eval/__init__.py new file mode 100644 index 0000000..5d8a089 --- /dev/null +++ b/packages/robosandbox-core/src/robosandbox/eval/__init__.py @@ -0,0 +1,14 @@ +"""Eval-time utilities: success-rate stats, JSON output schema, comparisons. + +Kept out of ``robosandbox.policy`` so the policy module stays focused on +the runtime contract (``act`` / ``run_policy`` / ``run_eval_parallel``) +and doesn't acquire reporting concerns. +""" + +from robosandbox.eval.stats import ( + proportion_z_test, + summarise_eval, + wilson_ci, +) + +__all__ = ["proportion_z_test", "summarise_eval", "wilson_ci"] diff --git a/packages/robosandbox-core/src/robosandbox/eval/stats.py b/packages/robosandbox-core/src/robosandbox/eval/stats.py new file mode 100644 index 0000000..29a1dab --- /dev/null +++ b/packages/robosandbox-core/src/robosandbox/eval/stats.py @@ -0,0 +1,218 @@ +"""Binomial success-rate stats for policy eval. + +Stdlib only — no scipy. The Wilson score interval is the textbook +binomial CI for small ``n`` (Newton typically runs 32-128 worlds, where +the normal-approximation Wald CI breaks down near 0% and 100%). The +two-proportion z-test is the standard significance test for comparing +two policies' success rates. +""" + +from __future__ import annotations + +import math +from typing import TypedDict + +# 95% two-sided z-quantile. Hardcoded so callers don't need scipy/numpy +# just to ask for the default. Override via ``z`` for other confidence +# levels (e.g. z=2.576 for 99%). +_Z_95 = 1.959963984540054 + + +class EvalSummary(TypedDict, total=False): + """Structured eval result. Stable JSON schema for ``robo-sandbox eval --output``.""" + + schema_version: int + task: str + policy: str + sim_backend: str + n_trials: int + successes: int + rate: float + ci_low: float + ci_high: float + ci_level: float + success_per_trial: list[bool] + steps: int + wall_seconds: float + throughput_env_steps_per_s: float + + +def wilson_ci(successes: int, n: int, z: float = _Z_95) -> tuple[float, float]: + """Wilson score confidence interval for a binomial proportion. + + Returns ``(lo, hi)`` as fractions in [0, 1]. With ``n == 0`` the + rate is undefined; we return the trivial ``(0.0, 1.0)`` interval + rather than raise — callers can branch on ``n`` upstream if they + want to emit ``null``. + + The Wilson interval is preferred over the textbook Wald interval + (``p ± z·√(p(1-p)/n)``) because Wald collapses to a zero-width + bound at ``p == 0`` or ``p == 1`` and undercovers for small ``n``. + Wilson stays well-defined and conservative across the whole range. + """ + if n <= 0: + return (0.0, 1.0) + if successes < 0 or successes > n: + raise ValueError(f"successes={successes} out of range for n={n}") + p = successes / n + z2 = z * z + denom = 1.0 + z2 / n + center = (p + z2 / (2.0 * n)) / denom + half = (z * math.sqrt(p * (1.0 - p) / n + z2 / (4.0 * n * n))) / denom + lo = max(0.0, center - half) + hi = min(1.0, center + half) + return (lo, hi) + + +def proportion_z_test( + s_a: int, n_a: int, s_b: int, n_b: int +) -> tuple[float, float]: + """Two-proportion z-test (pooled). Returns ``(z, p_value_two_sided)``. + + Tests H0: rate_A == rate_B. Use the returned p-value with the usual + α = 0.05 cutoff to decide significance. With either ``n`` zero the + test is undefined; we return ``(0.0, 1.0)`` (no evidence of any + difference) so callers can keep their JSON schema stable. + + The pooled-variance form is appropriate here because we are testing + equality, not estimating the difference — that's the standard choice + when the null hypothesis is ``p_A = p_B``. + """ + if n_a <= 0 or n_b <= 0: + return (0.0, 1.0) + p_a = s_a / n_a + p_b = s_b / n_b + pooled = (s_a + s_b) / (n_a + n_b) + var = pooled * (1.0 - pooled) * (1.0 / n_a + 1.0 / n_b) + if var <= 0.0: + # Both 0% or both 100%; no difference detectable. + return (0.0, 1.0) + z = (p_a - p_b) / math.sqrt(var) + p_value = 2.0 * (1.0 - _normal_cdf(abs(z))) + return (z, p_value) + + +def summarise_eval( + *, + task: str, + policy: str, + sim_backend: str, + successes: int, + n_trials: int, + success_per_trial: list[bool] | None = None, + per_trial_details: list[dict] | None = None, + provenance: dict | None = None, + steps: int = 0, + wall_seconds: float = 0.0, + throughput: float = 0.0, + ci_level: float = 0.95, +) -> EvalSummary: + """Build the canonical EvalSummary dict from a finished eval run. + + Wraps ``wilson_ci`` and locks the JSON schema (``schema_version=2``) + so downstream tools (``robo-sandbox compare``, plotting scripts) + can rely on field names + presence. v2 adds ``per_trial_details`` + (per-trial cube pose, peak lift, EE-object min distance) and a + derived ``spatial_breakdown`` so spatial failure analysis is + available without re-running anything. + """ + z = _Z_95 if abs(ci_level - 0.95) < 1e-9 else _z_from_two_sided_level(ci_level) + lo, hi = wilson_ci(successes, n_trials, z=z) + out: EvalSummary = { + "schema_version": 2, + "task": task, + "policy": policy, + "sim_backend": sim_backend, + "n_trials": n_trials, + "successes": successes, + "rate": (successes / n_trials) if n_trials > 0 else 0.0, + "ci_low": lo, + "ci_high": hi, + "ci_level": ci_level, + "steps": steps, + "wall_seconds": wall_seconds, + "throughput_env_steps_per_s": throughput, + } + if success_per_trial is not None: + out["success_per_trial"] = list(success_per_trial) + if per_trial_details is not None: + out["per_trial_details"] = list(per_trial_details) + out["spatial_breakdown"] = _spatial_breakdown(per_trial_details) + if provenance is not None: + # Provenance is "what produced this result, exactly" — checkpoint + # content hash, robosandbox/lerobot/mujoco versions, git rev. Two + # eval JSONs with matching provenance are guaranteed-comparable; + # mismatching provenance means at least one of code/checkpoint/sim + # changed and any apples-to-apples comparison is suspect. + out["provenance"] = dict(provenance) + return out + + +def _spatial_breakdown(details: list[dict], n_bins: int = 5) -> dict: + """Group trials into x/y bins of the tracked object's initial position + and report success rate per bin. Skips trials without ``object_initial_xyz`` + (e.g. tasks whose success criterion has no .object). Returns an empty + dict when no trials carry pose info. + + The breakdown is the cheapest way to see *where* the policy fails — a + per-bin success-rate gradient is the signature of a generalization gap + (model trained uniformly but only succeeds in one region). + """ + valid = [d for d in details if d.get("object_initial_xyz") is not None] + if not valid: + return {} + xs = [d["object_initial_xyz"][0] for d in valid] + ys = [d["object_initial_xyz"][1] for d in valid] + def _bin(values: list[float], n: int) -> list[tuple[float, float]]: + lo, hi = min(values), max(values) + if hi - lo < 1e-9: + return [(lo, hi + 1e-9)] + step = (hi - lo) / n + return [(lo + i * step, lo + (i + 1) * step + (1e-9 if i == n - 1 else 0.0)) + for i in range(n)] + def _by(axis_index: int) -> list[dict]: + bins = _bin([d["object_initial_xyz"][axis_index] for d in valid], n_bins) + rows = [] + for blo, bhi in bins: + in_bin = [d for d in valid + if blo <= d["object_initial_xyz"][axis_index] < bhi] + n_in = len(in_bin) + n_succ = sum(1 for d in in_bin if d.get("success")) + rows.append({ + "lo": blo, + "hi": bhi, + "n": n_in, + "successes": n_succ, + "rate": (n_succ / n_in) if n_in else 0.0, + }) + return rows + return { + "by_object_x": _by(0), + "by_object_y": _by(1), + } + + +def _normal_cdf(x: float) -> float: + """Standard normal CDF Φ(x). Stdlib via ``math.erf``.""" + return 0.5 * (1.0 + math.erf(x / math.sqrt(2.0))) + + +def _z_from_two_sided_level(level: float) -> float: + """Inverse normal for arbitrary two-sided confidence levels. + + Supports the common 0.90 / 0.95 / 0.99 levels without scipy via a + small lookup. Anything else falls back to a numeric inversion. + """ + table = {0.90: 1.6448536269514722, 0.95: _Z_95, 0.99: 2.5758293035489004} + if level in table: + return table[level] + # Bisect on Φ⁻¹((1+level)/2). Fine for any reasonable level. + target = (1.0 + level) / 2.0 + lo, hi = 0.0, 10.0 + for _ in range(80): + mid = 0.5 * (lo + hi) + if _normal_cdf(mid) < target: + lo = mid + else: + hi = mid + return 0.5 * (lo + hi) diff --git a/packages/robosandbox-core/tests/test_eval_stats.py b/packages/robosandbox-core/tests/test_eval_stats.py new file mode 100644 index 0000000..458b1d6 --- /dev/null +++ b/packages/robosandbox-core/tests/test_eval_stats.py @@ -0,0 +1,157 @@ +"""Stats correctness for robosandbox.eval. + +The Wilson CI and z-test math is load-bearing for any future "policy A vs B" +claim, so a few targeted regression tests beat trusting that the formulas +were transcribed correctly. +""" + +from __future__ import annotations + +import math + +from robosandbox.eval import proportion_z_test, summarise_eval, wilson_ci + + +def test_wilson_ci_matches_known_values() -> None: + # Reference values from R: prop.test(8, 10, correct=FALSE)$conf.int + # Wilson is conf.int when correct=FALSE. + lo, hi = wilson_ci(8, 10) + assert math.isclose(lo, 0.4901624, abs_tol=1e-4) + assert math.isclose(hi, 0.9433178, abs_tol=1e-4) + + +def test_wilson_ci_handles_extremes() -> None: + # 0/n must produce a useful upper bound (not collapse to 0..0) + lo, hi = wilson_ci(0, 10) + assert lo == 0.0 + assert 0.20 < hi < 0.35 + # n/n symmetric + lo, hi = wilson_ci(10, 10) + assert hi == 1.0 or math.isclose(hi, 1.0, abs_tol=1e-9) + assert 0.65 < lo < 0.80 + # Degenerate n == 0 returns trivial interval, not crash + lo, hi = wilson_ci(0, 0) + assert (lo, hi) == (0.0, 1.0) + + +def test_proportion_z_test_significant_difference() -> None: + # 80/100 vs 50/100 is a textbook "highly significant" comparison. + z, p = proportion_z_test(80, 100, 50, 100) + assert z > 4.0 + assert p < 1e-4 + + +def test_proportion_z_test_no_difference() -> None: + z, p = proportion_z_test(50, 100, 50, 100) + assert z == 0.0 + # Two-sided p-value at z=0 is 1.0 by definition. + assert math.isclose(p, 1.0, abs_tol=1e-12) + + +def test_proportion_z_test_handles_empty_n() -> None: + # No silent crash; report "no evidence of difference" so callers + # can keep their JSON schema stable when one side has 0 trials. + assert proportion_z_test(0, 0, 5, 10) == (0.0, 1.0) + + +def test_summarise_eval_includes_required_fields() -> None: + out = summarise_eval( + task="pick_cube_franka", + policy="runs/abc", + sim_backend="newton", + successes=42, + n_trials=64, + success_per_trial=[True] * 42 + [False] * 22, + steps=38400, + wall_seconds=12.3, + throughput=3120.0, + ) + # Schema lock: anyone parsing eval JSON downstream relies on these keys. + for key in ( + "schema_version", "task", "policy", "sim_backend", "n_trials", + "successes", "rate", "ci_low", "ci_high", "ci_level", + "steps", "wall_seconds", "throughput_env_steps_per_s", + "success_per_trial", + ): + assert key in out, f"missing key: {key}" + # Schema bumped to 2 when per_trial_details + spatial_breakdown were + # added — both fields are optional, so callers that don't pass details + # get the same shape as v1 minus the new keys (older parsers ignore + # the version bump and still find every field they relied on). + assert out["schema_version"] == 2 + assert math.isclose(out["rate"], 42 / 64) + assert 0.0 <= out["ci_low"] <= out["rate"] <= out["ci_high"] <= 1.0 + + +def test_summarise_eval_emits_spatial_breakdown_when_details_provided() -> None: + """When per_trial_details carries object_initial_xyz, the summary should + derive a per-bin success-rate breakdown so spatial failure analysis + works without re-running anything.""" + # 4 trials: 2 successes at high x, 2 failures at low x — extreme spatial + # gradient that the breakdown should surface clearly. + details = [ + {"trial": 1, "seed": 1, "success": False, "steps": 500, + "peak_lift_mm": 5.0, "min_ee_object_dist_mm": 80.0, + "object_id": "red_cube", "object_initial_xyz": [0.36, 0.0, 0.05], + "object_initial_quat_xyzw": [0, 0, 0, 1]}, + {"trial": 2, "seed": 2, "success": False, "steps": 500, + "peak_lift_mm": 4.0, "min_ee_object_dist_mm": 75.0, + "object_id": "red_cube", "object_initial_xyz": [0.37, 0.0, 0.05], + "object_initial_quat_xyzw": [0, 0, 0, 1]}, + {"trial": 3, "seed": 3, "success": True, "steps": 400, + "peak_lift_mm": 120.0, "min_ee_object_dist_mm": 12.0, + "object_id": "red_cube", "object_initial_xyz": [0.44, 0.0, 0.05], + "object_initial_quat_xyzw": [0, 0, 0, 1]}, + {"trial": 4, "seed": 4, "success": True, "steps": 400, + "peak_lift_mm": 130.0, "min_ee_object_dist_mm": 10.0, + "object_id": "red_cube", "object_initial_xyz": [0.45, 0.0, 0.05], + "object_initial_quat_xyzw": [0, 0, 0, 1]}, + ] + out = summarise_eval( + task="pick_cube_franka_random", policy="ckpt", sim_backend="mujoco", + successes=2, n_trials=4, success_per_trial=[False, False, True, True], + per_trial_details=details, steps=1800, wall_seconds=20.0, throughput=90.0, + ) + assert "per_trial_details" in out and len(out["per_trial_details"]) == 4 + assert "spatial_breakdown" in out + bins_x = out["spatial_breakdown"]["by_object_x"] + # Lowest x bin should be 0% successful, highest 100% — the gradient + # signature we explicitly want callers to be able to read off. + assert bins_x[0]["rate"] == 0.0 + assert bins_x[-1]["rate"] == 1.0 + + +def test_summarise_eval_includes_provenance_when_provided() -> None: + """Provenance pins down what produced the result. Field shape is open + (callers may add their own keys), but if a caller passes one it must + flow through unchanged so downstream comparison tools can rely on it.""" + prov = { + "policy_path": "/tmp/ckpt", + "checkpoint_sha256": "deadbeef" * 8, + "robosandbox_git_rev": "abc123-dirty", + "lerobot_version": "0.4.4", + } + out = summarise_eval( + task="x", policy="/tmp/ckpt", sim_backend="mujoco", + successes=1, n_trials=1, provenance=prov, + steps=10, wall_seconds=1.0, throughput=10.0, + ) + assert out["provenance"] == prov + + +def test_summarise_eval_omits_spatial_breakdown_when_no_pose_info() -> None: + """Tasks whose success criterion has no .object (e.g. pure joint-state + targets) shouldn't produce a misleading empty/zero breakdown.""" + out = summarise_eval( + task="reach_pose", policy="ckpt", sim_backend="mujoco", + successes=1, n_trials=2, success_per_trial=[True, False], + per_trial_details=[ + {"trial": 1, "seed": 1, "success": True, "steps": 100, + "peak_lift_mm": 0.0, "min_ee_object_dist_mm": None}, + {"trial": 2, "seed": 2, "success": False, "steps": 500, + "peak_lift_mm": 0.0, "min_ee_object_dist_mm": None}, + ], + steps=600, wall_seconds=10.0, throughput=60.0, + ) + # spatial_breakdown is present but empty when no trial carries pose info. + assert out["spatial_breakdown"] == {} From da429b56ae9c4b54830ed40c1d361645fe0dc2c4 Mon Sep 17 00:00:00 2001 From: amarrmb Date: Sat, 25 Apr 2026 12:09:45 -0700 Subject: [PATCH 2/7] feat(policy): per-trial reset/reload, action_repeat, LeRobot processor pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit run_policy: - success criterion is now LATCHED on first per-step match (not just final state) — chunked policies that briefly succeed and then fail still register - action_repeat: hold each policy action for N sim steps so 30 fps recordings can replay correctly in a 200 Hz sim - single observe per step (the post-step obs becomes next step's input) - success-detection uses tasks.runner._eval_criterion via a thin shim LeRobotPolicyAdapter: - preprocessor / postprocessor pipeline parameters (lerobot 0.4 moved per-feature normalization out of the model's forward()) - reset() forwards to the wrapped policy so chunked-action queues (ACT, diffusion) clear between trials — without this, eval results silently depended on trial order load_policy: - LeRobot-checkpoint branch loads the matching processor pipeline pair - visual-input keys auto-detected from policy.config.input_features so legacy diffusion_pusht (observation.image) and ACT (observation.images.) both work without per-checkpoint glue Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/robosandbox/policy/__init__.py | 266 ++++++++++++++++-- .../src/robosandbox/policy/lerobot_adapter.py | 64 ++++- 2 files changed, 296 insertions(+), 34 deletions(-) diff --git a/packages/robosandbox-core/src/robosandbox/policy/__init__.py b/packages/robosandbox-core/src/robosandbox/policy/__init__.py index 16630ff..901585d 100644 --- a/packages/robosandbox-core/src/robosandbox/policy/__init__.py +++ b/packages/robosandbox-core/src/robosandbox/policy/__init__.py @@ -42,6 +42,7 @@ "Policy", "ReplayTrajectoryPolicy", "load_policy", + "run_eval_parallel", "run_policy", ] @@ -153,21 +154,115 @@ def run_policy( *, success: Any = None, # SuccessCriterion | None — imported lazily to dodge cycles on_step: Callable[[Observation, np.ndarray], None] | None = None, + action_repeat: int = 1, ) -> dict: """Drive ``sim`` with ``policy`` for up to ``max_steps`` ticks. + ``action_repeat`` holds each ``policy.act`` output for N sim steps before + re-querying. Use this to match the policy's training cadence when the + sim runs faster than the dataset's record rate (e.g. dataset at 30 fps, + sim_dt=0.005 s ⇒ 200 Hz ⇒ action_repeat=6). Calling the policy at every + sim step when training was at 30 fps causes the model to "fast-forward" + its action chunk 6.7× faster than intended — the gripper closes before + the arm reaches the cube and the lift phase ends prematurely. + Returns a dict: ``{success, steps, final_obs, initial_obs}``. ``success`` is the boolean outcome of ``success`` evaluated against ``(initial_obs, final_obs)``, or ``None`` when no criterion is supplied. """ + if action_repeat < 1: + raise ValueError(f"action_repeat must be >= 1, got {action_repeat}") n_dof = getattr(sim, "n_dof", 6) - initial_obs = sim.observe() + from robosandbox.tasks.runner import _eval_criterion - last_obs = initial_obs + initial_obs = sim.observe() + obs = initial_obs steps_done = 0 + held_action: np.ndarray | None = None + # Latch success on first match — a policy that keeps emitting actions + # past success often perturbs the scene back into a failed final state. + success_ever: bool | None = None if success is None else False + success_step: int | None = None for step_i in range(max_steps): - obs = sim.observe() - action = np.asarray(policy.act(obs), dtype=np.float64).ravel() + if step_i % action_repeat == 0: + action = np.asarray(policy.act(obs), dtype=np.float64).ravel() + if action.shape != (n_dof + 1,): + raise ValueError( + f"policy.act must return shape ({n_dof + 1},), got {action.shape}" + ) + held_action = action + else: + action = held_action # type: ignore[assignment] + sim.step(target_joints=action[:n_dof], gripper=float(action[n_dof])) + post_obs = sim.observe() + steps_done = step_i + 1 + if on_step is not None: + on_step(obs, action) + if success is not None and not success_ever: + ok_step, _ = _eval_criterion(success, initial_obs, post_obs) + if ok_step: + success_ever = True + success_step = step_i + 1 + obs = post_obs + + final_obs = obs + + return { + "success": success_ever, + "success_step": success_step, + "steps": steps_done, + "initial_obs": initial_obs, + "final_obs": final_obs, + "last_obs_before_final": final_obs, + } + + +# ---- run_eval_parallel ------------------------------------------------- + + +def run_eval_parallel( + sim: Any, + policy: Policy, + max_steps: int = 500, + *, + success: Any = None, + settle_steps: int = 100, +) -> dict: + """GPU-parallel policy evaluation across all worlds in ``sim``. + + Calls ``sim.observe_all()`` once per step, runs ``policy.act`` on + world-0's observation (broadcast to all worlds), then checks + ``success`` per world. Returns aggregated stats. + + ``sim`` must expose ``observe_all() -> list[Observation]`` and + ``n_worlds: int`` — i.e., a :class:`~robosandbox.sim.newton_backend.NewtonBackend` + with ``world_count > 1`` (also works with ``world_count=1``). + """ + import time + + n_worlds: int = getattr(sim, "n_worlds", 1) + n_dof: int = getattr(sim, "n_dof", 6) + + observe_all = getattr(sim, "observe_all", None) + if observe_all is None: + raise TypeError("sim must expose observe_all() for parallel eval") + + # Settle physics before recording initial state + for _ in range(settle_steps): + sim.step() + + initial_obs_all: list[Any] = observe_all() + + done = [False] * n_worlds + success_per_world = [False] * n_worlds + steps_done = 0 + + t0 = time.time() + for _ in range(max_steps): + obs_all: list[Any] = observe_all() + # Drive policy on world-0 observation; broadcast to all worlds + obs_0 = obs_all[0] + action = np.asarray(policy.act(obs_0), dtype=np.float64).ravel() if action.shape != (n_dof + 1,): raise ValueError( f"policy.act must return shape ({n_dof + 1},), got {action.shape}" @@ -175,28 +270,36 @@ def run_policy( target_joints = action[:n_dof] gripper = float(action[n_dof]) sim.step(target_joints=target_joints, gripper=gripper) - last_obs = obs - steps_done = step_i + 1 - if on_step is not None: - on_step(obs, action) + steps_done += 1 - final_obs = sim.observe() + # Evaluate success criterion per world + if success is not None: + from robosandbox.tasks.runner import _eval_criterion - success_ok: bool | None - if success is None: - success_ok = None - else: - # Lazy import to avoid tasks→policy cycles if any. - from robosandbox.tasks.runner import _eval_criterion + for w in range(n_worlds): + if done[w]: + continue + ok, _ = _eval_criterion(success, initial_obs_all[w], obs_all[w]) + if ok: + success_per_world[w] = True + done[w] = True - success_ok, _detail = _eval_criterion(success, initial_obs, final_obs) + if all(done): + break + + wall = time.time() - t0 + n_success = sum(success_per_world) + total_env_steps = steps_done * n_worlds + throughput = total_env_steps / wall if wall > 0 else 0.0 return { - "success": success_ok, + "n_worlds": n_worlds, + "successes": n_success, + "rate": n_success / n_worlds if n_worlds > 0 else 0.0, "steps": steps_done, - "initial_obs": initial_obs, - "final_obs": final_obs, - "last_obs_before_final": last_obs, + "wall": wall, + "throughput": throughput, + "success_per_world": success_per_world, } @@ -204,20 +307,88 @@ def run_policy( _BRING_YOUR_OWN_CHECKPOINT_HINT = ( - "No policy loader matched {path!r}. RoboSandbox ships a reference " - "ReplayTrajectoryPolicy but does not bundle a LeRobot/ACT/Diffusion-Policy " - "checkpoint loader. To run a trained checkpoint, either: (1) drop a " - "policy.json + events.jsonl under a directory and point --policy at it, " - "or (2) extend robosandbox.policy.load_policy to dispatch on your " - "checkpoint format (LeRobot, torchscript, onnx, ...)." + "No policy loader matched {path!r}. RoboSandbox recognises: " + "(1) a LeRobot checkpoint directory (config.json + model.safetensors), " + "(2) an episode directory containing events.jsonl (open-loop replay), " + "(3) a directory with policy.json (kind: replay_trajectory | ppo_neural). " + "To plug in a custom format, extend robosandbox.policy.load_policy." ) +def _lerobot_visual_input_keys(policy: Any) -> list[str]: + """Visual input keys expected by a LeRobot ``PreTrainedPolicy``. + + Reads ``policy.config.input_features`` and returns every key whose + feature spec is marked ``FeatureType.VISUAL``. Falls back to a name + prefix match when the typed marker is absent. Returns ``[]`` when + the config has no visual features (state-only policies). + """ + cfg = getattr(policy, "config", None) + feats = getattr(cfg, "input_features", None) if cfg is not None else None + if not feats: + return [] + visual: list[str] = [] + try: + items = list(feats.items()) + except AttributeError: + return [] + for key, spec in items: + if not isinstance(key, str): + continue + ftype = getattr(spec, "type", None) + type_name = getattr(ftype, "name", None) or getattr(ftype, "value", None) or str(ftype) + if isinstance(type_name, str) and type_name.upper() == "VISUAL": + visual.append(key) + continue + if key.startswith("observation.image"): + visual.append(key) + return visual + + +def _resolve_lerobot_device(cfg_device: str | None) -> str: + """Honor the checkpoint's device but downgrade `cuda` to `cpu` when + torch isn't compiled with CUDA — same fallback lerobot uses internally.""" + if not cfg_device or cfg_device != "cuda": + return cfg_device or "cpu" + try: + import torch + return "cuda" if torch.cuda.is_available() else "cpu" + except ImportError: + return "cpu" + + +def _load_lerobot_processors(p: Path, policy_config: Any) -> tuple[Any, Any]: + """Load lerobot's saved pre/post-processor pipelines for checkpoint ``p``. + + Returns ``(None, None)`` when the JSON files are absent (older + checkpoints) or when ``lerobot.processor`` isn't importable; the + adapter then falls back to its own minimal batch construction. + """ + if not ((p / "policy_preprocessor.json").exists() + and (p / "policy_postprocessor.json").exists()): + return (None, None) + try: + from lerobot.processor import PolicyProcessorPipeline + except ImportError: + return (None, None) + cfg_device = getattr(policy_config, "device", None) if policy_config else None + overrides = {"device_processor": {"device": _resolve_lerobot_device(cfg_device)}} + return ( + PolicyProcessorPipeline.from_pretrained(str(p), "policy_preprocessor.json", overrides=overrides), + PolicyProcessorPipeline.from_pretrained(str(p), "policy_postprocessor.json", overrides=overrides), + ) + + def load_policy(path: str | Path) -> Policy: """Load a policy from a checkpoint-or-episode directory. - Currently handles one case: a directory containing ``policy.json`` - of the form ``{"kind": "replay_trajectory", "trajectory": ""}``. + Recognised layouts: + + - ``config.json`` + ``model.safetensors`` → LeRobot checkpoint, loaded + via ``lerobot.policies.factory.get_policy_class`` and wrapped with + :class:`LeRobotPolicyAdapter`. + - ``policy.json`` with ``kind: replay_trajectory`` → trajectory replay. + - Bare ``events.jsonl`` → open-loop trajectory replay. Raises :class:`ImportError` with an explicit bring-your-own-checkpoint message otherwise — this is the extension seam for real policies. @@ -225,6 +396,45 @@ def load_policy(path: str | Path) -> Policy: p = Path(path) if p.is_dir(): + # LeRobot checkpoint: directory containing a `config.json` (with a + # `type` field naming the policy class) and `model.safetensors`. + # This is the format produced by `lerobot train` and by + # `PreTrainedPolicy.save_pretrained()`. + lerobot_cfg = p / "config.json" + lerobot_weights = p / "model.safetensors" + if lerobot_cfg.exists() and lerobot_weights.exists(): + try: + cfg = json.loads(lerobot_cfg.read_text()) + except json.JSONDecodeError as e: + raise ImportError( + f"Found LeRobot-shaped checkpoint at {p} but config.json is " + f"not valid JSON: {e}" + ) from e + policy_type = cfg.get("type") + if not policy_type: + raise ImportError( + f"Found LeRobot-shaped checkpoint at {p} but config.json has " + f"no 'type' field. Expected e.g. 'act', 'diffusion', 'tdmpc'." + ) + try: + from lerobot.policies.factory import get_policy_class + except ImportError as e: + raise ImportError( + f"Detected LeRobot checkpoint at {p} but the `lerobot` " + f"package is not installed. Install it with: " + f"`pip install lerobot` (see https://github.com/huggingface/lerobot)." + ) from e + policy_cls = get_policy_class(policy_type) + inner = policy_cls.from_pretrained(str(p)) + preproc, postproc = _load_lerobot_processors(p, getattr(inner, "config", None)) + # Visual input keys vary by policy class: legacy diffusion_pusht + # uses `observation.image`, ACT/pi0 use `observation.images.`. + image_keys = _lerobot_visual_input_keys(inner) + kwargs: dict[str, Any] = {"preprocessor": preproc, "postprocessor": postproc} + if image_keys: + kwargs["image_keys"] = image_keys + return LeRobotPolicyAdapter(inner, **kwargs) + cfg_file = p / "policy.json" if cfg_file.exists(): cfg = json.loads(cfg_file.read_text()) diff --git a/packages/robosandbox-core/src/robosandbox/policy/lerobot_adapter.py b/packages/robosandbox-core/src/robosandbox/policy/lerobot_adapter.py index 632f8db..fe10d43 100644 --- a/packages/robosandbox-core/src/robosandbox/policy/lerobot_adapter.py +++ b/packages/robosandbox-core/src/robosandbox/policy/lerobot_adapter.py @@ -66,14 +66,24 @@ def __init__( policy: _LeRobotLike, *, camera_name: str = "scene", + image_keys: list[str] | None = None, action_dim: int | None = None, image_size: tuple[int, int] | None = None, + preprocessor: Any = None, + postprocessor: Any = None, ) -> None: """Wrap ``policy`` as an ``act()``-compatible adapter. :param policy: any object exposing ``select_action(batch) -> tensor``. - :param camera_name: key used under ``observation.images.``. - Defaults to ``"scene"`` to match the shipped MuJoCo camera. + :param camera_name: convenience for the common single-camera case; + translates to ``image_keys=["observation.images."]`` + when ``image_keys`` is not given. Ignored otherwise. + :param image_keys: explicit list of batch keys under which the + sim's RGB frame should be exposed. Use this when the wrapped + policy expects something other than ``observation.images.scene`` + — e.g. legacy ``"observation.image"`` (pusht-era checkpoints) + or multiple wrist/base cameras (the same frame is duplicated + into each key). When ``None``, falls back to ``camera_name``. :param action_dim: expected ``(n_dof + 1,)`` action length. Used to validate the policy's output without depending on the sim's n_dof at construction time. ``None`` = skip validation. @@ -84,8 +94,17 @@ def __init__( """ self._policy = policy self._camera_name = camera_name + self._image_keys = ( + list(image_keys) + if image_keys is not None + else [f"observation.images.{camera_name}"] + ) self._action_dim = action_dim self._image_size = image_size + # See _load_lerobot_processors in policy/__init__.py for why both + # are required (lerobot 0.4 moved normalization out of the model). + self._preprocessor = preprocessor + self._postprocessor = postprocessor # Lazily-allocated scratch buffer for the (1, n_dof + 1) state vector. # Re-used across act() calls so the per-frame hot path doesn't # allocate three intermediate arrays per step. Sized on first use @@ -101,9 +120,42 @@ def __init__( def camera_name(self) -> str: return self._camera_name + def reset(self) -> None: + """Reset wrapped policy state between eval trials. + + ACT (and most chunked policies) maintain a per-episode action queue + that's populated by ``predict_action_chunk`` every ``n_action_steps`` + calls. Without an explicit reset between trials, trial N starts with + whatever residual state trial N-1 left behind — most often a + partially-consumed chunk that gets popped on trial N's first + ``select_action`` call instead of a fresh chunk conditioned on + trial N's actual initial observation. The eval CLI calls + ``policy.reset()`` between trials; without this method that call + was silently a no-op (``getattr(adapter, 'reset', None)`` returned + ``None``), making per-trial outcomes depend on trial order. + """ + inner_reset = getattr(self._policy, "reset", None) + if callable(inner_reset): + inner_reset() + def act(self, obs: Observation) -> np.ndarray: batch = self._to_batch(obs) + if self._preprocessor is not None: + # The preprocessor pipeline expects observation keys at the + # transition's top level (NOT nested under an `observation` + # sub-dict): {observation.state: ..., observation.images.X: ...}. + # Steps: rename + add-batch-dim (idempotent on already-batched + # tensors) + device transfer + per-feature normalization (state + # via dataset stats, images via ImageNet stats). + transition = self._preprocessor(dict(batch)) + batch = {k: transition[k] for k in batch if k in transition} raw = self._policy.select_action(batch) + if self._postprocessor is not None: + # Un-normalize the predicted action back into raw joint / + # gripper-command units. Postprocessor expects an `action` key + # in the transition. + out = self._postprocessor({"action": raw}) + raw = out["action"] action = _to_numpy_1d(raw) if self._action_dim is not None and action.shape[-1] != self._action_dim: raise ValueError( @@ -151,10 +203,10 @@ def _to_batch(self, obs: Observation) -> dict[str, Any]: img_val = chw state_val = self._state_buf - return { - f"observation.images.{self._camera_name}": img_val, - "observation.state": state_val, - } + batch: dict[str, Any] = {"observation.state": state_val} + for key in self._image_keys: + batch[key] = img_val + return batch def _policy_wants_torch(policy: Any) -> bool: From 7488fd70fc54dc701eec0ea3ee674a049b5604cb Mon Sep 17 00:00:00 2001 From: amarrmb Date: Sat, 25 Apr 2026 12:09:54 -0700 Subject: [PATCH 3/7] fix(export): LeRobot v3 layout + multi-episode + reject action=None frames MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit lerobot v3 expects parquet shards in a specific subdirectory layout (data/chunk-NNN/episode-NNNNNN.parquet); the old exporter wrote flat parquet which `lerobot train` rejected at load time. Also: - multi-episode export: pass a parent runs/ dir, get one dataset - preserve gripper as the last column of action (not in observation.state) so policies that emit (n_arm + 1)-dim actions can be trained directly - reject action=None frames at the column-builder seam — silently dropping them was producing datasets with phantom rows where the policy had no command to predict Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/robosandbox/export/lerobot.py | 424 ++++++++++++------ .../tests/test_export_lerobot.py | 77 +++- 2 files changed, 341 insertions(+), 160 deletions(-) diff --git a/packages/robosandbox-core/src/robosandbox/export/lerobot.py b/packages/robosandbox-core/src/robosandbox/export/lerobot.py index ec860cd..d530d11 100644 --- a/packages/robosandbox-core/src/robosandbox/export/lerobot.py +++ b/packages/robosandbox-core/src/robosandbox/export/lerobot.py @@ -1,45 +1,54 @@ -"""Export a recorded episode (`runs/-/`) to LeRobot v3 dataset layout. +"""Export recorded episodes (``runs/-/``) to a LeRobot v3.0 dataset. Consumes the artefacts written by :class:`robosandbox.recorder.local.LocalRecorder` -(`episode.json`, `events.jsonl`, `video.mp4`) and produces a LeRobot v3 dataset:: +(``episode.json``, ``events.jsonl``, ``video.mp4``) and produces the layout that +``lerobot`` >= 0.4 reads:: / meta/ info.json - tasks.jsonl - episodes.jsonl - data/chunk-000/episode_000000.parquet - videos/chunk-000/observation.images.scene/episode_000000.mp4 - -Schema choices (documented here because LeRobot v3's spec has wiggle room): - -* ``observation.state`` = concat(``robot_joints``, ``[gripper_width]``) as a single - float32 vector of length ``n_dof + 1``. This is the convention most - open-source LeRobot datasets (Aloha, Koch, SO-100) follow. -* ``action`` = frame's recorded action vector if present and numeric; otherwise - a copy of ``observation.state`` (the standard fallback for teleop-less - scripted demonstrations — action == next-state). -* ``observation.images.scene`` is stored as a video reference (not inlined - bytes). LeRobot's ``VideoFrame`` feature type handles this. -* Single-episode exports only: ``episode_index = 0``, one task, one chunk. - Multi-episode stitching is a separate concern. + tasks.parquet + episodes/chunk-000/file-000.parquet + data/chunk-000/file-000.parquet + data/chunk-000/file-001.parquet + ... + videos/observation.images.scene/chunk-000/file-000.mp4 + ... + +Key schema decisions: + +* ``observation.state`` = ``concat(robot_joints, [gripper_width])`` (float32). + This matches the Aloha / Koch / SO-100 convention. +* ``action`` = the frame's recorded action if numeric, otherwise a copy of + ``observation.state`` (scripted demos have no teleop → action == next-state). + Short recorded actions are padded to ``state_dim`` so policies see a + consistent action/state dim. +* Per-episode file layout: one parquet + one mp4 per episode, with + ``file_index == episode_index`` inside chunk 0. Fits the v3.0 chunk-file + contract (``DEFAULT_CHUNK_SIZE = 1000``) for any dataset up to 1000 + episodes — enough headroom for robosandbox's demo-gen use case. """ from __future__ import annotations import json import shutil +import subprocess from pathlib import Path from typing import Any -_CHUNK = "chunk-000" -_EPISODE_ID = 0 +_CHUNK_INDEX = 0 +_CHUNK_DIR = f"chunk-{_CHUNK_INDEX:03d}" _VIDEO_KEY = "observation.images.scene" +# LeRobot v3 templates (mirror lerobot.datasets.utils.DEFAULT_*). +_DATA_PATH_TMPL = "data/chunk-{chunk_index:03d}/file-{file_index:03d}.parquet" +_VIDEO_PATH_TMPL = "videos/{video_key}/chunk-{chunk_index:03d}/file-{file_index:03d}.mp4" + def _require_pyarrow(): # pragma: no cover - import guard try: - import pyarrow + import pyarrow # noqa: F401 import pyarrow.parquet # noqa: F401 except ImportError as e: raise RuntimeError( @@ -48,15 +57,52 @@ def _require_pyarrow(): # pragma: no cover - import guard ) from e -def _coerce_action(action: Any, fallback: list[float]) -> list[float]: - """Turn a recorded `action` field into a flat float vector. - - Falls back to `fallback` (typically the current state) if the action is - missing or not a numeric sequence. +def _probe_video_shape(video_path: Path) -> tuple[int, int, int]: + """Return (height, width, 3) of the first video stream, or (0, 0, 3) on failure.""" + if not video_path.exists(): + return (0, 0, 3) + try: + out = subprocess.check_output( + [ + "ffprobe", "-v", "error", + "-select_streams", "v:0", + "-show_entries", "stream=width,height", + "-of", "csv=p=0", + str(video_path), + ], + stderr=subprocess.DEVNULL, + text=True, + ).strip() + w_s, h_s = out.split(",")[:2] + return (int(h_s), int(w_s), 3) + except (subprocess.CalledProcessError, FileNotFoundError, ValueError): + return (0, 0, 3) + + +def _coerce_action( + action: Any, + fallback: list[float], + target_dim: int | None = None, +) -> list[float]: + """Turn a recorded ``action`` field into a flat float vector. + + Raises ``ValueError`` when ``action`` is missing or non-numeric — the + recorder must not emit frames without a commanded action (otherwise + settle/idle frames poison the dataset with state-as-action pairs). + + Padding from ``fallback`` only fixes dim alignment when the recorded + action has joints but no gripper field — it is NOT a missing-data + fallback. """ if action is None: - return list(fallback) - # Common shapes: {"joints": [...]}, {"qpos_target": [...]}, plain list. + raise ValueError( + "recorded event has action=None — cannot export. The recorder " + "should not write frames during settle / idle phases. If you " + "deliberately want a no-op action, record it explicitly as " + "{'joints': [...current targets...], 'gripper': }." + ) + seq: list | None = None + gripper: float | None = None if isinstance(action, list): seq = action elif isinstance(action, dict): @@ -65,14 +111,26 @@ def _coerce_action(action: Any, fallback: list[float]) -> list[float]: if isinstance(val, list): seq = val break - else: - return list(fallback) - else: - return list(fallback) + if "gripper" in action: + try: + gripper = float(action["gripper"]) + except (TypeError, ValueError): + gripper = None + if seq is None: + raise ValueError( + f"recorded action has no numeric joint sequence: {action!r}" + ) try: - return [float(x) for x in seq] - except (TypeError, ValueError): - return list(fallback) + out = [float(x) for x in seq] + except (TypeError, ValueError) as e: + raise ValueError( + f"recorded action joint sequence is not all numeric: {seq!r}" + ) from e + if gripper is not None: + out.append(gripper) + if target_dim is not None and len(out) < target_dim: + out = out + list(fallback[len(out):target_dim]) + return out def _read_events(events_path: Path) -> list[dict]: @@ -90,124 +148,166 @@ def _read_events(events_path: Path) -> list[dict]: return events -def export_episode( - src_dir: Path, +def export_episodes( + src_dirs: list[Path], dst_dir: Path, *, task: str | None = None, fps: int = 30, ) -> Path: - """Convert a single recorded episode directory to LeRobot v3 format. - - Parameters - ---------- - src_dir: - Directory produced by :class:`LocalRecorder` — must contain - ``events.jsonl`` and ``episode.json``; ``video.mp4`` is optional but - strongly recommended. - dst_dir: - Output LeRobot dataset root. Created if missing. Existing files with - matching paths are overwritten. - task: - Task string to record in ``meta/tasks.jsonl``. Falls back to - ``episode.json[task]`` then ``"unknown"``. - fps: - Recording rate advertised in ``meta/info.json``. Should match the rate - used by the recorder (default 30). - - Returns - ------- - Path - The dataset root (``dst_dir``). + """Convert recorded episodes into a single LeRobot v3.0 dataset. + + Each ``src_dirs`` entry must be a directory produced by + :class:`LocalRecorder` (containing ``events.jsonl`` and ideally + ``video.mp4``). The output uses the v3.0 chunk/file layout so ``lerobot + train`` can load it directly. """ _require_pyarrow() + import pandas as pd import pyarrow as pa import pyarrow.parquet as pq - src_dir = Path(src_dir) - dst_dir = Path(dst_dir) - if not src_dir.exists() or not src_dir.is_dir(): - raise FileNotFoundError(f"source episode directory does not exist: {src_dir}") - - events = _read_events(src_dir / "events.jsonl") - - episode_meta: dict = {} - ep_json = src_dir / "episode.json" - if ep_json.exists(): - episode_meta = json.loads(ep_json.read_text()) - task_str = task or episode_meta.get("task") or "unknown" - - # --- build column arrays -------------------------------------------------- - n_frames = len(events) - state_dim = len(events[0]["robot_joints"]) + 1 # + gripper_width - - states: list[list[float]] = [] - actions: list[list[float]] = [] - timestamps: list[float] = [] - frame_indices: list[int] = [] - - t0 = float(events[0].get("t", 0.0)) - for i, ev in enumerate(events): - joints = [float(x) for x in ev["robot_joints"]] - gripper = float(ev.get("gripper_width", 0.0)) - state = joints + [gripper] - if len(state) != state_dim: - raise ValueError( - f"inconsistent state dim at frame {i}: got {len(state)}, expected {state_dim}" - ) - states.append(state) - actions.append(_coerce_action(ev.get("action"), fallback=state)) - timestamps.append(float(ev.get("t", i / float(fps))) - t0) - frame_indices.append(int(ev.get("frame_idx", i))) - - episode_index = [_EPISODE_ID] * n_frames - task_index = [0] * n_frames - # LeRobot v3 uses an `index` column = global (multi-episode) frame index. - # For a single-episode export this coincides with frame_index. - global_index = list(range(n_frames)) - - table = pa.table( - { - "observation.state": pa.array(states, type=pa.list_(pa.float32())), - "action": pa.array(actions, type=pa.list_(pa.float32())), - "timestamp": pa.array(timestamps, type=pa.float32()), - "frame_index": pa.array(frame_indices, type=pa.int64()), - "episode_index": pa.array(episode_index, type=pa.int64()), - "index": pa.array(global_index, type=pa.int64()), - "task_index": pa.array(task_index, type=pa.int64()), - } - ) + if not src_dirs: + raise ValueError("export_episodes requires at least one source episode") + src_dirs = [Path(s) for s in src_dirs] + for s in src_dirs: + if not s.exists() or not s.is_dir(): + raise FileNotFoundError(f"source episode directory does not exist: {s}") + + if len(src_dirs) > 1000: + # One file per episode in a single chunk; chunk size = 1000 in v3.0. + # Sharding across chunks would require more bookkeeping than this + # exporter currently does. + raise NotImplementedError( + f"more than 1000 episodes ({len(src_dirs)}) is not yet supported — " + "multi-chunk output not implemented" + ) - # --- lay out directories -------------------------------------------------- + dst_dir = Path(dst_dir) meta_dir = dst_dir / "meta" - data_dir = dst_dir / "data" / _CHUNK - video_dir = dst_dir / "videos" / _CHUNK / _VIDEO_KEY - for d in (meta_dir, data_dir, video_dir): + data_dir = dst_dir / "data" / _CHUNK_DIR + video_dir = dst_dir / "videos" / _VIDEO_KEY / _CHUNK_DIR + episodes_meta_dir = meta_dir / "episodes" / _CHUNK_DIR + for d in (meta_dir, data_dir, video_dir, episodes_meta_dir): d.mkdir(parents=True, exist_ok=True) - parquet_path = data_dir / "episode_000000.parquet" - pq.write_table(table, parquet_path) + state_dim: int | None = None + total_frames = 0 + total_videos = 0 + task_str = task or "unknown" + robot_type: str | None = None + first_video_shape: tuple[int, int, int] | None = None + + episode_rows: list[dict] = [] + + # Running stats for observation.state and action. lerobot's dataloader + # factory reads meta/stats.json and crashes on missing keys — we must + # emit at least mean/std/min/max/count per non-visual feature. + state_stack: list[list[float]] = [] + action_stack: list[list[float]] = [] + + for ep_idx, src in enumerate(src_dirs): + events = _read_events(src / "events.jsonl") + ep_meta: dict = {} + ep_json = src / "episode.json" + if ep_json.exists(): + ep_meta = json.loads(ep_json.read_text()) + if task is None and ep_meta.get("task"): + task_str = ep_meta["task"] + if robot_type is None and ep_meta.get("robot_type"): + robot_type = ep_meta["robot_type"] + + n_frames = len(events) + ep_state_dim = len(events[0]["robot_joints"]) + 1 + if state_dim is None: + state_dim = ep_state_dim + elif state_dim != ep_state_dim: + raise ValueError( + f"episode {src} state_dim={ep_state_dim} != dataset state_dim={state_dim}" + ) - # --- copy video (if any) -------------------------------------------------- - src_video = src_dir / "video.mp4" - dst_video = video_dir / "episode_000000.mp4" - if src_video.exists(): - shutil.copyfile(src_video, dst_video) + states: list[list[float]] = [] + actions: list[list[float]] = [] + timestamps: list[float] = [] + frame_indices: list[int] = [] + # Timestamps MUST match the video's encoded rate, not the simulator's + # wall-clock. lerobot's dataloader queries frames by timestamp with a + # 0.1 ms default tolerance, and our video is muxed at exactly `fps` — + # so frame i sits at i/fps. Storing event `t` here would drift out of + # tolerance within a few seconds. + for i, ev in enumerate(events): + joints = [float(x) for x in ev["robot_joints"]] + gripper = float(ev.get("gripper_width", 0.0)) + state = joints + [gripper] + states.append(state) + actions.append(_coerce_action(ev.get("action"), fallback=state, target_dim=state_dim)) + timestamps.append(i / float(fps)) + frame_indices.append(int(ev.get("frame_idx", i))) + + global_indices = list(range(total_frames, total_frames + n_frames)) + episode_index_col = [ep_idx] * n_frames + task_index_col = [0] * n_frames + table = pa.table( + { + "observation.state": pa.array(states, type=pa.list_(pa.float32())), + "action": pa.array(actions, type=pa.list_(pa.float32())), + "timestamp": pa.array(timestamps, type=pa.float32()), + "frame_index": pa.array(frame_indices, type=pa.int64()), + "episode_index": pa.array(episode_index_col, type=pa.int64()), + "index": pa.array(global_indices, type=pa.int64()), + "task_index": pa.array(task_index_col, type=pa.int64()), + } + ) + file_idx = ep_idx + pq.write_table(table, data_dir / f"file-{file_idx:03d}.parquet") + + src_video = src / "video.mp4" + if src_video.exists(): + dst_video = video_dir / f"file-{file_idx:03d}.mp4" + shutil.copyfile(src_video, dst_video) + total_videos += 1 + if first_video_shape is None: + first_video_shape = _probe_video_shape(dst_video) + + # In the packed v3.0 layout multiple episodes share one mp4 and each + # episode records its [from_timestamp, to_timestamp) slice. We emit + # one mp4 per episode, so from=0 and to=length/fps. + episode_rows.append( + { + "episode_index": ep_idx, + "tasks": [task_str], + "length": n_frames, + "dataset_from_index": total_frames, + "dataset_to_index": total_frames + n_frames, + "data/chunk_index": _CHUNK_INDEX, + "data/file_index": file_idx, + f"videos/{_VIDEO_KEY}/chunk_index": _CHUNK_INDEX, + f"videos/{_VIDEO_KEY}/file_index": file_idx, + f"videos/{_VIDEO_KEY}/from_timestamp": 0.0, + f"videos/{_VIDEO_KEY}/to_timestamp": float(n_frames) / float(fps), + } + ) + total_frames += n_frames + state_stack.extend(states) + action_stack.extend(actions) + + assert state_dim is not None # at least one episode by precondition + + img_shape = list(first_video_shape) if first_video_shape is not None else [0, 0, 3] - # --- metadata ------------------------------------------------------------- info = { "codebase_version": "v3.0", - "robot_type": episode_meta.get("robot_type", "unknown"), - "total_episodes": 1, - "total_frames": n_frames, + "robot_type": robot_type or "unknown", + "total_episodes": len(src_dirs), + "total_frames": total_frames, "total_tasks": 1, - "total_videos": 1 if src_video.exists() else 0, + "total_videos": total_videos, "total_chunks": 1, "chunks_size": 1000, "fps": int(fps), - "splits": {"train": f"0:{n_frames}"}, - "data_path": "data/{episode_chunk:s}/episode_{episode_index:06d}.parquet", - "video_path": "videos/{episode_chunk:s}/{video_key}/episode_{episode_index:06d}.mp4", + "splits": {"train": f"0:{total_frames}"}, + "data_path": _DATA_PATH_TMPL, + "video_path": _VIDEO_PATH_TMPL, "features": { "observation.state": { "dtype": "float32", @@ -221,7 +321,7 @@ def export_episode( }, "observation.images.scene": { "dtype": "video", - "shape": [0, 0, 3], # filled by consumer / unknown at export time + "shape": img_shape, "names": ["height", "width", "channels"], "video_info": {"video.fps": float(fps), "video.codec": "h264"}, }, @@ -234,15 +334,57 @@ def export_episode( } (meta_dir / "info.json").write_text(json.dumps(info, indent=2)) - with (meta_dir / "tasks.jsonl").open("w") as f: - f.write(json.dumps({"task_index": 0, "task": task_str}) + "\n") + tasks_df = pd.DataFrame([{"task_index": 0, "task": task_str}]) + tasks_df.to_parquet(meta_dir / "tasks.parquet", index=False) + + episodes_df = pd.DataFrame(episode_rows) + episodes_df.to_parquet(episodes_meta_dir / "file-000.parquet", index=False) + + # meta/stats.json — required by lerobot's factory.py (crashes on None). + # For state/action we write real stats; for the video feature we write + # per-channel placeholders in [0,1] range (lerobot overwrites these with + # IMAGENET_STATS when dataset.use_imagenet_stats=True, which is the + # default for ACT and Diffusion policies). + import numpy as np + state_arr = np.asarray(state_stack, dtype=np.float64) + action_arr = np.asarray(action_stack, dtype=np.float64) + + def _stats(x: np.ndarray) -> dict[str, list[float] | int]: + return { + "mean": x.mean(axis=0).astype(np.float32).tolist(), + "std": (x.std(axis=0) + 1e-8).astype(np.float32).tolist(), + "min": x.min(axis=0).astype(np.float32).tolist(), + "max": x.max(axis=0).astype(np.float32).tolist(), + "count": [int(x.shape[0])], + } - episode_record = { - "episode_index": _EPISODE_ID, - "tasks": [task_str], - "length": n_frames, + stats = { + "observation.state": _stats(state_arr), + "action": _stats(action_arr), + _VIDEO_KEY: { + # Placeholder per-channel stats in [0,1] range, shape (3,1,1) as + # lerobot expects channel-first broadcast-compatible stats. + "mean": [[[0.5]], [[0.5]], [[0.5]]], + "std": [[[0.25]], [[0.25]], [[0.25]]], + "min": [[[0.0]], [[0.0]], [[0.0]]], + "max": [[[1.0]], [[1.0]], [[1.0]]], + "count": [total_frames], + }, } - with (meta_dir / "episodes.jsonl").open("w") as f: - f.write(json.dumps(episode_record) + "\n") + (meta_dir / "stats.json").write_text(json.dumps(stats, indent=2)) return dst_dir + + +def export_episode( + src_dir: Path, + dst_dir: Path, + *, + task: str | None = None, + fps: int = 30, +) -> Path: + """Convert a single recorded episode directory to LeRobot v3.0 format. + + Thin wrapper over :func:`export_episodes` for the single-episode case. + """ + return export_episodes([Path(src_dir)], Path(dst_dir), task=task, fps=fps) diff --git a/packages/robosandbox-core/tests/test_export_lerobot.py b/packages/robosandbox-core/tests/test_export_lerobot.py index bb9089c..cb5af1b 100644 --- a/packages/robosandbox-core/tests/test_export_lerobot.py +++ b/packages/robosandbox-core/tests/test_export_lerobot.py @@ -45,8 +45,12 @@ def fake_episode(tmp_path: Path) -> Path: "ee_pose": {"xyz": [0.3, 0.0, 0.2], "quat_xyzw": [0, 0, 0, 1]}, "gripper_width": 0.04, "objects": {}, - # Alternate between None and a numeric action to exercise both paths. - "action": None if i % 2 == 0 else {"joints": [0.2 * j for j in range(n_dof)]}, + # Every frame carries a real commanded action. The exporter + # rejects action=None (would otherwise silently fall back to + # action=state and poison ~65% of training samples — see + # _coerce_action). The settle/idle case is covered by + # test_export_rejects_action_none below. + "action": {"joints": [0.2 * j for j in range(n_dof)]}, } f.write(json.dumps(ev) + "\n") @@ -67,8 +71,8 @@ def test_export_produces_valid_parquet(fake_episode: Path, tmp_path: Path) -> No out = export_episode(fake_episode, dst, task="pick_cube", fps=30) assert out == dst - # Parquet file exists at LeRobot v3 path. - parquet_path = dst / "data" / "chunk-000" / "episode_000000.parquet" + # Parquet file exists at LeRobot v3.0 path (file-NNN, not episode-NNN). + parquet_path = dst / "data" / "chunk-000" / "file-000.parquet" assert parquet_path.exists(), parquet_path table = pq.read_table(parquet_path) @@ -89,11 +93,17 @@ def test_export_produces_valid_parquet(fake_episode: Path, tmp_path: Path) -> No # State dim = 7 joints + 1 gripper. state_first = table.column("observation.state")[0].as_py() assert len(state_first) == 8 + # Each recorded action has 7 joints (no gripper key in this fixture). The + # exporter pads short actions up to state_dim (8 = 7 joints + gripper) + # using state[7] so policies trained on the dataset see consistent + # action/state dims — without this, ACT/Diffusion would learn 7-D + # actions while observing 8-D states (silent gripper-open-by-default + # bias). Pad value MUST come from state, not zero, because the dataset's + # state[7] is the actual gripper width at that frame. action_first = table.column("action")[0].as_py() - assert len(action_first) == 8 # fallback to state (i=0 action was None) - # Frame 1 had an explicit numeric action of length 7 — exporter coerces. + assert len(action_first) == 8 action_second = table.column("action")[1].as_py() - assert len(action_second) == 7 + assert len(action_second) == 8 def test_export_writes_meta_files(fake_episode: Path, tmp_path: Path) -> None: @@ -105,28 +115,57 @@ def test_export_writes_meta_files(fake_episode: Path, tmp_path: Path) -> None: assert info["total_frames"] == 30 assert info["fps"] == 30 assert "observation.state" in info["features"] + # Path templates must use v3.0 chunk_index/file_index variables so lerobot + # resolves data and video files from the episodes.parquet pointers. + assert "{chunk_index" in info["data_path"] + assert "{file_index" in info["data_path"] - tasks = [ - json.loads(line) - for line in (dst / "meta" / "tasks.jsonl").read_text().splitlines() - if line.strip() - ] + tasks = pq.read_table(dst / "meta" / "tasks.parquet").to_pylist() assert tasks == [{"task_index": 0, "task": "pick_cube"}] - episodes = [ - json.loads(line) - for line in (dst / "meta" / "episodes.jsonl").read_text().splitlines() - if line.strip() - ] + episodes = pq.read_table( + dst / "meta" / "episodes" / "chunk-000" / "file-000.parquet" + ).to_pylist() assert episodes[0]["episode_index"] == 0 assert episodes[0]["length"] == 30 + assert episodes[0]["dataset_from_index"] == 0 + assert episodes[0]["dataset_to_index"] == 30 + assert episodes[0]["data/chunk_index"] == 0 + assert episodes[0]["data/file_index"] == 0 def test_export_task_falls_back_to_episode_json(fake_episode: Path, tmp_path: Path) -> None: dst = tmp_path / "dataset" export_episode(fake_episode, dst) # no `task=` override - tasks_line = (dst / "meta" / "tasks.jsonl").read_text().splitlines()[0] - assert json.loads(tasks_line)["task"] == "pick_cube" + tasks = pq.read_table(dst / "meta" / "tasks.parquet").to_pylist() + assert tasks[0]["task"] == "pick_cube" + + +def test_export_rejects_action_none(tmp_path: Path) -> None: + """Regression: action=None used to be silently filled with state. That + poisoned ~65% of training samples (the settle frames) and pulled all + skill-action targets toward the home baseline, so the trained policy + consistently undershot demo trajectories. The exporter now refuses + action=None to force the recorder to drop those frames upstream. + """ + ep_dir = tmp_path / "ep-with-none-action" + ep_dir.mkdir() + (ep_dir / "episode.json").write_text(json.dumps({"task": "x"})) + n_dof = 7 + with (ep_dir / "events.jsonl").open("w") as f: + for i in range(5): + ev = { + "t": i / 30.0, + "frame_idx": i, + "robot_joints": [0.0] * n_dof, + "ee_pose": {"xyz": [0, 0, 0], "quat_xyzw": [0, 0, 0, 1]}, + "gripper_width": 0.04, + "objects": {}, + "action": None, # the bug-triggering case + } + f.write(json.dumps(ev) + "\n") + with pytest.raises(ValueError, match="action=None"): + export_episode(ep_dir, tmp_path / "out", task="x") def test_export_missing_src_raises(tmp_path: Path) -> None: From 39cd089b70171fa7558de560ffce799cd4990e92 Mon Sep 17 00:00:00 2001 From: amarrmb Date: Sat, 25 Apr 2026 12:10:04 -0700 Subject: [PATCH 4/7] fix(scene+sim): strip MJCF keyframes; record last commanded action on MuJoCo backend MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scene/robot_loader: robot MJCFs ship with blocks sized to the robot in isolation. Once inject_scene_objects adds free-joint objects (each adds 7 qpos), MuJoCo refuses to compile the model with "keyframe N: invalid qpos size, expected length M". Strip them at load time — robot home pose flows through the sidecar (home_qpos) anyway. sim/mujoco_backend: capture the last (target_joints, gripper) commanded via step() and expose via last_action(). Lets the recorder populate the JSONL `action` field uniformly without each skill having to thread the target through ctx.on_step. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/robosandbox/scene/robot_loader.py | 10 +++++++ .../src/robosandbox/sim/mujoco_backend.py | 26 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/packages/robosandbox-core/src/robosandbox/scene/robot_loader.py b/packages/robosandbox-core/src/robosandbox/scene/robot_loader.py index 41269af..dc0a53b 100644 --- a/packages/robosandbox-core/src/robosandbox/scene/robot_loader.py +++ b/packages/robosandbox-core/src/robosandbox/scene/robot_loader.py @@ -310,6 +310,16 @@ def load_robot( except (ValueError, RuntimeError) as e: raise RobotModelCompileError(urdf_path, e) from e + # Strip any blocks the robot MJCF ships with. They reference a + # qpos vector sized to the robot in isolation; once `inject_scene_objects` + # adds free-joint objects (each adds 7 qpos), the keyframe rows become + # the wrong length and MuJoCo refuses to compile with + # ``keyframe N: invalid qpos size, expected length M``. The robot's home + # pose flows through the sidecar (home_qpos) and we set ctrl explicitly, + # so the in-MJCF keyframe table carries no information we depend on. + for key in list(spec.keys): + key.delete() + # Inject ee_site on the specified body if requested. if parsed.ee_site_mode == "inject": assert parsed.ee_site_attach_body is not None diff --git a/packages/robosandbox-core/src/robosandbox/sim/mujoco_backend.py b/packages/robosandbox-core/src/robosandbox/sim/mujoco_backend.py index 97976c7..585d7cf 100644 --- a/packages/robosandbox-core/src/robosandbox/sim/mujoco_backend.py +++ b/packages/robosandbox-core/src/robosandbox/sim/mujoco_backend.py @@ -65,6 +65,11 @@ def __init__(self, render_size: tuple[int, int] = (480, 640), camera: str = "sce self._gripper_qpos_adr: int = -1 self._arm_ctrl_adr: list[int] = [] self._gripper_ctrl_adr: int = -1 + # Last action commanded via step(). Recorded so write_frame() can + # populate the JSONL `action` field without each skill having to + # thread the target through ctx.on_step. + self._last_target_joints: np.ndarray | None = None + self._last_gripper: float | None = None self._ee_site_id: int = -1 self._obj_body_ids: dict[str, int] = {} self._t: float = 0.0 @@ -147,7 +152,9 @@ def step( ) for adr, q in zip(self._arm_ctrl_adr, arr): self._data.ctrl[adr] = float(q) + self._last_target_joints = arr.copy() if gripper is not None: + self._last_gripper = float(np.clip(gripper, 0.0, 1.0)) # Semantic input: 0.0 == open, 1.0 == closed. # Lerp form (open*(1-t) + closed*t) — collapses to OLD's # `open*(1-t)` when closed==0, preserving bit-exact ctrl values. @@ -162,6 +169,25 @@ def step( mujoco.mj_step(self._model, self._data) self._t += self._model.opt.timestep + def last_action(self) -> dict | None: + """Most recent (target_joints, gripper) commanded via :meth:`step`. + + Returns ``None`` until the first :meth:`step` call sets either field. + Used by :class:`~robosandbox.recorder.local.LocalRecorder` to populate + the JSONL ``action`` field without each skill having to thread the + target through ``ctx.on_step``. + """ + if self._last_target_joints is None and self._last_gripper is None: + return None + return { + "joints": ( + self._last_target_joints.tolist() + if self._last_target_joints is not None + else None + ), + "gripper": self._last_gripper, + } + # ---- observation ----------------------------------------------------- def observe(self) -> Observation: assert self._model is not None and self._data is not None and self._renderer is not None From 9414a29c63815a1fec4727c87726ae013eb3c7a1 Mon Sep 17 00:00:00 2001 From: amarrmb Date: Sat, 25 Apr 2026 12:10:13 -0700 Subject: [PATCH 5/7] feat(tasks): supported_backends, randomize spec, criterion target-object helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - task definitions can now declare supported_backends (mujoco/newton/...) so the CLI can fail clearly when a task is asked to run on an incompatible sim - pick_cube_franka_random.yaml: a randomize block (cube xyz jitter) — gives evals real per-trial variability so success rate is meaningful instead of measuring one fixed scene - tasks/runner: criterion_target_object() centralizes the "what cube does this success criterion track" lookup that used to be duplicated across the eval CLI and per-trial details collector Co-Authored-By: Claude Opus 4.7 (1M context) --- .../tasks/definitions/pick_cube_franka.yaml | 3 ++ .../definitions/pick_cube_franka_random.yaml | 29 +++++++++++++++++++ .../src/robosandbox/tasks/runner.py | 27 +++++++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 packages/robosandbox-core/src/robosandbox/tasks/definitions/pick_cube_franka_random.yaml diff --git a/packages/robosandbox-core/src/robosandbox/tasks/definitions/pick_cube_franka.yaml b/packages/robosandbox-core/src/robosandbox/tasks/definitions/pick_cube_franka.yaml index 2bb3d61..edfd80a 100644 --- a/packages/robosandbox-core/src/robosandbox/tasks/definitions/pick_cube_franka.yaml +++ b/packages/robosandbox-core/src/robosandbox/tasks/definitions/pick_cube_franka.yaml @@ -1,6 +1,9 @@ name: pick_cube_franka prompt: "pick up the red cube" seed_note: "Franka Panda from bundled menagerie assets; acceptance test for URDF/MJCF import path. Cube placed in Franka's reach (Franka base at x=-0.3)." +supported_backends: + - mujoco + - newton scene: robot_urdf: "@builtin:robots/franka_panda/panda.xml" robot_config: "@builtin:robots/franka_panda/panda.robosandbox.yaml" diff --git a/packages/robosandbox-core/src/robosandbox/tasks/definitions/pick_cube_franka_random.yaml b/packages/robosandbox-core/src/robosandbox/tasks/definitions/pick_cube_franka_random.yaml new file mode 100644 index 0000000..4e91ebf --- /dev/null +++ b/packages/robosandbox-core/src/robosandbox/tasks/definitions/pick_cube_franka_random.yaml @@ -0,0 +1,29 @@ +name: pick_cube_franka_random +prompt: "pick up the red cube" +seed_note: | + Same Franka pick scene as `pick_cube_franka`, with a randomize block + so each trial draws a fresh xy and yaw for the cube. Used by the + eval harness to report a real success rate (not a single-pose + anecdote) and to generate diverse scripted demonstrations for + policy training. +supported_backends: + - mujoco + - newton +scene: + robot_urdf: "@builtin:robots/franka_panda/panda.xml" + robot_config: "@builtin:robots/franka_panda/panda.robosandbox.yaml" + objects: + - id: red_cube + kind: box + size: [0.012, 0.012, 0.012] + pose: + xyz: [0.4, 0.0, 0.06] + rgba: [0.85, 0.2, 0.2, 1.0] + mass: 0.05 +randomize: + xy_jitter: 0.05 # ± 5 cm in xy per trial + yaw_jitter: 0.5 # ± 0.5 rad about z per trial +success: + kind: lifted + object: red_cube + min_mm: 50 diff --git a/packages/robosandbox-core/src/robosandbox/tasks/runner.py b/packages/robosandbox-core/src/robosandbox/tasks/runner.py index 37698ff..3db1bfb 100644 --- a/packages/robosandbox-core/src/robosandbox/tasks/runner.py +++ b/packages/robosandbox-core/src/robosandbox/tasks/runner.py @@ -60,10 +60,37 @@ class TaskResult: # ---------- success evaluation ----------------------------------------- +_OBJECT_BEARING_KINDS = frozenset({"lifted", "moved_above", "displaced"}) + + def _eval_criterion(c: SuccessCriterion, initial: Observation, final: Observation) -> tuple[bool, dict]: return _eval_check(c.data, initial, final) +def criterion_target_object(c: SuccessCriterion | None) -> str | None: + """Return the primary object id that ``c`` evaluates against, or None. + + Used by eval harnesses that want to track per-step distance / lift + metrics for the same object the success check operates on. + """ + if c is None: + return None + data = c.data if hasattr(c, "data") else (c if isinstance(c, dict) else {}) + return _check_target_object(data) + + +def _check_target_object(check: dict) -> str | None: + kind = check.get("kind") + if kind in _OBJECT_BEARING_KINDS: + return check.get("object") + if kind == "all": + for sub in check.get("checks", []): + target = _check_target_object(sub) + if target is not None: + return target + return None + + def _eval_check(check: dict, initial: Observation, final: Observation) -> tuple[bool, dict]: kind = check.get("kind") if kind == "lifted": From 472ccae035611bb41cc4491a8f82db17aa2c4a24 Mon Sep 17 00:00:00 2001 From: amarrmb Date: Sat, 25 Apr 2026 12:10:29 -0700 Subject: [PATCH 6/7] feat(sim): Newton integration backend (state + optional batched RGB) + sim-check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit RoboSandbox is the integration layer; Newton is one of the simulators it plays with. This lands the Newton integration as a parallel backend choice behind the same sim API: - sim/factory: create_sim_backend(name, **kw) — registry + entry-point dispatch so the CLI and downstream code stay backend-agnostic - sim/newton_backend: state-only by default (joint_q + body_q across N parallel worlds via newton.ModelBuilder + MuJoCo-Warp solver). Opt-in RGB via enable_camera=True wires newton.sensors.SensorTiledCamera to raytrace per-world RGB on the GPU. Lazy import of warp/newton so the module is safe to import on CPU-only machines. - eval/sim_check: drive the same policy through MuJoCo and Newton, report trajectory agreement — the reproducibility check for any cross-sim integration claim. Importantly: Newton runs require warp + newton ≥ 1.1 in the active env. RoboSandbox does not bundle them; users opt in via `pip install warp newton` or use the newton venv on a GPU host. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/robosandbox/eval/sim_check.py | 189 +++++ .../src/robosandbox/sim/__init__.py | 8 +- .../src/robosandbox/sim/factory.py | 58 ++ .../src/robosandbox/sim/newton_backend.py | 715 ++++++++++++++++++ .../robosandbox-core/tests/test_sim_check.py | 74 ++ 5 files changed, 1043 insertions(+), 1 deletion(-) create mode 100644 packages/robosandbox-core/src/robosandbox/eval/sim_check.py create mode 100644 packages/robosandbox-core/src/robosandbox/sim/factory.py create mode 100644 packages/robosandbox-core/src/robosandbox/sim/newton_backend.py create mode 100644 packages/robosandbox-core/tests/test_sim_check.py diff --git a/packages/robosandbox-core/src/robosandbox/eval/sim_check.py b/packages/robosandbox-core/src/robosandbox/eval/sim_check.py new file mode 100644 index 0000000..719a205 --- /dev/null +++ b/packages/robosandbox-core/src/robosandbox/eval/sim_check.py @@ -0,0 +1,189 @@ +"""Cross-backend agreement check. + +The product claim "we run your policy in MuJoCo or Newton" is only +meaningful if both backends agree on what the policy *did*. This +runs the same policy through both backends (MuJoCo single-world, +Newton world_count=1) and reports whether they agree on: + +1. Final task success outcome — the load-bearing claim. +2. End-of-episode joint positions — bounded by ``joint_tol_rad``. +3. End-of-episode object positions — bounded by ``xy_tol_m``. + +Per-step state divergence is tolerated by design: MuJoCo's CG/Newton +solvers and Warp-based Newton are entirely different numerical +methods, so mid-trajectory drift is expected (especially during +contact). Outcome agreement at the end is what matters. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +import numpy as np + +from robosandbox.policy import load_policy, run_eval_parallel, run_policy +from robosandbox.tasks.loader import load_builtin_task +from robosandbox.types import Observation + + +@dataclass +class BackendOutcome: + backend: str + success: bool | None + steps: int + final_joints: np.ndarray + final_objects: dict[str, np.ndarray] # id -> xyz (3,) + + +@dataclass +class SimCheckReport: + schema_version: int = 1 + task: str = "" + policy: str = "" + tolerances: dict[str, float] = field(default_factory=dict) + mujoco: dict[str, Any] = field(default_factory=dict) + newton: dict[str, Any] = field(default_factory=dict) + outcome_match: bool = False + joint_max_abs_diff_rad: float = float("nan") + object_max_xy_diff_m: float = float("nan") + object_diffs: dict[str, float] = field(default_factory=dict) + verdict: str = "UNKNOWN" # PASS | OUTCOME_MISMATCH | STATE_DRIFT | RUN_FAILED + + +def run_sim_check( + *, + task_name: str, + policy_path: str, + max_steps: int = 600, + settle_steps: int = 100, + joint_tol_rad: float = 0.087, # ~5 deg + xy_tol_m: float = 0.005, # 5 mm + device: str = "cuda:0", +) -> SimCheckReport: + """Drive the same policy through MuJoCo and Newton; report agreement.""" + from robosandbox.sim import create_sim_backend + + task = load_builtin_task(task_name) + rep = SimCheckReport( + task=task_name, + policy=policy_path, + tolerances={"joint_rad": joint_tol_rad, "xy_m": xy_tol_m}, + ) + + # ---- MuJoCo --------------------------------------------------------- + try: + policy_m = load_policy(policy_path) + sim_m = create_sim_backend("mujoco", render_size=(240, 320), camera="scene") + sim_m.load(task.scene) + try: + res_m = run_policy(sim_m, policy_m, max_steps=max_steps, success=task.success) + outcome_m = _capture_outcome("mujoco", res_m["final_obs"], res_m) + finally: + sim_m.close() + except Exception as e: + rep.mujoco = {"error": f"{type(e).__name__}: {e}"} + rep.verdict = "RUN_FAILED" + return rep + + # ---- Newton (world_count=1 to apple-to-apple compare) -------------- + try: + policy_n = load_policy(policy_path) + sim_n = create_sim_backend( + "newton", viewer="null", device=device, world_count=1, + render_size=(240, 320), camera="scene", + ) + sim_n.load(task.scene) + try: + res_n = run_eval_parallel( + sim_n, policy_n, max_steps=max_steps, success=task.success, + settle_steps=settle_steps, + ) + obs_n = sim_n.observe_all()[0] + outcome_n = _capture_outcome( + "newton", obs_n, + {"steps": res_n["steps"], + "success": (res_n["success_per_world"][0] if res_n["success_per_world"] else None)}, + ) + finally: + sim_n.close() + except Exception as e: + rep.newton = {"error": f"{type(e).__name__}: {e}"} + rep.mujoco = _outcome_to_dict(outcome_m) + rep.verdict = "RUN_FAILED" + return rep + + rep.mujoco = _outcome_to_dict(outcome_m) + rep.newton = _outcome_to_dict(outcome_n) + rep.outcome_match = (bool(outcome_m.success) == bool(outcome_n.success)) + + # Joint diff: max absolute element-wise difference across the n_dof joints. + if outcome_m.final_joints.shape == outcome_n.final_joints.shape: + rep.joint_max_abs_diff_rad = float( + np.max(np.abs(outcome_m.final_joints - outcome_n.final_joints)) + ) + else: + rep.joint_max_abs_diff_rad = float("nan") + + # Per-object xy diff. Only report objects present in both. + common_ids = sorted(set(outcome_m.final_objects) & set(outcome_n.final_objects)) + diffs: dict[str, float] = {} + for oid in common_ids: + a = outcome_m.final_objects[oid][:2] + b = outcome_n.final_objects[oid][:2] + diffs[oid] = float(np.linalg.norm(a - b)) + rep.object_diffs = diffs + rep.object_max_xy_diff_m = max(diffs.values()) if diffs else 0.0 + + if not rep.outcome_match: + rep.verdict = "OUTCOME_MISMATCH" + elif ( + rep.joint_max_abs_diff_rad == rep.joint_max_abs_diff_rad # not NaN + and rep.joint_max_abs_diff_rad > joint_tol_rad + ) or rep.object_max_xy_diff_m > xy_tol_m: + rep.verdict = "STATE_DRIFT" + else: + rep.verdict = "PASS" + return rep + + +def _capture_outcome(backend: str, obs: Observation, res: dict) -> BackendOutcome: + final_objects = { + oid: np.asarray(p.xyz, dtype=np.float64) for oid, p in obs.scene_objects.items() + } + return BackendOutcome( + backend=backend, + success=res.get("success"), + steps=int(res.get("steps", 0)), + final_joints=np.asarray(obs.robot_joints, dtype=np.float64).copy(), + final_objects=final_objects, + ) + + +def _outcome_to_dict(o: BackendOutcome) -> dict[str, Any]: + return { + "backend": o.backend, + "success": o.success, + "steps": o.steps, + "final_joints": o.final_joints.tolist(), + "final_objects": {k: v.tolist() for k, v in o.final_objects.items()}, + } + + +def report_to_dict(rep: SimCheckReport) -> dict[str, Any]: + """Serialize a SimCheckReport for `--output result.json`.""" + return { + "schema_version": rep.schema_version, + "task": rep.task, + "policy": rep.policy, + "tolerances": rep.tolerances, + "mujoco": rep.mujoco, + "newton": rep.newton, + "agreement": { + "outcome_match": rep.outcome_match, + "joint_max_abs_diff_rad": rep.joint_max_abs_diff_rad, + "object_max_xy_diff_m": rep.object_max_xy_diff_m, + "object_diffs": rep.object_diffs, + }, + "verdict": rep.verdict, + } diff --git a/packages/robosandbox-core/src/robosandbox/sim/__init__.py b/packages/robosandbox-core/src/robosandbox/sim/__init__.py index 1a57cd4..e9b65bc 100644 --- a/packages/robosandbox-core/src/robosandbox/sim/__init__.py +++ b/packages/robosandbox-core/src/robosandbox/sim/__init__.py @@ -1,3 +1,9 @@ +from robosandbox.sim.factory import create_sim_backend, get_sim_backend_class, list_sim_backends from robosandbox.sim.mujoco_backend import MuJoCoBackend -__all__ = ["MuJoCoBackend"] +__all__ = [ + "MuJoCoBackend", + "create_sim_backend", + "get_sim_backend_class", + "list_sim_backends", +] diff --git a/packages/robosandbox-core/src/robosandbox/sim/factory.py b/packages/robosandbox-core/src/robosandbox/sim/factory.py new file mode 100644 index 0000000..9a6d466 --- /dev/null +++ b/packages/robosandbox-core/src/robosandbox/sim/factory.py @@ -0,0 +1,58 @@ +"""Sim backend registry helpers. + +This keeps backend selection in one place so CLI and future plugins do +not need to import concrete sim implementations directly. +""" + +from __future__ import annotations + +import inspect +from importlib import import_module +from importlib.metadata import entry_points +from typing import Any + + +_BUILTIN_BACKENDS: dict[str, str] = { + "mujoco": "robosandbox.sim.mujoco_backend:MuJoCoBackend", + "newton": "robosandbox.sim.newton_backend:NewtonBackend", +} + + +def _load_dotted(ref: str) -> type: + module_name, _, attr = ref.partition(":") + if not module_name or not attr: + raise ValueError(f"invalid backend reference {ref!r}") + module = import_module(module_name) + return getattr(module, attr) + + +def _iter_entry_points(): + for ep in entry_points(group="robosandbox.sim"): + yield ep + + +def list_sim_backends() -> list[str]: + names = set(_BUILTIN_BACKENDS) + for ep in _iter_entry_points(): + names.add(ep.name) + return sorted(names) + + +def get_sim_backend_class(name: str) -> type: + for ep in _iter_entry_points(): + if ep.name == name: + return ep.load() + ref = _BUILTIN_BACKENDS.get(name) + if ref is None: + available = ", ".join(list_sim_backends()) + raise ValueError(f"unknown sim backend {name!r}; available: {available}") + return _load_dotted(ref) + + +def create_sim_backend(name: str, **kwargs: Any) -> Any: + cls = get_sim_backend_class(name) + params = inspect.signature(cls).parameters + accepted = { + key: value for key, value in kwargs.items() if key in params + } + return cls(**accepted) diff --git a/packages/robosandbox-core/src/robosandbox/sim/newton_backend.py b/packages/robosandbox-core/src/robosandbox/sim/newton_backend.py new file mode 100644 index 0000000..e3a86af --- /dev/null +++ b/packages/robosandbox-core/src/robosandbox/sim/newton_backend.py @@ -0,0 +1,715 @@ +"""Newton implementation of the SimBackend protocol. + +This is intentionally narrower than the MuJoCo backend today: + +- scene loading supports the Franka-style MJCF + primitive box objects +- observations expose robot/object state plus optional batched RGB + (opt-in via ``enable_camera=True``; uses ``newton.sensors.SensorTiledCamera`` + to raytrace one shaded image per world on the GPU) +- the motion-planner stack remains MuJoCo-specific, so planner-driven + agent runs should stay on MuJoCo for now + +The point of this backend is to make Newton a first-class integration +surface for scene loading, policy replay, and interactive viewer demos. + +Multi-world (world_count > 1) +------------------------------ +Pass ``world_count=N`` to run N identical scenes in parallel on one GPU. +Newton tiles the worlds in a 2-D grid with 2.5 m spacing so they don't +interfere physically. ``observe_all()`` returns one Observation per world; +``step()`` broadcasts the same joint targets to every world. This is +exactly what GPU-parallel policy evaluation needs. +""" + +from __future__ import annotations + +import copy +import math +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import numpy as np +import yaml + +from robosandbox.types import Observation, Pose, Scene + + +def _quat_mul( + a: tuple[float, float, float, float], + b: tuple[float, float, float, float], +) -> tuple[float, float, float, float]: + ax, ay, az, aw = a + bx, by, bz, bw = b + return ( + aw * bx + ax * bw + ay * bz - az * by, + aw * by - ax * bz + ay * bw + az * bx, + aw * bz + ax * by - ay * bx + az * bw, + aw * bw - ax * bx - ay * by - az * bz, + ) + + +def _quat_conj(q: tuple[float, float, float, float]) -> tuple[float, float, float, float]: + x, y, z, w = q + return (-x, -y, -z, w) + + +def _rotate_vec(q: tuple[float, float, float, float], v: tuple[float, float, float]) -> np.ndarray: + qv = (v[0], v[1], v[2], 0.0) + out = _quat_mul(_quat_mul(q, qv), _quat_conj(q)) + return np.array(out[:3], dtype=np.float64) + + +def _body_pose_from_row(row: np.ndarray) -> Pose: + return Pose( + xyz=(float(row[0]), float(row[1]), float(row[2])), + quat_xyzw=(float(row[3]), float(row[4]), float(row[5]), float(row[6])), + ) + + +@dataclass(frozen=True) +class _RobotConfig: + arm_joint_names: tuple[str, ...] + gripper_joint_names: tuple[str, ...] + home_qpos: tuple[float, ...] + gripper_open_qpos: float + gripper_closed_qpos: float + ee_attach_body: str + ee_offset_xyz: tuple[float, float, float] + + +def _look_at_quat(eye: np.ndarray, target: np.ndarray, up: np.ndarray) -> Any: + """OpenGL-style look-at quat (xyzw) used by Newton SensorTiledCamera. + + Camera looks down -Z in its own frame, +Y up. Returns a wp.quatf — caller + supplies wp via the lazy import on the backend instance. + """ + import warp as wp # local: only when camera is enabled + + forward = target - eye + forward /= np.linalg.norm(forward) + 1e-9 + right = np.cross(forward, up) + right /= np.linalg.norm(right) + 1e-9 + cam_up = np.cross(right, forward) + R = np.column_stack([right, cam_up, -forward]) + tr = R[0, 0] + R[1, 1] + R[2, 2] + if tr > 0: + s = math.sqrt(tr + 1.0) * 2 + qw = 0.25 * s + qx = (R[2, 1] - R[1, 2]) / s + qy = (R[0, 2] - R[2, 0]) / s + qz = (R[1, 0] - R[0, 1]) / s + elif (R[0, 0] > R[1, 1]) and (R[0, 0] > R[2, 2]): + s = math.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2]) * 2 + qw = (R[2, 1] - R[1, 2]) / s + qx = 0.25 * s + qy = (R[0, 1] + R[1, 0]) / s + qz = (R[0, 2] + R[2, 0]) / s + elif R[1, 1] > R[2, 2]: + s = math.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2]) * 2 + qw = (R[0, 2] - R[2, 0]) / s + qx = (R[0, 1] + R[1, 0]) / s + qy = 0.25 * s + qz = (R[1, 2] + R[2, 1]) / s + else: + s = math.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1]) * 2 + qw = (R[1, 0] - R[0, 1]) / s + qx = (R[0, 2] + R[2, 0]) / s + qy = (R[1, 2] + R[2, 1]) / s + qz = 0.25 * s + return wp.quatf(float(qx), float(qy), float(qz), float(qw)) + + +def _grid_offsets(n: int, spacing: float = 2.5) -> list[tuple[float, float, float]]: + """Lay out N worlds in a square-ish grid with ``spacing`` metres between centres.""" + cols = int(math.ceil(math.sqrt(n))) + rows = int(math.ceil(n / cols)) + offsets: list[tuple[float, float, float]] = [] + for i in range(n): + r, c = i // cols, i % cols + x = (c - (cols - 1) * 0.5) * spacing + y = (r - (rows - 1) * 0.5) * spacing + offsets.append((x, y, 0.0)) + return offsets + + +class NewtonBackend: + """Newton rigid-body sim backend. Supports ``world_count`` ≥ 1 for + GPU-parallel policy evaluation.""" + + def __init__( + self, + render_size: tuple[int, int] = (480, 640), + camera: str = "scene", + viewer: str = "null", + port: int = 8080, + device: str = "cuda:0", + dt: float = 1.0 / 240.0, + world_count: int = 1, + enable_camera: bool = False, + camera_pos: tuple[float, float, float] = (1.1, -1.4, 0.9), + camera_look_at: tuple[float, float, float] = (0.4, 0.0, 0.1), + camera_fov_deg: float = 45.0, + ): + self._render_h, self._render_w = render_size + self._camera = camera + self._viewer_kind = viewer + self._port = int(port) + self._device = device + self._dt = float(dt) + self._world_count = max(1, int(world_count)) + # When enabled, observe()/observe_all() raytrace one RGB image per + # world via newton.sensors.SensorTiledCamera. Off by default to keep + # state-only callers (RL, headless eval) free of GPU render cost. + self._camera_enabled = bool(enable_camera) + self._cam_pos = tuple(float(v) for v in camera_pos) + self._cam_look_at = tuple(float(v) for v in camera_look_at) + self._cam_fov_deg = float(camera_fov_deg) + self._sensor: Any = None + self._cam_rays: Any = None + self._cam_color_buf: Any = None + self._cam_xforms: Any = None + + self._scene: Scene | None = None + self._robot: _RobotConfig | None = None + self._model: Any = None + self._state_0: Any = None + self._state_1: Any = None + self._control: Any = None + self._contacts: Any = None + self._solver: Any = None + self._viewer: Any = None + self._wp: Any = None + self._newton: Any = None + self._joint_target_mode: Any = None + self._viewer_null_cls: Any = None + self._viewer_viser_cls: Any = None + + # Per-world layout (set during load). + # _dof_per_world: stride for the FULL state vector (joint_q), + # includes the cube freejoint's quaternion (7 coords). + # _actuators_per_world: stride for the ACTUATOR target vector + # (joint_target_pos), sized by joint_qd which uses 6 coords for + # freejoints. They differ by 1 per freejoint and conflating them + # produces an OOB into joint_target_pos at world_count >= 16. + self._bodies_per_world: int = 0 + self._dof_per_world: int = 0 + self._actuators_per_world: int = 0 + # Indices *within one world* (0-indexed from world start) + self._w_arm_q: list[int] = [] # relative joint_q indices for arm + self._w_gripper_q: list[int] = [] # relative joint_q indices for fingers + self._w_ee_body: int = -1 # relative body index for EE + self._w_obj_body: dict[str, int] = {} # obj_id -> relative body index + + self._arm_joint_names: list[str] = [] + self._ee_offset_xyz: tuple[float, float, float] = (0.0, 0.0, 0.0) + self._t: float = 0.0 + + # ------------------------------------------------------------------ + # Runtime bootstrap + # ------------------------------------------------------------------ + + def _ensure_runtime(self) -> None: + if self._newton is not None: + return + try: + import warp as wp + import newton + from newton import JointTargetMode + from newton.viewer import ViewerNull, ViewerViser + except ImportError as e: + raise ImportError( + "Newton backend requires `warp` and `newton` installed in the active environment" + ) from e + self._wp = wp + self._newton = newton + self._joint_target_mode = JointTargetMode + self._viewer_null_cls = ViewerNull + self._viewer_viser_cls = ViewerViser + + def _load_robot_config(self, path: Path) -> _RobotConfig: + with path.open("r", encoding="utf-8") as fh: + raw = yaml.safe_load(fh) + return _RobotConfig( + arm_joint_names=tuple(str(v) for v in raw["arm"]["joints"]), + gripper_joint_names=tuple(str(v) for v in raw["gripper"]["joints"]), + home_qpos=tuple(float(v) for v in raw["arm"]["home_qpos"]), + gripper_open_qpos=float(raw["gripper"]["open_qpos"]), + gripper_closed_qpos=float(raw["gripper"]["closed_qpos"]), + ee_attach_body=str(raw["ee_site"]["inject"]["attach_body"]), + ee_offset_xyz=tuple(float(v) for v in raw["ee_site"]["inject"]["xyz"]), + ) + + def _create_viewer(self) -> Any: + if self._viewer_kind == "null": + return self._viewer_null_cls(num_frames=1_000_000) + if self._viewer_kind == "viser": + return self._viewer_viser_cls(port=self._port) + raise ValueError(f"unknown Newton viewer {self._viewer_kind!r}") + + # ------------------------------------------------------------------ + # Model building + # ------------------------------------------------------------------ + + def _build_single_builder(self, scene: Scene) -> Any: + """Build an unfinalized ModelBuilder for one world.""" + assert scene.robot_urdf is not None + assert self._robot is not None + + wp = self._wp + newton = self._newton + builder = newton.ModelBuilder() + builder.add_mjcf( + str(scene.robot_urdf), + floating=False, + enable_self_collisions=False, + parse_mujoco_options=True, + ) + + x0, y0, _ = scene.workspace_aabb[0] + x1, y1, _ = scene.workspace_aabb[1] + table_thickness = 0.02 + hx = max((x1 - x0) * 0.5, 0.01) + hy = max((y1 - y0) * 0.5, 0.01) + table_center = wp.vec3( + float((x0 + x1) * 0.5), + float((y0 + y1) * 0.5), + float(scene.table_height - table_thickness), + ) + builder.add_shape_box( + body=-1, + hx=hx, hy=hy, hz=table_thickness, + xform=wp.transform(table_center, wp.quat_identity()), + ) + builder.add_ground_plane() + + for obj in scene.objects: + if obj.kind != "box": + raise NotImplementedError( + f"Newton backend currently supports box objects only, got {obj.kind!r}" + ) + x, y, z = obj.pose.xyz + qx, qy, qz, qw = obj.pose.quat_xyzw + sx, sy, sz = obj.size + builder.add_body( + xform=wp.transform(wp.vec3(x, y, z), wp.quat(qx, qy, qz, qw)), + mass=float(obj.mass), + label=obj.id, + ) + builder.add_shape_box(body=builder.body_count - 1, hx=sx, hy=sy, hz=sz) + + target_q = [*self._robot.home_qpos, self._robot.gripper_open_qpos, self._robot.gripper_open_qpos] + builder.joint_q[: len(target_q)] = target_q + builder.joint_target_pos[: len(target_q)] = target_q + builder.joint_target_ke[: len(target_q)] = [4500, 4500, 3500, 3500, 2000, 2000, 2000, 100, 100] + builder.joint_target_kd[: len(target_q)] = [450, 450, 350, 350, 200, 200, 200, 10, 10] + for i in range(len(target_q)): + builder.joint_target_mode[i] = int(self._joint_target_mode.POSITION) + return builder + + def _build_model( + self, + scene: Scene, + *, + per_world_scenes: list[Scene] | None = None, + ) -> Any: + # `single` defines the per-world topology used for index discovery. + # When `per_world_scenes` is set, every entry must have matching + # topology — `load()` already validated that. + single = self._build_single_builder(scene) + + # Record per-world layout sizes from the unfinalized builder + self._bodies_per_world = single.body_count + self._dof_per_world = single.joint_coord_count + + # Discover joint/body indices from a finalized single-world model + ref_model = copy.deepcopy(single).finalize() + self._w_arm_q = [self._find_joint_q_index_in(ref_model, n) for n in self._robot.arm_joint_names] + self._w_gripper_q = [self._find_joint_q_index_in(ref_model, n) for n in self._robot.gripper_joint_names] + self._w_ee_body = self._find_body_index_in(ref_model, self._robot.ee_attach_body) + self._w_obj_body = {obj.id: self._find_body_index_in(ref_model, obj.id) for obj in scene.objects} + + if self._world_count == 1: + # World 0 honours the per-world scene if one was passed. + if per_world_scenes is not None: + return self._build_single_builder(per_world_scenes[0]).finalize() + return single.finalize() + + # Tile N worlds in a grid. Each world gets its own builder when + # per_world_scenes is set so randomization actually lands in the + # finalized model (object xform is baked at add_body time). + wp = self._wp + newton = self._newton + multi = newton.ModelBuilder() + for w, (ox, oy, oz) in enumerate(_grid_offsets(self._world_count)): + world_scene = per_world_scenes[w] if per_world_scenes is not None else scene + world_builder = ( + self._build_single_builder(world_scene) + if per_world_scenes is not None + else single + ) + multi.add_world( + world_builder, + xform=wp.transform(wp.vec3(ox, oy, oz), wp.quat_identity()), + ) + return multi.finalize() + + # ------------------------------------------------------------------ + # Index helpers + # ------------------------------------------------------------------ + + def _find_joint_q_index_in(self, model: Any, joint_name: str) -> int: + labels = list(model.joint_label) + starts = model.joint_q_start.numpy() + for i, label in enumerate(labels): + if label.endswith(f"/{joint_name}") or label == joint_name: + return int(starts[i]) + raise KeyError(f"joint {joint_name!r} not found in Newton model") + + def _find_body_index_in(self, model: Any, body_name: str) -> int: + labels = list(model.body_label) + for i, label in enumerate(labels): + if label.endswith(f"/{body_name}") or label == body_name: + return i + raise KeyError(f"body {body_name!r} not found in Newton model") + + # absolute index for world w + def _arm_q_abs(self, w: int) -> list[int]: + off = w * self._dof_per_world + return [off + i for i in self._w_arm_q] + + def _gripper_q_abs(self, w: int) -> list[int]: + off = w * self._dof_per_world + return [off + i for i in self._w_gripper_q] + + def _ee_body_abs(self, w: int) -> int: + return w * self._bodies_per_world + self._w_ee_body + + def _obj_body_abs(self, obj_id: str, w: int) -> int: + return w * self._bodies_per_world + self._w_obj_body[obj_id] + + # ------------------------------------------------------------------ + # Viewer + # ------------------------------------------------------------------ + + def _log_viewer_state(self) -> None: + if self._viewer is None or self._state_0 is None: + return + self._viewer.begin_frame(self._t) + self._viewer.log_state(self._state_0) + self._viewer.end_frame() + + # ------------------------------------------------------------------ + # SimBackend protocol + # ------------------------------------------------------------------ + + def load(self, scene: Scene, *, per_world_scenes: list[Scene] | None = None) -> None: + """Build the multi-world model. + + ``per_world_scenes``: optional list of length ``world_count``. When + provided, each world is built from the corresponding Scene (so e.g. + each world can hold a different randomized object pose). The base + ``scene`` still defines the robot URDF / sidecar / workspace; only + the per-world ``objects`` differ. Topology must match across the + list (same number and kinds of objects) — that is the contract + ``robosandbox.tasks.randomize.jitter_scene`` already satisfies. + """ + self._ensure_runtime() + self._wp.set_device(self._device) + if scene.robot_urdf is None or scene.robot_config is None: + raise NotImplementedError( + "Newton backend requires an explicit robot_urdf and robot_config" + ) + if per_world_scenes is not None: + if len(per_world_scenes) != self._world_count: + raise ValueError( + f"per_world_scenes has length {len(per_world_scenes)} but " + f"world_count={self._world_count}" + ) + base_kinds = tuple((o.id, o.kind) for o in scene.objects) + for w, ws in enumerate(per_world_scenes): + ws_kinds = tuple((o.id, o.kind) for o in ws.objects) + if ws_kinds != base_kinds: + raise ValueError( + f"per_world_scenes[{w}] topology differs from base scene " + f"({ws_kinds} vs {base_kinds}); only pose/mass/size/rgba " + f"may be randomized" + ) + self._scene = scene + self._robot = self._load_robot_config(scene.robot_config) + self._arm_joint_names = list(self._robot.arm_joint_names) + self._ee_offset_xyz = self._robot.ee_offset_xyz + + self._model = self._build_model(scene, per_world_scenes=per_world_scenes) + # Newton stores actuator targets (joint_target_pos / joint_target_ke / + # …) on a vector sized to ACTUATED joints only; free joints (the + # cube) have no actuator slot. So the per-world stride for + # control vectors is *smaller* than the per-world stride for the + # full state vector (joint_q) by the freejoint's contribution. + # Authoritative source: the freshly-built model itself. + try: + ref_control = self._model.control() + ref_state = self._model.state() + total_targets = int(ref_control.joint_target_pos.shape[0]) + total_q = int(ref_state.joint_q.shape[0]) + if total_targets % self._world_count == 0: + self._actuators_per_world = total_targets // self._world_count + else: + self._actuators_per_world = self._dof_per_world + if total_q % self._world_count == 0: + self._dof_per_world = total_q // self._world_count + except Exception: + # Don't gate model creation on this defensive recompute — fall + # back to the unfinalized builder counts. + self._actuators_per_world = self._dof_per_world + self._viewer = self._create_viewer() + self._viewer.set_model(self._model) + if hasattr(self._viewer, "set_camera"): + self._viewer.set_camera( + pos=self._wp.vec3(1.1, -1.4, 0.9), pitch=-18.0, yaw=45.0 + ) + if self._camera_enabled: + self._setup_tiled_camera() + self.reset() + + def _setup_tiled_camera(self) -> None: + """Build SensorTiledCamera + per-world camera transforms (one camera per world). + + Newton's pinhole rays follow the OpenGL convention (camera looks down + -Z, +Y up). Each world's camera follows the world's grid offset so all + worlds see their own scene framed identically. + """ + from newton.sensors import SensorTiledCamera + + wp = self._wp + self._sensor = SensorTiledCamera(model=self._model) + self._sensor.utils.create_default_light(enable_shadows=True) + self._sensor.utils.assign_checkerboard_material_to_all_shapes() + + self._cam_rays = self._sensor.utils.compute_pinhole_camera_rays( + self._render_w, self._render_h, math.radians(self._cam_fov_deg) + ) + self._cam_color_buf = self._sensor.utils.create_color_image_output( + self._render_w, self._render_h, camera_count=1 + ) + + eye0 = np.asarray(self._cam_pos, dtype=np.float64) + target0 = np.asarray(self._cam_look_at, dtype=np.float64) + up = np.array([0.0, 0.0, 1.0]) + per_world = [] + for ox, oy, oz in _grid_offsets(self._world_count): + offset = np.array([ox, oy, oz]) + eye_w = eye0 + offset + q = _look_at_quat(eye_w, target0 + offset, up) + per_world.append(wp.transformf(wp.vec3f(*eye_w.astype(np.float32)), q)) + # Newton expects shape (camera_count, world_count); one camera per world. + self._cam_xforms = wp.array([per_world], dtype=wp.transformf) + + def reset(self) -> None: + assert self._model is not None + self._state_0 = self._model.state() + self._state_1 = self._model.state() + self._control = self._model.control() + self._contacts = self._model.contacts() + self._solver = self._newton.solvers.SolverMuJoCo(self._model) + self._t = 0.0 + self._log_viewer_state() + + def close(self) -> None: + if self._viewer is not None: + try: + self._viewer.close() + except Exception: + pass + self._viewer = None + + def step( + self, + target_joints: np.ndarray | None = None, + gripper: float | None = None, + ) -> None: + assert self._state_0 is not None + assert self._control is not None + + if target_joints is not None: + arr = np.asarray(target_joints, dtype=np.float64).ravel() + n_arm = len(self._w_arm_q) + if arr.shape != (n_arm,): + raise ValueError(f"target_joints must have shape ({n_arm},), got {arr.shape}") + target = self._control.joint_target_pos.numpy() + stride = self._actuators_per_world or self._dof_per_world + for w in range(self._world_count): + for local_q, q in zip(self._w_arm_q, arr): + target[w * stride + local_q] = float(q) + arr_wp = self._wp.array(target, dtype=self._control.joint_target_pos.dtype) + self._wp.copy(self._control.joint_target_pos, arr_wp) + + if gripper is not None: + t = float(np.clip(gripper, 0.0, 1.0)) + finger_q = ( + self._robot.gripper_open_qpos * (1.0 - t) + + self._robot.gripper_closed_qpos * t + ) + target = self._control.joint_target_pos.numpy() + stride = self._actuators_per_world or self._dof_per_world + for w in range(self._world_count): + for local_q in self._w_gripper_q: + target[w * stride + local_q] = finger_q + arr_wp = self._wp.array(target, dtype=self._control.joint_target_pos.dtype) + self._wp.copy(self._control.joint_target_pos, arr_wp) + + self._state_0.clear_forces() + self._model.collide(self._state_0, self._contacts) + self._solver.step(self._state_0, self._state_1, self._control, self._contacts, self._dt) + self._state_0, self._state_1 = self._state_1, self._state_0 + self._t += self._dt + self._log_viewer_state() + + def _render_all_rgb(self) -> np.ndarray | None: + """Return (W, H, W_pix, 3) uint8 RGB across worlds, or None if disabled.""" + if not self._camera_enabled or self._sensor is None: + return None + from newton.sensors import SensorTiledCamera + + self._sensor.update( + self._state_0, + self._cam_xforms, + self._cam_rays, + color_image=self._cam_color_buf, + clear_data=SensorTiledCamera.GRAY_CLEAR_DATA, + ) + # uint32 RGBA per pixel → unpack to uint8 (W, 1, H, W_pix, 4) → drop alpha + img32 = self._cam_color_buf.numpy() + rgba = img32.view(np.uint8).reshape(*img32.shape, 4) + return rgba[:, 0, :, :, :3].copy() # (W, H, W_pix, 3) + + def _obs_for_world( + self, + w: int, + q: np.ndarray, + body_q: np.ndarray, + rgb_all: np.ndarray | None = None, + ) -> Observation: + arm_joints = np.array([q[self._dof_per_world * w + i] for i in self._w_arm_q], dtype=np.float64) + finger_positions = [float(q[self._dof_per_world * w + i]) for i in self._w_gripper_q] + gripper_width = float(sum(abs(v) for v in finger_positions)) + ee_row = body_q[self._ee_body_abs(w)] + ee_body_pose = _body_pose_from_row(ee_row) + rotated = _rotate_vec(ee_body_pose.quat_xyzw, self._ee_offset_xyz) + ee_xyz = np.asarray(ee_body_pose.xyz, dtype=np.float64) + rotated + ee_pose = Pose( + xyz=(float(ee_xyz[0]), float(ee_xyz[1]), float(ee_xyz[2])), + quat_xyzw=ee_body_pose.quat_xyzw, + ) + objects = { + oid: _body_pose_from_row(body_q[self._obj_body_abs(oid, w)]) + for oid in self._w_obj_body + } + rgb = ( + rgb_all[w] + if rgb_all is not None + else np.zeros((self._render_h, self._render_w, 3), dtype=np.uint8) + ) + return Observation( + rgb=rgb, + depth=None, + robot_joints=arm_joints, + ee_pose=ee_pose, + gripper_width=gripper_width, + scene_objects=objects, + timestamp=self._t, + camera_intrinsics=None, + camera_extrinsics=None, + ) + + def step_all( + self, + targets: np.ndarray, + grippers: np.ndarray, + ) -> None: + """Per-world joint targets for RL training. + + Args: + targets: (N, n_arm) — absolute joint positions per world + grippers: (N,) — gripper command ∈ [0, 1] per world + """ + assert self._state_0 is not None + assert self._control is not None + + targets = np.asarray(targets, dtype=np.float64) + grippers_arr = np.asarray(grippers, dtype=np.float64) + N = self._world_count + n_arm = len(self._w_arm_q) + + if targets.shape != (N, n_arm): + raise ValueError(f"targets must be ({N}, {n_arm}), got {targets.shape}") + if grippers_arr.shape != (N,): + raise ValueError(f"grippers must be ({N},), got {grippers_arr.shape}") + + target = self._control.joint_target_pos.numpy() + for w in range(N): + for local_q, q in zip(self._w_arm_q, targets[w]): + target[w * self._dof_per_world + local_q] = float(q) + t = float(np.clip(grippers_arr[w], 0.0, 1.0)) + finger_q = ( + self._robot.gripper_open_qpos * (1.0 - t) + + self._robot.gripper_closed_qpos * t + ) + for local_q in self._w_gripper_q: + target[w * self._dof_per_world + local_q] = finger_q + + arr_wp = self._wp.array(target, dtype=self._control.joint_target_pos.dtype) + self._wp.copy(self._control.joint_target_pos, arr_wp) + + self._state_0.clear_forces() + self._model.collide(self._state_0, self._contacts) + self._solver.step(self._state_0, self._state_1, self._control, self._contacts, self._dt) + self._state_0, self._state_1 = self._state_1, self._state_0 + self._t += self._dt + self._log_viewer_state() + + def observe(self) -> Observation: + """World-0 observation (backward-compatible single-world interface).""" + assert self._state_0 is not None + q = self._state_0.joint_q.numpy() + body_q = self._state_0.body_q.numpy() + rgb_all = self._render_all_rgb() + return self._obs_for_world(0, q, body_q, rgb_all) + + def observe_all(self) -> list[Observation]: + """One Observation per parallel world.""" + assert self._state_0 is not None + q = self._state_0.joint_q.numpy() + body_q = self._state_0.body_q.numpy() + rgb_all = self._render_all_rgb() + return [self._obs_for_world(w, q, body_q, rgb_all) for w in range(self._world_count)] + + def get_object_pose(self, object_id: str) -> Pose | None: + if object_id not in self._w_obj_body or self._state_0 is None: + return None + body_q = self._state_0.body_q.numpy() + return _body_pose_from_row(body_q[self._obj_body_abs(object_id, 0)]) + + def set_object_pose(self, object_id: str, pose: Pose) -> None: + raise NotImplementedError("Newton backend does not support teleporting objects in-place") + + # ------------------------------------------------------------------ + # Properties + # ------------------------------------------------------------------ + + @property + def n_dof(self) -> int: + return len(self._arm_joint_names) + + @property + def joint_names(self) -> list[str]: + return list(self._arm_joint_names) + + @property + def home_qpos(self) -> np.ndarray: + assert self._robot is not None + return np.asarray(self._robot.home_qpos, dtype=np.float64) + + @property + def n_worlds(self) -> int: + return self._world_count diff --git a/packages/robosandbox-core/tests/test_sim_check.py b/packages/robosandbox-core/tests/test_sim_check.py new file mode 100644 index 0000000..e4bc6e9 --- /dev/null +++ b/packages/robosandbox-core/tests/test_sim_check.py @@ -0,0 +1,74 @@ +"""sim-check structural tests. + +The end-to-end MuJoCo↔Newton agreement assertion lives on DGX (Newton +needs CUDA + warp). These tests cover the module's structural +contracts that we can validate on any machine: report dataclass +shape, JSON serialization, error handling when a backend is +unavailable. +""" + +from __future__ import annotations + +import json + +import pytest + +from robosandbox.eval.sim_check import ( + SimCheckReport, + report_to_dict, + run_sim_check, +) + + +def test_report_dict_has_stable_top_level_keys() -> None: + rep = SimCheckReport( + task="pick_cube_franka", + policy="runs/x", + tolerances={"joint_rad": 0.087, "xy_m": 0.005}, + ) + out = report_to_dict(rep) + for key in ( + "schema_version", "task", "policy", "tolerances", + "mujoco", "newton", "agreement", "verdict", + ): + assert key in out + for key in ( + "outcome_match", "joint_max_abs_diff_rad", + "object_max_xy_diff_m", "object_diffs", + ): + assert key in out["agreement"] + # Schema version locked so external consumers can rely on it. + assert out["schema_version"] == 1 + # JSON-serializable as written. + json.dumps(out) + + +def test_run_sim_check_reports_run_failed_when_newton_missing() -> None: + """Without warp installed, Newton creation must surface as RUN_FAILED + (exit 2 in the CLI) rather than crashing the run or pretending to PASS. + + This test runs only when a real episode exists in runs/. Using the + bundled scripted-Pick episode is overkill for a structural test, so + skip cleanly if there's nothing to point at. + """ + from pathlib import Path + + runs = sorted(Path("runs").glob("20*"), reverse=True) + if not runs: + pytest.skip("no episode in runs/ to use as policy") + try: + import warp # noqa: F401 + pytest.skip("warp present — this test needs a no-warp environment") + except ImportError: + pass + + rep = run_sim_check( + task_name="pick_cube_franka", + policy_path=str(runs[0]), + max_steps=20, # short — we just want to reach the Newton step + settle_steps=10, + ) + # MuJoCo should have run cleanly; Newton failed at backend creation. + assert "error" not in rep.mujoco + assert "error" in rep.newton + assert rep.verdict == "RUN_FAILED" From 3fc36bed8a7930bfbe6e9e2b23afab604cfff3f3 Mon Sep 17 00:00:00 2001 From: amarrmb Date: Sat, 25 Apr 2026 12:10:41 -0700 Subject: [PATCH 7/7] feat(cli+viewer): eval / compare / sim-check / simulate subcommands; pluggable viewer backend MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CLI gains four user-facing eval-loop verbs: - eval: run a policy against a task; n-trials with deterministic per-trial reload, settle-steps, action-repeat, and structured JSON output (the EvalSummary v2 shape). Single-world MuJoCo or N parallel Newton worlds via --sim-backend / --world-count. - compare: two-proportion z-test on two prior eval JSONs ("did checkpoint B beat checkpoint A?") so you don't eyeball CIs. - sim-check: drive the same policy through both backends, report trajectory agreement — the reproducibility hook for any cross-sim claim. - simulate: drop a task into a backend and step it; useful for debugging scene loading without running a full eval. viewer/server: route through sim.factory.create_sim_backend so the live viewer can pick MuJoCo or Newton just like the CLI eval can. The PPO `train` subcommand is intentionally left off `main` — RoboSandbox is not a training framework. PPO lives on the experimental/newton-eval branch as research, not product. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../robosandbox-core/src/robosandbox/cli.py | 792 +++++++++++++++++- .../src/robosandbox/viewer/server.py | 118 ++- 2 files changed, 858 insertions(+), 52 deletions(-) diff --git a/packages/robosandbox-core/src/robosandbox/cli.py b/packages/robosandbox-core/src/robosandbox/cli.py index 2f4064d..74e1887 100644 --- a/packages/robosandbox-core/src/robosandbox/cli.py +++ b/packages/robosandbox-core/src/robosandbox/cli.py @@ -3,8 +3,11 @@ from __future__ import annotations import argparse +import math import sys +import numpy as np + def main(argv: list[str] | None = None) -> int: p = argparse.ArgumentParser(prog="robo-sandbox") @@ -27,6 +30,11 @@ def main(argv: list[str] | None = None) -> int: default="runs", help="Directory to write recorded episodes (default: ./runs)", ) + viewer_p.add_argument("--sim-backend", default="mujoco", choices=["mujoco", "newton"]) + viewer_p.add_argument("--viser-port", type=int, default=8090, + help="Port for Newton's Viser 3D viewer (Newton mode only)") + viewer_p.add_argument("--device", default="cuda:0", + help="Compute device for Newton backend (Newton mode only)") run_p = sub.add_parser("run", help="Agent-driven run (stub / openai / ollama / custom)") run_p.add_argument( @@ -58,16 +66,120 @@ def main(argv: list[str] | None = None) -> int: run_p.add_argument("--api-key-env", default=None) run_p.add_argument("--perception", choices=["vlm", "ground_truth"], default=None) run_p.add_argument("--max-replans", type=int, default=3) + run_p.add_argument("--sim-backend", default="mujoco") + run_p.add_argument("--sim-viewer", default="null") + run_p.add_argument("--viewer-port", type=int, default=8080) + run_p.add_argument("--device", default="cuda:0") run_p.add_argument("--log-level", default="INFO") + sim_p = sub.add_parser( + "simulate", + help="Load a built-in task in a sim backend and step it forward", + ) + sim_p.add_argument("--task", default="pick_cube_franka") + sim_p.add_argument("--sim-backend", default="mujoco") + sim_p.add_argument("--steps", type=int, default=240) + sim_p.add_argument("--render-height", type=int, default=480) + sim_p.add_argument("--render-width", type=int, default=640) + sim_p.add_argument("--sim-viewer", default="null") + sim_p.add_argument("--viewer-port", type=int, default=8080) + sim_p.add_argument("--device", default="cuda:0") + sim_p.add_argument("--hold", action="store_true") + exp_p = sub.add_parser( "export-lerobot", help="Convert a recorded episode directory to LeRobot v3 dataset format", ) - exp_p.add_argument("src", help="Source episode dir (e.g. runs/20260101-120000-abcd1234)") + exp_p.add_argument( + "src", + help=( + "Source: either a single episode dir (containing events.jsonl) " + "or a parent dir containing many episode subdirs — auto-detected. " + "Use --src repeatedly to combine an explicit list." + ), + ) exp_p.add_argument("dst", help="Destination LeRobot dataset dir") exp_p.add_argument("--task", default=None, help="Override task string") exp_p.add_argument("--fps", type=int, default=30) + exp_p.add_argument( + "--src", + dest="extra_src", + action="append", + default=[], + help="Additional source episode dirs (repeatable). Combined with the positional src.", + ) + + eval_p = sub.add_parser( + "eval", + help="GPU-parallel policy evaluation via Newton multi-world backend", + ) + eval_p.add_argument("--task", required=True, help="Built-in task name (e.g. pick_cube_franka)") + eval_p.add_argument("--policy", required=True, help="Path to policy checkpoint or episode dir") + eval_p.add_argument("--sim-backend", default="newton", choices=["mujoco", "newton"], + help="mujoco = single world + RGB; newton = N parallel worlds, state only") + eval_p.add_argument("--world-count", type=int, default=64, + help="Parallel worlds (newton only, ignored for mujoco)") + eval_p.add_argument("--max-steps", type=int, default=500, help="Max sim steps per world") + eval_p.add_argument("--settle-steps", type=int, default=100, help="Physics settle steps before eval") + eval_p.add_argument("--device", default="cuda:0", help="GPU device for Newton") + eval_p.add_argument("--n-trials", type=int, default=1, + help="MuJoCo only: repeat the eval N times. Newton uses --world-count.") + eval_p.add_argument("--output", default=None, + help="Write structured eval result as JSON to this path.") + eval_p.add_argument("--seed", type=int, default=None, + help="Seed for any randomization (item 3 will use this).") + eval_p.add_argument("--reload-policy", action=argparse.BooleanOptionalAction, default=True, + help=( + "Reload the policy fresh per trial (default true) so per-trial " + "outcomes are deterministic and independent of trial order. The " + "alternative (--no-reload-policy) loads once and only calls " + "policy.reset() between trials, which is faster but leaks state " + "(BatchNorm running buffers, register_buffer tensors, RNG, " + "compile/autograd cache) across trials — observed seed-level " + "outcomes change with trial order. Reload cost is ~1-2 s/trial." + )) + eval_p.add_argument("--newton-rgb", action=argparse.BooleanOptionalAction, default=None, + help=( + "Newton only: raytrace per-world RGB via SensorTiledCamera. " + "Default = auto (on iff the policy declares image inputs). " + "--no-newton-rgb forces zero frames (faster, useful only for " + "state-only policies)." + )) + eval_p.add_argument("--action-repeat", type=int, default=1, + help=( + "Hold each policy action for N sim steps. Set this " + "to sim_dt/dataset_dt (e.g. 6 when training data " + "was recorded at 30 fps and sim_dt=0.005s = 200 Hz) " + "so the policy sees the same temporal cadence at " + "eval as it did during training. Default 1 (call " + "policy every step) is correct only when training " + "and eval cadences match." + )) + + sc_p = sub.add_parser( + "sim-check", + help="Drive the same policy through MuJoCo and Newton; report agreement", + ) + sc_p.add_argument("--task", required=True) + sc_p.add_argument("--policy", required=True) + sc_p.add_argument("--max-steps", type=int, default=600) + sc_p.add_argument("--settle-steps", type=int, default=100) + sc_p.add_argument("--joint-tol-rad", type=float, default=0.087, + help="Per-joint tolerance for end-state agreement (~5 deg).") + sc_p.add_argument("--xy-tol-m", type=float, default=0.005, + help="Per-object xy tolerance for end-state agreement (5 mm).") + sc_p.add_argument("--device", default="cuda:0") + sc_p.add_argument("--output", default=None, + help="Write the full report as JSON to this path.") + + cmp_p = sub.add_parser( + "compare", + help="Statistical comparison of two eval JSON outputs (two-proportion z-test)", + ) + cmp_p.add_argument("a", help="Path to first eval JSON (from `eval --output`)") + cmp_p.add_argument("b", help="Path to second eval JSON") + cmp_p.add_argument("--alpha", type=float, default=0.05, + help="Significance threshold for the difference (default 0.05)") viz_p = sub.add_parser( "download-franka-visuals", @@ -90,8 +202,15 @@ def main(argv: list[str] | None = None) -> int: elif args.cmd == "viewer": from robosandbox.viewer.server import run as viewer_run - print(f"RoboSandbox viewer — open http://{args.host}:{args.port}") - viewer_run(host=args.host, port=args.port, initial_task=args.task, runs_dir=args.runs_dir) + backend = args.sim_backend + print(f"RoboSandbox viewer [{backend}] — open http://{args.host}:{args.port}") + if backend == "newton": + print(f"Newton Viser 3D viewer — http://127.0.0.1:{args.viser_port}") + viewer_run( + host=args.host, port=args.port, initial_task=args.task, + runs_dir=args.runs_dir, sim_backend=backend, viser_port=args.viser_port, + device=getattr(args, "device", "cuda:0"), + ) return 0 elif args.cmd == "run": if args.policy is not None: @@ -99,6 +218,11 @@ def main(argv: list[str] | None = None) -> int: if args.task is None: run_p.error("task is required unless --policy is set") + if args.sim_backend != "mujoco": + run_p.error( + "planner-driven runs currently require --sim-backend mujoco; " + "use `simulate` or `run --policy` for Newton" + ) from robosandbox.agentic_demo import main as agentic_main forwarded = [ @@ -119,19 +243,80 @@ def main(argv: list[str] | None = None) -> int: if args.perception is not None: forwarded += ["--perception", args.perception] return agentic_main(forwarded) + elif args.cmd == "simulate": + return _simulate_cli(args) elif args.cmd == "export-lerobot": from pathlib import Path - from robosandbox.export.lerobot import export_episode + from robosandbox.export.lerobot import export_episode, export_episodes - out = export_episode( - Path(args.src), - Path(args.dst), - task=args.task, - fps=args.fps, + # Resolve sources: positional + any --src flags. If a single source + # is supplied and it directly contains events.jsonl, use the + # single-episode fast path. Otherwise the source is treated as a + # parent directory and every immediate subdir with events.jsonl + # becomes one episode in the combined dataset. + positional = Path(args.src) + extras = [Path(p) for p in (getattr(args, "extra_src", None) or [])] + if not extras and (positional / "events.jsonl").exists(): + sources = [positional] + elif extras: + sources = [positional, *extras] + else: + # Parent-dir auto-detect: any immediate subdir with events.jsonl. + subs = sorted( + p for p in positional.iterdir() + if p.is_dir() and (p / "events.jsonl").exists() + ) + if not subs: + print( + f"[export-lerobot] no episodes found under {positional} " + f"(no events.jsonl in {positional} and no immediate " + f"subdirs containing it)", + file=sys.stderr, + ) + return 2 + sources = subs + + if len(sources) == 1: + out = export_episode(sources[0], Path(args.dst), task=args.task, fps=args.fps) + print(f"Exported LeRobot dataset to: {out} (1 episode)") + else: + out = export_episodes(sources, Path(args.dst), task=args.task, fps=args.fps) + print(f"Exported LeRobot dataset to: {out} ({len(sources)} episodes)") + out_abs = Path(out).resolve() + ckpt_dir = out_abs.parent / f"{out_abs.name}-checkpoint" + print() + print("Next steps:") + print() + print(" # 1. Train an ACT policy on this dataset:") + print( + f" python -m lerobot.scripts.train \\\n" + f" --dataset.repo_id=local \\\n" + f" --dataset.root={out_abs} \\\n" + f" --policy.type=act \\\n" + f" --output_dir={ckpt_dir}" + ) + print() + print(" # 2. Evaluate the trained checkpoint:") + print( + f" robo-sandbox eval \\\n" + f" --task {args.task or ''} \\\n" + f" --policy {ckpt_dir} \\\n" + f" --sim-backend mujoco" + ) + print() + print( + " Note: vision policies (ACT, Diffusion) work on either backend. " + "Newton auto-renders per-world RGB via SensorTiledCamera when the " + "policy declares image inputs (override with --no-newton-rgb)." ) - print(f"Exported LeRobot dataset to: {out}") return 0 + elif args.cmd == "eval": + return _eval_parallel_cli(args) + elif args.cmd == "compare": + return _compare_cli(args) + elif args.cmd == "sim-check": + return _sim_check_cli(args) elif args.cmd == "download-franka-visuals": from robosandbox.assets.franka_visuals import cli as download_cli @@ -148,7 +333,7 @@ def _run_policy_cli(args: argparse.Namespace) -> int: from pathlib import Path from robosandbox.policy import load_policy, run_policy - from robosandbox.sim.mujoco_backend import MuJoCoBackend + from robosandbox.sim import create_sim_backend from robosandbox.tasks.loader import load_builtin_task logging.basicConfig( @@ -173,7 +358,14 @@ def _run_policy_cli(args: argparse.Namespace) -> int: print(f"[run --policy] failed to load policy: {e}", file=sys.stderr) return 2 - sim = MuJoCoBackend(render_size=(240, 320), camera="scene") + sim = create_sim_backend( + args.sim_backend, + render_size=(240, 320), + camera="scene", + viewer=args.sim_viewer, + port=args.viewer_port, + device=args.device, + ) sim.load(task.scene) try: # Settle gravity so the initial observation is at rest. @@ -196,6 +388,7 @@ def _run_policy_cli(args: argparse.Namespace) -> int: ) print(f"[run --policy] task: {task.name}") print(f"[run --policy] policy: {args.policy}") + print(f"[run --policy] sim_backend: {args.sim_backend}") print(f"[run --policy] verdict: {verdict}") print(f"[run --policy] steps: {result['steps']}") print(f"[run --policy] final_reason: {final_reason}") @@ -203,5 +396,580 @@ def _run_policy_cli(args: argparse.Namespace) -> int: return 0 if success_ok else 1 +def _simulate_cli(args: argparse.Namespace) -> int: + import time + + from robosandbox.sim import create_sim_backend + from robosandbox.tasks.loader import load_builtin_task + + try: + task = load_builtin_task(args.task) + except FileNotFoundError as e: + print(f"[simulate] {e}", file=sys.stderr) + return 2 + + try: + sim = create_sim_backend( + args.sim_backend, + render_size=(args.render_height, args.render_width), + camera="scene", + viewer=args.sim_viewer, + port=args.viewer_port, + device=args.device, + ) + except (ImportError, ValueError) as e: + print(f"[simulate] failed to create backend: {e}", file=sys.stderr) + return 2 + + sim.load(task.scene) + try: + for _ in range(args.steps): + sim.step() + obs = sim.observe() + print(f"[simulate] task: {task.name}") + print(f"[simulate] sim_backend: {args.sim_backend}") + print(f"[simulate] steps: {args.steps}") + print(f"[simulate] joints: {np.array2string(obs.robot_joints, precision=4)}") + for oid, pose in obs.scene_objects.items(): + print( + "[simulate] object: " + f"{oid} xyz=({pose.xyz[0]:.4f}, {pose.xyz[1]:.4f}, {pose.xyz[2]:.4f})" + ) + if args.sim_backend == "newton" and args.sim_viewer == "viser": + print(f"[simulate] viewer_url: http://127.0.0.1:{args.viewer_port}") + if args.hold: + try: + while True: + time.sleep(1.0) + except KeyboardInterrupt: + pass + finally: + sim.close() + return 0 + + +def _policy_image_inputs(policy) -> set[str]: + """Return the set of visual input feature keys a policy expects, if any. + + LeRobot policies expose this via ``policy.config.input_features``. The + :class:`~robosandbox.policy.LeRobotPolicyAdapter` wraps the real policy + on its private ``_policy`` attribute, so we unwrap when needed. + + Detection prefers the feature spec's ``.type == FeatureType.VISUAL`` + marker (works for both the legacy ``observation.image`` key and the + namespaced ``observation.images.`` family). Falls back to a + name-prefix match when the type field is absent. Returns an empty + set for any non-LeRobot policy — no false-positive warnings. + """ + inner = getattr(policy, "_policy", policy) + cfg = getattr(inner, "config", None) + feats = getattr(cfg, "input_features", None) if cfg is not None else None + if not feats: + return set() + try: + items = list(feats.items()) + except AttributeError: + return set() + visual = set() + for key, spec in items: + if not isinstance(key, str): + continue + # Prefer the typed marker when present. + ftype = getattr(spec, "type", None) + type_name = getattr(ftype, "name", None) or getattr(ftype, "value", None) or str(ftype) + if isinstance(type_name, str) and type_name.upper() == "VISUAL": + visual.add(key) + continue + # Fallback for shapes without a typed feature spec. + if key.startswith("observation.image"): + visual.add(key) + return visual + + +def _print_eval_summary(summary: dict) -> None: + """Pretty-print a finished EvalSummary dict to stdout.""" + n = int(summary["n_trials"]) + s = int(summary["successes"]) + rate_pct = float(summary["rate"]) * 100.0 + lo_pct = float(summary["ci_low"]) * 100.0 + hi_pct = float(summary["ci_high"]) * 100.0 + ci_pct = int(round(float(summary["ci_level"]) * 100)) + wall = float(summary["wall_seconds"]) + thr = float(summary["throughput_env_steps_per_s"]) + print(f"[eval] successes: {s} / {n} ({rate_pct:.1f}%)") + print(f"[eval] {ci_pct}% CI (Wilson): [{lo_pct:.1f}%, {hi_pct:.1f}%]") + print(f"[eval] steps: {summary['steps']}") + print(f"[eval] wall: {wall:.1f}s") + print(f"[eval] throughput: {thr:,.0f} env-steps/s") + + +def _maybe_write_json(path: str | None, summary: dict) -> None: + """Write the EvalSummary as JSON when ``--output`` was supplied.""" + if not path: + return + import json + from pathlib import Path + + out = Path(path) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps(summary, indent=2)) + print(f"[eval] wrote JSON: {out}") + + +def _finalize_eval( + args: argparse.Namespace, + *, + task_name: str, + sim_backend: str, + successes: int, + n_trials: int, + success_per_trial: list[bool], + per_trial_details: list[dict] | None, + steps: int, + wall_seconds: float, + throughput: float, +) -> int: + """Build summary, print it, optionally write JSON, return exit code.""" + from robosandbox.eval import summarise_eval + summary = summarise_eval( + task=task_name, + policy=str(args.policy), + sim_backend=sim_backend, + successes=successes, + n_trials=n_trials, + success_per_trial=success_per_trial, + per_trial_details=per_trial_details, + provenance=_build_provenance(str(args.policy)), + steps=steps, + wall_seconds=wall_seconds, + throughput=throughput, + ) + _print_eval_summary(summary) + _maybe_write_json(getattr(args, "output", None), summary) + return 0 if successes > 0 else 1 + + +def _build_provenance(policy_path: str) -> dict: + """Collect the bits that pin down an eval's reproducibility: + + - SHA-256 of the model weights (``model.safetensors``) — content hash + that survives renames and path changes; two evals with matching + hashes used the same checkpoint bytes. + - robosandbox + lerobot + mujoco version strings (``importlib.metadata``). + - Git rev of the working repo, with a ``-dirty`` suffix when there are + uncommitted changes — so a "clean" rev guarantees the code is the + committed version. + + All fields are best-effort: missing dependencies, non-git checkouts, + or a missing model file just produce ``None``/absent keys instead of + failing the eval. + """ + import hashlib + import subprocess + from pathlib import Path + p = Path(policy_path) + prov: dict = {"policy_path": str(p)} + + # Checkpoint content hash. Use the lerobot canonical filename when + # present; fall back to events.jsonl for replay policies. + weight_file = None + for candidate in ("model.safetensors", "events.jsonl"): + f = p / candidate if p.is_dir() else p + if f.exists() and f.is_file(): + weight_file = f + break + if weight_file is not None: + h = hashlib.sha256() + with weight_file.open("rb") as fh: + for chunk in iter(lambda: fh.read(1 << 20), b""): + h.update(chunk) + prov["checkpoint_file"] = weight_file.name + prov["checkpoint_sha256"] = h.hexdigest() + prov["checkpoint_bytes"] = weight_file.stat().st_size + + # Versions of the libraries that produced the eval result. + try: + from importlib.metadata import PackageNotFoundError, version + for pkg in ("robosandbox-core", "lerobot", "mujoco", "torch", "warp-lang"): + try: + prov[f"{pkg}_version"] = version(pkg) + except PackageNotFoundError: + pass + except ImportError: + pass + + # Git rev of the robosandbox checkout (with -dirty suffix for any + # uncommitted changes — anything but a clean tag is a flag for "the + # eval was run against modified code"). + try: + repo = Path(__file__).resolve().parent.parent.parent + rev = subprocess.run( + ["git", "rev-parse", "HEAD"], cwd=repo, + capture_output=True, text=True, timeout=2, + ) + if rev.returncode == 0: + sha = rev.stdout.strip() + dirty = subprocess.run( + ["git", "status", "--porcelain"], cwd=repo, + capture_output=True, text=True, timeout=2, + ) + if dirty.returncode == 0 and dirty.stdout.strip(): + sha += "-dirty" + prov["robosandbox_git_rev"] = sha + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + + return prov + + +def _sim_check_cli(args: argparse.Namespace) -> int: + """Run the cross-backend agreement check; print the report.""" + import json + from pathlib import Path + + from robosandbox.eval.sim_check import report_to_dict, run_sim_check + + rep = run_sim_check( + task_name=args.task, + policy_path=args.policy, + max_steps=int(args.max_steps), + settle_steps=int(args.settle_steps), + joint_tol_rad=float(args.joint_tol_rad), + xy_tol_m=float(args.xy_tol_m), + device=args.device, + ) + + print(f"[sim-check] task: {rep.task}") + print(f"[sim-check] policy: {rep.policy}") + print(f"[sim-check] tolerances: joint <= {rep.tolerances['joint_rad']:.3f} rad, xy <= {rep.tolerances['xy_m'] * 1000:.1f} mm") + + if "error" in rep.mujoco: + print(f"[sim-check] MuJoCo FAILED: {rep.mujoco['error']}", file=sys.stderr) + else: + print(f"[sim-check] MuJoCo: success={rep.mujoco['success']} steps={rep.mujoco['steps']}") + if "error" in rep.newton: + print(f"[sim-check] Newton FAILED: {rep.newton['error']}", file=sys.stderr) + elif rep.newton: + print(f"[sim-check] Newton: success={rep.newton['success']} steps={rep.newton['steps']}") + + if rep.verdict != "RUN_FAILED": + print(f"[sim-check] outcome match: {rep.outcome_match}") + print(f"[sim-check] joint max diff: {rep.joint_max_abs_diff_rad:.4f} rad") + print(f"[sim-check] object max xy: {rep.object_max_xy_diff_m * 1000:.2f} mm") + for oid, d in sorted(rep.object_diffs.items()): + print(f"[sim-check] {oid}: {d * 1000:.2f} mm") + print(f"[sim-check] verdict: {rep.verdict}") + + if args.output: + out = Path(args.output) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps(report_to_dict(rep), indent=2)) + print(f"[sim-check] wrote JSON: {out}") + + # Exit code: 0 on PASS, 1 on STATE_DRIFT (the backends agreed on outcome + # but drifted in state — a softer failure), 2 on OUTCOME_MISMATCH or + # RUN_FAILED (the load-bearing claim broke). + if rep.verdict == "PASS": + return 0 + if rep.verdict == "STATE_DRIFT": + return 1 + return 2 + + +def _compare_cli(args: argparse.Namespace) -> int: + """Two-proportion z-test comparison of two EvalSummary JSON files.""" + import json + from pathlib import Path + + from robosandbox.eval import proportion_z_test + + try: + a = json.loads(Path(args.a).read_text()) + b = json.loads(Path(args.b).read_text()) + except (FileNotFoundError, json.JSONDecodeError) as e: + print(f"[compare] failed to read input: {e}", file=sys.stderr) + return 2 + + for label, doc in [("A", a), ("B", b)]: + for required in ("successes", "n_trials", "task", "policy", "sim_backend"): + if required not in doc: + print(f"[compare] {label} is missing required field {required!r}", file=sys.stderr) + return 2 + if a["task"] != b["task"]: + print( + f"[compare] WARNING: comparing across tasks ({a['task']} vs {b['task']}). " + f"The z-test result is mathematically valid but semantically suspect.", + file=sys.stderr, + ) + + s_a, n_a = int(a["successes"]), int(a["n_trials"]) + s_b, n_b = int(b["successes"]), int(b["n_trials"]) + rate_a = (s_a / n_a) if n_a else 0.0 + rate_b = (s_b / n_b) if n_b else 0.0 + z, p_value = proportion_z_test(s_a, n_a, s_b, n_b) + significant = p_value < float(args.alpha) + if rate_a > rate_b: + winner = "A" + elif rate_b > rate_a: + winner = "B" + else: + winner = "tie" + + print(f"A: {a['policy']} ({a['sim_backend']})") + print(f" successes: {s_a}/{n_a} ({rate_a * 100:.1f}%) CI: [{a.get('ci_low', 0) * 100:.1f}%, {a.get('ci_high', 1) * 100:.1f}%]") + print(f"B: {b['policy']} ({b['sim_backend']})") + print(f" successes: {s_b}/{n_b} ({rate_b * 100:.1f}%) CI: [{b.get('ci_low', 0) * 100:.1f}%, {b.get('ci_high', 1) * 100:.1f}%]") + print(f"diff: {(rate_a - rate_b) * 100:+.1f}pp") + print(f"z: {z:+.3f}") + print(f"p-value: {p_value:.4g}") + if winner == "tie": + print(f"verdict: tie (rates equal)") + elif significant: + print(f"verdict: {winner} > other (p<{args.alpha:g}, significant)") + else: + print(f"verdict: {winner} > other but NOT significant at α={args.alpha:g}") + # Exit 0 either way — `compare` is a report tool, not a gate. Callers + # who want a gate should check the printed p-value themselves. + return 0 + + +def _eval_parallel_cli(args: argparse.Namespace) -> int: + """Evaluate a policy in sim — MuJoCo (single world) or Newton (N parallel worlds).""" + from pathlib import Path + + from robosandbox.policy import load_policy, run_eval_parallel, run_policy + from robosandbox.sim import create_sim_backend + from robosandbox.tasks.loader import load_builtin_task + + backend = getattr(args, "sim_backend", "newton") + + try: + task = load_builtin_task(args.task) + except FileNotFoundError as e: + print(f"[eval] {e}", file=sys.stderr) + return 2 + + if backend not in task.supported_backends: + print( + f"[eval] task {args.task!r} does not list '{backend}' as a supported backend " + f"(supported: {', '.join(task.supported_backends)})", + file=sys.stderr, + ) + return 2 + + try: + policy = load_policy(Path(args.policy)) + except (ImportError, FileNotFoundError, ValueError) as e: + print(f"[eval] failed to load policy: {e}", file=sys.stderr) + return 2 + + print(f"[eval] task: {task.name}") + print(f"[eval] policy: {args.policy}") + print(f"[eval] sim_backend: {backend}") + + # ---- MuJoCo: single world, RGB available ---------------------------- + if backend == "mujoco": + import time as _time + + from robosandbox.eval import summarise_eval + from robosandbox.tasks.randomize import jitter_scene + + n_trials = max(1, int(getattr(args, "n_trials", 1) or 1)) + base_seed = getattr(args, "seed", None) + randomize_active = bool(task.randomize) and n_trials > 1 + print(f"[eval] n_trials: {n_trials}") + if task.randomize and n_trials == 1: + print( + "[eval] note: task has randomize spec but --n-trials=1; " + "using base scene (seed=0). Pass --n-trials N to randomize.", + file=sys.stderr, + ) + if randomize_active: + seed_origin = "user-provided" if base_seed is not None else "auto-derived" + print(f"[eval] randomize: {sorted(task.randomize.keys())} ({seed_origin} seeds)") + from robosandbox.tasks.runner import criterion_target_object + per_trial_details: list[dict] = [] + success_per_trial: list[bool] = [] + target_obj_id = criterion_target_object(task.success) + total_steps = 0 + t0 = _time.time() + for trial in range(n_trials): + try: + sim = create_sim_backend( + "mujoco", render_size=(240, 320), camera="scene" + ) + except (ImportError, ValueError) as e: + print(f"[eval] failed to create MuJoCo backend: {e}", file=sys.stderr) + return 2 + # Per-trial seed: 0 returns the base scene unchanged, so we shift + # the trial index by 1 in the auto-derived path. With --seed S, + # trial t uses S + t (still skipping 0 -> identity). + if randomize_active: + trial_seed = (int(base_seed) + trial + 1) if base_seed is not None else (trial + 1) + trial_scene = jitter_scene(task.scene, task.randomize, trial_seed) + else: + trial_scene = task.scene + sim.load(trial_scene) + # Match the settle the recorder did before its first frame — + # without it the policy sees mid-fall, OOD vs training. + for _ in range(int(getattr(args, "settle_steps", 100) or 0)): + sim.step() + if getattr(args, "reload_policy", True): + trial_policy = load_policy(Path(args.policy)) + else: + trial_policy = policy + initial_cube_pose = sim.get_object_pose(target_obj_id) if target_obj_id else None + initial_cube_z = initial_cube_pose.xyz[2] if initial_cube_pose else None + peak_lift_mm = 0.0 + min_ee_dist_mm = float("inf") + def _track(obs: object, action: object, + _z0=initial_cube_z, _oid=target_obj_id) -> None: + nonlocal peak_lift_mm, min_ee_dist_mm + if _oid is None or _z0 is None: + return + p = sim.get_object_pose(_oid) + if p is None: + return + lift_mm = (p.xyz[2] - _z0) * 1000.0 + if lift_mm > peak_lift_mm: + peak_lift_mm = lift_mm + ee_xyz = obs.ee_pose.xyz # type: ignore[attr-defined] + d_mm = math.sqrt(sum((a - b) ** 2 for a, b in zip(ee_xyz, p.xyz))) * 1000.0 + if d_mm < min_ee_dist_mm: + min_ee_dist_mm = d_mm + try: + # Reset replay-style policies so each trial starts at step 0. + reset = getattr(trial_policy, "reset", None) + if callable(reset): + reset() + result = run_policy( + sim, trial_policy, + max_steps=args.max_steps, + success=task.success, + action_repeat=int(getattr(args, "action_repeat", 1) or 1), + on_step=_track, + ) + finally: + sim.close() + ok = bool(result["success"]) if result["success"] is not None else False + success_per_trial.append(ok) + total_steps += int(result["steps"]) + trial_seed_recorded = ( + ((int(base_seed) + trial + 1) if base_seed is not None else (trial + 1)) + if randomize_active else 0 + ) + detail = { + "trial": trial + 1, + "seed": trial_seed_recorded, + "success": ok, + "success_step": result.get("success_step"), + "steps": int(result["steps"]), + "peak_lift_mm": float(peak_lift_mm), + "min_ee_object_dist_mm": ( + float(min_ee_dist_mm) if min_ee_dist_mm != float("inf") else None + ), + } + if initial_cube_pose is not None: + detail["object_id"] = target_obj_id + detail["object_initial_xyz"] = list(initial_cube_pose.xyz) + detail["object_initial_quat_xyzw"] = list(initial_cube_pose.quat_xyzw) + per_trial_details.append(detail) + if n_trials > 1: + print(f"[eval] trial {trial + 1}/{n_trials}: {'success' if ok else 'failure'} ({result['steps']} steps)") + wall = _time.time() - t0 + successes = sum(success_per_trial) + return _finalize_eval( + args, task_name=task.name, sim_backend="mujoco", + successes=successes, n_trials=n_trials, + success_per_trial=success_per_trial, + per_trial_details=per_trial_details, + steps=total_steps, wall_seconds=wall, + throughput=(total_steps / wall) if wall > 0 else 0.0, + ) + + # ---- Newton: N parallel worlds, state-only -------------------------- + world_count: int = args.world_count + print(f"[eval] world_count: {world_count}") + print(f"[eval] device: {args.device}") + print(f"[eval] max_steps: {args.max_steps}") + + image_keys = _policy_image_inputs(policy) + # Auto-enable RGB if the policy needs it; users can force on/off via + # --newton-rgb / --no-newton-rgb. Camera-on costs ~ms per step but is + # required for vision policies; off keeps state-only RL fast. + if getattr(args, "newton_rgb", None) is None: + enable_camera = bool(image_keys) + else: + enable_camera = bool(args.newton_rgb) + if image_keys and not enable_camera: + print( + f"[eval] WARNING: policy expects image inputs ({', '.join(sorted(image_keys))}) " + f"but --no-newton-rgb was set — frames will be zeros. Drop the flag to " + f"raytrace per-world RGB via SensorTiledCamera.", + file=sys.stderr, + ) + elif enable_camera: + print(f"[eval] camera: SensorTiledCamera (1 cam/world @ 240x320)") + + # Per-world randomization: build N different scenes when the task's + # randomize spec is set. Topology is invariant under jitter_scene so + # the Newton backend's per-world layout assumptions still hold. + per_world_scenes = None + if task.randomize and world_count > 1: + from robosandbox.tasks.randomize import jitter_scene as _jitter + base_seed = getattr(args, "seed", None) + per_world_scenes = [ + _jitter( + task.scene, + task.randomize, + seed=((int(base_seed) + w + 1) if base_seed is not None else (w + 1)), + ) + for w in range(world_count) + ] + seed_origin = "user-provided" if base_seed is not None else "auto-derived" + print(f"[eval] randomize: {sorted(task.randomize.keys())} ({seed_origin} seeds, per-world)") + elif task.randomize and world_count == 1: + print( + "[eval] note: task has randomize spec but --world-count=1; " + "using base scene. Increase --world-count to randomize.", + file=sys.stderr, + ) + + try: + sim = create_sim_backend( + "newton", + render_size=(240, 320), + camera="scene", + viewer="null", + device=args.device, + world_count=world_count, + enable_camera=enable_camera, + ) + sim.load(task.scene, per_world_scenes=per_world_scenes) + except (ImportError, ValueError) as e: + print(f"[eval] failed to create Newton backend: {e}", file=sys.stderr) + return 2 + + try: + result = run_eval_parallel( + sim, + policy, + max_steps=args.max_steps, + success=task.success, + settle_steps=args.settle_steps, + ) + finally: + sim.close() + + return _finalize_eval( + args, task_name=task.name, sim_backend="newton", + successes=int(result["successes"]), n_trials=int(result["n_worlds"]), + success_per_trial=list(result.get("success_per_world", [])), + per_trial_details=None, # Newton parallel path doesn't track per-trial pose + steps=int(result["steps"]), wall_seconds=float(result["wall"]), + throughput=float(result["throughput"]), + ) + + if __name__ == "__main__": sys.exit(main()) diff --git a/packages/robosandbox-core/src/robosandbox/viewer/server.py b/packages/robosandbox-core/src/robosandbox/viewer/server.py index 1f6b002..2a45057 100644 --- a/packages/robosandbox-core/src/robosandbox/viewer/server.py +++ b/packages/robosandbox-core/src/robosandbox/viewer/server.py @@ -1,17 +1,22 @@ """FastAPI server exposing the sim as a browser-viewable live stream. Architecture: - - SimThread: owns the MuJoCoBackend in a single dedicated thread + - SimThread: owns the sim backend in a single dedicated thread (MuJoCo renderer is not thread-safe, so all sim access is serialized through this thread's command queue). Publishes JPEG frames + status events to thread-safe queues. - Async FastAPI app: serves the SPA, broadcasts frames + events to connected WebSocket clients, relays client commands to SimThread. +Backend modes: + mujoco (default): renders JPEG frames streamed to the browser . + newton: Viser owns the 3D render; JPEG queue stays empty. The browser + embeds Viser as an