Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Environment

- **You are on a shared SLURM cluster.** Do NOT run anything GPU- or CPU-intensive yourself (no training, no eval, no large data conversions, no full dataset loads, no heavy `pytest` runs that spin up models or pull data). Defer to the user to actually execute those commands — your job is to prepare the command and explain it. Lightweight read-only work (lint, type checks, small unit tests, file edits, single-file syntax checks) is fine on the login node.
- Python 3.11. Activate the project venv before any Python tooling: `source emimic/bin/activate`.
- Package is installed editable as `egomimic` (see `pyproject.toml`). Linting is `ruff` via pre-commit.
- AWS/Cloudflare R2 credentials are required for SQL episode registry + data download. Bootstrap with `aws configure` then `./egomimic/utils/aws/setup_secret.sh` (writes `~/.egoverse_env`). `load_env()` from `egomimic.utils.aws.aws_data_utils` is called automatically at the top of `trainHydra.py`.

## Architecture

### Training entrypoint
`egomimic/trainHydra.py` is the only training entrypoint. It is Hydra + PyTorch Lightning, supports DDP, and is composed entirely via config groups under `egomimic/hydra_configs/`:

- `train_zarr_cartesian.yaml`, `train_zarr_cartesian_pi.yaml`, `train_zarr_keypoint*.yaml` — top-level configs that pick a default `data=`, `model=`, `trainer=`, `evaluator=`, `logger=`, `callbacks=`.
- `data/` — one YAML per dataset recipe. Each instantiates `MultiDataset._from_resolver` with an `S3EpisodeResolver` + `key_map` + `transform_list` + `filters`. Modify these (or override inline) to change which episodes are pulled.
- `model/` — model recipes. Two families: HPT-based (`hpt_*.yaml`) and Pi/Pi0.5 flow-matching (`pi0.5_*.yaml`). Cotrain configs combine a robot + human dataset into a shared/separate-head model.
- `hydra/launcher/submitit.yaml` — SLURM partition/GPUs/time. Edit to match the cluster.
- `logger=debug` and `trainer=debug` exist for fast iteration.

### Data pipeline (read this before touching any data code)
Code lives under `egomimic/rldb/`. Flow is:
1. **SQL filter** (`rldb/filters.py`) selects rows from the `app.episodes` Postgres table.
2. **`S3EpisodeResolver`** (`rldb/zarr/zarr_dataset_multi.py`) maps filtered rows → S3 Zarr URIs and lazily downloads to a local cache.
3. **`ZarrEpisode`** reads each `<episode_hash>.zarr` (Zarr v3, see `CONTRIBUTING_DATA.md` for the schema contract).
4. **Key map** (per-embodiment, `rldb/embodiment/{eva,human}.py`) renames raw zarr keys to pre-transform names.
5. **Transform list** (same files) applies frame conversions (SLAM world → head frame via `ActionChunkCoordinateFrameTransform`), action chunking, normalization, concatenation. All poses are stored in SLAM world frame and re-expressed to head frame at load time — do **not** pre-transform when writing.
6. **`MultiDataset`** virtually merges per-episode datasets; `mode` ∈ {`train`, `valid`, `percent`, `total`} controls sampling.
7. **`DataSchematic`** in the top-level config maps post-transform keys → batch keys consumed by the model.
8. **Norm stats** are computed on-the-fly over `norm_stats.sample_frac` of the data and cached to the run's `norm_stats/` dir. Lower `sample_frac` for large datasets; reuse via `norm_stats.precomputed_norm_path`.

`egomimic/rldb/zarr/zarr_writer.py` is the only supported way to *produce* Zarr stores. Don't roll a custom writer — sharding/chunking conventions must match the rest of the corpus.

### Algorithms and models
- `egomimic/algo/` — top-level policy classes: `act.py`, `hpt.py`, `pi.py` (Pi/Pi0.5). They wrap nets from `egomimic/models/`.
- `egomimic/models/` — `act_nets.py`, `hpt_nets.py`, `denoising_*`, `*_policy.py`, `preprocess_pi_obs.py`. `fm_policy.py` is flow-matching, `diffusion_policy.py` is DDIM.
- `egomimic/pl_utils/pl_model.py` — Lightning `ModelWrapper` used by `trainHydra.py`.
- `egomimic/eval/` — evaluators dispatched by `+evaluator=eval_act|eval_hpt|eval_pi|eval_video`.

