Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
- [Help](#help)

## Overview
The Resiliency library provides resilient training code used to improve training goodput (aka training efficiency) of distributed workloads. For those unfamiliar with the term "goodput", please read [Google Cloud Goodput Blogpost](https://cloud.google.com/blog/products/ai-machine-learning/goodput-metric-as-measure-of-ml-productivity). This library provides resilency features built on-top of [NVIDIA NeMo 2.0](https://github.com/NVIDIA/NeMo) training recipes. Reproducible benchmarks demonstrating high training goodput using this library are defined in [AI-Hypercomputer/gpu-recipes](https://github.com/AI-Hypercomputer/gpu-recipes).
The Resiliency library provides resilient training code used to improve training goodput (aka training efficiency) of distributed workloads. For those unfamiliar with the term "goodput", please read [Google Cloud Goodput Blogpost](https://cloud.google.com/blog/products/ai-machine-learning/goodput-metric-as-measure-of-ml-productivity). This library provides resiliency features built on-top of [NVIDIA NeMo 2.0](https://github.com/NVIDIA/NeMo) training recipes. Reproducible benchmarks demonstrating high training goodput using this library are defined in [AI-Hypercomputer/gpu-recipes](https://github.com/AI-Hypercomputer/gpu-recipes).

### Motivation
The scale of large distributed AI training jobs results in frequent failures and interruptions. This library exists to help minimize the impact of failures and interruptions on training, ultimately improving training goodput for training workloads on Google Cloud.

### Hierarchy of ML frameworks within this library
This library intertwines several popular ML frameworks and adds features on top of them. The following diagram provides a visual representation of the ML resiliency stack and the benifits that come with each layer:
This library intertwines several popular ML frameworks and adds features on top of them. The following diagram provides a visual representation of the ML resiliency stack and the benefits that come with each layer:

![Diagram of the ML Resiliency Stack](docs/media/ml_resiliency_stack_diagram.png)

Expand Down Expand Up @@ -45,7 +45,7 @@ Holistic cluster control is supported via the Supervisor. The supervisor compris

This library includes the client facing portion of the Supervisor in [`supervisor`](supervisor). One instance of the sensor, controller, and actuator should be deployed on CPU hosts per cluster. Each training host (GPUs) adjacent to the training workload should also run [`host_daemon.py`](supervisor/host_daemon.py) as a daemon deployment. The host daemon deployment on each host is responsible for monitoring the training processes on the same host and will relay all signal to the sensor deployment.

The Supervisor builds upon the resiliency support provided by [NVIDIA's NvRx library](https://github.com/NVIDIA/nvidia-resiliency-ext). In particular, it utilizes the heartbeating mechansisms defined in NvRx as a signal to whether each node is training as expected. When interruptions occur, the training job relies on the in-job restart support provided by NvRx.
The Supervisor builds upon the resiliency support provided by [NVIDIA's NvRx library](https://github.com/NVIDIA/nvidia-resiliency-ext). In particular, it utilizes the heartbeating mechanisms defined in NvRx as a signal to whether each node is training as expected. When interruptions occur, the training job relies on the in-job restart support provided by NvRx.

Once the Supervisor is deployed, please include launcher flag `--use-supervisor` to attach the training job to the Supervisor.
## Tools
Expand Down Expand Up @@ -86,4 +86,4 @@ python3 benchmark/benchmark.py --help
```

## Help
If you run into any issues when using this library, please file an issue or create a pull request.
If you run into any issues when using this library, please file an issue or create a pull request.
2 changes: 1 addition & 1 deletion benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ def main():
# Add the new handler to the logger
pl_logger.addHandler(console_handler)

# loging to /tmp folder by default
# logging | to /tmp folder by default
log_file_dir = f"/tmp/{args.job_name}/log"
if args.log_to_remote_storage:
log_file_dir = Path(args.log_dir) / args.job_name / "log"
Expand Down
2 changes: 1 addition & 1 deletion docs/weekend-exp-mt.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Weekend Experment
# Weekend Experiment

- Create a bug under this [component](https://b.corp.google.com/issues/new?component=1784216&template=0),
the bug should include head commit hash of the repo, the cmd started the exp, any cmd used to restart the exp, and the final goodput analysis report.
Expand Down
2 changes: 1 addition & 1 deletion examples/supervisor_quickstart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ RUN pip install google-cloud-resiliency-supervisor
Note: This example assumes a Python 3.10 environment. The path to local_elastic_agent.py might differ slightly based on your Python version or base image. The ARG command dynamically finds the correct path.

## Step 3: Run Your PyTorch Workload
With the patched Docker image, you can now run your training job. To activate the supervisor client, you must set the `GCP_HOST_DAEMON_PORT` environment variable when launching your workload. If `GCP_HOST_DAEMON_PORT`, the supervisor client will not be initated by default.
With the patched Docker image, you can now run your training job. To activate the supervisor client, you must set the `GCP_HOST_DAEMON_PORT` environment variable when launching your workload. If `GCP_HOST_DAEMON_PORT`, the supervisor client will not be initiated by default.

The client code uses this port to connect to the `supervisor` daemon running on the same node.

Expand Down
2 changes: 1 addition & 1 deletion resiliency/callbacks/comm_overlap.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _get_model_comm_overlap_cfgs(
if vp_size is None:
vp_size = 1

# Optimizations disabled by default, can be overriden by user
# Optimizations disabled by default, can be overridden by user
comm_overlap_cfg.tp_comm_overlap = False
comm_overlap_cfg.tp_comm_overlap_cfg = None
comm_overlap_cfg.tp_comm_bootstrap_backend = None
Expand Down
4 changes: 2 additions & 2 deletions resiliency/callbacks/model_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
class ModelCheckpoint(PTLModelCheckpoint):
"""Light wrapper around Lightning's ModelCheckpoint to force a saved checkpoint on train_end.

Adds support for asyncronous checkpointing and provides some additional logic
Adds support for asynchronous checkpointing and provides some additional logic
to clean up invalid checkpoints

Args:
Expand All @@ -69,7 +69,7 @@ class ModelCheckpoint(PTLModelCheckpoint):
save_optim_on_train_end: Whether to include the optimizer states in the
final checkpoint at the end of training. Only applicable when
save_weights_only is ``False``.
always_save_context: Whether to dump the artifacts needed to reinintialize
always_save_context: Whether to dump the artifacts needed to reinitialize
the current model, trainer, and dataloader to allow for reproducibility
of experiments.
save_context_on_train_end: Whether to dump the artifacts on_train_end
Expand Down
2 changes: 1 addition & 1 deletion resiliency/high_scale_ckpt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _extract_step(f):
_wait_config_file_disappear(replicator_config_path, timeout_s)
handle_replicator_fail_situation(directory)

# TODO: will use this feature when enabling data laoder ckpt.
# TODO: will use this feature when enabling data loader ckpt.
process_replicator_prerestore(directory)

for _ in range(timeout_s):
Expand Down
2 changes: 1 addition & 1 deletion resiliency/plugins/combined_functionality.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ def load(
pyt_state_dict = mcore_to_pyt_state_dict(sharded_state_dict, True)

# Define planner. If replication coordinator is present, use FlexibleLoadPlanner.
# This will prevent errors from occuring if local checkpoint is missing.
# This will prevent errors from occurring if local checkpoint is missing.
if self.replication_coordinator is not None:
planner = ReplicationLoadPlanner(
shapes_validation_sharded_tensors=flexible_shape_sharded_tensors,
Expand Down
2 changes: 1 addition & 1 deletion resiliency/plugins/in_cluster_local_ckpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ def load(
pyt_state_dict = mcore_to_pyt_state_dict(sharded_state_dict, True)

# Define planner. If replication coordinator is present, use FlexibleLoadPlanner.
# This will prevent errors from occuring if local checkpoint is missing.
# This will prevent errors from occurring if local checkpoint is missing.
if self.replication_coordinator is not None:
planner = ReplicationLoadPlanner(
shapes_validation_sharded_tensors=flexible_shape_sharded_tensors,
Expand Down
6 changes: 3 additions & 3 deletions resiliency/plugins/replication_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ def read_data(
target_tensor.copy_(tensor)
planner.commit_tensor(req, target_tensor)
elif self.replication_coordinator is not None:
# For every request, this rank will need to receive data before being able to laod it.
# For every request, this rank will need to receive data before being able to load it.
# This is a no-op if rank does not have any data to receive.
for req in reqs:
if req.type == LoadItemType.BYTE_IO:
Expand Down Expand Up @@ -629,7 +629,7 @@ def get_replication_coordinator(
3) (1) and (2) is collected for all ranks.
4) Determine mapping of ranks <-> local checkpoint rank
5) Determine all other ranks have replicated state of this rank.
6) With this information, we iterate throguh all ranks again. For each rank,
6) With this information, we iterate through all ranks again. For each rank,
calculate:
a) if rank already has access to its own checkpoint: this rank does not
need to receive.
Expand All @@ -639,7 +639,7 @@ def get_replication_coordinator(
i) if this rank's checkpoint exists somewhere (using (4)), determine
that rank with checkpoint as source
ii) if this rank's checkpoint does not exist somewhere (using (4)),
determine smallest rank with repliated state as source
determine smallest rank with replicated state as source
smallest peer rank with replicated state will be source of
broadcast to all ranks that need replicated state
NOTE: if no replicated state has source, then we cannot rely on
Expand Down
4 changes: 2 additions & 2 deletions third_party/megatron/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ def async_loop(
):
"""Main function for the persistent checkpoint worker

The persisent worker is created once and terminated at exit or
when application calls `close()` explictily
The persistent worker is created once and terminated at exit or
when application calls `close()` explicitly

This routine receives `AsyncRequest` and does `preload_fn` first and
put the integer value in `preload_q` to inform the trainer to proceed.
Expand Down
4 changes: 2 additions & 2 deletions third_party/nvidia-resiliency-ext/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ def _start_workers(self, worker_group: WorkerGroup) -> Dict[int, Any]:
return self._pcontext.pids()

def _patch_pcontext_close(self, pcontext: PContext) -> None:
# replace PContext._close with our version that has cutomized timeout
# replace PContext._close with our version that has customized timeout
# this ensures that the workers have enough time between SIGTERM and SIGKILL
orig_close = pcontext._close

Expand Down Expand Up @@ -1978,7 +1978,7 @@ def get_use_env(args) -> bool:


def _get_logs_specs_class(logs_specs_name: Optional[str]) -> Type[LogsSpecs]:
"""Attemps to load `torchrun.logs_spec` entrypoint with key of `logs_specs_name` param.
"""Attempts to load `torchrun.logs_spec` entrypoint with key of `logs_specs_name` param.

Provides plugin mechanism to provide custom implementation of LogsSpecs.

Expand Down
4 changes: 2 additions & 2 deletions third_party/nvidia-resiliency-ext/v0.4.1/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ def _invoke_run_with_any_failed_policy(self, role: str = DEFAULT_ROLE) -> RunRes
else:
self._stop_workers(self._worker_group)
self._worker_group.state = WorkerState.FAILED
# to preserve torchrun's behaviour, should not return WorkerState.UNHEALTHY.
# to preserve torchrun's behavior, should not return WorkerState.UNHEALTHY.
# we use WorkerState.UNHEALTHY to denote a worker group that is still
# running but has some failed workers. torchrun does not use WorkerState.UNHEALTHY
run_result = self._monitor_workers(self._worker_group)
Expand Down Expand Up @@ -2017,7 +2017,7 @@ def get_use_env(args) -> bool:

def _get_logs_specs_class(logs_specs_name: Optional[str]) -> Type[LogsSpecs]:
"""
Attemps to load `torchrun.logs_spec` entrypoint with key of `logs_specs_name` param.
Attempts to load `torchrun.logs_spec` entrypoint with key of `logs_specs_name` param.
Provides plugin mechanism to provide custom implementation of LogsSpecs.

Returns `DefaultLogsSpecs` when logs_spec_name is None.
Expand Down
2 changes: 1 addition & 1 deletion third_party/torch/torchrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1476,7 +1476,7 @@ def get_use_env(args) -> bool:

def _get_logs_specs_class(logs_specs_name: Optional[str]) -> type[LogsSpecs]:
"""
Attemps to load `torchrun.logs_spec` entrypoint with key of `logs_specs_name` param.
Attempts to load `torchrun.logs_spec` entrypoint with key of `logs_specs_name` param.
Provides plugin mechanism to provide custom implementation of LogsSpecs.

Returns `DefaultLogsSpecs` when logs_spec_name is None.
Expand Down