From 34561b61ad3bca537ffc45403a277c5b2bbd47c6 Mon Sep 17 00:00:00 2001 From: Ana Gainaru Date: Mon, 2 Mar 2026 10:10:47 -0500 Subject: [PATCH 1/5] Include the AERIS dataset --- examples/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/examples/utils.py b/examples/utils.py index 1a1c413..fbd83e6 100644 --- a/examples/utils.py +++ b/examples/utils.py @@ -15,6 +15,10 @@ def get_example(cfg: Config) -> BaseModelHarness: from examples.imagenet.model import IMAGENET_VISION return IMAGENET_VISION(cfg=cfg) + elif cfg.data.name == "aeris_dataset.csv": + from examples.aeris.model import AERIS + + return AERIS(cfg=cfg) else: raise NotImplementedError( f"Example for dataset {cfg.data.name} is not implemented." From ec30d45ba1443b0816eae198684b64479fa7afad Mon Sep 17 00:00:00 2001 From: Ana Gainaru Date: Mon, 2 Mar 2026 10:16:15 -0500 Subject: [PATCH 2/5] AERIS model harness --- examples/aeris/__init__.py | 0 examples/aeris/aeris.toml | 52 ++++++ examples/aeris/model.py | 228 +++++++++++++++++++++++ examples/aeris/utils.py | 357 +++++++++++++++++++++++++++++++++++++ 4 files changed, 637 insertions(+) create mode 100644 examples/aeris/__init__.py create mode 100644 examples/aeris/aeris.toml create mode 100644 examples/aeris/model.py create mode 100644 examples/aeris/utils.py diff --git a/examples/aeris/__init__.py b/examples/aeris/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/aeris/aeris.toml b/examples/aeris/aeris.toml new file mode 100644 index 0000000..7d16c7c --- /dev/null +++ b/examples/aeris/aeris.toml @@ -0,0 +1,52 @@ +# aeris.toml — AERIS continuous-learning +seed = 42 +device = "auto" +multi_gpu = false +verbosity = "INFO" + +[model] +name = "aeris_model.pt" +pretrained_path = "examples/aeris/model" + +[data] +name = "aeris_dataset.csv" +path = "examples/aeris/data" + +[train] +batch_size = 500 +num_workers = 4 +init_lr = 1e-6 +max_iter = 4000 +grad_accumulation_steps = 1 + +[continual_learning] +update_mode = "base" + +# JVP regularization (used when update_mode = "jvp_reg") +jvp_lambda = 10 +jvp_deltax_norm = 1 + +# EWC (used when update_mode = "ewc_online") +ewc_lambda = 1000.0 +ewc_ema_decay = 0.95 + +# KFAC (used when update_mode = "kfac_online") +kfac_lambda = 1e-2 +kfac_ema_decay = 0.95 + +[drift_detection] +detector_name = "ADWINDetector" +detection_interval = 10 +aggregation = "mean" +metric_index = 0 +reset_after_learning = false +max_stream_updates = 20 + +# ADWIN hyperparameters +adwin_delta = 0.002 +adwin_minor_threshold = 0.3 +adwin_moderate_threshold = 0.6 + +[logging] +backend = "wandb" +experiment_name = "aeris-cl" diff --git a/examples/aeris/model.py b/examples/aeris/model.py new file mode 100644 index 0000000..2dfdb80 --- /dev/null +++ b/examples/aeris/model.py @@ -0,0 +1,228 @@ +# examples/aeris/model.py +"""AERIS model harness for the BaseSim continuous-learning framework. + +This harness wraps a 8-layer neural network trained to predict enthalpy per atom from a given fuel material.""" + +import gc +import torch +from typing import Tuple, Optional, List, Any, Mapping, cast +from torch import nn, Tensor +from torch.optim import Optimizer +from torch.utils.data import DataLoader, ConcatDataset, TensorDataset + +from model.torch_model_harness import BaseModelHarness +from config.configuration import Config + +from examples.aeris.utils import ( + load_datasets, + make_loader, + load_pretrained_model, + split_into_windows, +) + + +# Aeris model architecture used for prediction +class AerisFullStructure(nn.Module): + def __init__(self, input_dim, dropout=0.3): + super().__init__() + first_layer = min(1024, max(512, input_dim * 2)) + self.layers = nn.Sequential( + nn.Linear(input_dim, first_layer), + nn.ReLU(), + nn.BatchNorm1d(first_layer), + nn.Linear(first_layer, first_layer), + nn.ReLU(), + nn.Dropout(dropout), + nn.Linear(first_layer, 512), + nn.ReLU(), + nn.BatchNorm1d(512), + nn.Linear(512, 512), + nn.ReLU(), + nn.Dropout(dropout), + nn.Linear(512, 256), + nn.ReLU(), + nn.BatchNorm1d(256), + nn.Linear(256, 256), + nn.ReLU(), + nn.Dropout(dropout), + nn.Linear(256, 128), + nn.ReLU(), + nn.BatchNorm1d(128), + nn.Linear(128, 64), + nn.ReLU(), + nn.Dropout(dropout), + nn.Linear(64, 32), + nn.ReLU(), + nn.Linear(32, 1), + ) + + def forward(self, x): + return self.layers(x) + + +# Fraction of each time window reserved for validation +_VAL_FRACTION: float = 0.2 + + +class AERIS(BaseModelHarness): + """ + Continuous-learning harness for the AERIS prediction model. + """ + + def __init__(self, cfg: Config): + # ----- build model --------------------------------------------------- + ckpt = load_pretrained_model( + cfg.model.pretrained_path, cfg.model.name, device=cfg.device + ) + + # Checkpoint is a dict saved via torch.save(model_info, ...) + input_dim_raw = ckpt.get("input_dim") + if input_dim_raw is None: + raise KeyError("Checkpoint missing required key: 'input_dim'") + input_dim = int(cast(int, input_dim_raw)) + + feature_names_raw = ckpt.get("feature_names") + if feature_names_raw is None: + raise KeyError("Checkpoint missing required key: 'feature_names'") + feature_names = cast(List[str], feature_names_raw) + + scaler_raw = ckpt.get("scaler") + if scaler_raw is None: + raise KeyError("Checkpoint missing required key: 'scaler'") + scaler = cast(Any, scaler_raw) + + state_raw = ckpt.get("model_state_dict") + if state_raw is None: + raise KeyError("Checkpoint missing required key: 'model_state_dict'") + state = cast(Mapping[str, Any], state_raw) + + model = AerisFullStructure(input_dim=input_dim) + model.load_state_dict(state) + model.to(cfg.device) + + super().__init__(cfg=cfg, model=model) + + # ----- eval metrics (prediction) ------------------------------------- + self.eval_metrics = {"mae": self.mae_metric(), "loss": self.get_criterion()} + self.higher_is_better = {"accuracy": False, "loss": False} + + # ----- data loaders ------------------------------------- + X, y = load_datasets(cfg.data.path, cfg.data.name, feature_names) + X_raw = torch.tensor(X, dtype=torch.float32) + X_scaled: Tensor = scaler.transform(X_raw) + y_raw = torch.tensor(y, dtype=torch.float32) + # y is a 1D array of shape (N,), but model outputs (N, 1): + if y_raw.ndim == 1: + y_raw = y_raw.unsqueeze(1) + + self.windows = split_into_windows(X_scaled, y_raw) + + # ----- streaming state ----------------------------------------------- + self.window_idx: int = 0 + self.history_windows: List[Tuple[Tensor, Tensor]] = [] + + self._cur_train_loader: Optional[DataLoader] = None + self._cur_val_loader: Optional[DataLoader] = None + + def get_optmizer(self) -> Optimizer: # noqa: D102 (spelling kept for ABC) + return torch.optim.Adam(self.model.parameters(), lr=self.cfg.train.init_lr) + + def get_criterion(self): # noqa: D102 + return nn.MSELoss() + + def mae_metric(self): + return nn.L1Loss() + + def get_cur_data_loaders(self) -> Tuple[DataLoader, DataLoader]: # noqa: D102 + assert self._cur_train_loader is not None and self._cur_val_loader is not None + return self._cur_train_loader, self._cur_val_loader + + def get_hist_data_loaders( + self, + ) -> Tuple[Optional[DataLoader], Optional[DataLoader]]: + """Return loaders over all previously-seen time windows. + + Returns ``(None, None)`` until at least two windows have been served. + """ + if self.window_idx <= 1: + return None, None + + # Concatenate all history windows + hist_train_views: List[TensorDataset] = [] + hist_val_views: List[TensorDataset] = [] + + for X_w, y_w in self.history_windows: + n = X_w.shape[0] + n_val = max(1, int(n * _VAL_FRACTION)) + n_train = n - n_val + hist_train_views.append(TensorDataset(X_w[:n_train], y_w[:n_train])) + hist_val_views.append(TensorDataset(X_w[n_train:], y_w[n_train:])) + + ds_hist_train: ConcatDataset[Any] = ConcatDataset(hist_train_views) + ds_hist_val: ConcatDataset[Any] = ConcatDataset(hist_val_views) + + bs = self.cfg.train.batch_size + nw = self.cfg.train.num_workers + pin = torch.cuda.is_available() + return ( + make_loader( + ds_hist_train, bs, shuffle=True, num_workers=nw, pin_memory=pin + ), + make_loader(ds_hist_val, bs, shuffle=False, num_workers=nw, pin_memory=pin), + ) + + def update_data_stream(self) -> None: + """Advance to the next chronological time window. + + The current window is added to the history, and new train/val loaders + are built from the upcoming window. + """ + self._dispose_current_loaders() + + if self.window_idx >= len(self.windows): + print( + f"Warning: All {len(self.windows)} time windows exhausted; " + "wrapping around to the first window." + ) + self.window_idx = 0 + + X_w, y_w = self.windows[self.window_idx] + + # Archive previous window in history (skip the very first call) + if self.window_idx > 0: + prev_X, prev_y = self.windows[self.window_idx - 1] + # Only add if not already stored (idempotency guard) + if len(self.history_windows) < self.window_idx: + self.history_windows.append((prev_X, prev_y)) + # Train / val split (last _VAL_FRACTION chronologically) + n = X_w.shape[0] + n_val = max(1, int(n * _VAL_FRACTION)) + n_train = n - n_val + + ds_train = TensorDataset(X_w[:n_train], y_w[:n_train]) + ds_val = TensorDataset(X_w[n_train:], y_w[n_train:]) + + bs = self.cfg.train.batch_size + nw = self.cfg.train.num_workers + pin = torch.cuda.is_available() + + self._cur_train_loader = make_loader( + ds_train, bs, shuffle=True, num_workers=nw, pin_memory=pin + ) + self._cur_val_loader = make_loader( + ds_val, bs, shuffle=False, num_workers=nw, pin_memory=pin + ) + + self.window_idx += 1 + + # --------------------------------------------------------------------- # + # Helpers + # --------------------------------------------------------------------- # + def _dispose_current_loaders(self) -> None: + if self._cur_train_loader is not None: + del self._cur_train_loader + self._cur_train_loader = None + if self._cur_val_loader is not None: + del self._cur_val_loader + self._cur_val_loader = None + gc.collect() diff --git a/examples/aeris/utils.py b/examples/aeris/utils.py new file mode 100644 index 0000000..ac2b253 --- /dev/null +++ b/examples/aeris/utils.py @@ -0,0 +1,357 @@ +# examples/aeris/utils.py +"""Utility functions for the AERIS continuous-learning example. + +Expected directory layout (pointed to by ``cfg.data.path``):: + + / + dataset.csv # data that will be parsed by the SIM framework + aeris_model.pt # AERIS pre-trained model +""" + +import os +import glob +import re +from typing import Dict, List, Tuple, Any + +import numpy as np +import pandas as pd +import torch +from torch import Tensor +from torch.utils.data import DataLoader, Dataset + +from pymatgen.core.composition import Composition +from matminer.featurizers.base import MultipleFeaturizer +from matminer.featurizers import composition as cf + + +def load_pretrained_model( + data_path: str, model_name: str, device: str = "cpu" +) -> dict[str, Any]: + """Load the pretrained AERIS model. + + Parameters + ---------- + data_path: + Directory containing the model. + model_name: + The name of the pretrained model. + device: + Device to map the scalers to. + + Returns + ------- + model_info = { + 'model_state_dict': model.state_dict(), + 'input_dim': input_dim, + 'feature_names': feature_names, + 'scaler': scaler, + 'metrics': {'mae': mae, 'rmse': rmse, 'r2': r2}, + 'history': history + } + """ + ckpt = None + if os.path.exists(data_path): + ckpt = torch.load( + os.path.join(data_path, model_name), map_location=device, weights_only=False + ) + if ckpt is None: + raise FileNotFoundError("No model found at path: " + data_path) + return ckpt + + +def _parse_formula(s: str) -> Dict[str, float]: + parts = re.findall(r"([A-Z][a-z]?)([0-9]*\.?[0-9]*)", str(s).strip()) + if not parts: + raise ValueError(f"Could not parse formula: {s}") + comp: Dict[str, float] = {} + for el, num in parts: + comp[el] = float(num) if num else 1.0 + return comp + + +def _apply_df_parse_formula_num(val): + try: + if pd.isna(val): + return None + parsed = _parse_formula(str(val)) + return int(sum(parsed.values())) + except Exception: + return None + + +def _apply_df_parse_formula_str(val): + try: + if pd.isna(val): + return None + parsed = _parse_formula(str(val)) + return "".join(f"{k}{v}" for k, v in sorted(parsed.items())) + except Exception: + return None + + +def _parse_structure_string(struct_str: str) -> Dict[str, float]: + # minimal lattice extractor (compatible with training utils) + result = { + "lattice_a": np.nan, + "lattice_b": np.nan, + "lattice_c": np.nan, + "lattice_alpha": np.nan, + "lattice_beta": np.nan, + "lattice_gamma": np.nan, + "volume": np.nan, + "density": np.nan, + "nsites": np.nan, + "spacegroup_number": np.nan, + } + if struct_str is None: + return result + s = str(struct_str) + abc_pattern = r"abc\s*:\s*([\d.]+)\s+([\d.]+)\s+([\d.]+)" + angles_pattern = r"angles\s*:\s*([\d.]+)\s+([\d.]+)\s+([\d.]+)" + abc = re.search(abc_pattern, s) + ang = re.search(angles_pattern, s) + if abc: + result["lattice_a"] = float(abc.group(1)) + result["lattice_b"] = float(abc.group(2)) + result["lattice_c"] = float(abc.group(3)) + if ang: + result["lattice_alpha"] = float(ang.group(1)) + result["lattice_beta"] = float(ang.group(2)) + result["lattice_gamma"] = float(ang.group(3)) + # try volume + vol_match = re.search(r"volume\s*[:=]\s*([\d.]+)", s) + if vol_match: + result["volume"] = float(vol_match.group(1)) + dens_match = re.search(r"density\s*[:=]\s*([\d.]+)", s) + if dens_match: + result["density"] = float(dens_match.group(1)) + sg_match = re.search(r"spacegroup(?:_number)?\s*[:=]\s*(\d+)", s) + if sg_match: + result["spacegroup_number"] = int(sg_match.group(1)) + nsites_match = re.search(r"nsites\s*[:=]\s*(\d+)", s) + if nsites_match: + result["nsites"] = int(nsites_match.group(1)) + return result + + +def _build_feature_vector( + composition: str, features: Dict, feature_names: List[str] +) -> np.ndarray: + comp = _parse_formula(composition) + total_atoms = float(sum(comp.values())) + # prepare composition fractions + elem_frac = {k: v / total_atoms for k, v in comp.items()} + + # parse structure if string/dict + struct_vals = {} + if features is not None: + for feature in features: + if feature not in feature_names: + continue + struct_vals[feature] = features[feature] + if "structure" in features: + parsed_struct = _parse_structure_string(features["structure"]) + struct_vals.update(parsed_struct) + + # magpie + feature_calculators = MultipleFeaturizer( + [ + cf.Stoichiometry(), + cf.ElementProperty.from_preset("magpie"), + cf.ValenceOrbital(props=["avg"]), + cf.IonProperty(fast=True), + ] + ) + + comp_obj = Composition(composition) + data = pd.DataFrame([{"comp_obj": comp_obj, "composition_reduced": composition}]) + + # Calculate Magpie features. + # IMPORTANT: when running under MPI, do NOT let matminer spawn multiprocessing pools + # inside each rank (oversubscription/hangs). Force single-process. + magpie_features_dict = {} + try: + # Some matminer versions support n_jobs; if yours does, keep it at 1. + magpie_features = feature_calculators.featurize_dataframe( + data, col_id="comp_obj", ignore_errors=True, pbar=False, n_jobs=1 + ) + magpie_features_dict = magpie_features.iloc[0].to_dict() + except Exception: + try: + feats = feature_calculators.featurize_many( + [Composition(composition)], n_jobs=1 + ) + magpie_features = pd.DataFrame(feats) + magpie_features.index = [0] + magpie_features_dict = magpie_features.iloc[0].to_dict() + except Exception as e: + print("Magpie featurizer failed, falling back to empty features:", repr(e)) + magpie_features_dict = {} + + vec = np.zeros(len(feature_names), dtype=np.float32) + for i, name in enumerate(feature_names): + # elemental features (assume single element name) + if re.match(r"^[A-Z][a-z]?$", name) and name in elem_frac: + vec[i] = float(elem_frac.get(name, 0.0)) + continue + + # structural features + if name in struct_vals: + vec[i] = float(struct_vals[name]) + continue + + # magpie features + if name in magpie_features_dict: + vec[i] = float(magpie_features_dict[name]) + continue + + # try numeric keys in struct_vals + val = struct_vals.get(name) + if val is None: + v = struct_vals.get(name, 0.0) + try: + vec[i] = float(v) + except Exception: + vec[i] = 0.0 + else: + try: + vec[i] = float(val) + except Exception: + vec[i] = 0.0 + + X = vec.reshape(1, -1) + # if there are nan values in the feature vector + return np.nan_to_num(X, nan=0.0, posinf=1e6, neginf=-1e6) + + +def load_datasets(data_path: str, dataset_name: str, feature_names: List[str]): + """Load the dataset that will be parsed, return features and ground truth. + + Parameters + ---------- + data_path: + Directory containing the datasets. + dataset_name: + The name or regular expression for the datasets + feature_names: + The features used by the model for prediction + + Returns + ------- + input festures, output target values + """ + dfs = [] + dataset_pattern = os.path.join(data_path, dataset_name) + dataset_files: List[str] = glob.glob(dataset_pattern) + if not dataset_files: + raise FileNotFoundError(f"No dataset files matched pattern: {dataset_pattern}") + for file_path in dataset_files: + dfs.append(pd.read_csv(file_path, low_memory=False)) + dataset: pd.DataFrame = pd.concat(dfs, ignore_index=True) + + # Filter all entries that do not have a target value + dataset = dataset.dropna(subset=["formation_energy_per_atom"]).copy() + + # Replace NaN/+inf/-inf in numeric columns (keep DataFrame type) + num_cols = dataset.select_dtypes(include=[np.number]).columns + dataset[num_cols] = dataset[num_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0) + + y = dataset["formation_energy_per_atom"].values.astype(np.float32) + X = [] + for _, row in dataset.iterrows(): + composition = row["composition_reduced"] + features = { + "composition": row["composition"], + "structure": row["structure"], + "spacegroup_number": row["spacegroup_number"], + "density_atomic": row["density_atomic"], + "CN_max": row["CN_max"], + "CN_min": row["CN_min"], + "CN_avg": row["CN_avg"], + } + X.append(_build_feature_vector(composition, features, feature_names)) + + assert len(X) == len(y), ( + "The feature and target vectors do not have the same lenght" + ) + return X, y + + +# Default number of samples per time window. Can be overridden by the caller. +DEFAULT_WINDOW_SIZE: int = 5000 + + +def split_into_windows( + X: Tensor, + y: Tensor, + window_size: int = DEFAULT_WINDOW_SIZE, +) -> List[Tuple[Tensor, Tensor]]: + """Split chronologically-ordered tensors into non-overlapping windows. + + Any leftover samples that don't fill a complete window are appended as + a final (smaller) window so no data is discarded. + + Parameters + ---------- + X: + Input features ``[N, D]``. + y: + Targets ``[N, T]``. + window_size: + Number of samples per window. + + Returns + ------- + List of ``(X_chunk, y_chunk)`` tuples. + """ + n = X.shape[0] + windows: List[Tuple[Tensor, Tensor]] = [] + for start in range(0, n, window_size): + end = min(start + window_size, n) + windows.append((X[start:end], y[start:end])) + return windows + + +def make_loader( + ds: Dataset, + batch_size: int, + shuffle: bool, + num_workers: int = 4, + pin_memory: bool = True, + persistent_workers: bool = True, + prefetch_factor: int = 2, +) -> DataLoader: + """Build a ``DataLoader`` from a ``Dataset``. + + Parameters + ---------- + ds: + The base dataset. + batch_size: + Batch size. + shuffle: + Whether to shuffle. + num_workers: + Number of data-loading workers. + pin_memory: + Pin CUDA memory for faster transfers. + persistent_workers: + Keep worker processes alive between iterations. + prefetch_factor: + Samples to prefetch per worker. + + Returns + ------- + DataLoader + """ + kwargs: dict = dict(batch_size=batch_size, shuffle=shuffle, drop_last=False) + if num_workers > 0: + kwargs.update( + dict( + num_workers=num_workers, + pin_memory=pin_memory, + persistent_workers=persistent_workers, + prefetch_factor=prefetch_factor, + ) + ) + return DataLoader(ds, **kwargs) # type: ignore[arg-type] From adfd00dc70784b909b4c31eb3ed6bd4c9949699d Mon Sep 17 00:00:00 2001 From: Ana Gainaru Date: Mon, 2 Mar 2026 16:52:59 -0500 Subject: [PATCH 3/5] wip --- examples/aeris/model.py | 18 ++++--- examples/aeris/utils.py | 115 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 118 insertions(+), 15 deletions(-) diff --git a/examples/aeris/model.py b/examples/aeris/model.py index 2dfdb80..4ef6619 100644 --- a/examples/aeris/model.py +++ b/examples/aeris/model.py @@ -5,6 +5,7 @@ import gc import torch +import numpy as np from typing import Tuple, Optional, List, Any, Mapping, cast from torch import nn, Tensor from torch.optim import Optimizer @@ -108,14 +109,19 @@ def __init__(self, cfg: Config): # ----- data loaders ------------------------------------- X, y = load_datasets(cfg.data.path, cfg.data.name, feature_names) - X_raw = torch.tensor(X, dtype=torch.float32) - X_scaled: Tensor = scaler.transform(X_raw) + # X shape: (n_samples, 1, 245) y shape: (n_samples,1) + + # apply scaler if present + if scaler is not None: + try: + X = scaler.transform(X) + except Exception: + pass + with torch.no_grad(): + X_scaled = torch.FloatTensor(X).to(cfg.device) y_raw = torch.tensor(y, dtype=torch.float32) - # y is a 1D array of shape (N,), but model outputs (N, 1): - if y_raw.ndim == 1: - y_raw = y_raw.unsqueeze(1) - self.windows = split_into_windows(X_scaled, y_raw) + self.windows = split_into_windows(X_scaled, y_raw, cfg.train.batch_size) # ----- streaming state ----------------------------------------------- self.window_idx: int = 0 diff --git a/examples/aeris/utils.py b/examples/aeris/utils.py index ac2b253..78cb86a 100644 --- a/examples/aeris/utils.py +++ b/examples/aeris/utils.py @@ -178,9 +178,8 @@ def _build_feature_vector( magpie_features_dict = magpie_features.iloc[0].to_dict() except Exception: try: - feats = feature_calculators.featurize_many( - [Composition(composition)], n_jobs=1 - ) + feature_calculators.set_n_jobs(1) + feats = feature_calculators.featurize_many([Composition(composition)]) magpie_features = pd.DataFrame(feats) magpie_features.index = [0] magpie_features_dict = magpie_features.iloc[0].to_dict() @@ -219,12 +218,106 @@ def _build_feature_vector( except Exception: vec[i] = 0.0 - X = vec.reshape(1, -1) - # if there are nan values in the feature vector - return np.nan_to_num(X, nan=0.0, posinf=1e6, neginf=-1e6) + # Return a 1D feature vector (D,) instead of (1, D) + vec = np.nan_to_num(vec, nan=0.0, posinf=1e6, neginf=-1e6) + return vec def load_datasets(data_path: str, dataset_name: str, feature_names: List[str]): + """Load the dataset used by the model. + + This function attempts to *prefer* loading the exact columns listed in + `feature_names` (in the same order). If those columns are present in the + CSV(s), they are used directly (fast, deterministic). If not all feature + columns are present, the function falls back to building feature vectors + row-by-row using _build_feature_vector to preserve compatibility with older + or alternate CSV formats. + + The function returns: + X: numpy.ndarray of shape (n_samples, n_features) dtype float32 + y: numpy.ndarray of shape (n_samples,) dtype float32 + + Note: scaling is intentionally NOT applied here. The caller (model harness) + will apply the saved scaler from the checkpoint (if any) via scaler.transform(). + """ + + # collect files + dataset_pattern = os.path.join(data_path, dataset_name) + dataset_files: List[str] = glob.glob(dataset_pattern) + if not dataset_files: + raise FileNotFoundError(f"No dataset files matched pattern: {dataset_pattern}") + + # read & concatenate CSV files + dfs = [] + for file_path in dataset_files: + dfs.append(pd.read_csv(file_path, low_memory=False)) + dataset: pd.DataFrame = pd.concat(dfs, ignore_index=True) + + # Ensure target present + if "formation_energy_per_atom" not in dataset.columns: + raise KeyError("Required target column 'formation_energy_per_atom' not found in dataset") + + # Drop rows missing target + dataset = dataset.dropna(subset=["formation_energy_per_atom"]).copy() + + # If all feature_names are present as columns, take that branch (preferred) + all_present = all((fn in dataset.columns) for fn in feature_names) + + if all_present: + # Select columns in the exact saved order + X = dataset[feature_names].to_numpy(dtype=np.float32) + + # Replace infs / NaNs in numeric columns with column means (same as training) + # (do not change dtype or drop rows here; keep alignment with model) + numeric_mask = np.isfinite(X) + # For each column replace non-finite with column mean (computed over finite rows) + col_means = np.nanmean(np.where(np.isfinite(X), X, np.nan), axis=0) + # Where a column is completely NaN/inf, set mean to 0.0 + col_means = np.where(np.isnan(col_means), 0.0, col_means) + inds = np.where(~np.isfinite(X)) + if inds[0].size > 0: + X[inds] = np.take(col_means, inds[1]) + + else: + # Fall back to building feature vectors row-by-row using the helper + # This creates the same ordering as feature_names when possible (elemental + # names are interpreted by _build_feature_vector). + X_rows = [] + # Build a minimal features dict per row (this mirrors training's inputs) + for _, row in dataset.iterrows(): + comp = row.get("composition_reduced", row.get("composition", None)) + features = { + "composition": row.get("composition", None), + "structure": row.get("structure", None), + "spacegroup_number": row.get("spacegroup_number", None), + "density_atomic": row.get("density_atomic", None), + "CN_max": row.get("CN_max", None), + "CN_min": row.get("CN_min", None), + "CN_avg": row.get("CN_avg", None), + } + try: + vec = _build_feature_vector(comp, features, feature_names) + except Exception: + # on failure, append zeros to avoid mismatched shapes + vec = np.zeros(len(feature_names), dtype=np.float32) + X_rows.append(vec) + X = np.vstack(X_rows).astype(np.float32) + + # Replace any remaining nan/inf with column means + col_means = np.nanmean(np.where(np.isfinite(X), X, np.nan), axis=0) + col_means = np.where(np.isnan(col_means), 0.0, col_means) + inds = np.where(~np.isfinite(X)) + if inds[0].size > 0: + X[inds] = np.take(col_means, inds[1]) + + # Prepare target vector shape (N,) + y = dataset["formation_energy_per_atom"].to_numpy(dtype=np.float32).reshape(-1, 1) + assert X.shape[0] == y.shape[0], "Feature matrix and target vector must have same number of rows" + + return X, y + + +def load_datasets2(data_path: str, dataset_name: str, feature_names: List[str]): """Load the dataset that will be parsed, return features and ground truth. Parameters @@ -256,7 +349,11 @@ def load_datasets(data_path: str, dataset_name: str, feature_names: List[str]): num_cols = dataset.select_dtypes(include=[np.number]).columns dataset[num_cols] = dataset[num_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0) - y = dataset["formation_energy_per_atom"].values.astype(np.float32) + y = ( + dataset["formation_energy_per_atom"] + .values.astype(np.float32) + .reshape(-1, 1) + ) X = [] for _, row in dataset.iterrows(): composition = row["composition_reduced"] @@ -271,6 +368,7 @@ def load_datasets(data_path: str, dataset_name: str, feature_names: List[str]): } X.append(_build_feature_vector(composition, features, feature_names)) + print("X shape:", np.array(X).shape, "y shape:", y.shape) assert len(X) == len(y), ( "The feature and target vectors do not have the same lenght" ) @@ -278,8 +376,7 @@ def load_datasets(data_path: str, dataset_name: str, feature_names: List[str]): # Default number of samples per time window. Can be overridden by the caller. -DEFAULT_WINDOW_SIZE: int = 5000 - +DEFAULT_WINDOW_SIZE: int = 100 def split_into_windows( X: Tensor, From ccdcb0f157e16b30609e21f8ed4fa2786c89af9d Mon Sep 17 00:00:00 2001 From: Ana Gainaru Date: Tue, 3 Mar 2026 15:46:24 -0500 Subject: [PATCH 4/5] Updates to build the features in the same order as the training --- examples/aeris/model.py | 88 +++------- examples/aeris/utils.py | 355 +++++++++++++++------------------------- 2 files changed, 159 insertions(+), 284 deletions(-) diff --git a/examples/aeris/model.py b/examples/aeris/model.py index 4ef6619..ed1c767 100644 --- a/examples/aeris/model.py +++ b/examples/aeris/model.py @@ -21,40 +21,21 @@ split_into_windows, ) - # Aeris model architecture used for prediction class AerisFullStructure(nn.Module): def __init__(self, input_dim, dropout=0.3): super().__init__() first_layer = min(1024, max(512, input_dim * 2)) self.layers = nn.Sequential( - nn.Linear(input_dim, first_layer), - nn.ReLU(), - nn.BatchNorm1d(first_layer), - nn.Linear(first_layer, first_layer), - nn.ReLU(), - nn.Dropout(dropout), - nn.Linear(first_layer, 512), - nn.ReLU(), - nn.BatchNorm1d(512), - nn.Linear(512, 512), - nn.ReLU(), - nn.Dropout(dropout), - nn.Linear(512, 256), - nn.ReLU(), - nn.BatchNorm1d(256), - nn.Linear(256, 256), - nn.ReLU(), - nn.Dropout(dropout), - nn.Linear(256, 128), - nn.ReLU(), - nn.BatchNorm1d(128), - nn.Linear(128, 64), - nn.ReLU(), - nn.Dropout(dropout), - nn.Linear(64, 32), - nn.ReLU(), - nn.Linear(32, 1), + nn.Linear(input_dim, first_layer), nn.ReLU(), nn.BatchNorm1d(first_layer), + nn.Linear(first_layer, first_layer), nn.ReLU(), nn.Dropout(dropout), + nn.Linear(first_layer, 512), nn.ReLU(), nn.BatchNorm1d(512), + nn.Linear(512, 512), nn.ReLU(), nn.Dropout(dropout), + nn.Linear(512, 256), nn.ReLU(), nn.BatchNorm1d(256), + nn.Linear(256, 256), nn.ReLU(), nn.Dropout(dropout), + nn.Linear(256, 128), nn.ReLU(), nn.BatchNorm1d(128), + nn.Linear(128, 64), nn.ReLU(), nn.Dropout(dropout), + nn.Linear(64, 32), nn.ReLU(), nn.Linear(32, 1) ) def forward(self, x): @@ -75,31 +56,14 @@ def __init__(self, cfg: Config): ckpt = load_pretrained_model( cfg.model.pretrained_path, cfg.model.name, device=cfg.device ) - - # Checkpoint is a dict saved via torch.save(model_info, ...) - input_dim_raw = ckpt.get("input_dim") - if input_dim_raw is None: - raise KeyError("Checkpoint missing required key: 'input_dim'") - input_dim = int(cast(int, input_dim_raw)) - - feature_names_raw = ckpt.get("feature_names") - if feature_names_raw is None: - raise KeyError("Checkpoint missing required key: 'feature_names'") - feature_names = cast(List[str], feature_names_raw) - - scaler_raw = ckpt.get("scaler") - if scaler_raw is None: - raise KeyError("Checkpoint missing required key: 'scaler'") - scaler = cast(Any, scaler_raw) - - state_raw = ckpt.get("model_state_dict") - if state_raw is None: - raise KeyError("Checkpoint missing required key: 'model_state_dict'") - state = cast(Mapping[str, Any], state_raw) + feature_names: List[str] = ckpt["feature_names"] + scaler = ckpt["scaler"] + input_dim = int(ckpt["input_dim"]) model = AerisFullStructure(input_dim=input_dim) - model.load_state_dict(state) + model.load_state_dict(ckpt["model_state_dict"]) model.to(cfg.device) + model.eval() super().__init__(cfg=cfg, model=model) @@ -108,20 +72,16 @@ def __init__(self, cfg: Config): self.higher_is_better = {"accuracy": False, "loss": False} # ----- data loaders ------------------------------------- - X, y = load_datasets(cfg.data.path, cfg.data.name, feature_names) - # X shape: (n_samples, 1, 245) y shape: (n_samples,1) - - # apply scaler if present - if scaler is not None: - try: - X = scaler.transform(X) - except Exception: - pass - with torch.no_grad(): - X_scaled = torch.FloatTensor(X).to(cfg.device) - y_raw = torch.tensor(y, dtype=torch.float32) - - self.windows = split_into_windows(X_scaled, y_raw, cfg.train.batch_size) + X, y = load_datasets(cfg.data.path, cfg.data.name, feature_names, input_dim) + # X shape: (n_samples, 245) y shape: (n_samples,1) + + # scale (must match training) + X_scaled = scaler.transform(X).astype(np.float32) + X_tensor = torch.tensor(X_scaled, dtype=torch.float32) + y_tensor = torch.tensor(y, dtype=torch.float32) + + self.windows = split_into_windows(X_tensor, y_tensor, cfg.train.batch_size) + #print(f"Prepared {len(self.windows)} time windows for streaming. Each window has ~{self.windows[0][0].shape[0]} samples.") # ----- streaming state ----------------------------------------------- self.window_idx: int = 0 diff --git a/examples/aeris/utils.py b/examples/aeris/utils.py index 78cb86a..d02eeb3 100644 --- a/examples/aeris/utils.py +++ b/examples/aeris/utils.py @@ -11,7 +11,7 @@ import os import glob import re -from typing import Dict, List, Tuple, Any +from typing import Dict, List, Tuple, Any, Optional import numpy as np import pandas as pd @@ -69,26 +69,6 @@ def _parse_formula(s: str) -> Dict[str, float]: return comp -def _apply_df_parse_formula_num(val): - try: - if pd.isna(val): - return None - parsed = _parse_formula(str(val)) - return int(sum(parsed.values())) - except Exception: - return None - - -def _apply_df_parse_formula_str(val): - try: - if pd.isna(val): - return None - parsed = _parse_formula(str(val)) - return "".join(f"{k}{v}" for k, v in sorted(parsed.items())) - except Exception: - return None - - def _parse_structure_string(struct_str: str) -> Dict[str, float]: # minimal lattice extractor (compatible with training utils) result = { @@ -134,96 +114,144 @@ def _parse_structure_string(struct_str: str) -> Dict[str, float]: return result -def _build_feature_vector( - composition: str, features: Dict, feature_names: List[str] -) -> np.ndarray: - comp = _parse_formula(composition) - total_atoms = float(sum(comp.values())) - # prepare composition fractions - elem_frac = {k: v / total_atoms for k, v in comp.items()} - - # parse structure if string/dict - struct_vals = {} - if features is not None: - for feature in features: - if feature not in feature_names: - continue - struct_vals[feature] = features[feature] - if "structure" in features: - parsed_struct = _parse_structure_string(features["structure"]) - struct_vals.update(parsed_struct) - - # magpie - feature_calculators = MultipleFeaturizer( - [ - cf.Stoichiometry(), - cf.ElementProperty.from_preset("magpie"), - cf.ValenceOrbital(props=["avg"]), - cf.IonProperty(fast=True), - ] - ) - - comp_obj = Composition(composition) - data = pd.DataFrame([{"comp_obj": comp_obj, "composition_reduced": composition}]) - - # Calculate Magpie features. - # IMPORTANT: when running under MPI, do NOT let matminer spawn multiprocessing pools - # inside each rank (oversubscription/hangs). Force single-process. - magpie_features_dict = {} +# ----------------------------- +# Build X,y in *checkpoint feature order* +# ----------------------------- +# optional numeric columns (if present in CSV) that we will include as features +OPTIONAL_NUMERIC_COLS = [ + 'density_atomic', 'CN_max', 'CN_min', 'CN_avg', + # add more if you know they exist & are useful +] + +def _make_magpie_featurizer() -> MultipleFeaturizer: + return MultipleFeaturizer([ + cf.Stoichiometry(), + cf.ElementProperty.from_preset("magpie"), + cf.ValenceOrbital(props=['avg']), + cf.IonProperty(fast=True), + ]) + +def _compute_magpie_df(compositions: pd.Series) -> pd.DataFrame: + featurizer = _make_magpie_featurizer() + + comp_objs = [] + for s in compositions.astype(str).tolist(): + try: + comp_objs.append(Composition(s)) + except Exception: + comp_objs.append(None) + + base = pd.DataFrame({"comp_obj": comp_objs}, index=compositions.index) + try: - # Some matminer versions support n_jobs; if yours does, keep it at 1. - magpie_features = feature_calculators.featurize_dataframe( - data, col_id="comp_obj", ignore_errors=True, pbar=False, n_jobs=1 + feat_df = featurizer.featurize_dataframe( + base, col_id="comp_obj", ignore_errors=True, pbar=False, n_jobs=1 ) - magpie_features_dict = magpie_features.iloc[0].to_dict() - except Exception: + except TypeError: try: - feature_calculators.set_n_jobs(1) - feats = feature_calculators.featurize_many([Composition(composition)]) - magpie_features = pd.DataFrame(feats) - magpie_features.index = [0] - magpie_features_dict = magpie_features.iloc[0].to_dict() - except Exception as e: - print("Magpie featurizer failed, falling back to empty features:", repr(e)) - magpie_features_dict = {} - - vec = np.zeros(len(feature_names), dtype=np.float32) - for i, name in enumerate(feature_names): - # elemental features (assume single element name) - if re.match(r"^[A-Z][a-z]?$", name) and name in elem_frac: - vec[i] = float(elem_frac.get(name, 0.0)) - continue - - # structural features - if name in struct_vals: - vec[i] = float(struct_vals[name]) - continue - - # magpie features - if name in magpie_features_dict: - vec[i] = float(magpie_features_dict[name]) - continue - - # try numeric keys in struct_vals - val = struct_vals.get(name) - if val is None: - v = struct_vals.get(name, 0.0) + featurizer.set_n_jobs(1) + except Exception: + pass + feat_df = featurizer.featurize_dataframe( + base, col_id="comp_obj", ignore_errors=True, pbar=False + ) + + feat_df = feat_df.drop(columns=[c for c in feat_df.columns if c == "comp_obj"], errors="ignore") + return feat_df + +def _build_X_y_in_ckpt_order( + df: pd.DataFrame, + feature_names: List[str], + target_col: Optional[str], +) -> Tuple[np.ndarray, Optional[np.ndarray]]: + required = ["composition", "structure"] + for c in required: + if c not in df.columns: + raise KeyError(f"Missing required column '{c}'") + + if target_col is not None and target_col not in df.columns: + raise KeyError(f"Missing target column '{target_col}'") + + magpie_df = _compute_magpie_df(df["composition"]) + n = len(df) + X = np.zeros((n, len(feature_names)), dtype=np.float32) + y: Optional[np.ndarray] = None + if target_col is not None: + y = np.zeros((n, 1), dtype=np.float32) + + df2 = df.reset_index(drop=True) + + for i, row in df2.iterrows(): + comp_str = str(row["composition"]) + + # 1) element fractions + try: + parsed = _parse_formula(comp_str) + total = float(sum(parsed.values())) if parsed else 0.0 + except Exception: + parsed, total = {}, 0.0 + + elem_frac: Dict[str, float] = {} + if total > 0: + for el, cnt in parsed.items(): + elem_frac[el] = float(cnt) / total + + # 2) structure features + struct_vals = _parse_structure_string(row.get("structure")) + + # 3) optional numeric cols + opt_vals: Dict[str, float] = {} + for c in OPTIONAL_NUMERIC_COLS: + if c in df2.columns: + v = row.get(c) + try: + opt_vals[c] = float(v) + except Exception: + opt_vals[c] = np.nan + + # 4) magpie row + magpie_row = magpie_df.iloc[i].to_dict() + + # single lookup dict, then assemble in EXACT feature_names order + value_by_name: Dict[str, float] = {} + for el, frac in elem_frac.items(): + value_by_name[el] = float(frac) + for k, v in struct_vals.items(): + try: + value_by_name[k] = float(v) + except Exception: + pass + for k, v in opt_vals.items(): + try: + value_by_name[k] = float(v) + except Exception: + pass + for k, v in magpie_row.items(): try: - vec[i] = float(v) + value_by_name[k] = float(v) except Exception: - vec[i] = 0.0 - else: + pass + + X[i, :] = np.array([value_by_name.get(name, 0.0) for name in feature_names], dtype=np.float32) + + if y is not None: try: - vec[i] = float(val) + y[i, 0] = float(row[target_col]) # type: ignore[arg-type] except Exception: - vec[i] = 0.0 + y[i, 0] = np.nan - # Return a 1D feature vector (D,) instead of (1, D) - vec = np.nan_to_num(vec, nan=0.0, posinf=1e6, neginf=-1e6) - return vec + X = np.nan_to_num(X, nan=0.0, posinf=1e6, neginf=-1e6).astype(np.float32) + if y is not None: + y = y.astype(np.float32) + # drop rows where y is nan + mask = ~np.isnan(y[:, 0]) + X = X[mask] + y = y[mask] + return X, y -def load_datasets(data_path: str, dataset_name: str, feature_names: List[str]): + +def load_datasets(data_path: str, dataset_name: str, feature_names: List[str], input_dim: int) -> Tuple[np.ndarray, Optional[np.ndarray]]: """Load the dataset used by the model. This function attempts to *prefer* loading the exact columns listed in @@ -253,127 +281,14 @@ def load_datasets(data_path: str, dataset_name: str, feature_names: List[str]): dfs.append(pd.read_csv(file_path, low_memory=False)) dataset: pd.DataFrame = pd.concat(dfs, ignore_index=True) - # Ensure target present - if "formation_energy_per_atom" not in dataset.columns: - raise KeyError("Required target column 'formation_energy_per_atom' not found in dataset") - - # Drop rows missing target - dataset = dataset.dropna(subset=["formation_energy_per_atom"]).copy() - - # If all feature_names are present as columns, take that branch (preferred) - all_present = all((fn in dataset.columns) for fn in feature_names) - - if all_present: - # Select columns in the exact saved order - X = dataset[feature_names].to_numpy(dtype=np.float32) - - # Replace infs / NaNs in numeric columns with column means (same as training) - # (do not change dtype or drop rows here; keep alignment with model) - numeric_mask = np.isfinite(X) - # For each column replace non-finite with column mean (computed over finite rows) - col_means = np.nanmean(np.where(np.isfinite(X), X, np.nan), axis=0) - # Where a column is completely NaN/inf, set mean to 0.0 - col_means = np.where(np.isnan(col_means), 0.0, col_means) - inds = np.where(~np.isfinite(X)) - if inds[0].size > 0: - X[inds] = np.take(col_means, inds[1]) - - else: - # Fall back to building feature vectors row-by-row using the helper - # This creates the same ordering as feature_names when possible (elemental - # names are interpreted by _build_feature_vector). - X_rows = [] - # Build a minimal features dict per row (this mirrors training's inputs) - for _, row in dataset.iterrows(): - comp = row.get("composition_reduced", row.get("composition", None)) - features = { - "composition": row.get("composition", None), - "structure": row.get("structure", None), - "spacegroup_number": row.get("spacegroup_number", None), - "density_atomic": row.get("density_atomic", None), - "CN_max": row.get("CN_max", None), - "CN_min": row.get("CN_min", None), - "CN_avg": row.get("CN_avg", None), - } - try: - vec = _build_feature_vector(comp, features, feature_names) - except Exception: - # on failure, append zeros to avoid mismatched shapes - vec = np.zeros(len(feature_names), dtype=np.float32) - X_rows.append(vec) - X = np.vstack(X_rows).astype(np.float32) - - # Replace any remaining nan/inf with column means - col_means = np.nanmean(np.where(np.isfinite(X), X, np.nan), axis=0) - col_means = np.where(np.isnan(col_means), 0.0, col_means) - inds = np.where(~np.isfinite(X)) - if inds[0].size > 0: - X[inds] = np.take(col_means, inds[1]) - - # Prepare target vector shape (N,) - y = dataset["formation_energy_per_atom"].to_numpy(dtype=np.float32).reshape(-1, 1) - assert X.shape[0] == y.shape[0], "Feature matrix and target vector must have same number of rows" - - return X, y - - -def load_datasets2(data_path: str, dataset_name: str, feature_names: List[str]): - """Load the dataset that will be parsed, return features and ground truth. - - Parameters - ---------- - data_path: - Directory containing the datasets. - dataset_name: - The name or regular expression for the datasets - feature_names: - The features used by the model for prediction - - Returns - ------- - input festures, output target values - """ - dfs = [] - dataset_pattern = os.path.join(data_path, dataset_name) - dataset_files: List[str] = glob.glob(dataset_pattern) - if not dataset_files: - raise FileNotFoundError(f"No dataset files matched pattern: {dataset_pattern}") - for file_path in dataset_files: - dfs.append(pd.read_csv(file_path, low_memory=False)) - dataset: pd.DataFrame = pd.concat(dfs, ignore_index=True) + target_col = 'formation_energy_per_atom' + X_raw, y = _build_X_y_in_ckpt_order(dataset, feature_names=feature_names, target_col=target_col) + #print("Prepared X:", X_raw.shape, "y:", None if y is None else y.shape, "num_features:", len(feature_names)) - # Filter all entries that do not have a target value - dataset = dataset.dropna(subset=["formation_energy_per_atom"]).copy() - - # Replace NaN/+inf/-inf in numeric columns (keep DataFrame type) - num_cols = dataset.select_dtypes(include=[np.number]).columns - dataset[num_cols] = dataset[num_cols].replace([np.inf, -np.inf], np.nan).fillna(0.0) - - y = ( - dataset["formation_energy_per_atom"] - .values.astype(np.float32) - .reshape(-1, 1) - ) - X = [] - for _, row in dataset.iterrows(): - composition = row["composition_reduced"] - features = { - "composition": row["composition"], - "structure": row["structure"], - "spacegroup_number": row["spacegroup_number"], - "density_atomic": row["density_atomic"], - "CN_max": row["CN_max"], - "CN_min": row["CN_min"], - "CN_avg": row["CN_avg"], - } - X.append(_build_feature_vector(composition, features, feature_names)) - - print("X shape:", np.array(X).shape, "y shape:", y.shape) - assert len(X) == len(y), ( - "The feature and target vectors do not have the same lenght" - ) - return X, y + if X_raw.shape[1] != input_dim: + raise ValueError(f"Checkpoint input_dim={input_dim} but built X has {X_raw.shape[1]} features.") + return X_raw, y # Default number of samples per time window. Can be overridden by the caller. DEFAULT_WINDOW_SIZE: int = 100 From 77a5d0d65d7dca717149a2143fba335d6197c3ab Mon Sep 17 00:00:00 2001 From: Ana Gainaru Date: Wed, 4 Mar 2026 18:10:22 -0500 Subject: [PATCH 5/5] wip --- examples/aeris/aeris.toml | 17 +++++++++-------- examples/aeris/model.py | 6 ++++-- examples/aeris/utils.py | 2 +- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/examples/aeris/aeris.toml b/examples/aeris/aeris.toml index 7d16c7c..99bc171 100644 --- a/examples/aeris/aeris.toml +++ b/examples/aeris/aeris.toml @@ -2,7 +2,7 @@ seed = 42 device = "auto" multi_gpu = false -verbosity = "INFO" +verbosity = "INFO:2" [model] name = "aeris_model.pt" @@ -11,12 +11,13 @@ pretrained_path = "examples/aeris/model" [data] name = "aeris_dataset.csv" path = "examples/aeris/data" +batch_size = 1 [train] -batch_size = 500 +batch_size = 256 num_workers = 4 -init_lr = 1e-6 -max_iter = 4000 +init_lr = 1e-3 +max_iter = 100 grad_accumulation_steps = 1 [continual_learning] @@ -36,15 +37,15 @@ kfac_ema_decay = 0.95 [drift_detection] detector_name = "ADWINDetector" -detection_interval = 10 +detection_interval = 1 aggregation = "mean" metric_index = 0 reset_after_learning = false -max_stream_updates = 20 +max_stream_updates = 250 # ADWIN hyperparameters -adwin_delta = 0.002 -adwin_minor_threshold = 0.3 +adwin_delta = 0.2 +adwin_minor_threshold = 0.1 adwin_moderate_threshold = 0.6 [logging] diff --git a/examples/aeris/model.py b/examples/aeris/model.py index ed1c767..4de7fc4 100644 --- a/examples/aeris/model.py +++ b/examples/aeris/model.py @@ -80,8 +80,8 @@ def __init__(self, cfg: Config): X_tensor = torch.tensor(X_scaled, dtype=torch.float32) y_tensor = torch.tensor(y, dtype=torch.float32) - self.windows = split_into_windows(X_tensor, y_tensor, cfg.train.batch_size) - #print(f"Prepared {len(self.windows)} time windows for streaming. Each window has ~{self.windows[0][0].shape[0]} samples.") + self.windows = split_into_windows(X_tensor, y_tensor) + print(f"Prepared {len(self.windows)} time windows for streaming. Each window has ~{self.windows[0][0].shape[0]} samples.") # ----- streaming state ----------------------------------------------- self.window_idx: int = 0 @@ -167,6 +167,8 @@ def update_data_stream(self) -> None: ds_train = TensorDataset(X_w[:n_train], y_w[:n_train]) ds_val = TensorDataset(X_w[n_train:], y_w[n_train:]) + #print(f"Window {self.window_idx}: {n_train} train samples, {n_val} val samples.") + #print(len(ds_train), len(ds_val)) bs = self.cfg.train.batch_size nw = self.cfg.train.num_workers diff --git a/examples/aeris/utils.py b/examples/aeris/utils.py index d02eeb3..dc86135 100644 --- a/examples/aeris/utils.py +++ b/examples/aeris/utils.py @@ -291,7 +291,7 @@ def load_datasets(data_path: str, dataset_name: str, feature_names: List[str], i return X_raw, y # Default number of samples per time window. Can be overridden by the caller. -DEFAULT_WINDOW_SIZE: int = 100 +DEFAULT_WINDOW_SIZE: int = 10 def split_into_windows( X: Tensor,