diff --git a/distributed/rpc/ddp_rpc/README.md b/distributed/rpc/ddp_rpc/README.md index 70f7ff231d..e4707a2544 100644 --- a/distributed/rpc/ddp_rpc/README.md +++ b/distributed/rpc/ddp_rpc/README.md @@ -1,18 +1,10 @@ Distributed DataParallel + Distributed RPC Framework Example -The example shows how to combine Distributed DataParallel with the Distributed -RPC Framework. There are two trainer nodes, 1 master node and 1 parameter -server in the example. +This example demonstrates how to combine Distributed DataParallel (DDP) with the Distributed RPC Framework. It requires two trainer nodes (each with a GPU), one master node, and one parameter server. -The master node creates an embedding table on the parameter server and drives -the training loop on the trainers. The model consists of a dense part -(nn.Linear) replicated on the trainers via Distributed DataParallel and a -sparse part (nn.EmbeddingBag) which resides on the parameter server. Each -trainer performs an embedding lookup on the parameter server (using the -Distributed RPC Framework) and then executes its local nn.Linear module. -During the backward pass, the gradients for the dense part are aggregated via -allreduce by DDP and the distributed backward pass updates the parameters for -the embedding table on the parameter server. +The master node initializes an embedding table on the parameter server and orchestrates the training loop across the trainers. The model is composed of a dense component (`nn.Linear`), which is replicated on the trainers using DDP, and a sparse component (`nn.EmbeddingBag`), which resides on the parameter server. + +Each trainer performs embedding lookups on the parameter server via RPC, then processes the results through its local `nn.Linear` module. During the backward pass, DDP aggregates gradients for the dense part using allreduce, while the distributed backward pass updates the embedding table parameters on the parameter server. ``` diff --git a/distributed/rpc/ddp_rpc/main.py b/distributed/rpc/ddp_rpc/main.py index 8ce476023c..0b4725756e 100644 --- a/distributed/rpc/ddp_rpc/main.py +++ b/distributed/rpc/ddp_rpc/main.py @@ -15,6 +15,11 @@ NUM_EMBEDDINGS = 100 EMBEDDING_DIM = 16 +def verify_min_gpu_count(min_gpus: int = 2) -> bool: + """ verification that we have at least 2 gpus to run dist examples """ + has_gpu = torch.accelerator.is_available() + gpu_count = torch.accelerator.device_count() + return has_gpu and gpu_count >= min_gpus class HybridModel(torch.nn.Module): r""" @@ -24,15 +29,15 @@ class HybridModel(torch.nn.Module): This remote model can get a Remote Reference to the embedding table on the parameter server. """ - def __init__(self, remote_emb_module, device): + def __init__(self, remote_emb_module, rank): super(HybridModel, self).__init__() self.remote_emb_module = remote_emb_module - self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device]) - self.device = device + self.fc = DDP(torch.nn.Linear(16, 8).to(rank)) + self.rank = rank def forward(self, indices, offsets): emb_lookup = self.remote_emb_module.forward(indices, offsets) - return self.fc(emb_lookup.cuda(self.device)) + return self.fc(emb_lookup.to(self.rank)) def _run_trainer(remote_emb_module, rank): @@ -83,7 +88,7 @@ def get_next_batch(rank): batch_size += 1 offsets_tensor = torch.LongTensor(offsets) - target = torch.LongTensor(batch_size).random_(8).cuda(rank) + target = torch.LongTensor(batch_size).random_(8).to(rank) yield indices, offsets_tensor, target # Train for 100 epochs @@ -145,9 +150,13 @@ def run_worker(rank, world_size): for fut in futs: fut.wait() elif rank <= 1: + acc = torch.accelerator.current_accelerator() + device = torch.device(acc) + backend = torch.distributed.get_default_backend_for_device(device) + torch.accelerator.device_index(rank) # Initialize process group for Distributed DataParallel on trainers. dist.init_process_group( - backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500" + backend=backend, rank=rank, world_size=2, init_method="tcp://localhost:29500" ) # Initialize RPC. @@ -172,9 +181,18 @@ def run_worker(rank, world_size): # block until all rpcs finish rpc.shutdown() + + # Clean up process group for trainers to avoid resource leaks + if rank <= 1: + dist.destroy_process_group() if __name__ == "__main__": # 2 trainers, 1 parameter server, 1 master. world_size = 4 + _min_gpu_count = 2 + if not verify_min_gpu_count(min_gpus=_min_gpu_count): + print(f"Unable to locate sufficient {_min_gpu_count} gpus to run this example. Exiting.") + exit() mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True) + print("Distributed RPC example completed successfully.") diff --git a/distributed/rpc/ddp_rpc/requirements.txt b/distributed/rpc/ddp_rpc/requirements.txt index 1b8580395e..c22ee832df 100644 --- a/distributed/rpc/ddp_rpc/requirements.txt +++ b/distributed/rpc/ddp_rpc/requirements.txt @@ -1 +1,2 @@ -torch>=1.6.0 +torch>=2.7.0 +numpy diff --git a/distributed/rpc/parameter_server/requirements.txt b/distributed/rpc/parameter_server/requirements.txt new file mode 100644 index 0000000000..351ad68146 --- /dev/null +++ b/distributed/rpc/parameter_server/requirements.txt @@ -0,0 +1,2 @@ +torch>=2.7.1 +numpy diff --git a/distributed/rpc/parameter_server/rpc_parameter_server.py b/distributed/rpc/parameter_server/rpc_parameter_server.py index e01c0cd7c5..79846e56aa 100644 --- a/distributed/rpc/parameter_server/rpc_parameter_server.py +++ b/distributed/rpc/parameter_server/rpc_parameter_server.py @@ -20,15 +20,19 @@ def __init__(self, num_gpus=0): super(Net, self).__init__() print(f"Using {num_gpus} GPUs to train") self.num_gpus = num_gpus - device = torch.device( - "cuda:0" if torch.cuda.is_available() and self.num_gpus > 0 else "cpu") + if torch.accelerator.is_available() and self.num_gpus > 0: + acc = torch.accelerator.current_accelerator() + device = torch.device(f'{acc}:0') + else: + device = torch.device("cpu") print(f"Putting first 2 convs on {str(device)}") - # Put conv layers on the first cuda device + # Put conv layers on the first accelerator device self.conv1 = nn.Conv2d(1, 32, 3, 1).to(device) self.conv2 = nn.Conv2d(32, 64, 3, 1).to(device) - # Put rest of the network on the 2nd cuda device, if there is one - if "cuda" in str(device) and num_gpus > 1: - device = torch.device("cuda:1") + # Put rest of the network on the 2nd accelerator device, if there is one + if torch.accelerator.is_available() and self.num_gpus > 0: + acc = torch.accelerator.current_accelerator() + device = torch.device(f'{acc}:1') print(f"Putting rest of layers on {str(device)}") self.dropout1 = nn.Dropout2d(0.25).to(device) @@ -72,21 +76,22 @@ def call_method(method, rref, *args, **kwargs): # .bar(arg1, arg2) on the remote node and getting the result # back. - def remote_method(method, rref, *args, **kwargs): args = [method, rref] + list(args) return rpc.rpc_sync(rref.owner(), call_method, args=args, kwargs=kwargs) - # --------- Parameter Server -------------------- class ParameterServer(nn.Module): def __init__(self, num_gpus=0): super().__init__() model = Net(num_gpus=num_gpus) self.model = model - self.input_device = torch.device( - "cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu") - + if torch.accelerator.is_available() and num_gpus > 0: + acc = torch.accelerator.current_accelerator() + self.input_device = torch.device(f'{acc}:0') + else: + self.input_device = torch.device("cpu") + def forward(self, inp): inp = inp.to(self.input_device) out = self.model(inp) @@ -113,11 +118,9 @@ def get_param_rrefs(self): param_rrefs = [rpc.RRef(param) for param in self.model.parameters()] return param_rrefs - param_server = None global_lock = Lock() - def get_parameter_server(num_gpus=0): global param_server # Ensure that we get only one handle to the ParameterServer. @@ -197,8 +200,11 @@ def get_accuracy(test_loader, model): model.eval() correct_sum = 0 # Use GPU to evaluate if possible - device = torch.device("cuda:0" if model.num_gpus > 0 - and torch.cuda.is_available() else "cpu") + if torch.accelerator.is_available() and model.num_gpus > 0: + acc = torch.accelerator.current_accelerator() + device = torch.device(f'{acc}:0') + else: + device = torch.device("cpu") with torch.no_grad(): for i, (data, target) in enumerate(test_loader): out = model(data) diff --git a/distributed/rpc/rnn/requirements.txt b/distributed/rpc/rnn/requirements.txt index 12c6d5d5ea..351ad68146 100644 --- a/distributed/rpc/rnn/requirements.txt +++ b/distributed/rpc/rnn/requirements.txt @@ -1 +1,2 @@ -torch +torch>=2.7.1 +numpy diff --git a/distributed/rpc/rnn/rnn.py b/distributed/rpc/rnn/rnn.py index 053187d65e..ded3781711 100644 --- a/distributed/rpc/rnn/rnn.py +++ b/distributed/rpc/rnn/rnn.py @@ -43,13 +43,15 @@ def __init__(self, ntoken, ninp, dropout): super(EmbeddingTable, self).__init__() self.drop = nn.Dropout(dropout) self.encoder = nn.Embedding(ntoken, ninp) - if torch.cuda.is_available(): - self.encoder = self.encoder.cuda() + if torch.accelerator.is_available(): + device = torch.accelerator.current_accelerator() + self.encoder = self.encoder.to(device) nn.init.uniform_(self.encoder.weight, -0.1, 0.1) def forward(self, input): - if torch.cuda.is_available(): - input = input.cuda() + if torch.accelerator.is_available(): + device = torch.accelerator.current_accelerator() + input = input.to(device) return self.drop(self.encoder(input)).cpu() diff --git a/run_distributed_examples.sh b/run_distributed_examples.sh index 3307b57168..3ac2e59ebc 100755 --- a/run_distributed_examples.sh +++ b/run_distributed_examples.sh @@ -58,10 +58,20 @@ function distributed_minGPT-ddp() { uv run bash run_example.sh mingpt/main.py || error "minGPT example failed" } +function distributed_rpc_ddp_rpc() { + uv run main.py || error "ddp_rpc example failed" +} + +function distributed_rpc_rnn() { + uv run main.py || error "rpc_rnn example failed" +} + function run_all() { run distributed/tensor_parallelism run distributed/ddp run distributed/minGPT-ddp + run distributed/rpc/ddp_rpc + run distributed/rpc/rnn } # by default, run all examples diff --git a/utils.sh b/utils.sh index 72b6e44cd8..3dd3246f6c 100644 --- a/utils.sh +++ b/utils.sh @@ -48,7 +48,7 @@ function run() { if start $EXAMPLE; then # drop trailing slash (occurs due to auto completion in bash interactive mode) # replace slashes with underscores: this allows to call nested examples - EXAMPLE_FN=$(echo $EXAMPLE | sed "s@/\$@@" | sed 's@/@_@') + EXAMPLE_FN=$(echo $EXAMPLE | sed "s@/\$@@" | sed 's@/@_@g') $EXAMPLE_FN fi stop $EXAMPLE