From de7db4c0989822aeca726a81b0204e423044f0f9 Mon Sep 17 00:00:00 2001 From: jafraustro Date: Mon, 14 Jul 2025 09:02:32 -0700 Subject: [PATCH 1/5] Add rpc/ddp_rpc and rpc/rnn examples to CI Signed-off-by: jafraustro --- run_distributed_examples.sh | 10 ++++++++++ utils.sh | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/run_distributed_examples.sh b/run_distributed_examples.sh index 3307b57168..3ac2e59ebc 100755 --- a/run_distributed_examples.sh +++ b/run_distributed_examples.sh @@ -58,10 +58,20 @@ function distributed_minGPT-ddp() { uv run bash run_example.sh mingpt/main.py || error "minGPT example failed" } +function distributed_rpc_ddp_rpc() { + uv run main.py || error "ddp_rpc example failed" +} + +function distributed_rpc_rnn() { + uv run main.py || error "rpc_rnn example failed" +} + function run_all() { run distributed/tensor_parallelism run distributed/ddp run distributed/minGPT-ddp + run distributed/rpc/ddp_rpc + run distributed/rpc/rnn } # by default, run all examples diff --git a/utils.sh b/utils.sh index 72b6e44cd8..3dd3246f6c 100644 --- a/utils.sh +++ b/utils.sh @@ -48,7 +48,7 @@ function run() { if start $EXAMPLE; then # drop trailing slash (occurs due to auto completion in bash interactive mode) # replace slashes with underscores: this allows to call nested examples - EXAMPLE_FN=$(echo $EXAMPLE | sed "s@/\$@@" | sed 's@/@_@') + EXAMPLE_FN=$(echo $EXAMPLE | sed "s@/\$@@" | sed 's@/@_@g') $EXAMPLE_FN fi stop $EXAMPLE From 9354458561f948ca6e3c7c188f6457b232e31174 Mon Sep 17 00:00:00 2001 From: jafraustro Date: Mon, 14 Jul 2025 09:31:30 -0700 Subject: [PATCH 2/5] Add accelerator API to RPC distributed examples: - ddp_rpc - parameter_server - rnn Signed-off-by: jafraustro --- distributed/rpc/ddp_rpc/main.py | 15 +++++--- distributed/rpc/ddp_rpc/requirements.txt | 2 +- .../parameter_server/rpc_parameter_server.py | 36 +++++++++++-------- distributed/rpc/rnn/rnn.py | 10 +++--- 4 files changed, 39 insertions(+), 24 deletions(-) diff --git a/distributed/rpc/ddp_rpc/main.py b/distributed/rpc/ddp_rpc/main.py index 8ce476023c..27dc35d57d 100644 --- a/distributed/rpc/ddp_rpc/main.py +++ b/distributed/rpc/ddp_rpc/main.py @@ -27,12 +27,13 @@ class HybridModel(torch.nn.Module): def __init__(self, remote_emb_module, device): super(HybridModel, self).__init__() self.remote_emb_module = remote_emb_module - self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device]) + self.fc = DDP(torch.nn.Linear(16, 8).to(device), device_ids=[device]) self.device = device def forward(self, indices, offsets): + device = torch.accelerator.current_accelerator() emb_lookup = self.remote_emb_module.forward(indices, offsets) - return self.fc(emb_lookup.cuda(self.device)) + return self.fc(emb_lookup.to(self.device)) def _run_trainer(remote_emb_module, rank): @@ -83,7 +84,7 @@ def get_next_batch(rank): batch_size += 1 offsets_tensor = torch.LongTensor(offsets) - target = torch.LongTensor(batch_size).random_(8).cuda(rank) + target = torch.LongTensor(batch_size).random_(8).to(rank) yield indices, offsets_tensor, target # Train for 100 epochs @@ -145,9 +146,15 @@ def run_worker(rank, world_size): for fut in futs: fut.wait() elif rank <= 1: + if torch.accelerator.is_available(): + acc = torch.accelerator.current_accelerator() + device = torch.device(acc) + else: + device = torch.device("cpu") + backend = torch.distributed.get_default_backend_for_device(device) # Initialize process group for Distributed DataParallel on trainers. dist.init_process_group( - backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500" + backend=backend, rank=rank, world_size=2, init_method="tcp://localhost:29500" ) # Initialize RPC. diff --git a/distributed/rpc/ddp_rpc/requirements.txt b/distributed/rpc/ddp_rpc/requirements.txt index 1b8580395e..2cc080103f 100644 --- a/distributed/rpc/ddp_rpc/requirements.txt +++ b/distributed/rpc/ddp_rpc/requirements.txt @@ -1 +1 @@ -torch>=1.6.0 +torch>=2.7.1 diff --git a/distributed/rpc/parameter_server/rpc_parameter_server.py b/distributed/rpc/parameter_server/rpc_parameter_server.py index e01c0cd7c5..79846e56aa 100644 --- a/distributed/rpc/parameter_server/rpc_parameter_server.py +++ b/distributed/rpc/parameter_server/rpc_parameter_server.py @@ -20,15 +20,19 @@ def __init__(self, num_gpus=0): super(Net, self).__init__() print(f"Using {num_gpus} GPUs to train") self.num_gpus = num_gpus - device = torch.device( - "cuda:0" if torch.cuda.is_available() and self.num_gpus > 0 else "cpu") + if torch.accelerator.is_available() and self.num_gpus > 0: + acc = torch.accelerator.current_accelerator() + device = torch.device(f'{acc}:0') + else: + device = torch.device("cpu") print(f"Putting first 2 convs on {str(device)}") - # Put conv layers on the first cuda device + # Put conv layers on the first accelerator device self.conv1 = nn.Conv2d(1, 32, 3, 1).to(device) self.conv2 = nn.Conv2d(32, 64, 3, 1).to(device) - # Put rest of the network on the 2nd cuda device, if there is one - if "cuda" in str(device) and num_gpus > 1: - device = torch.device("cuda:1") + # Put rest of the network on the 2nd accelerator device, if there is one + if torch.accelerator.is_available() and self.num_gpus > 0: + acc = torch.accelerator.current_accelerator() + device = torch.device(f'{acc}:1') print(f"Putting rest of layers on {str(device)}") self.dropout1 = nn.Dropout2d(0.25).to(device) @@ -72,21 +76,22 @@ def call_method(method, rref, *args, **kwargs): # .bar(arg1, arg2) on the remote node and getting the result # back. - def remote_method(method, rref, *args, **kwargs): args = [method, rref] + list(args) return rpc.rpc_sync(rref.owner(), call_method, args=args, kwargs=kwargs) - # --------- Parameter Server -------------------- class ParameterServer(nn.Module): def __init__(self, num_gpus=0): super().__init__() model = Net(num_gpus=num_gpus) self.model = model - self.input_device = torch.device( - "cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu") - + if torch.accelerator.is_available() and num_gpus > 0: + acc = torch.accelerator.current_accelerator() + self.input_device = torch.device(f'{acc}:0') + else: + self.input_device = torch.device("cpu") + def forward(self, inp): inp = inp.to(self.input_device) out = self.model(inp) @@ -113,11 +118,9 @@ def get_param_rrefs(self): param_rrefs = [rpc.RRef(param) for param in self.model.parameters()] return param_rrefs - param_server = None global_lock = Lock() - def get_parameter_server(num_gpus=0): global param_server # Ensure that we get only one handle to the ParameterServer. @@ -197,8 +200,11 @@ def get_accuracy(test_loader, model): model.eval() correct_sum = 0 # Use GPU to evaluate if possible - device = torch.device("cuda:0" if model.num_gpus > 0 - and torch.cuda.is_available() else "cpu") + if torch.accelerator.is_available() and model.num_gpus > 0: + acc = torch.accelerator.current_accelerator() + device = torch.device(f'{acc}:0') + else: + device = torch.device("cpu") with torch.no_grad(): for i, (data, target) in enumerate(test_loader): out = model(data) diff --git a/distributed/rpc/rnn/rnn.py b/distributed/rpc/rnn/rnn.py index 053187d65e..ded3781711 100644 --- a/distributed/rpc/rnn/rnn.py +++ b/distributed/rpc/rnn/rnn.py @@ -43,13 +43,15 @@ def __init__(self, ntoken, ninp, dropout): super(EmbeddingTable, self).__init__() self.drop = nn.Dropout(dropout) self.encoder = nn.Embedding(ntoken, ninp) - if torch.cuda.is_available(): - self.encoder = self.encoder.cuda() + if torch.accelerator.is_available(): + device = torch.accelerator.current_accelerator() + self.encoder = self.encoder.to(device) nn.init.uniform_(self.encoder.weight, -0.1, 0.1) def forward(self, input): - if torch.cuda.is_available(): - input = input.cuda() + if torch.accelerator.is_available(): + device = torch.accelerator.current_accelerator() + input = input.to(device) return self.drop(self.encoder(input)).cpu() From a79054914de055e9d584900119944501a8111ca6 Mon Sep 17 00:00:00 2001 From: jafraustro Date: Tue, 15 Jul 2025 08:53:32 -0700 Subject: [PATCH 3/5] Update requirements for RPC examples to include numpy Signed-off-by: jafraustro --- distributed/rpc/ddp_rpc/requirements.txt | 1 + distributed/rpc/parameter_server/requirements.txt | 2 ++ distributed/rpc/rnn/requirements.txt | 3 ++- 3 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 distributed/rpc/parameter_server/requirements.txt diff --git a/distributed/rpc/ddp_rpc/requirements.txt b/distributed/rpc/ddp_rpc/requirements.txt index 2cc080103f..351ad68146 100644 --- a/distributed/rpc/ddp_rpc/requirements.txt +++ b/distributed/rpc/ddp_rpc/requirements.txt @@ -1 +1,2 @@ torch>=2.7.1 +numpy diff --git a/distributed/rpc/parameter_server/requirements.txt b/distributed/rpc/parameter_server/requirements.txt new file mode 100644 index 0000000000..351ad68146 --- /dev/null +++ b/distributed/rpc/parameter_server/requirements.txt @@ -0,0 +1,2 @@ +torch>=2.7.1 +numpy diff --git a/distributed/rpc/rnn/requirements.txt b/distributed/rpc/rnn/requirements.txt index 12c6d5d5ea..351ad68146 100644 --- a/distributed/rpc/rnn/requirements.txt +++ b/distributed/rpc/rnn/requirements.txt @@ -1 +1,2 @@ -torch +torch>=2.7.1 +numpy From a84f91c3030976428b21178fdb1fce0c0ea4cf91 Mon Sep 17 00:00:00 2001 From: jafraustro Date: Thu, 17 Jul 2025 16:08:36 -0700 Subject: [PATCH 4/5] Enhance GPU verification and cleanup in DDP RPC example - Added a function to verify minimum GPU count before execution. - Updated HybridModel initialization to use rank instead of device. - Ensured proper cleanup of the process group to avoid resource leaks. - Added exit message if insufficient GPUs are detected. Signed-off-by: jafraustro --- distributed/rpc/ddp_rpc/main.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/distributed/rpc/ddp_rpc/main.py b/distributed/rpc/ddp_rpc/main.py index 27dc35d57d..346d55c91b 100644 --- a/distributed/rpc/ddp_rpc/main.py +++ b/distributed/rpc/ddp_rpc/main.py @@ -15,6 +15,11 @@ NUM_EMBEDDINGS = 100 EMBEDDING_DIM = 16 +def verify_min_gpu_count(min_gpus: int = 2) -> bool: + """ verification that we have at least 2 gpus to run dist examples """ + has_gpu = torch.accelerator.is_available() + gpu_count = torch.accelerator.device_count() + return has_gpu and gpu_count >= min_gpus class HybridModel(torch.nn.Module): r""" @@ -24,16 +29,15 @@ class HybridModel(torch.nn.Module): This remote model can get a Remote Reference to the embedding table on the parameter server. """ - def __init__(self, remote_emb_module, device): + def __init__(self, remote_emb_module, rank): super(HybridModel, self).__init__() self.remote_emb_module = remote_emb_module - self.fc = DDP(torch.nn.Linear(16, 8).to(device), device_ids=[device]) - self.device = device + self.fc = DDP(torch.nn.Linear(16, 8).to(rank)) + self.rank = rank def forward(self, indices, offsets): - device = torch.accelerator.current_accelerator() emb_lookup = self.remote_emb_module.forward(indices, offsets) - return self.fc(emb_lookup.to(self.device)) + return self.fc(emb_lookup.to(self.rank)) def _run_trainer(remote_emb_module, rank): @@ -152,6 +156,7 @@ def run_worker(rank, world_size): else: device = torch.device("cpu") backend = torch.distributed.get_default_backend_for_device(device) + torch.accelerator.device_index(rank) # Initialize process group for Distributed DataParallel on trainers. dist.init_process_group( backend=backend, rank=rank, world_size=2, init_method="tcp://localhost:29500" @@ -179,9 +184,18 @@ def run_worker(rank, world_size): # block until all rpcs finish rpc.shutdown() + + # Clean up process group for trainers to avoid resource leaks + if rank <= 1: + dist.destroy_process_group() if __name__ == "__main__": # 2 trainers, 1 parameter server, 1 master. world_size = 4 + _min_gpu_count = 2 + if not verify_min_gpu_count(min_gpus=_min_gpu_count): + print(f"Unable to locate sufficient {_min_gpu_count} gpus to run this example. Exiting.") + exit() mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True) + print("Distributed RPC example completed successfully.") From e044bc7b52aeae9849172e8501a28782e9b1bb37 Mon Sep 17 00:00:00 2001 From: jafraustro Date: Wed, 23 Jul 2025 14:26:04 -0700 Subject: [PATCH 5/5] - Update torch version in requirements.txt - Remove CPU execution option since DDP requires 2 GPUs for this example. - Refine README.md for DDP RPC example clarity and detail Signed-off-by: jafraustro --- distributed/rpc/ddp_rpc/README.md | 16 ++++------------ distributed/rpc/ddp_rpc/main.py | 7 ++----- distributed/rpc/ddp_rpc/requirements.txt | 2 +- 3 files changed, 7 insertions(+), 18 deletions(-) diff --git a/distributed/rpc/ddp_rpc/README.md b/distributed/rpc/ddp_rpc/README.md index 70f7ff231d..e4707a2544 100644 --- a/distributed/rpc/ddp_rpc/README.md +++ b/distributed/rpc/ddp_rpc/README.md @@ -1,18 +1,10 @@ Distributed DataParallel + Distributed RPC Framework Example -The example shows how to combine Distributed DataParallel with the Distributed -RPC Framework. There are two trainer nodes, 1 master node and 1 parameter -server in the example. +This example demonstrates how to combine Distributed DataParallel (DDP) with the Distributed RPC Framework. It requires two trainer nodes (each with a GPU), one master node, and one parameter server. -The master node creates an embedding table on the parameter server and drives -the training loop on the trainers. The model consists of a dense part -(nn.Linear) replicated on the trainers via Distributed DataParallel and a -sparse part (nn.EmbeddingBag) which resides on the parameter server. Each -trainer performs an embedding lookup on the parameter server (using the -Distributed RPC Framework) and then executes its local nn.Linear module. -During the backward pass, the gradients for the dense part are aggregated via -allreduce by DDP and the distributed backward pass updates the parameters for -the embedding table on the parameter server. +The master node initializes an embedding table on the parameter server and orchestrates the training loop across the trainers. The model is composed of a dense component (`nn.Linear`), which is replicated on the trainers using DDP, and a sparse component (`nn.EmbeddingBag`), which resides on the parameter server. + +Each trainer performs embedding lookups on the parameter server via RPC, then processes the results through its local `nn.Linear` module. During the backward pass, DDP aggregates gradients for the dense part using allreduce, while the distributed backward pass updates the embedding table parameters on the parameter server. ``` diff --git a/distributed/rpc/ddp_rpc/main.py b/distributed/rpc/ddp_rpc/main.py index 346d55c91b..0b4725756e 100644 --- a/distributed/rpc/ddp_rpc/main.py +++ b/distributed/rpc/ddp_rpc/main.py @@ -150,11 +150,8 @@ def run_worker(rank, world_size): for fut in futs: fut.wait() elif rank <= 1: - if torch.accelerator.is_available(): - acc = torch.accelerator.current_accelerator() - device = torch.device(acc) - else: - device = torch.device("cpu") + acc = torch.accelerator.current_accelerator() + device = torch.device(acc) backend = torch.distributed.get_default_backend_for_device(device) torch.accelerator.device_index(rank) # Initialize process group for Distributed DataParallel on trainers. diff --git a/distributed/rpc/ddp_rpc/requirements.txt b/distributed/rpc/ddp_rpc/requirements.txt index 351ad68146..c22ee832df 100644 --- a/distributed/rpc/ddp_rpc/requirements.txt +++ b/distributed/rpc/ddp_rpc/requirements.txt @@ -1,2 +1,2 @@ -torch>=2.7.1 +torch>=2.7.0 numpy