### Embodiments
Defined as strings (see `CONTRIBUTING_DATA.md` §8 for the full list): `aria_bimanual`, `eva_bimanual`, `mecka_bimanual`, `scale_bimanual`, plus single-arm variants. The `embodiment` value in both the SQL row and `zarr.attrs` must match exactly. `rldb/embodiment/{eva,human}.py` expose `get_keymap()` and `get_transform_list()` per embodiment.
4 changes: 4 additions & 0 deletions egomimic/algo/pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ def process_batch_for_training(self, batch):
processed_batch[embodiment_id]["embodiment"] = torch.tensor(
[embodiment_id], device=self.device, dtype=torch.int64
)
# Forward per-sample task tags (list[str], length B) so EvalVideo's
# one_video_per_task bucketing can read batch[emb]["task"].
if "task" in _batch:
processed_batch[embodiment_id]["task"] = _batch["task"]

for key, value in processed_batch[embodiment_id].items():
if isinstance(value, torch.Tensor):
Expand Down
95 changes: 75 additions & 20 deletions egomimic/eval/eval_video.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
import re
from abc import abstractmethod
from collections import defaultdict

import torch
import torchvision.io as tvio
Expand All @@ -20,6 +22,8 @@ def __init__(
limit_val_batches: int = 400,
viz_func: dict = None,
transform_lists: dict | None = None,
one_video_per_task: bool = False,
max_frames_per_task: int | None = 1000,
):
super().__init__()
self.trainer = None
Expand All @@ -29,6 +33,13 @@ def __init__(
# the model's wrist-frame actions back into cam (head) frame. Reused for
# both cam-frame MSE and the viz video so we don't transform twice.
self.transform_lists = transform_lists or {}
# When True, key the buffer by (embodiment_id, task) and emit exactly
# one mp4 per (embodiment, task) at the end of validation. Used in
# eval-only mode when the valid filter spans multiple tasks.
self.one_video_per_task = one_video_per_task
# Cap each (embodiment, task) bucket at this many frames. Only takes
# effect when one_video_per_task=True. Set None to disable.
self.max_frames_per_task = max_frames_per_task
self.val_image_buffer = {}
self.val_counter = {}
self.override_dict = {
Expand Down Expand Up @@ -65,13 +76,26 @@ def on_validation_start(self):
exist_ok=True,
)

@staticmethod
def _sanitize_task(task: str) -> str:
# Filesystem-safe: collapse whitespace and replace path separators.
return re.sub(r"[^\w.-]+", "_", str(task)).strip("_") or "unknown"

def on_validation_end(self):
for key, buffer in self.val_image_buffer.items():
if self.one_video_per_task:
embodiment_id, task = key
emb_dir = str(get_embodiment(embodiment_id))
filename = f"{self._sanitize_task(task)}.mp4"
else:
emb_dir = str(get_embodiment(key))
filename = f"validation_video_{self.val_counter[key]}.mp4"

os.makedirs(
os.path.join(
self.video_dir(),
f"epoch_{self.trainer.current_epoch}",
str(get_embodiment(key)),
emb_dir,
),
exist_ok=True,
)
Expand All @@ -80,8 +104,8 @@ def on_validation_end(self):
path = os.path.join(
self.video_dir(),
f"epoch_{self.trainer.current_epoch}",
str(get_embodiment(key)),
f"validation_video_{self.val_counter[key]}.mp4",
emb_dir,
filename,
)
tvio.write_video(path, frames, fps=30, video_codec="h264")

Expand All @@ -98,29 +122,60 @@ def on_validation_step(self, batch, batch_idx, dataloader_idx=0):
}

