diff --git a/data_quality_review/folding_metric_vs_decision_correlation.md b/data_quality_review/folding_metric_vs_decision_correlation.md new file mode 100644 index 000000000..3e5c1ae21 --- /dev/null +++ b/data_quality_review/folding_metric_vs_decision_correlation.md @@ -0,0 +1,60 @@ +# Folding-clothes: val metrics vs. accept/reject correlation + +**Date:** 2026-06-18 +**Sources:** +- Per-episode val metrics: `logs/mecka_pi_folding_per_episode/folding_sharded/merged/per_episode_stats/episodes.jsonl` (462 episodes) +- Human review decisions: `data_quality_review/mecka_folding_clothes_decisions.jsonl` (477 rows) + +## Setup + +- All 462 scored episodes matched 1:1 to a review decision. +- 13 episodes were reviewed more than once (477 rows → 462 unique hashes); deduped by **latest timestamp**. +- Dedup also resolves the single `uncertain` row → its later `reject`. +- Final labels: **283 reject / 179 accept** (61.3% baseline reject rate). +- Checkpoint scored: `mecka_pi_fold_clothes_2026-05-27_12-01-39` (`HumanBimanualCartesianEuler`, cartesian transform). + +## Headline + +**There is a real but modest correlation: the model's action-prediction error is systematically higher on rejected episodes. The flow-matching losses carry no signal.** Direction below: positive r / AUC > 0.5 means a higher metric associates with `reject`. + +## Per-metric correlation with `reject` + +| metric | Pearson r | AUC | p (Mann–Whitney) | mean accept → reject | +|---|---:|---:|---:|---| +| `..._final_mse_avg` | **+0.27** | **0.67** | 1.4e-09 | 0.227 → 0.323 | +| `..._frechet_gauss_min` | +0.23 | 0.66 | 4.9e-09 | 0.334 → 0.386 | +| `..._frechet_gauss_avg` | +0.23 | 0.64 | 7.1e-07 | 0.987 → 1.158 | +| `..._ypr_paired_mse_avg` | +0.23 | 0.63 | 3.1e-06 | 0.274 → 0.391 | +| `..._paired_mse_avg` | +0.23 | 0.63 | 3.9e-06 | 0.140 → 0.199 | +| `..._frechet_gauss_max` | +0.15 | 0.57 | 8.0e-03 | 2.71 → 2.96 | +| `mecka_bimanual_loss` / `action_loss` | +0.09 | 0.54 | 0.16 (n.s.) | 0.083 → 0.088 | +| `..._xyz_paired_mse_avg` | **−0.07** | 0.44 | 0.036 | 0.0064 → 0.0061 | +| `n_frames_total` | +0.04 | 0.52 | 0.42 (n.s.) | — | +| `n_resampled_frames` | +0.03 | 0.48 | 0.37 (n.s.) | — | + +(`n_frames`, `n_batches`, `seed`, `embodiment_id` are constant across episodes — excluded.) +(All metric names prefixed `Valid/mecka_bimanual_actions_cartesian_`.) + +## Interpretation + +1. **The signal is almost entirely orientation error.** `paired_mse` and `ypr_paired_mse` correlate **1.00**; `final_mse`, `frechet_gauss_avg`, `ypr`, `paired` all sit at 0.96–0.97 with each other — one underlying signal: the policy reproduces the *rotation* trajectory worse on rejected episodes. + +2. **Translation error (`xyz`) is the opposite** — nearly independent of the rest (r≈0.25) and *weakly negative* with reject. Position tracking does not flag bad episodes; orientation does. + +3. **The training-style losses carry no signal** (`*_loss`, p=0.16). Only the geometric/decoded-action metrics separate the classes. + +4. **Effect is modest, not a clean separator.** Best single metric `final_mse_avg` → AUC 0.67. A 5-fold-CV logistic regression on all six action-error metrics reaches only **0.69** (vs 0.66 for `final_mse` alone — the others are redundant). + +### Reject rate by `final_mse_avg` quintile + +| quintile (range) | reject rate | n | +|---|---:|---:| +| Q1 [0.042, 0.152] | 43.5% | 92 | +| Q2 [0.153, 0.216] | 51.1% | 92 | +| Q3 [0.217, 0.281] | 60.2% | 93 | +| Q4 [0.281, 0.395] | 68.5% | 92 | +| Q5 [0.396, 1.384] | **82.8%** | 93 | + +## Takeaway + +A high-error episode (`final_mse_avg` / orientation MSE / Fréchet distance) is ~2× more likely to be a human reject than a low-error one — consistent with reviewers rejecting inconsistent demos the policy fits poorly. But at AUC ≈ 0.66–0.69 this is a **prioritization signal, not an auto-filter**: useful for surfacing likely-bad episodes for review, not for replacing the manual pass. To use as a screen, rank by `final_mse_avg` (≡ `ypr_paired_mse_avg`); ignore `xyz` and the raw losses. diff --git a/egomimic/eval/eval_pi_per_episode.py b/egomimic/eval/eval_pi_per_episode.py new file mode 100644 index 000000000..50b13a2e8 --- /dev/null +++ b/egomimic/eval/eval_pi_per_episode.py @@ -0,0 +1,363 @@ +import csv +import json +import logging +import os + +import numpy as np +import torch +from omegaconf import OmegaConf +from torch.utils.data import DataLoader, Subset + +from egomimic.eval.eval_pi import PIEvalVideo +from egomimic.pl_utils.pl_data_utils import annotation_collate +from egomimic.rldb.embodiment.embodiment import get_embodiment +from egomimic.rldb.zarr.utils import set_global_seed + +logger = logging.getLogger(__name__) + + +class PIEvalVideoPerEpisode(PIEvalVideo): + """ + Per-episode validation for Pi/Pi0.5 models. + + Where the parent ``PIEvalVideo`` produces a single dataset-level aggregate + pushed to wandb, this evaluator iterates the validation ``MultiDataset`` one + episode at a time and writes **one record per episode** to disk. It reuses + the parent's ``compute_metrics_and_viz`` unchanged: when a DataLoader yields + only one episode's frames, the batch-reduced metrics ARE that episode's + metrics (accumulated frame-weighted across the episode's batches). + + Invoked via the ``mode=eval`` ``run()`` hook in ``trainHydra.py`` — it does + NOT call ``trainer.validate``, so Lightning's ``on_validation_*`` hooks (and + the unconditional ``torch.distributed.barrier()`` in + ``ModelWrapper.on_validation_end``) never fire. Eval mode forces + ``devices=1``, so this is strictly single-process and file writes need no + rank guard. + + Designed for an OLD euler checkpoint: pair with ``transform_lists: {}`` (so + the 6D cam-frame revert branch in ``compute_metrics_and_viz`` is skipped) and + a ``norm_stats.precomputed_norm_path`` pointing at the checkpoint's own + euler norm stats. + """ + + def __init__( + self, + *args, + dataset_name: str = "mecka_bimanual", + output_subdir: str = "per_episode_stats", + seed: int = 42, + max_frames_per_episode: int | None = None, + num_shards: int = 1, + shard_index: int = 0, + **kwargs, + ): + super().__init__(*args, **kwargs) + # Which entry of ``datamodule.valid_datasets`` to iterate. Also the + # embodiment-name key expected by ``process_batch_for_training``. + self.dataset_name = dataset_name + self.output_subdir = output_subdir + self.seed = seed + # Episode sharding for multi-GPU runs: each process scores + # ``sorted(episodes)[shard_index::num_shards]`` and writes its own + # ``episodes.shard{i}of{n}.jsonl``. Merge the shard files afterwards. + # num_shards=1 -> single ``episodes.jsonl`` + summary (no merge needed). + if not (0 <= shard_index < num_shards): + raise ValueError( + f"shard_index {shard_index} out of range for num_shards {num_shards}" + ) + self.num_shards = num_shards + self.shard_index = shard_index + # Cap frames scored per episode (evenly spaced across the episode) to + # bound runtime. None = score every frame. The per-episode metric is a + # mean over frames, and consecutive frames are highly autocorrelated, so + # an evenly-spaced subset of a few hundred frames is an ample estimate + # for ranking/filtering episodes. + self.max_frames_per_episode = max_frames_per_episode + + # ------------------------------------------------------------------ + # helpers + # ------------------------------------------------------------------ + @staticmethod + def _to_float(v): + if torch.is_tensor(v): + return float(v.detach().cpu().item()) + return float(v) + + def _episode_dir(self): + return os.path.join(self.root_dir(), self.output_subdir) + + @staticmethod + def _cfg_get(cfg, path): + try: + v = OmegaConf.select(cfg, path) + return None if v is None else str(v) + except Exception: + return None + + # ------------------------------------------------------------------ + # entrypoint (called by trainHydra.py mode=eval) + # ------------------------------------------------------------------ + def run(self, trainer, model, datamodule, cfg): + # ``model`` is the LightningModule (ModelWrapper); ``model.model`` is the + # algo. trainHydra already sets self.trainer / self.model, but be explicit. + self.trainer = trainer + self.model = model.model + algo = self.model + + # --- load checkpoint weights into the wrapper (mirrors eval_latent.run) --- + ckpt_path = cfg.get("ckpt_path") + if ckpt_path: + checkpoint = torch.load(ckpt_path, map_location="cpu", weights_only=False) + model.load_state_dict(checkpoint["state_dict"], strict=False) + logger.info("Loaded weights from %s", ckpt_path) + else: + logger.warning( + "No ckpt_path provided — evaluating current (untrained) weights." + ) + + # --- device setup (on_validation_start won't run, so do it by hand) --- + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model.to(device) + model.eval() + # The algo reads ``self.device`` when building pad masks / sampling. + algo.device = device + + # Disable the max-autotune torch.compile on sample_actions (set in + # PI0Pytorch.__init__). Compiling it costs >20 min on first call and + # would re-trigger on every new batch size (each episode's short final + # batch differs), so eager is both faster end-to-end and stable here. + # Mirrors egomimic/robot/rollout.py's rollout-inference unwrap. + pi0 = algo.nets["policy"] + if "sample_actions" in vars(pi0): + del pi0.sample_actions + logger.info("Disabled torch.compile on sample_actions (eager eval).") + + set_global_seed(self.seed) + + valid_md = datamodule.valid_datasets[self.dataset_name] + gibd = valid_md._global_indices_by_dataset # {episode_hash: [global idx]} + + dl_params = dict(datamodule.valid_dataloader_params.get(self.dataset_name, {})) + batch_size = int(dl_params.get("batch_size", 32)) + num_workers = int(dl_params.get("num_workers", 0)) + + out_dir = self._episode_dir() + os.makedirs(out_dir, exist_ok=True) + if self.num_shards > 1: + jsonl_path = os.path.join( + out_dir, f"episodes.shard{self.shard_index}of{self.num_shards}.jsonl" + ) + else: + jsonl_path = os.path.join(out_dir, "episodes.jsonl") + + provenance = { + "ckpt_path": str(ckpt_path), + "transform_mode": self._cfg_get( + cfg, + f"data.valid_datasets.{self.dataset_name}.resolver.transform_list.mode", + ), + "precomputed_norm_path": self._cfg_get( + cfg, "norm_stats.precomputed_norm_path" + ), + "seed": self.seed, + "dataset_name": self.dataset_name, + } + + records = [] + all_episodes = sorted(gibd.items()) + episodes = all_episodes[self.shard_index :: self.num_shards] + n_eps = len(episodes) + logger.info( + "Per-episode eval: %d episodes (shard %d/%d of %d total, " + "dataset=%s, batch_size=%d, num_workers=%d)", + n_eps, + self.shard_index, + self.num_shards, + len(all_episodes), + self.dataset_name, + batch_size, + num_workers, + ) + + with open(jsonl_path, "w") as jf: + for ep_i, (episode_hash, global_idxs) in enumerate(episodes): + # Re-seed per episode so numbers don't depend on episode ordering. + set_global_seed(self.seed + ep_i) + + idxs = list(global_idxs) + n_frames_total = len(idxs) + if ( + self.max_frames_per_episode + and n_frames_total > self.max_frames_per_episode + ): + sel = np.linspace( + 0, n_frames_total - 1, self.max_frames_per_episode + ) + sel = sorted(set(int(round(x)) for x in sel)) + idxs = [idxs[i] for i in sel] + + subset = Subset(valid_md, idxs) + loader = DataLoader( + subset, + batch_size=batch_size, + shuffle=False, + collate_fn=annotation_collate, + num_workers=num_workers, + ) + + agg = {} # metric_key -> frame-weighted running sum + total_frames = 0 + n_batches = 0 + n_resampled = 0 + task_label = None + emb_id = None + action_converter = None + + for collated in loader: + batch = algo.process_batch_for_training( + {self.dataset_name: collated} + ) + metrics, _ = self.compute_metrics_and_viz(batch, do_viz=False) + + emb_id = next(iter(batch.keys())) + hashes = batch[emb_id].get("episode_hash") + if hashes is None: + hashes = collated.get("episode_hash", []) + b = len(hashes) + n_resampled += sum(1 for h in hashes if str(h) != str(episode_hash)) + + if task_label is None: + tlist = batch[emb_id].get("task") or collated.get("task") + if tlist is not None and len(tlist) > 0: + task_label = str(tlist[0]) + if action_converter is None: + try: + ac_key = algo.ac_keys[emb_id] + action_converter = type( + algo.action_registry.get(emb_id, ac_key) + ).__name__ + except Exception: + action_converter = None + + for k, v in metrics.items(): + agg[k] = agg.get(k, 0.0) + self._to_float(v) * b + total_frames += b + n_batches += 1 + + if total_frames == 0: + logger.warning( + "Episode %s yielded 0 frames; skipping.", episode_hash + ) + continue + + ep_metrics = {k: (s / total_frames) for k, s in agg.items()} + if n_resampled: + logger.warning( + "Episode %s: %d/%d frames resampled to a different episode " + "(MultiDataset bounds fallback); metrics include them.", + episode_hash, + n_resampled, + total_frames, + ) + + rec = { + "episode_hash": str(episode_hash), + "task": task_label, + "embodiment_id": int(emb_id) if emb_id is not None else None, + "embodiment": ( + get_embodiment(emb_id) if emb_id is not None else None + ), + "n_frames": int(total_frames), + "n_frames_total": int(n_frames_total), + "n_batches": int(n_batches), + "n_resampled_frames": int(n_resampled), + "action_converter": action_converter, + **provenance, + "metrics": ep_metrics, + } + jf.write(json.dumps(rec) + "\n") + jf.flush() + records.append(rec) + logger.info( + "[%d/%d] %s task=%s frames=%d action_loss=%.5f", + ep_i + 1, + n_eps, + episode_hash, + task_label, + total_frames, + ep_metrics.get("Valid/action_loss", float("nan")), + ) + + if self.num_shards == 1: + self._write_summary(out_dir, records) + else: + logger.info( + "Shard %d/%d done — wrote %s. Merge shard jsonls + build summary " + "after all shards finish.", + self.shard_index, + self.num_shards, + jsonl_path, + ) + logger.info( + "Per-episode eval complete: %d episodes (shard %d/%d) -> %s", + len(records), + self.shard_index, + self.num_shards, + out_dir, + ) + + # ------------------------------------------------------------------ + # roll-up + # ------------------------------------------------------------------ + def _write_summary(self, out_dir, records): + if not records: + logger.warning("No episode records to summarize.") + return + + metric_keys = sorted({k for r in records for k in r["metrics"]}) + + # Per-episode CSV (one row per episode, flattened metrics). + base_cols = [ + "episode_hash", + "task", + "embodiment", + "n_frames", + "n_batches", + "n_resampled_frames", + ] + csv_path = os.path.join(out_dir, "summary.csv") + with open(csv_path, "w", newline="") as f: + w = csv.writer(f) + w.writerow(base_cols + metric_keys) + for r in records: + w.writerow( + [r.get(c) for c in base_cols] + + [r["metrics"].get(k) for k in metric_keys] + ) + + # Aggregate over episodes (unweighted stats + frame-weighted mean) so the + # frame-weighted mean is directly comparable to the old wandb aggregate. + agg = {} + for k in metric_keys: + vals = np.array( + [r["metrics"][k] for r in records if k in r["metrics"]], dtype=float + ) + fw = np.array( + [r["n_frames"] for r in records if k in r["metrics"]], dtype=float + ) + agg[k] = { + "episode_mean": float(vals.mean()), + "episode_median": float(np.median(vals)), + "episode_std": float(vals.std()), + "frame_weighted_mean": ( + float((vals * fw).sum() / fw.sum()) if fw.sum() else None + ), + } + + summary = { + "n_episodes": len(records), + "total_frames": int(sum(r["n_frames"] for r in records)), + "metrics": agg, + } + with open(os.path.join(out_dir, "summary.json"), "w") as f: + json.dump(summary, f, indent=2) diff --git a/egomimic/eval/eval_train_viz.py b/egomimic/eval/eval_train_viz.py index e63cee828..fc1013209 100644 --- a/egomimic/eval/eval_train_viz.py +++ b/egomimic/eval/eval_train_viz.py @@ -53,7 +53,7 @@ def model(self, value): def video_dir(self): return os.path.join(self.root_dir(), "videos_train_viz") - def compute_metrics_and_viz(self, batch): - metrics, images_dict = self.base.compute_metrics_and_viz(batch) + def compute_metrics_and_viz(self, batch, do_viz=True): + metrics, images_dict = self.base.compute_metrics_and_viz(batch, do_viz=do_viz) metrics = {f"train_viz/{k}": v for k, v in metrics.items()} return metrics, images_dict diff --git a/egomimic/hydra_configs/data/mecka_pi_per_episode_eval.yaml b/egomimic/hydra_configs/data/mecka_pi_per_episode_eval.yaml new file mode 100644 index 000000000..faaf55375 --- /dev/null +++ b/egomimic/hydra_configs/data/mecka_pi_per_episode_eval.yaml @@ -0,0 +1,53 @@ +_target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper + +# Per-episode eval variant of mecka_pi for the OLD EULER checkpoint. +# +# Differences vs mecka_pi.yaml: +# - transform_list.mode = cartesian (12-dim EULER, NOT cartesian_6d) so the +# data matches the euler checkpoint + its euler norm stats. +# - filter = mecka folding_clothes. +# - valid mode = total -> ALL matching episodes (not the held-out split). +# - train mode = percent (tiny) -> kept non-empty only for shape inference +# (trainHydra.py reads train_datasets[name][0]); norm values come from +# norm_stats.precomputed_norm_path, so train size doesn't affect them. +# +# Still required on the CLI (lives in the model config, not here): +# model.robomimic_model.action_converters.rules.MECKA_BIMANUAL._target_=\ +# egomimic.utils.action_utils.HumanBimanualCartesianEuler +# norm_stats.precomputed_norm_path=/norm_stats/norm_stats.json + +train_datasets: + mecka_bimanual: + _target_: egomimic.rldb.zarr.zarr_dataset_multi.MultiDataset._from_resolver + resolver: + _target_: egomimic.rldb.zarr.zarr_dataset_multi.S3EpisodeResolver + folder_path: ${paths.dataset_dir} + key_map: + _target_: egomimic.rldb.embodiment.human.Mecka.get_keymap + mode: cartesian_pi + annotation_key: annotations + transform_list: + _target_: egomimic.rldb.embodiment.human.Mecka.get_transform_list + mode: cartesian + filters: + _target_: egomimic.rldb.filters.DatasetFilter + filter_lambdas: + - "lambda row: row['lab'] == 'mecka' and row['task'] == 'folding_clothes'" + mode: percent + percent: 0.02 + +valid_datasets: + mecka_bimanual: + _target_: ${data.train_datasets.mecka_bimanual._target_} + resolver: ${data.train_datasets.mecka_bimanual.resolver} + filters: ${data.train_datasets.mecka_bimanual.filters} + mode: total + +train_dataloader_params: + mecka_bimanual: + batch_size: 32 + num_workers: 10 +valid_dataloader_params: + mecka_bimanual: + batch_size: 32 + num_workers: 10 diff --git a/egomimic/hydra_configs/evaluator/eval_pi_per_episode.yaml b/egomimic/hydra_configs/evaluator/eval_pi_per_episode.yaml new file mode 100644 index 000000000..d6537e79d --- /dev/null +++ b/egomimic/hydra_configs/evaluator/eval_pi_per_episode.yaml @@ -0,0 +1,40 @@ +defaults: + - _self_ + +# Per-episode eval: iterates the validation MultiDataset one episode at a time +# and writes one stats record per episode (per_episode_stats/episodes.jsonl + +# summary.{json,csv}). Driven by the mode=eval `run()` hook in trainHydra.py; +# does NOT use trainer.validate. See egomimic/eval/eval_pi_per_episode.py. +_target_: egomimic.eval.eval_pi_per_episode.PIEvalVideoPerEpisode + +# Which datamodule.valid_datasets entry to iterate (also the embodiment-name key +# consumed by process_batch_for_training). +dataset_name: mecka_bimanual + +# Output dir (under the eval run's hydra output_dir) + per-episode seed base. +output_subdir: per_episode_stats +seed: 42 + +# Cap frames scored per episode (evenly spaced). null = every frame. Set this to +# bound runtime: folding_clothes averages ~2440 frames/episode (1.13M total), so +# scoring every frame on 1 GPU is ~33h. A few hundred frames/episode gives an +# ample per-episode estimate (frames are highly autocorrelated). +max_frames_per_episode: null + +# Episode sharding for multi-GPU runs. Each process scores +# sorted(episodes)[shard_index::num_shards] and writes +# episodes.shard{i}of{n}.jsonl; merge the shard files after all finish. +# num_shards=1 -> single episodes.jsonl + summary (no merge). +num_shards: 1 +shard_index: 0 + +# No viz: run() always calls compute_metrics_and_viz(do_viz=False), so no +# viz_func is needed and no videos are written. +viz_every_n_epochs: 0 +one_video_per_task: false + +# CRITICAL: empty so compute_metrics_and_viz skips the cam-frame revert+MSE +# branch (eval_pi.py:142-171). The default eval_pi.yaml wires a 6D revert +# transform, which is WRONG for a 12-dim euler checkpoint. Native-frame metrics +# (paired/final/xyz/ypr/Frechet) handle 12-dim euler correctly. +transform_lists: {} diff --git a/egomimic/hydra_configs/model/pi0.5_base.yaml b/egomimic/hydra_configs/model/pi0.5_base.yaml index 31f83d579..37977d48a 100644 --- a/egomimic/hydra_configs/model/pi0.5_base.yaml +++ b/egomimic/hydra_configs/model/pi0.5_base.yaml @@ -58,7 +58,7 @@ robomimic_model: size: 224 interpolation: 3 -enable_grad_norm: false +enable_grad_norm: true optimizer: _target_: torch.optim.AdamW @@ -72,7 +72,7 @@ scheduler: _target_: transformers.optimization.get_cosine_with_min_lr_schedule_with_warmup _partial_: true num_warmup_steps: 1000 - num_training_steps: 200000 + num_training_steps: 150000 num_cycles: 0.5 min_lr: 1e-5