diff --git a/CLAUDE.md b/CLAUDE.md index 5f0544413..26b457260 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Environment -- **You are on a shared SLURM cluster.** Do NOT run anything GPU- or CPU-intensive yourself (no training, no eval, no large data conversions, no full dataset loads, no heavy `pytest` runs that spin up models or pull data). Defer to the user to actually execute those commands — your job is to prepare the command and explain it. Lightweight read-only work (lint, type checks, small unit tests, file edits, single-file syntax checks) is fine on the login node. +- **You are on a shared SLURM cluster.** Do not run anything GPU- or CPU-intensive yourself unless told to (no training, no eval, no large data conversions, no full dataset loads, no heavy `pytest` runs that spin up models or pull data). Defer to the user to actually execute those commands — your job is to prepare the command and explain it. Lightweight read-only work (lint, type checks, small unit tests, file edits, single-file syntax checks) is fine on the login node. - Python 3.11. Activate the project venv before any Python tooling: `source emimic/bin/activate`. - Package is installed editable as `egomimic` (see `pyproject.toml`). Linting is `ruff` via pre-commit. - AWS/Cloudflare R2 credentials are required for SQL episode registry + data download. Bootstrap with `aws configure` then `./egomimic/utils/aws/setup_secret.sh` (writes `~/.egoverse_env`). `load_env()` from `egomimic.utils.aws.aws_data_utils` is called automatically at the top of `trainHydra.py`. diff --git a/egomimic/eval/eval_train_viz.py b/egomimic/eval/eval_train_viz.py new file mode 100644 index 000000000..e63cee828 --- /dev/null +++ b/egomimic/eval/eval_train_viz.py @@ -0,0 +1,59 @@ +"""Train-set visualization evaluator. + +Wraps a concrete EvalVideo (HPTEvalVideo, PIEvalVideo, ...) so the same +forward/metric/viz logic can run a second time against a separate +``train_viz`` dataloader. Videos go to ``/videos_train_viz/`` and +metric keys are prefixed with ``train_viz/`` so they don't collide with the +canonical ``Valid/...`` keys. + +Instantiated via Hydra from a config like +``hydra_configs/evaluator/train_viz_pi.yaml``. +""" + +from __future__ import annotations + +import os + +from egomimic.eval.eval_video import EvalVideo + + +class TrainVizEvalVideo(EvalVideo): + def __init__(self, base: EvalVideo, limit_val_batches: int = 50): + self.base = base + # Forward eval-affecting knobs from the wrapped evaluator so the + # wrapper's own buffering/flushing logic (inherited from EvalVideo) + # matches the base's intent. compute_metrics_and_viz is still + # delegated to base. + super().__init__( + limit_val_batches=limit_val_batches, + viz_func=base.viz_func, + transform_lists=base.transform_lists, + one_video_per_task=base.one_video_per_task, + max_frames_per_task=base.max_frames_per_task, + ) + + @property + def trainer(self): + return self._trainer + + @trainer.setter + def trainer(self, value): + self._trainer = value + self.base.trainer = value + + @property + def model(self): + return self._model + + @model.setter + def model(self, value): + self._model = value + self.base.model = value + + def video_dir(self): + return os.path.join(self.root_dir(), "videos_train_viz") + + def compute_metrics_and_viz(self, batch): + metrics, images_dict = self.base.compute_metrics_and_viz(batch) + metrics = {f"train_viz/{k}": v for k, v in metrics.items()} + return metrics, images_dict diff --git a/egomimic/hydra_configs/data/mecka_pi_10_hrs.yaml b/egomimic/hydra_configs/data/mecka_pi_10_hrs.yaml new file mode 100644 index 000000000..78e66d320 --- /dev/null +++ b/egomimic/hydra_configs/data/mecka_pi_10_hrs.yaml @@ -0,0 +1,48 @@ +_target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper + +train_datasets: + mecka_bimanual: + _target_: egomimic.rldb.zarr.zarr_dataset_multi.MultiDataset._from_resolver + resolver: + _target_: egomimic.rldb.zarr.zarr_dataset_multi.S3EpisodeResolver + folder_path: ${paths.dataset_dir} + key_map: + _target_: egomimic.rldb.embodiment.human.Mecka.get_keymap + mode: cartesian_pi + annotation_key: annotations + transform_list: + _target_: egomimic.rldb.embodiment.human.Mecka.get_transform_list + mode: cartesian + filters: + _target_: egomimic.rldb.filters.DatasetFilter + filter_lambdas: + - "lambda row: row['lab'] == 'mecka' and row['task'] in {'packaging_coffee', 'wrapping_gifts', 'cleaning_tools', 'folding_napkins', 'repairing_electronics', 'polishing_jewelry', 'disassembling_phone', 'assembling_flowers', 'making_dumplings', 'peeling_vegetables'}" + mode: train + +valid_datasets: + mecka_bimanual: + _target_: ${data.train_datasets.mecka_bimanual._target_} + resolver: ${data.train_datasets.mecka_bimanual.resolver} + filters: ${data.train_datasets.mecka_bimanual.filters} + mode: valid + +train_dataloader_params: + mecka_bimanual: + batch_size: 64 + num_workers: 10 +valid_dataloader_params: + mecka_bimanual: + batch_size: 64 + num_workers: 10 + +# `+evaluator@train_viz_evaluator=train_viz_pi`. +train_viz_datasets: + mecka_bimanual: + _target_: ${data.train_datasets.mecka_bimanual._target_} + resolver: ${data.train_datasets.mecka_bimanual.resolver} + filters: ${data.train_datasets.mecka_bimanual.filters} + mode: train +train_viz_dataloader_params: + mecka_bimanual: + batch_size: 64 + num_workers: 10 diff --git a/egomimic/hydra_configs/data/mecka_pi_50_hrs.yaml b/egomimic/hydra_configs/data/mecka_pi_50_hrs.yaml new file mode 100644 index 000000000..307894211 --- /dev/null +++ b/egomimic/hydra_configs/data/mecka_pi_50_hrs.yaml @@ -0,0 +1,48 @@ +_target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper + +train_datasets: + mecka_bimanual: + _target_: egomimic.rldb.zarr.zarr_dataset_multi.MultiDataset._from_resolver + resolver: + _target_: egomimic.rldb.zarr.zarr_dataset_multi.S3EpisodeResolver + folder_path: ${paths.dataset_dir} + key_map: + _target_: egomimic.rldb.embodiment.human.Mecka.get_keymap + mode: cartesian_pi + annotation_key: annotations + transform_list: + _target_: egomimic.rldb.embodiment.human.Mecka.get_transform_list + mode: cartesian + filters: + _target_: egomimic.rldb.filters.DatasetFilter + filter_lambdas: + - "lambda row: row['lab'] == 'mecka' and row['task'] in {'bottling_perfume', 'assembling_flowers', 'packaging_perfumes', 'making_dumplings', 'cleaning_tools', 'planting_seedlings', 'assembling_components', 'disassembling_phone', 'portioning_food', 'packaging_cutlery', 'stringing_beads', 'packaging_nuts', 'wrapping_gifts', 'arranging_flowers', 'folding_paper', 'packaging_gifts', 'peeling_garlic', 'disassembling_laptops', 'peeling_vegetables', 'cleaning_windows', 'packaging_masks', 'rinsing_dishes', 'making_paper_bags', 'packaging_coffee', 'crafting_decorations'}" + mode: train + +valid_datasets: + mecka_bimanual: + _target_: ${data.train_datasets.mecka_bimanual._target_} + resolver: ${data.train_datasets.mecka_bimanual.resolver} + filters: ${data.train_datasets.mecka_bimanual.filters} + mode: valid + +train_dataloader_params: + mecka_bimanual: + batch_size: 64 + num_workers: 10 +valid_dataloader_params: + mecka_bimanual: + batch_size: 64 + num_workers: 10 + +# `+evaluator@train_viz_evaluator=train_viz_pi`. +train_viz_datasets: + mecka_bimanual: + _target_: ${data.train_datasets.mecka_bimanual._target_} + resolver: ${data.train_datasets.mecka_bimanual.resolver} + filters: ${data.train_datasets.mecka_bimanual.filters} + mode: train +train_viz_dataloader_params: + mecka_bimanual: + batch_size: 64 + num_workers: 10 diff --git a/egomimic/hydra_configs/data/mecka_pi_eval.yaml b/egomimic/hydra_configs/data/mecka_pi_eval.yaml index 2177804bb..9ca0dbc44 100644 --- a/egomimic/hydra_configs/data/mecka_pi_eval.yaml +++ b/egomimic/hydra_configs/data/mecka_pi_eval.yaml @@ -22,7 +22,7 @@ train_datasets: filter_lambdas: - "lambda row: row['lab'] == 'mecka' and row['episode_hash'] in {'69b9a059c69a19757c9ec3e7', '69b979626e470e7c633d794d', '693a33831eb710720c250034', '696da1be2ddbc19f3c9b0899', '692eaa836974927fef249cfc'}" # cleaning_shoes, dishwashing, potting_plants, folding_clothes, cup_on_saucer - mode: train + mode: total valid_datasets: mecka_bimanual: @@ -33,7 +33,7 @@ valid_datasets: filter_lambdas: - "lambda row: row['lab'] == 'mecka' and row['episode_hash'] in {'69b9a059c69a19757c9ec3e7', '69b979626e470e7c633d794d', '693a33831eb710720c250034', '696da1be2ddbc19f3c9b0899', '692eaa836974927fef249cfc'}" # cleaning_shoes, dishwashing, potting_plants, folding_clothes, cup_on_saucer - mode: valid + mode: total train_dataloader_params: mecka_bimanual: @@ -43,3 +43,20 @@ valid_dataloader_params: mecka_bimanual: batch_size: 32 num_workers: 10 + +# Optional train-set visualization loader. When set, ModelWrapper drives a +# second pass through these datasets each validation epoch and routes them to +# the TrainVizEvalVideo wrapper (videos under `videos_train_viz/`, metrics +# prefixed with `train_viz/`). Uncomment together with +# `+evaluator@train_viz_evaluator=train_viz_pi`. +# +# train_viz_datasets: +# mecka_bimanual: +# _target_: ${data.train_datasets.mecka_bimanual._target_} +# resolver: ${data.train_datasets.mecka_bimanual.resolver} +# filters: ${data.train_datasets.mecka_bimanual.filters} +# mode: train +# train_viz_dataloader_params: +# mecka_bimanual: +# batch_size: 32 +# num_workers: 10 diff --git a/egomimic/hydra_configs/evaluator/train_viz_pi.yaml b/egomimic/hydra_configs/evaluator/train_viz_pi.yaml new file mode 100644 index 000000000..d69b0bf4d --- /dev/null +++ b/egomimic/hydra_configs/evaluator/train_viz_pi.yaml @@ -0,0 +1,19 @@ +# Wraps the canonical Pi evaluator (eval_pi) so it can run a second time +# against the `train_viz` dataloader. The wrapped `base` evaluator owns +# compute_metrics_and_viz; this wrapper writes videos to `videos_train_viz/` +# and prefixes metric keys with `train_viz/`. +# +# Enable via `+evaluator@train_viz_evaluator=train_viz_pi` together with a +# data config that defines `train_viz_datasets` and +# `train_viz_dataloader_params`. + +defaults: + - /evaluator@base: eval_pi + - _self_ + +_target_: egomimic.eval.eval_train_viz.TrainVizEvalVideo + +# Cap on validation batches consumed from the train_viz loader per epoch. +# Train data is iterated as a "spot check"; keep this small relative to the +# canonical valid loader's limit_val_batches. +limit_val_batches: 50 diff --git a/egomimic/hydra_configs/hydra/launcher/submitit_cpu_pace.yaml b/egomimic/hydra_configs/hydra/launcher/submitit_cpu_pace.yaml new file mode 100644 index 000000000..178b91c90 --- /dev/null +++ b/egomimic/hydra_configs/hydra/launcher/submitit_cpu_pace.yaml @@ -0,0 +1,17 @@ +defaults: + - submitit_slurm + +_target_: hydra_plugins.hydra_submitit_launcher.submitit_launcher.SlurmLauncher + +# CPU-only PACE launcher for norm-stats / preprocessing jobs (no GPU). +name: ${hydra.job.name} +partition: "cpu-small" # PACE CPU partition — confirm before first submit +account: "gts-dxu345-rl2" +cpus_per_task: 24 +nodes: 1 +tasks_per_node: 1 +qos: "inferno" +mem_per_cpu: 8G +timeout_min: 720 # 12h +additional_parameters: + requeue: true diff --git a/egomimic/hydra_configs/train_zarr_cartesian.yaml b/egomimic/hydra_configs/train_zarr_cartesian.yaml index 1d5d987ba..1e14c6647 100644 --- a/egomimic/hydra_configs/train_zarr_cartesian.yaml +++ b/egomimic/hydra_configs/train_zarr_cartesian.yaml @@ -15,6 +15,11 @@ description: test ckpt_path: null mode: train +# Optional second evaluator that runs against the train_viz dataloader. +# Set via override, e.g. `+train_viz_evaluator=train_viz_pi`, together with a +# data config that defines `train_viz_datasets`. +train_viz_evaluator: null + hydra: run: # Dir should be experiment_name/description_{timestamp} diff --git a/egomimic/hydra_configs/trainer/ddp_pi.yaml b/egomimic/hydra_configs/trainer/ddp_pi.yaml index 731316953..6e786fc88 100644 --- a/egomimic/hydra_configs/trainer/ddp_pi.yaml +++ b/egomimic/hydra_configs/trainer/ddp_pi.yaml @@ -7,5 +7,5 @@ accelerator: gpu devices: ${eval:'${launch_params.gpus_per_node} * ${launch_params.nodes}'} num_nodes: ${launch_params.nodes} sync_batchnorm: True -check_val_every_n_epoch: 200 -num_sanity_val_steps: 0 \ No newline at end of file +check_val_every_n_epoch: 100 +num_sanity_val_steps: 0 diff --git a/egomimic/pl_utils/pl_data_utils.py b/egomimic/pl_utils/pl_data_utils.py index eac1f4fbf..28fec7e8b 100644 --- a/egomimic/pl_utils/pl_data_utils.py +++ b/egomimic/pl_utils/pl_data_utils.py @@ -60,6 +60,8 @@ def __init__( valid_datasets: dict, train_dataloader_params: dict, valid_dataloader_params: dict, + train_viz_datasets: dict | None = None, + train_viz_dataloader_params: dict | None = None, ): """ Args: @@ -67,6 +69,14 @@ def __init__( valid_datasets: dictionary of valid datasets train_dataloader_params: dictionary of train dataloader parameters valid_dataloader_params: dictionary of valid dataloader parameters + train_viz_datasets: optional dict of datasets iterated like a + second val loader. Used by TrainVizEvalVideo to visualize the + policy on training data alongside the canonical validation + pass. When set, ``val_dataloader()`` returns a list with the + train-viz CombinedLoader at index 1 so Lightning populates + ``dataloader_idx=1`` on validation_step. + train_viz_dataloader_params: dict of per-dataset DataLoader kwargs + for the train-viz loader (parallels valid_dataloader_params). Tokenization (sampling a prompt from per-sample annotation lists, splicing in embodiment / control-mode / proprio blocks, and running @@ -81,8 +91,12 @@ def __init__( # dataset defined in a base (e.g. `aria_bimanual: null`). self.train_datasets = {k: v for k, v in train_datasets.items() if v is not None} self.valid_datasets = {k: v for k, v in valid_datasets.items() if v is not None} + self.train_viz_datasets = { + k: v for k, v in (train_viz_datasets or {}).items() if v is not None + } self.train_dataloader_params = train_dataloader_params self.valid_dataloader_params = valid_dataloader_params + self.train_viz_dataloader_params = train_viz_dataloader_params or {} self.collate_fn = annotation_collate def train_dataloader(self): @@ -102,13 +116,13 @@ def train_dataloader(self): return CombinedLoader(iterables, "max_size_cycle") - def val_dataloader(self): + def _build_val_style_loader(self, datasets: dict, params: dict, kind: str): iterables = dict() - for dataset_name, dataset in self.valid_datasets.items(): - dataset_params = self.valid_dataloader_params.get(dataset_name) + for dataset_name, dataset in datasets.items(): + dataset_params = params.get(dataset_name) if dataset_params is None or len(dataset_params) == 0: raise ValueError( - f"No dataloader params found for dataset {dataset_name}. Please add {dataset_name} into your data config valid_dataloader_params." + f"No dataloader params found for dataset {dataset_name}. Please add {dataset_name} into your data config {kind}_dataloader_params." ) dataset_params = dict(dataset_params) shuffle = dataset_params.pop("shuffle", False) @@ -118,9 +132,24 @@ def val_dataloader(self): collate_fn=self.collate_fn, **dataset_params, ) - return CombinedLoader(iterables, "max_size_cycle") + def val_dataloader(self): + valid_loader = self._build_val_style_loader( + self.valid_datasets, self.valid_dataloader_params, kind="valid" + ) + if not self.train_viz_datasets: + return valid_loader + # When train_viz_datasets is configured, return a list so Lightning + # populates dataloader_idx (0=valid, 1=train_viz) and ModelWrapper can + # dispatch to self.train_viz_evaluator. + train_viz_loader = self._build_val_style_loader( + self.train_viz_datasets, + self.train_viz_dataloader_params, + kind="train_viz", + ) + return [valid_loader, train_viz_loader] + class DualDataModuleWrapper(LightningDataModule): """ diff --git a/egomimic/pl_utils/pl_model.py b/egomimic/pl_utils/pl_model.py index 353af6d12..48247c776 100644 --- a/egomimic/pl_utils/pl_model.py +++ b/egomimic/pl_utils/pl_model.py @@ -35,6 +35,7 @@ def __init__( scheduler_interval="step", scheduler_frequency: int = 1, evaluator=None, + train_viz_evaluator=None, enable_grad_norm: bool = True, ): """ @@ -65,6 +66,10 @@ def __init__( self.epoch_memory_stats = [] # Store memory stats per epoch self.evaluator = evaluator + # Optional second evaluator that runs against a train-sampled + # dataloader during the same validation pass (dataloader_idx=1). + # See egomimic/eval/eval_train_viz.py. + self.train_viz_evaluator = train_viz_evaluator @staticmethod def _as_config(cfg): @@ -196,29 +201,46 @@ def on_before_optimizer_step(self, optimizer): ) def on_validation_start(self): - if self.evaluator is None: + if self.evaluator is None and self.train_viz_evaluator is None: return self.model.device = self.device - self.evaluator.on_validation_start() + if self.evaluator is not None: + self.evaluator.on_validation_start() + if self.train_viz_evaluator is not None: + self.train_viz_evaluator.on_validation_start() def validation_step(self, batch, batch_idx, dataloader_idx=0): """ Run a validation step on the batch, and save that batch of images into the val_image_buffer. Once the buffer hits 1000 images, save that as a 30fps video using torchvision.io.write_video. + + ``dataloader_idx`` is populated by Lightning when ``val_dataloader()`` + returns a list. Index 0 routes to the canonical evaluator; index 1 + routes to the train-viz evaluator (if configured). """ - if self.evaluator is None: + active = self.train_viz_evaluator if dataloader_idx == 1 else self.evaluator + if active is None: return + # When val_dataloader returns a list of CombinedLoaders (valid + train_viz), + # Lightning wraps it in an outer sequential CombinedLoader. The outer iterator + # calls next() on each inner CombinedLoader, which itself yields a + # (batch_dict, batch_idx, dataloader_idx) triple — that triple lands here as + # `batch`. Unwrap to recover the dict. + if isinstance(batch, tuple) and len(batch) == 3 and isinstance(batch[0], dict): + batch = batch[0] batch = self.model.process_batch_for_training(batch) print( - f"[VAL_STEP] rank={self.global_rank}, batch_idx={batch_idx}", + f"[VAL_STEP] rank={self.global_rank}, batch_idx={batch_idx}, dataloader_idx={dataloader_idx}", flush=True, ) - self.evaluator.on_validation_step(batch, batch_idx, dataloader_idx) + active.on_validation_step(batch, batch_idx, dataloader_idx) def on_validation_end(self): print(f"[ON_VALIDATION_END] rank={self.global_rank}", flush=True) if self.evaluator is not None: self.evaluator.on_validation_end() + if self.train_viz_evaluator is not None: + self.train_viz_evaluator.on_validation_end() print( f"Rank {self.global_rank} on validation end, waiting for all ranks to synchronize", diff --git a/egomimic/rldb/zarr/zarr_dataset_multi.py b/egomimic/rldb/zarr/zarr_dataset_multi.py index f2ac2533a..47eb2bce2 100644 --- a/egomimic/rldb/zarr/zarr_dataset_multi.py +++ b/egomimic/rldb/zarr/zarr_dataset_multi.py @@ -199,7 +199,12 @@ def __init__( self.key_map = key_map self.transform_list = transform_list - def _load_zarr_datasets(self, search_path: Path, valid_folder_names: set[str]): + def _load_zarr_datasets( + self, + search_path: Path, + valid_folder_names: set[str], + hash_to_task: dict[str, str] | None = None, + ): """ Loads multiple Zarr datasets from the specified folder path, filtering only those whose hashes are present in the valid_folder_names set. @@ -207,6 +212,12 @@ def _load_zarr_datasets(self, search_path: Path, valid_folder_names: set[str]): Args: search_path (Path): The root directory to search for Zarr datasets. valid_folder_names (set[str]): A set of valid folder names (episode hashes without ".zarr") to filter datasets. + hash_to_task (dict[str, str] | None): Optional override mapping + ``episode_hash`` → ``task`` (the SQL column). When provided, + each ``ZarrDataset`` receives the authoritative task name from + the episode registry, which can disagree with the in-zarr + ``task_name`` attribute (zarr writer defaults to ``"debug"`` + when the producer doesn't set it). Returns: dict[str, ZarrDataset]: a dictionary mapping string keys to constructed zarr datasets from valid filters. """ @@ -230,6 +241,7 @@ def _load_zarr_datasets(self, search_path: Path, valid_folder_names: set[str]): p, key_map=self.key_map, transform_list=self.transform_list, + task=(hash_to_task or {}).get(name), ) datasets[name] = ds_obj except Exception as e: @@ -286,7 +298,7 @@ def resolve( logger.info(f"Filters: {filters}") - filtered_paths = self.sync_from_filters( + filtered_paths, hash_to_task = self.sync_from_filters( bucket_name=self.bucket_name, filters=filters, local_dir=self.folder_path, @@ -303,6 +315,7 @@ def resolve( datasets = self._load_zarr_datasets( search_path=self.folder_path, valid_folder_names=valid_hashes, + hash_to_task=hash_to_task, ) return datasets @@ -335,7 +348,10 @@ def _get_filtered_paths( lambda row: filters.matches(_normalize_filter_row(row.to_dict())), axis=1, ) - output = df.loc[mask, ["zarr_processed_path", "episode_hash"]] + cols = ["zarr_processed_path", "episode_hash"] + if "task" in df.columns: + cols.append("task") + output = df.loc[mask, cols] n_matched_sql = len(output) output = output[ @@ -354,9 +370,21 @@ def _get_filtered_paths( logger.info("Debug mode: limiting to %d datasets.", k) output = output.iloc[:k] - paths = list(output.itertuples(index=False, name=None)) + hash_to_task: dict[str, str] = {} + if "task" in output.columns: + hash_to_task = { + str(h): str(t) + for h, t in zip(output["episode_hash"], output["task"]) + if t is not None and str(t) != "nan" + } + + paths = list( + output[["zarr_processed_path", "episode_hash"]].itertuples( + index=False, name=None + ) + ) logger.info(f"Paths: {paths}") - return paths + return paths, hash_to_task @classmethod def _sync_s3_to_local( @@ -464,15 +492,15 @@ def sync_from_filters( numworkers: Number of parallel workers for s5cmd. Returns: - List[(processed_path, episode_hash)] + tuple[list[(processed_path, episode_hash)], dict[hash, task]] """ filters = _ensure_dataset_filter(filters) # 1) Resolve episodes from DB - filtered_paths = cls._get_filtered_paths(filters, debug=debug) + filtered_paths, hash_to_task = cls._get_filtered_paths(filters, debug=debug) if not filtered_paths: logger.warning("No episodes matched filters.") - return [] + return [], {} # 2) Logging logger.info( @@ -487,7 +515,7 @@ def sync_from_filters( numworkers=numworkers, ) - return filtered_paths + return filtered_paths, hash_to_task # --------------------------------------------------------------------------- @@ -1520,18 +1548,23 @@ def __init__( Episode_path: Path, key_map: dict, transform_list: list | None = None, + task: str | None = None, ): """ Args: episode_path: just a path to the designated zarr episode key_map: dict mapping from dataset keys to zarr keys and horizon info, e.g. {"obs/image/front": {"zarr_key": "observations.images.front", "horizon": 4}, ...} transform_list: list of Transform objects to apply to the data after loading, e.g. for action chunk transformations. Should be in order of application. + task: optional authoritative task name (typically the SQL ``task`` + column). When set, it overrides the in-zarr ``task_name`` attr + — older episodes default that attr to ``"debug"``. """ self.episode_path = Episode_path self.metadata = None self._image_keys = None # Lazy-loaded set of JPEG-encoded keys self._json_keys = None # Lazy-loaded set of JSON-encoded keys self._annotations = None + self.task = task self.init_episode() self.key_map = key_map @@ -1724,7 +1757,7 @@ def _next(reason: str, key: str = "") -> int: data["embodiment"] = get_embodiment_id(self.embodiment) ep_name = Path(self.episode_path).name data["episode_hash"] = ep_name[:-5] if ep_name.endswith(".zarr") else ep_name - data["task"] = self.metadata.get("task_name", "unknown") + data["task"] = self.task or self.metadata.get("task_name", "unknown") _ = origin # preserved for symmetry with prior API return data diff --git a/egomimic/scripts/compute_norm_stats.py b/egomimic/scripts/compute_norm_stats.py new file mode 100644 index 000000000..d0ce0f8b5 --- /dev/null +++ b/egomimic/scripts/compute_norm_stats.py @@ -0,0 +1,97 @@ +"""Compute and cache norm stats for a data config — no model, no trainer. + +Accepts the same Hydra composition as ``trainHydra.py`` (``--config-name``, +``data=``, ``model=``, etc.) so existing run recipes can be reused. The +``model=`` override is harmless — the model is never instantiated. + +Writes ``norm_stats/norm_stats.json`` under ``cfg.norm_stats.save_cache_dir`` +(defaults to the Hydra run dir). Point a follow-up training run at that +directory via ``norm_stats.precomputed_norm_path=`` to skip recompute. +""" + +import copy +import os +from typing import Optional + +import hydra +import lightning as L +from omegaconf import DictConfig, OmegaConf + +from egomimic.rldb.zarr.utils import set_global_seed +from egomimic.rldb.zarr.zarr_dataset_multi import MultiDataset +from egomimic.utils.aws.aws_data_utils import load_env +from egomimic.utils.pylogger import RankedLogger +from egomimic.utils.utils import extras + +OmegaConf.register_new_resolver("eval", eval, replace=True) +log = RankedLogger(__name__, rank_zero_only=True) + + +def compute_norm_stats(cfg: DictConfig) -> None: + if cfg.get("seed"): + L.seed_everything(cfg.seed, workers=True) + set_global_seed(cfg.seed) + else: + raise ValueError("Seed must be provided in cfg for reproducibility!") + + load_env() + + train_datasets = {} + for dataset_name in cfg.data.train_datasets: + train_datasets[dataset_name] = hydra.utils.instantiate( + cfg.data.train_datasets[dataset_name] + ) + + norm_stats = MultiDataset( + state={}, + norm_mode=OmegaConf.select(cfg, "norm_stats.norm_mode", default="quantile"), + ) + norm_stats.populate_from_datasets(train_datasets) + + save_cache_dir = OmegaConf.select(cfg, "norm_stats.save_cache_dir", default=None) + if not save_cache_dir: + raise ValueError( + "norm_stats.save_cache_dir must be set so the computed stats are persisted." + ) + + for dataset_name, dataset in train_datasets.items(): + log.info(f"Inferring shapes for dataset <{dataset_name}>") + norm_stats.infer_shapes_from_batch(dataset[0]) + + instantiate_copy = copy.deepcopy(cfg.data.train_datasets[dataset_name]) + keymap_cfg = instantiate_copy.resolver.key_map + km = OmegaConf.to_container(keymap_cfg, resolve=False) + km["norm_mode"] = True + instantiate_copy.resolver.key_map = km + norm_dataset = hydra.utils.instantiate(instantiate_copy) + + log.info(f"Computing norm stats for dataset <{dataset_name}>") + norm_stats.infer_norm_from_dataset( + norm_dataset, + dataset_name, + sample_frac=OmegaConf.select(cfg, "norm_stats.sample_frac", default=1.0), + num_workers=OmegaConf.select(cfg, "norm_stats.num_workers", default=4), + precomputed_norm_path=None, + ) + norm_stats.cache_stats(save_cache_dir=save_cache_dir) + + out_path = os.path.join(save_cache_dir, "norm_stats", "norm_stats.json") + log.info(f"Wrote norm stats to: {out_path}") + log.info( + f"To reuse, pass: norm_stats.precomputed_norm_path={os.path.join(save_cache_dir, 'norm_stats')}" + ) + + +@hydra.main( + version_base="1.3", + config_path="../hydra_configs", + config_name="train_zarr_cartesian.yaml", +) +def main(cfg: DictConfig) -> Optional[float]: + extras(cfg) + print(OmegaConf.to_yaml(cfg)) + compute_norm_stats(cfg) + + +if __name__ == "__main__": + main() diff --git a/egomimic/scripts/top_tasks_by_hours.py b/egomimic/scripts/top_tasks_by_hours.py index 28120e20e..3c956dfd5 100644 --- a/egomimic/scripts/top_tasks_by_hours.py +++ b/egomimic/scripts/top_tasks_by_hours.py @@ -30,7 +30,7 @@ def main(): default=[], help="Equality filters like lab=mecka. Multiple filters are ANDed.", ) - parser.add_argument("--top", type=int, default=10, help="Number of tasks to show.") + parser.add_argument("--top", type=int, default=100, help="Number of tasks to show.") args = parser.parse_args() filters = parse_filters(args.filter) diff --git a/egomimic/trainHydra.py b/egomimic/trainHydra.py index 57f864f5c..c93419cd0 100644 --- a/egomimic/trainHydra.py +++ b/egomimic/trainHydra.py @@ -95,12 +95,26 @@ def train(cfg: DictConfig) -> Tuple[Dict[str, Any], Dict[str, Any]]: cfg.data.valid_datasets[dataset_name] ) + train_viz_datasets = {} + if cfg.data.get("train_viz_datasets") is not None: + for dataset_name in cfg.data.train_viz_datasets: + cfg_entry = cfg.data.train_viz_datasets[dataset_name] + if cfg_entry is None: + train_viz_datasets[dataset_name] = None + continue + train_viz_datasets[dataset_name] = hydra.utils.instantiate(cfg_entry) + log.info(f"Instantiating datamodule <{cfg.data._target_}>") assert ( "MultiDataModuleWrapper" in cfg.data._target_ ), "cfg.data._target_ must be 'MultiDataModuleWrapper'" + datamodule_kwargs = dict( + train_datasets=train_datasets, valid_datasets=valid_datasets + ) + if train_viz_datasets: + datamodule_kwargs["train_viz_datasets"] = train_viz_datasets datamodule: LightningDataModule = hydra.utils.instantiate( - cfg.data, train_datasets=train_datasets, valid_datasets=valid_datasets + cfg.data, **datamodule_kwargs ) # Stats-only MultiDataset (no graph of its own; explicitly populated from @@ -148,6 +162,8 @@ def train(cfg: DictConfig) -> Tuple[Dict[str, Any], Dict[str, Any]]: ds.set_norm_stats_from(norm_stats) for ds in datamodule.valid_datasets.values(): ds.set_norm_stats_from(norm_stats) + for ds in getattr(datamodule, "train_viz_datasets", {}).values(): + ds.set_norm_stats_from(norm_stats) log.info(f"Instantiating model <{cfg.model._target_}>") model: LightningModule = ModelWrapper( @@ -230,6 +246,11 @@ def train(cfg: DictConfig) -> Tuple[Dict[str, Any], Dict[str, Any]]: eval_obj.trainer = trainer eval_obj.model = model.model model.evaluator = eval_obj + if cfg.get("train_viz_evaluator") is not None: + train_viz_eval_obj: Eval = hydra.utils.instantiate(cfg.train_viz_evaluator) + train_viz_eval_obj.trainer = trainer + train_viz_eval_obj.model = model.model + model.train_viz_evaluator = train_viz_eval_obj log.info("Starting training!") trainer.fit( model=model, @@ -241,6 +262,11 @@ def train(cfg: DictConfig) -> Tuple[Dict[str, Any], Dict[str, Any]]: eval_obj.trainer = trainer eval_obj.model = model.model model.evaluator = eval_obj + if cfg.get("train_viz_evaluator") is not None: + train_viz_eval_obj: Eval = hydra.utils.instantiate(cfg.train_viz_evaluator) + train_viz_eval_obj.trainer = trainer + train_viz_eval_obj.model = model.model + model.train_viz_evaluator = train_viz_eval_obj if hasattr(eval_obj, "run"): eval_obj.run(trainer, model, datamodule, cfg)