## images is now a dict
for key, images in images_dict.items():
for embodiment_id, images in images_dict.items():
os.makedirs(
os.path.join(
self.video_dir(),
f"epoch_{self.trainer.current_epoch}",
str(get_embodiment(key)),
str(get_embodiment(embodiment_id)),
),
exist_ok=True,
)
if key not in self.val_image_buffer or self.val_image_buffer[key] is None:
self.val_image_buffer[key] = []
self.val_counter[key] = 0
self.val_image_buffer[key].extend(torch.from_numpy(images))
if len(self.val_image_buffer[key]) >= 1000:
frames = torch.stack(self.val_image_buffer[key])
path = os.path.join(
self.video_dir(),
f"epoch_{self.trainer.current_epoch}",
str(get_embodiment(key)),
f"validation_video_{self.val_counter[key]}.mp4",
)
tvio.write_video(path, frames, fps=30, video_codec="h264")
self.val_image_buffer[key].clear()
self.val_counter[key] += 1
frames_tensor = torch.from_numpy(images)

if self.one_video_per_task:
tasks = batch[embodiment_id].get("task")
if tasks is None:
raise KeyError(
"one_video_per_task=True requires 'task' in each batch "
"sample. Confirm ZarrDataset.__getitem__ attaches it."
)
# Group sample indices by task so each bucket only takes one
# extend call even when a batch straddles two tasks.
per_task = defaultdict(list)
for i, t in enumerate(tasks):
per_task[str(t)].append(i)
for task, idxs in per_task.items():
key = (embodiment_id, task)
if key not in self.val_image_buffer:
self.val_image_buffer[key] = []
self.val_counter[key] = 0
if self.max_frames_per_task is not None:
remaining = self.max_frames_per_task - len(
self.val_image_buffer[key]
)
if remaining <= 0:
continue
idxs = idxs[:remaining]
self.val_image_buffer[key].extend(frames_tensor[idxs])
# No mid-flush in per-task mode: clip length is bounded by
# limit_val_batches; final write happens in on_validation_end.
else:
key = embodiment_id
if key not in self.val_image_buffer:
self.val_image_buffer[key] = []
self.val_counter[key] = 0
self.val_image_buffer[key].extend(frames_tensor)
if len(self.val_image_buffer[key]) >= 1000:
frames = torch.stack(self.val_image_buffer[key])
path = os.path.join(
self.video_dir(),
f"epoch_{self.trainer.current_epoch}",
str(get_embodiment(embodiment_id)),
f"validation_video_{self.val_counter[key]}.mp4",
)
tvio.write_video(path, frames, fps=30, video_codec="h264")
self.val_image_buffer[key].clear()
self.val_counter[key] += 1

self.trainer.lightning_module.log_dict(metrics, sync_dist=True)
45 changes: 45 additions & 0 deletions egomimic/hydra_configs/data/mecka_pi_eval.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
_target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper

# Eval-only variant of mecka_pi: train side is unchanged (single task), but the
# valid filter is broadened to all mecka tasks so eval mode can score across
# them. Use together with evaluator.one_video_per_task=true.

train_datasets:
mecka_bimanual:
_target_: egomimic.rldb.zarr.zarr_dataset_multi.MultiDataset._from_resolver
resolver:
_target_: egomimic.rldb.zarr.zarr_dataset_multi.S3EpisodeResolver
folder_path: ${paths.dataset_dir}
key_map:
_target_: egomimic.rldb.embodiment.human.Mecka.get_keymap
mode: cartesian_pi
annotation_key: annotations
transform_list:
_target_: egomimic.rldb.embodiment.human.Mecka.get_transform_list
mode: cartesian
filters:
_target_: egomimic.rldb.filters.DatasetFilter
filter_lambdas:
- "lambda row: row['lab'] == 'mecka' and row['episode_hash'] in {'69b9a059c69a19757c9ec3e7', '69b979626e470e7c633d794d', '693a33831eb710720c250034', '696da1be2ddbc19f3c9b0899', '692eaa836974927fef249cfc'}"
# cleaning_shoes, dishwashing, potting_plants, folding_clothes, cup_on_saucer
mode: train

valid_datasets:
mecka_bimanual:
_target_: ${data.train_datasets.mecka_bimanual._target_}
resolver: ${data.train_datasets.mecka_bimanual.resolver}
filters:
_target_: egomimic.rldb.filters.DatasetFilter
filter_lambdas:
- "lambda row: row['lab'] == 'mecka' and row['episode_hash'] in {'69b9a059c69a19757c9ec3e7', '69b979626e470e7c633d794d', '693a33831eb710720c250034', '696da1be2ddbc19f3c9b0899', '692eaa836974927fef249cfc'}"
# cleaning_shoes, dishwashing, potting_plants, folding_clothes, cup_on_saucer
mode: valid

