diff --git a/astraflow/dataflow/service.py b/astraflow/dataflow/service.py index 7896031..e2bf184 100644 --- a/astraflow/dataflow/service.py +++ b/astraflow/dataflow/service.py @@ -234,7 +234,14 @@ def _create_rollout_dataloader(self, agent_config: AgentConfig) -> Any: """ ds_cfg = agent_config.rollout_dataset tokenizer = self._load_tokenizer(agent_config) - dataset = _create_dataset_from_config(ds_cfg, tokenizer) + # For offline_dir derivation, the rollout's logical name is + # ``dataset_name`` if specified, else the dataset_fn module name + # (e.g. ``deepscaler`` from + # ``astraflow.dataflow.dataset.deepscaler:get_deepscaler_rl_dataset``). + name = ds_cfg.get("dataset_name") or _module_basename(ds_cfg.get("dataset_fn", "")) + dataset = _create_dataset_from_config( + ds_cfg, tokenizer, data_root=agent_config.data_root, name=name, + ) batch_size = ds_cfg.get("batch_size", 1) return _create_dataloader(dataset, batch_size=batch_size) @@ -294,7 +301,9 @@ def _create_eval_datasets( f"and no legacy eval_workflow_specs fallback is available" ) - dataset = _create_dataset_from_config(ds_cfg, tokenizer) + dataset = _create_dataset_from_config( + ds_cfg, tokenizer, data_root=agent_config.data_root, name=name, + ) eval_datasets[name] = (dataset, repeat, wf) return eval_datasets @@ -1542,13 +1551,30 @@ def _import_function(import_path: str) -> Any: return getattr(module, func_name) -def _create_dataset_from_config(ds_cfg: dict[str, Any], tokenizer: Any) -> Any: +def _module_basename(dataset_fn_path: str) -> str | None: + """Return the last module component of a ``module.path:fn`` import path.""" + module_path, _, _ = dataset_fn_path.rpartition(":") + if not module_path: + return None + return module_path.rsplit(".", 1)[-1] + + +def _create_dataset_from_config( + ds_cfg: dict[str, Any], + tokenizer: Any, + data_root: str | None = None, + name: str | None = None, +) -> Any: """Create a dataset from a config dict using ``dataset_fn``. The ``dataset_fn`` field is a Python import path like ``"astraflow.dataflow.dataset.deepscaler:get_deepscaler_rl_dataset"``. Extra fields in ``ds_cfg`` are forwarded as kwargs when supported by the target dataset function. + + If ``data_root`` is set and ``ds_cfg`` does not specify ``offline_dir``, + one is auto-derived as ``f"{data_root}/{name}"`` — making it easy to + flip a recipe between online and offline by setting a single env var. """ dataset_fn_path = ds_cfg.get("dataset_fn") if dataset_fn_path is None: @@ -1564,6 +1590,13 @@ def _create_dataset_from_config(ds_cfg: dict[str, Any], tokenizer: Any) -> Any: } kwargs.setdefault("tokenizer", tokenizer) + if data_root and name and "offline_dir" not in kwargs: + kwargs["offline_dir"] = f"{data_root}/{name}" + logger.info( + "Auto-derived offline_dir for dataset %r: %s", + name, kwargs["offline_dir"], + ) + sig = inspect.signature(dataset_fn) accepts_var_kwargs = any( p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values() @@ -1572,7 +1605,7 @@ def _create_dataset_from_config(ds_cfg: dict[str, Any], tokenizer: Any) -> Any: return dataset_fn(**kwargs) filtered_kwargs = { - name: kwargs[name] for name in sig.parameters if name in kwargs + pname: kwargs[pname] for pname in sig.parameters if pname in kwargs } return dataset_fn(**filtered_kwargs) diff --git a/astraflow/dataflow/service_config.py b/astraflow/dataflow/service_config.py index 2cbc191..d44969b 100644 --- a/astraflow/dataflow/service_config.py +++ b/astraflow/dataflow/service_config.py @@ -37,6 +37,19 @@ class AgentConfig: tokenizer_path: str | None = None """Path to tokenizer (HuggingFace model name or local path).""" + data_root: str | None = None + """Root directory for pre-downloaded datasets (offline mode). + + When set, every entry in ``rollout_dataset`` and ``eval_datasets`` + that does not already specify ``offline_dir`` gets one auto-derived + as ``f"{data_root}/{name}"`` — where ``name`` is the dict key for + eval datasets, and the value of ``dataset_name`` (falling back to + the dataset_fn module name) for the rollout dataset. + + Use ``examples/math/offline/download_math_datasets.py`` to populate + this directory. + """ + rollout_dataset: dict[str, Any] | None = None """Dataset config for rollout data acquisition. diff --git a/docs/en/index.rst b/docs/en/index.rst index 75e1f28..1f4fd9a 100644 --- a/docs/en/index.rst +++ b/docs/en/index.rst @@ -26,6 +26,7 @@ on distributed GPU clusters. :caption: Recipes recipes/math + recipes/math-offline recipes/code recipes/multi-agent recipes/agentbench diff --git a/docs/en/recipes/math-offline.md b/docs/en/recipes/math-offline.md new file mode 100644 index 0000000..26f10b4 --- /dev/null +++ b/docs/en/recipes/math-offline.md @@ -0,0 +1,74 @@ +# Math (Offline) + +Run the math RL recipe on a node with **no internet access** by pre-downloading every training and evaluation dataset to a local directory. + +**Recipe**: [`examples/math/offline/qwen3-8b-m2po-full-offline/`](https://github.com/Infini-AI-Lab/astraflow/tree/main/examples/math/offline/qwen3-8b-m2po-full-offline) + +**Downloader**: [`examples/math/offline/download_math_datasets.py`](https://github.com/Infini-AI-Lab/astraflow/tree/main/examples/math/offline/download_math_datasets.py) + +This is the same Qwen3-8B / M2PO / TCP recipe as [Math](math.md), with one difference: at startup the AstraFlow service loads every dataset from disk instead of fetching from the HuggingFace Hub. + +## 1. One-time prep — download datasets + +From the repo root: + +```bash +python examples/math/offline/download_math_datasets.py --root data-data/math +``` + +This writes 8 dataset directories under `data-data/math/` (~400 MB total) plus a `MANIFEST.json`: + +| Directory | HF source | Split | Use | +|-----------------|---------------------------------------------|-------|---------| +| `deepscaler` | `agentica-org/DeepScaleR-Preview-Dataset` | train | rollout | +| `dapo_filter` | `aaabiao/dapo_filter` | train | rollout | +| `aime24` | `HuggingFaceH4/aime_2024` | train | eval | +| `aime25` | `math-ai/aime25` | test | eval | +| `amc` | `rawsh/2024_AMC12` | train | eval | +| `math500` | `HuggingFaceH4/MATH-500` | test | eval | +| `minerva` | `math-ai/minervamath` | test | eval | +| `olympiadbench` | `math-ai/olympiadbench` | test | eval | + +Re-running is idempotent (skips populated dirs). Useful flags: + +- `--force` — re-download even if a directory exists +- `--only deepscaler,aime24` — partial subset +- `--verify` — skip download; just load each from disk and assert non-empty + +## 2. Run training + +```bash +bash examples/math/offline/qwen3-8b-m2po-full-offline/scripts/run_qwen3-8b-m2po-full-offline.sh +``` + +You can confirm the offline path is active by looking for these lines in the AstraFlow service log: + +```text +Auto-derived offline_dir for dataset 'deepscaler': data-data/math/deepscaler +Loading DeepScaleR dataset from offline path: data-data/math/deepscaler +Auto-derived offline_dir for dataset 'aime24': data-data/math/aime24 +... (same for aime25, amc, minerva, math500) +``` + +## How it works + +The recipe's `experiment.yaml` sets a single field under `dataflow`: + +```yaml +dataflow: + data_root: data-data/math +``` + +At startup `astraflow.dataflow.service` walks every entry in `rollout_dataset` and `eval_datasets`; for each one that does not already specify `offline_dir`, it auto-derives `offline_dir = f"{data_root}/{name}"`. The `name` is: + +- the **dict key** for eval datasets (`aime24`, `aime25`, `amc`, `minerva`, `math500`) +- the **`dataset_fn` module basename** for the rollout dataset (`deepscaler` from `astraflow.dataflow.dataset.deepscaler:get_deepscaler_rl_dataset`) + +The downloader uses the same naming convention, so the two sides stay in sync. To opt a single dataset out — e.g. point one eval at a different snapshot — just set `offline_dir:` explicitly on that entry; explicit values always win. + +To convert any other recipe to offline mode, add the same `dataflow.data_root` field; no other changes are required. + +## Caveats + +- **Model and tokenizer weights are *not* covered** by the dataset downloader. `model_path` / `tokenizer_path` still point at `Qwen/Qwen3-8B` and resolve via the HuggingFace cache. For a fully air-gapped run, pre-fetch them with `huggingface-cli download Qwen/Qwen3-8B --local-dir /local/models/Qwen3-8B` and edit the two paths in `experiment.yaml`. +- The downloader needs internet at prep time. Once `data-data/math/` is populated, training itself works with `HF_HUB_OFFLINE=1` / `HF_DATASETS_OFFLINE=1`. diff --git a/examples/math/offline/README.md b/examples/math/offline/README.md new file mode 100644 index 0000000..55971fb --- /dev/null +++ b/examples/math/offline/README.md @@ -0,0 +1,61 @@ +# Offline math datasets + +Pre-download every dataset used by the math recipes so training can run on +a node with no internet access. + +## 1. Download (one-time) + +From the repo root: + +```bash +python examples/math/offline/download_math_datasets.py --root data-data/math +``` + +This writes 8 dataset directories under `data-data/math/` and a +`MANIFEST.json` summary. Re-running is a no-op (skips populated dirs); +pass `--force` to re-download, or `--only deepscaler,aime24` for a subset. + +| dir | HF source | split | use | +|------------------|---------------------------------------------|-------|---------| +| `deepscaler` | `agentica-org/DeepScaleR-Preview-Dataset` | train | rollout | +| `dapo_filter` | `aaabiao/dapo_filter` | train | rollout | +| `aime24` | `HuggingFaceH4/aime_2024` | train | eval | +| `aime25` | `math-ai/aime25` | test | eval | +| `amc` | `rawsh/2024_AMC12` | train | eval | +| `math500` | `HuggingFaceH4/MATH-500` | test | eval | +| `minerva` | `math-ai/minervamath` | test | eval | +| `olympiadbench` | `math-ai/olympiadbench` | test | eval | + +## 2. Verify + +```bash +python examples/math/offline/download_math_datasets.py --verify +``` + +Loads every directory with `load_from_disk` and prints row counts; exits +non-zero if any dataset is missing or empty. + +## 3. Run training with offline data + +The matching recipe is `examples/math/offline/qwen3-8b-m2po-full-offline/`. Its +`experiment.yaml` sets `dataflow.data_root: data-data/math`, which causes +`astraflow.dataflow.service` to auto-derive each loader's `offline_dir` +as `data-data/math/` (the dict key for evals, or the `dataset_fn` +module name for the rollout). No per-entry edits required. + +```bash +bash examples/math/offline/qwen3-8b-m2po-full-offline/scripts/run_qwen3-8b-m2po-full-offline.sh +``` + +## Notes + +- **Model weights are *not* covered.** `model_path` / `tokenizer_path` + still point at `Qwen/Qwen3-8B` and will be pulled from HF Hub on first + use. Either let HF cache them once, or pre-fetch with + `huggingface-cli download Qwen/Qwen3-8B` and point the YAML at the + local snapshot for a fully air-gapped run. +- Convention: a dataset directory name in `--root` must match the + `name` used by `_create_dataset_from_config` (eval dict key, or + rollout `dataset_fn` module basename). The download script and the + service use the same `MATH_DATASETS` table / derivation, so they stay + in sync. diff --git a/examples/math/offline/download_math_datasets.py b/examples/math/offline/download_math_datasets.py new file mode 100644 index 0000000..58c7fae --- /dev/null +++ b/examples/math/offline/download_math_datasets.py @@ -0,0 +1,177 @@ +"""Download all math training + eval datasets for offline AstraFlow runs. + +Layout written to ``--root`` (default: ``./data-data/math``):: + + / + deepscaler/ # RL train (agentica-org/DeepScaleR-Preview-Dataset) + dapo_filter/ # RL train (aaabiao/dapo_filter) + aime24/ # eval (HuggingFaceH4/aime_2024) + aime25/ # eval (math-ai/aime25) + amc/ # eval (rawsh/2024_AMC12) + math500/ # eval (HuggingFaceH4/MATH-500) + minerva/ # eval (math-ai/minervamath) + olympiadbench/ # eval (math-ai/olympiadbench) + MANIFEST.json # source repo + split + row count for each + +The directory names align with the auto-derived ``offline_dir`` convention +used by ``astraflow.dataflow.service`` when ``dataflow.data_root`` is set +in the experiment YAML — so the matching recipe just needs:: + + dataflow: + data_root: ./data-data/math + +Example:: + + python examples/math/offline/download_math_datasets.py + python examples/math/offline/download_math_datasets.py --root /scratch/math + python examples/math/offline/download_math_datasets.py --only deepscaler,aime24 + python examples/math/offline/download_math_datasets.py --verify +""" + +from __future__ import annotations + +import argparse +import json +import logging +import sys +import time +from pathlib import Path + +# Each entry: (logical_name, module_path, hf_repo, split) +# logical_name MUST match the offline_dir convention used by +# astraflow.dataflow.service._create_dataset_from_config — i.e. for evals +# the YAML dict key, and for the rollout the dataset_fn module basename. +MATH_DATASETS: list[tuple[str, str, str, str]] = [ + # Training + ("deepscaler", "astraflow.dataflow.dataset.deepscaler", "agentica-org/DeepScaleR-Preview-Dataset", "train"), + ("dapo_filter", "astraflow.dataflow.dataset.dapo_filter", "aaabiao/dapo_filter", "train"), + # Eval + ("aime24", "astraflow.dataflow.dataset.aime24x4", "HuggingFaceH4/aime_2024", "train"), + ("aime25", "astraflow.dataflow.dataset.aime25x4", "math-ai/aime25", "test"), + ("amc", "astraflow.dataflow.dataset.amc24", "rawsh/2024_AMC12", "train"), + ("math500", "astraflow.dataflow.dataset.math500", "HuggingFaceH4/MATH-500", "test"), + ("minerva", "astraflow.dataflow.dataset.minervamath", "math-ai/minervamath", "test"), + ("olympiadbench", "astraflow.dataflow.dataset.olympiadbench", "math-ai/olympiadbench", "test"), +] + +logger = logging.getLogger("download_math_datasets") + + +def _is_populated(p: Path) -> bool: + """A HF save_to_disk directory contains a dataset_info.json.""" + return p.exists() and (p / "dataset_info.json").exists() + + +def _download_one( + name: str, + module_path: str, + hf_repo: str, + split: str, + out_dir: Path, + force: bool, +) -> dict: + import importlib + from datasets import load_from_disk + + if _is_populated(out_dir) and not force: + ds = load_from_disk(str(out_dir)) + logger.info("[skip] %-14s already populated (%d rows) at %s", name, len(ds), out_dir) + return {"name": name, "repo": hf_repo, "split": split, "path": str(out_dir), + "rows": len(ds), "status": "skipped"} + + mod = importlib.import_module(module_path) + download_fn = getattr(mod, "download_dataset", None) + if download_fn is None: + raise RuntimeError( + f"{module_path} has no download_dataset() helper — offline mode " + f"is not supported for this dataset." + ) + + out_dir.parent.mkdir(parents=True, exist_ok=True) + t0 = time.time() + logger.info("[start] %-14s %s [%s] -> %s", name, hf_repo, split, out_dir) + download_fn(offline_dir=str(out_dir), dataset_path=hf_repo, split=split) + ds = load_from_disk(str(out_dir)) + dt = time.time() - t0 + logger.info("[ok] %-14s %d rows in %.1fs", name, len(ds), dt) + return {"name": name, "repo": hf_repo, "split": split, "path": str(out_dir), + "rows": len(ds), "status": "downloaded", "elapsed_sec": round(dt, 2)} + + +def _verify_one(name: str, out_dir: Path) -> dict: + from datasets import load_from_disk + if not _is_populated(out_dir): + logger.error("[fail] %-14s missing or incomplete: %s", name, out_dir) + return {"name": name, "path": str(out_dir), "status": "missing"} + ds = load_from_disk(str(out_dir)) + n = len(ds) + if n == 0: + logger.error("[fail] %-14s loaded but empty: %s", name, out_dir) + return {"name": name, "path": str(out_dir), "status": "empty"} + logger.info("[ok] %-14s %d rows", name, n) + return {"name": name, "path": str(out_dir), "rows": n, "status": "ok"} + + +def main() -> int: + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--root", type=str, default="data-data/math", + help="Root directory for offline datasets (default: %(default)s)") + p.add_argument("--only", type=str, default=None, + help="Comma-separated subset of dataset names (e.g. deepscaler,aime24)") + p.add_argument("--force", action="store_true", + help="Re-download even if the directory already exists") + p.add_argument("--verify", action="store_true", + help="Skip download; just verify each dataset loads from disk") + args = p.parse_args() + + logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") + + root = Path(args.root).resolve() + root.mkdir(parents=True, exist_ok=True) + logger.info("offline root: %s", root) + + selected = MATH_DATASETS + if args.only: + wanted = {s.strip() for s in args.only.split(",") if s.strip()} + selected = [d for d in MATH_DATASETS if d[0] in wanted] + missing = wanted - {d[0] for d in selected} + if missing: + logger.error("Unknown dataset names in --only: %s", sorted(missing)) + return 2 + if not selected: + logger.error("No datasets matched --only %s", args.only) + return 2 + + manifest: list[dict] = [] + failed: list[str] = [] + + for name, module_path, hf_repo, split in selected: + out_dir = root / name + try: + if args.verify: + entry = _verify_one(name, out_dir) + if entry["status"] != "ok": + failed.append(name) + else: + entry = _download_one(name, module_path, hf_repo, split, out_dir, args.force) + except Exception as e: + logger.exception("[fail] %-14s %s", name, e) + entry = {"name": name, "repo": hf_repo, "split": split, + "path": str(out_dir), "status": "failed", "error": str(e)} + failed.append(name) + manifest.append(entry) + + if not args.verify: + manifest_path = root / "MANIFEST.json" + manifest_path.write_text(json.dumps(manifest, indent=2)) + logger.info("wrote manifest: %s", manifest_path) + + if failed: + logger.error("FAILED: %s", failed) + return 1 + logger.info("All %d dataset(s) %s.", len(selected), "verified" if args.verify else "ready") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/1_astraflow.sh b/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/1_astraflow.sh new file mode 100755 index 0000000..7ffeada --- /dev/null +++ b/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/1_astraflow.sh @@ -0,0 +1,36 @@ +#!/bin/bash +set -euo pipefail +# [1/3] Launch AstraFlow HTTP service +# +# Usage (terminal 1): +# bash examples/math/offline/qwen3-8b-m2po-full-offline/scripts/1_astraflow.sh + +export CUDA_VISIBLE_DEVICES="" + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../../.." && pwd)" +cd "${REPO_ROOT}" +export PYTHONPATH="${REPO_ROOT}${PYTHONPATH:+:${PYTHONPATH}}" + +YAML_DIR="${SCRIPT_DIR}/yaml" +export EXPERIMENT_CONFIG="${EXPERIMENT_CONFIG:-${YAML_DIR}/experiment.yaml}" +source "${REPO_ROOT}/examples/_common/utils.sh" +# Export EXP_NAME and TRIAL_NAME from the experiment YAML. +astraflow_load_experiment_env + +export ASTRAFLOW_HOST="${ASTRAFLOW_HOST:-0.0.0.0}" +export ASTRAFLOW_PORT="${ASTRAFLOW_PORT:-8000}" + +# NCCL / PYTORCH / WANDB tweaks + LOG_DIR. Defined in examples/_common/utils.sh. +astraflow_setup_env + +echo "=== AstraFlow HTTP Service ===" +echo "Experiment config : ${EXPERIMENT_CONFIG}" +echo "Port : ${ASTRAFLOW_PORT}" +echo "===============================" + +python3 -u -m astraflow \ + --config "${EXPERIMENT_CONFIG}" \ + --port "${ASTRAFLOW_PORT}" \ + --host "${ASTRAFLOW_HOST}" \ + 2>&1 | tee "${LOG_DIR}/astraflow.log" diff --git a/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/2_raas.sh b/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/2_raas.sh new file mode 100755 index 0000000..977f42f --- /dev/null +++ b/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/2_raas.sh @@ -0,0 +1,44 @@ +#!/bin/bash +set -euo pipefail +# [2/3] Launch RaaS inference server (SGLang + TCP receiver) +# +# Usage (terminal 2, after AstraFlow is ready): +# bash examples/math/offline/qwen3-8b-m2po-full-offline/scripts/2_raas.sh + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../../.." && pwd)" +cd "${REPO_ROOT}" +export PYTHONPATH="${REPO_ROOT}${PYTHONPATH:+:${PYTHONPATH}}" + +YAML_DIR="${SCRIPT_DIR}/yaml" +export EXPERIMENT_CONFIG="${EXPERIMENT_CONFIG:-${YAML_DIR}/experiment.yaml}" +export RAAS_CONFIG="${RAAS_CONFIG:-${YAML_DIR}/raas.yaml}" +source "${REPO_ROOT}/examples/_common/utils.sh" +# Export EXP_NAME and TRIAL_NAME from the experiment YAML. +astraflow_load_experiment_env + +export CUDA_VISIBLE_DEVICES="${SERVICE_CUDA_VISIBLE_DEVICES:-0,1,2,3}" +export RAAS_HOST="${RAAS_HOST:-0.0.0.0}" +export RAAS_PORT="${RAAS_PORT:-19190}" +export ASTRAFLOW_PORT="${ASTRAFLOW_PORT:-8000}" +export ASTRAFLOW_URL="${ASTRAFLOW_URL:-http://127.0.0.1:${ASTRAFLOW_PORT}}" + +# NCCL / PYTORCH / WANDB tweaks + LOG_DIR. Defined in examples/_common/utils.sh. +astraflow_setup_env + +echo "=== RaaS Inference Server (SGLang + TCP receiver) ===" +echo "Experiment config : ${EXPERIMENT_CONFIG}" +echo "RaaS config : ${RAAS_CONFIG}" +echo "GPUs : ${CUDA_VISIBLE_DEVICES}" +echo "Port : ${RAAS_PORT}" +echo "AstraFlow URL : ${ASTRAFLOW_URL}" +echo "=======================================================" + +python3 -u -m astraflow.raas.server \ + --host "${RAAS_HOST}" \ + --port "${RAAS_PORT}" \ + --config "${EXPERIMENT_CONFIG}" \ + --config "${RAAS_CONFIG}" \ + --engine-id "${ENGINE_ID:-default}" \ + --astraflow-url "${ASTRAFLOW_URL}" \ + 2>&1 | tee "${LOG_DIR}/raas.log" diff --git a/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/3_trainer_model0.sh b/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/3_trainer_model0.sh new file mode 100755 index 0000000..4f84a82 --- /dev/null +++ b/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/3_trainer_model0.sh @@ -0,0 +1,47 @@ +#!/bin/bash +set -euo pipefail +# [3/3] Launch Trainer for model0 (TCP, sender_agent on local_rank 0) +# +# Usage (terminal 3, after AstraFlow and RaaS are ready): +# bash examples/math/offline/qwen3-8b-m2po-full-offline/scripts/3_trainer_model0.sh + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../../.." && pwd)" +cd "${REPO_ROOT}" +export PYTHONPATH="${REPO_ROOT}${PYTHONPATH:+:${PYTHONPATH}}" + +YAML_DIR="${SCRIPT_DIR}/yaml" +export EXPERIMENT_CONFIG="${EXPERIMENT_CONFIG:-${YAML_DIR}/experiment.yaml}" +source "${REPO_ROOT}/examples/_common/utils.sh" +# Export EXP_NAME and TRIAL_NAME from the experiment YAML. +astraflow_load_experiment_env + +export CUDA_VISIBLE_DEVICES="${TRAINER_MODEL0_GPUS:-4,5,6,7}" +TRAINER0_NPROC="$(echo "${CUDA_VISIBLE_DEVICES}" | awk -F',' '{print NF}')" + +export RAAS_PORT="${RAAS_PORT:-19190}" +export ASTRAFLOW_PORT="${ASTRAFLOW_PORT:-8000}" +export ASTRAFLOW_URL="http://127.0.0.1:${ASTRAFLOW_PORT}" +export ASTRAFLOW_RAAS_URL="http://127.0.0.1:${RAAS_PORT}" + +# sender_agent (in trainer) listens on this HTTP port +export WEIGHT_TRANSFER_HTTP_PORT="${WEIGHT_TRANSFER_HTTP_PORT_MODEL0:-19861}" + +# NCCL / PYTORCH / WANDB tweaks + LOG_DIR. Defined in examples/_common/utils.sh. +astraflow_setup_env + +echo "=== Trainer model0 (TCP) ===" +echo "Experiment config : ${EXPERIMENT_CONFIG}" +echo "GPUs : ${CUDA_VISIBLE_DEVICES} (FSDP dp${TRAINER0_NPROC})" +echo "AstraFlow : ${ASTRAFLOW_URL}" +echo "RaaS : ${ASTRAFLOW_RAAS_URL}" +echo "Sender HTTP : ${WEIGHT_TRANSFER_HTTP_PORT}" +echo "WANDB mode : ${WANDB_MODE:-online}" +echo "==========================================" + +torchrun --nnodes 1 --nproc-per-node "${TRAINER0_NPROC}" \ + --master-addr "${MASTER_ADDR:-127.0.0.1}" --master-port "${MASTER_PORT_MODEL0:-29541}" \ + examples/launch_trainer.py \ + --config "${EXPERIMENT_CONFIG}" \ + --trainer trainer_model0 \ + "$@" 2>&1 | tee "${LOG_DIR}/trainer_model0.log" diff --git a/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/run_qwen3-8b-m2po-full-offline.sh b/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/run_qwen3-8b-m2po-full-offline.sh new file mode 100755 index 0000000..c89e29c --- /dev/null +++ b/examples/math/offline/qwen3-8b-m2po-full-offline/scripts/run_qwen3-8b-m2po-full-offline.sh @@ -0,0 +1,108 @@ +#!/bin/bash +set -euo pipefail +# All-in-one launcher for AstraFlow v2 math training (Qwen3-8B, M2PO, TCP) +# using locally pre-downloaded math datasets (offline data mode). +# +# Launches 3 processes: +# 1. AstraFlow HTTP service (CPU-only) — loads datasets from data-data/math +# 2. RaaS inference server (SGLang, SERVICE_CUDA_VISIBLE_DEVICES) +# 3. Trainer model0 (math, TRAINER_MODEL0_GPUS) +# +# One-time prep (downloads ~hundreds of MB): +# python examples/math/offline/download_math_datasets.py --root data-data/math +# +# Usage: +# bash examples/math/offline/qwen3-8b-m2po-full-offline/scripts/run_qwen3-8b-m2po-full-offline.sh + +# ============================================================================= +# Part 1: Load env and settings +# ============================================================================= +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +REPO_ROOT="$(cd "${SCRIPT_DIR}/../../../.." && pwd)" +cd "${REPO_ROOT}" +export PYTHONPATH="${REPO_ROOT}${PYTHONPATH:+:${PYTHONPATH}}" + +YAML_DIR="${SCRIPT_DIR}/yaml" +export EXPERIMENT_CONFIG="${EXPERIMENT_CONFIG:-${YAML_DIR}/experiment.yaml}" +export RAAS_CONFIG="${RAAS_CONFIG:-${YAML_DIR}/raas.yaml}" +source "${REPO_ROOT}/examples/_common/utils.sh" +# Export EXP_NAME and TRIAL_NAME from the experiment YAML. +# Defined in examples/_common/utils.sh. +astraflow_load_experiment_env + +# ============================================================================= +# Part 2: Set up env +# ============================================================================= +# GPU assignments (default: 4 GPUs for inference, 4 for training) +export SERVICE_CUDA_VISIBLE_DEVICES="${SERVICE_CUDA_VISIBLE_DEVICES:-0,1,2,3}" +export TRAINER_MODEL0_GPUS="${TRAINER_MODEL0_GPUS:-4,5,6,7}" +# Ports / URLs (each component gets its own port) +export RAAS_HOST="${RAAS_HOST:-0.0.0.0}" +export RAAS_PORT="${RAAS_PORT:-19190}" +export ASTRAFLOW_HOST="${ASTRAFLOW_HOST:-0.0.0.0}" +export ASTRAFLOW_PORT="${ASTRAFLOW_PORT:-8000}" +export ASTRAFLOW_URL="http://127.0.0.1:${ASTRAFLOW_PORT}" +export WEIGHT_TRANSFER_HTTP_PORT_MODEL0="${WEIGHT_TRANSFER_HTTP_PORT_MODEL0:-19861}" + +TRAINER0_NPROC="$(echo "${TRAINER_MODEL0_GPUS}" | awk -F',' '{print NF}')" + +# NCCL / PYTORCH / WANDB tweaks + LOG_DIR. +# Defined in examples/_common/utils.sh. +astraflow_setup_env + +# ============================================================================= +# Part 3: Print info and clean up +# ============================================================================= +echo "=== AstraFlow v2 (Qwen3-8B, math, M2PO, ctx16k, TCP, OFFLINE DATA) ===" +echo "Experiment config : ${EXPERIMENT_CONFIG}" +echo "RaaS config : ${RAAS_CONFIG}" +echo "RaaS GPUs : ${SERVICE_CUDA_VISIBLE_DEVICES}" +echo "Trainer model0 GPUs : ${TRAINER_MODEL0_GPUS} (FSDP dp${TRAINER0_NPROC})" +echo "RaaS port : ${RAAS_PORT}" +echo "AstraFlow port : ${ASTRAFLOW_PORT}" +echo "Sender HTTP model0 : ${WEIGHT_TRANSFER_HTTP_PORT_MODEL0}" +echo "WANDB mode : ${WANDB_MODE:-online}" +echo "==========================================================" + +trap astraflow_cleanup_trap EXIT INT TERM + +# Kill leftover processes and shared memory from prior runs. +# Defined in examples/_common/utils.sh. +astraflow_kill_stale + +# ============================================================================= +# Part 4: Launch training +# ============================================================================= +echo "[1/3] Starting AstraFlow HTTP service..." +CUDA_VISIBLE_DEVICES="" \ + python3 -u -m astraflow \ + --config "${EXPERIMENT_CONFIG}" \ + --port "${ASTRAFLOW_PORT}" \ + --host "${ASTRAFLOW_HOST}" \ + 2>&1 | tee "${LOG_DIR}/astraflow.log" & +sleep 5 + +echo "[2/3] Starting RaaS inference server (SGLang + TCP receiver)..." +CUDA_VISIBLE_DEVICES="${SERVICE_CUDA_VISIBLE_DEVICES}" \ + python3 -u -m astraflow.raas.server \ + --host "${RAAS_HOST}" \ + --port "${RAAS_PORT}" \ + --config "${EXPERIMENT_CONFIG}" \ + --config "${RAAS_CONFIG}" \ + --engine-id "${ENGINE_ID:-default}" \ + --astraflow-url "${ASTRAFLOW_URL}" \ + 2>&1 | tee "${LOG_DIR}/raas.log" & +sleep 15 + +export ASTRAFLOW_RAAS_URL="http://127.0.0.1:${RAAS_PORT}" + +echo "[3/3] Starting trainer model0..." +CUDA_VISIBLE_DEVICES="${TRAINER_MODEL0_GPUS}" \ +WEIGHT_TRANSFER_HTTP_PORT="${WEIGHT_TRANSFER_HTTP_PORT_MODEL0}" \ + torchrun --nnodes 1 --nproc-per-node "${TRAINER0_NPROC}" \ + --master-addr "${MASTER_ADDR:-127.0.0.1}" --master-port "${MASTER_PORT_MODEL0:-29541}" \ + examples/launch_trainer.py \ + --config "${EXPERIMENT_CONFIG}" \ + --trainer trainer_model0 \ + "$@" \ + 2>&1 | tee "${LOG_DIR}/trainer_model0.log" diff --git a/examples/math/offline/qwen3-8b-m2po-full-offline/yaml/experiment.yaml b/examples/math/offline/qwen3-8b-m2po-full-offline/yaml/experiment.yaml new file mode 100644 index 0000000..dcfcff4 --- /dev/null +++ b/examples/math/offline/qwen3-8b-m2po-full-offline/yaml/experiment.yaml @@ -0,0 +1,175 @@ +# ============================================================================ +# Experiment config — AstraFlow service + Trainer (OFFLINE math datasets) +# Experiment: math / qwen3-8b-m2po-full-offline +# +# Identical to qwen3-8b-m2po-full, except dataflow.data_root points at a +# locally pre-downloaded dataset directory — so the AstraFlow service +# never hits the HuggingFace Hub for datasets at training time. +# +# Pre-download with: +# python examples/math/offline/download_math_datasets.py --root data-data/math +# +# (Model + tokenizer weights still come from HF — pre-fetch them too if +# you also need the trainer/RaaS to run with HF_HUB_OFFLINE=1.) +# +# Qwen3-8B math RL with M2PO, ctx 16k, lr 5e-6, full TCP weight transfer. +# GPU layout (default, 8 GPUs): +# SERVICE_CUDA_VISIBLE_DEVICES=0,1,2,3 -> RaaS (model0 dp=4) +# TRAINER_MODEL0_GPUS=4,5,6,7 -> Trainer model0 (FSDP, 4 GPUs) +# ============================================================================ + +# ── Experiment: identity, model, shared settings ── +experiment: + experiment_name: astraflow-math + trial_name: qwen3-8b-m2po-full-offline + fileroot: ./data-experiments/${experiment.experiment_name}/${experiment.trial_name} + + model_path: "Qwen/Qwen3-8B" + tokenizer_path: "Qwen/Qwen3-8B" + seed: 1 + dtype: bfloat16 + weight_transfer_mode: tcp + weight_transfer_strategies: full + +# ── RaaS: what to generate (inference-level config) ── +# model keys here also determine expected_model_ids for AstraFlow service +raas: + models: + model0: + backend: sglang + gconfig: + n_samples: 8 + temperature: 1.0 + max_new_tokens: 14000 + min_new_tokens: 0 + +# ── AstraFlow: data pipeline ── +# auto-derives: expected_model_ids from raas.models keys +# auto-derives: dump_dir from experiment.fileroot +dataflow: + host: "0.0.0.0" + port: 8000 + + # Offline mode: every rollout/eval dataset that omits ``offline_dir`` will + # auto-derive one as ``{data_root}/{name}`` — e.g. data-data/math/deepscaler, + # data-data/math/aime24, etc. Populate with + # ``python examples/math/offline/download_math_datasets.py --root data-data/math``. + # Resolved relative to the process CWD (repo root, set by the launch scripts). + data_root: data-data/math + + buffer: + size: 10000 + replay_size: 10000 + replay_ratio: 0 + max_staleness: 8 + filter_function: filter_zero_adv + + rollout_dataset: + dataset_fn: "astraflow.dataflow.dataset.deepscaler:get_deepscaler_rl_dataset" + max_length: 2000 + + workflow_spec: + workflow_cls: "rlvr" + reward_fn: "math_verify" + enable_thinking: false + + eval_workflows: + math_eval: + workflow_cls: "rlvr" + reward_fn: "math_verify" + enable_thinking: false + gconfig_overrides: + temperature: 0.6 + n_samples: 1 + + eval_datasets: + aime24: + dataset_fn: "astraflow.dataflow.dataset.aime24x4:get_aime_2024x4_test_dataset" + max_length: 2000 + repeat: 4 + eval_workflow: math_eval + aime25: + dataset_fn: "astraflow.dataflow.dataset.aime25x4:get_aime_2025x4_test_dataset" + max_length: 2000 + repeat: 4 + eval_workflow: math_eval + amc: + dataset_fn: "astraflow.dataflow.dataset.amc24:get_amc_2024x4_test_dataset" + max_length: 2000 + repeat: 4 + eval_workflow: math_eval + minerva: + dataset_fn: "astraflow.dataflow.dataset.minervamath:get_minerva_math_test_dataset" + max_length: 2000 + repeat: 4 + eval_workflow: math_eval + math500: + dataset_fn: "astraflow.dataflow.dataset.math500:get_math500_test_dataset" + max_length: 2000 + repeat: 4 + eval_workflow: math_eval + +# ── Trainer base: shared config ── +# auto-derives from experiment: experiment_name, trial_name, fileroot, +# tokenizer_path, seed, dtype, weight_transfer_mode +# auto-derives from raas.models.: actor.path, actor.max_new_tokens, +# ref.path +# auto-derives: saver, recover, stats_logger fields from experiment section +# auto-derives: cluster.name_resolve from experiment.fileroot +# auto-derives: trial_name suffix from model_id (e.g. trial_name-model0) +trainer_base: + total_train_steps: 800 + train_batch_size: 256 + n_samples: 8 + engine: + backend: fsdp + data_parallel_size: 4 + + actor: + gradient_checkpointing: true + mb_spec: + max_tokens_per_mb: 17408 + optimizer: + type: adam + lr: 5e-6 + weight_decay: 0.01 + beta1: 0.9 + beta2: 0.999 + eps: 1e-8 + lr_scheduler_type: constant + gradient_clipping: 1.0 + # PPO / M2PO algorithm + m2_threshold: 0.01 + eps_clip: 100.0 + eps_clip_higher: 100.0 + reward_scaling: 1 + reward_bias: 0 + kl_ctl: 0.00 + kl_penalty_coef: 0.001 + ppo_n_minibatches: 4 + reward_norm: { mean_level: group, std_level: group } + adv_norm: { mean_level: batch, std_level: batch } + + ref: + mb_spec: + max_tokens_per_mb: 17408 + + recover: + mode: auto + freq_steps: 25 + + evaluator: + eval_at_start: false + freq_steps: 25 + + stats_logger: + wandb: + mode: online + id_suffix: "uid" + +# ── Trainer for model0 — only overrides ── +trainer_model0: + model_id: model0 + stats_logger: + wandb: + tags: ["m2po", "math", "astraflow-v2", "qwen3-8b", "tcp", "ctx16k", "offline-data"] diff --git a/examples/math/offline/qwen3-8b-m2po-full-offline/yaml/raas.yaml b/examples/math/offline/qwen3-8b-m2po-full-offline/yaml/raas.yaml new file mode 100644 index 0000000..8a5dbe3 --- /dev/null +++ b/examples/math/offline/qwen3-8b-m2po-full-offline/yaml/raas.yaml @@ -0,0 +1,33 @@ +# ============================================================================ +# RaaS config — Inference serving instance (hardware/resources) +# Experiment: math / qwen3-8b-m2po-full +# +# Hardware: 4x GPU, TP=1 +# model0: DP=4, TP=1 +# +# Merged with experiment.yaml at launch (--config experiment.yaml --config raas.yaml) +# experiment.yaml provides: model_path, tokenizer_path, seed, dtype, models/gconfig +# ============================================================================ + +rollout: + max_concurrent_rollouts: 1024 + # Cap concurrent eval prefills to bound peak KV pressure during the + # ~3.5k-item eval burst (5 datasets x repeat=4) — default 128 OOMs sglang. + max_concurrent_evals: 64 + pause_grace_period: 3 + # Adaptive availability — drive /availability off sglang /get_load. + enable_adaptive_availability: true + target_waiting_queue_per_dp: 4 + adaptive_step_size: 4 + load_cache_ttl_ms: 100 + +engine: + model0: + backend: sglang + data_parallel_size: 4 + +sglang: + context_length: 16384 + mem_fraction_static: 0.8 + max_running_requests: null + skip_tokenizer_init: true