train_dataloader_params:
mecka_bimanual:
batch_size: 32
num_workers: 10
valid_dataloader_params:
mecka_bimanual:
batch_size: 32
num_workers: 10
8 changes: 8 additions & 0 deletions egomimic/hydra_configs/evaluator/eval_pi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ defaults:

_target_: egomimic.eval.eval_pi.PIEvalVideo

# When true, key the buffer by (embodiment, task) and emit one mp4 per task
# in on_validation_end. Opt-in for eval-only runs that span multiple tasks.
one_video_per_task: false

# Cap each (embodiment, task) video at this many frames. Only takes effect
# when one_video_per_task=true. Set null to disable.
max_frames_per_task: 1000

# Per-embodiment revert transform. Applied once during validation to project
# the model's wrist-frame action chunks back to cam (head) frame, then reused
# for both the cam-frame MSE and the viz video. Each value resolves to a
Expand Down
1 change: 0 additions & 1 deletion egomimic/hydra_configs/train_zarr_cartesian.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,4 @@ norm_stats:
num_workers: 6
save_cache_dir: ${hydra:runtime.output_dir}
precomputed_norm_path: null
# precomputed_norm_path: /storage/project/r-dxu345-0/rco3/EgoVerse/logs/norm_stats
reject_outliers: true
1 change: 1 addition & 0 deletions egomimic/rldb/zarr/zarr_dataset_multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1724,6 +1724,7 @@ def _next(reason: str, key: str = "") -> int:
data["embodiment"] = get_embodiment_id(self.embodiment)
ep_name = Path(self.episode_path).name
data["episode_hash"] = ep_name[:-5] if ep_name.endswith(".zarr") else ep_name
data["task"] = self.metadata.get("task_name", "unknown")
_ = origin # preserved for symmetry with prior API
return data

Expand Down
66 changes: 66 additions & 0 deletions egomimic/scripts/top_tasks_by_hours.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Print the top 10 tasks by total hours of episode data, optionally filtered.

Example:
python -m egomimic.scripts.top_tasks_by_hours --filter lab=mecka
python -m egomimic.scripts.top_tasks_by_hours --filter lab=mecka embodiment=human
"""

import argparse

from egomimic.utils.aws.aws_sql import create_default_engine, episode_table_to_df

FPS = 30


def parse_filters(filter_args):
filters = {}
for arg in filter_args:
if "=" not in arg:
raise ValueError(f"Filter must be in 'column=value' form, got: {arg}")
col, val = arg.split("=", 1)
filters[col.strip()] = val.strip()
return filters


def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--filter",
nargs="*",
default=[],
help="Equality filters like lab=mecka. Multiple filters are ANDed.",
)
parser.add_argument("--top", type=int, default=10, help="Number of tasks to show.")
args = parser.parse_args()

filters = parse_filters(args.filter)

engine = create_default_engine()
df = episode_table_to_df(engine)

df = df[~df["is_deleted"]]
df = df[df["num_frames"] > 0]

for col, val in filters.items():
if col not in df.columns:
raise ValueError(f"Unknown column '{col}'. Available: {list(df.columns)}")
df = df[df[col].astype(str) == val]

if df.empty:
print(f"No episodes match filters {filters}.")
return

hours = df.groupby("task")["num_frames"].sum() / FPS / 3600
top = hours.sort_values(ascending=False).head(args.top)

filter_desc = ", ".join(f"{k}={v}" for k, v in filters.items()) or "(no filter)"
print(f"\nTop {args.top} tasks by hours [{filter_desc}] @ {FPS} fps:\n")
print(f"{'task':<40} {'hours':>10} {'episodes':>10}")
print("-" * 64)
counts = df.groupby("task").size()
for task, hrs in top.items():
print(f"{task:<40} {hrs:>10.2f} {counts[task]:>10}")


if __name__ == "__main__":
main()
Loading