Skip to content

[WIP] Add async 1D HaloExchange API#947

Closed
rjzamora wants to merge 5 commits intorapidsai:mainfrom
rjzamora:halo-exchange
Closed

[WIP] Add async 1D HaloExchange API#947
rjzamora wants to merge 5 commits intorapidsai:mainfrom
rjzamora:halo-exchange

Conversation

@rjzamora
Copy link
Copy Markdown
Member

@rjzamora rjzamora commented Apr 7, 2026

Adds rapidsmpf::streaming::HaloExchange, a new streaming collective that
exchanges 1-dimensional boundary data between adjacent ranks in a linear rank topology. Each
call to exchange(send_left, send_right) performs one bidirectional hop and
returns (from_left, from_right).

The intended use is rolling window aggregations: after a global sort, ranks
loop on exchange() until every rank has accumulated enough lookback context,
propagating halos one hop further each round.

Condensed overview of changes (according to Claude)...

Design

Wire protocol: Each direction uses two tags per op_id (metadata then GPU
data, stages 0–3). Every rank always sends exactly one metadata message per
direction per round; a null sentinel (8-byte message, gpu_data_size == 0)
signals "nothing to send", keeping the polling logic simple. Successive rounds
reuse the same tags; ordering is guaranteed by the communicator's no-overtaking
property.

Progress thread: event_loop() is registered with the communicator's
progress thread. It polls for metadata, posts async GPU receives, and fires
event_.set(executor) once both directions complete. The destructor asserts
event_.is_set() (matching AllGather / AllReduce / ShufflerAsync).

Coroutine interface: exchange() resets per-round state, issues all
sends, and co_awaits a coro::event. On nranks == 1 the event is set
immediately (fast path, no progress thread involved). The Python binding uses
the same asyncio.Future + cpp_set_py_future + spawn_detached pattern as
allgather.pyx.

Files

File Change
cpp/include/rapidsmpf/streaming/coll/halo_exchange.hpp New class declaration
cpp/src/streaming/coll/halo_exchange.cpp New implementation
cpp/CMakeLists.txt Wire new source into RAPIDSMPF_HAVE_STREAMING
cpp/tests/streaming/test_halo_exchange.cpp New C++ GTest suite
cpp/tests/CMakeLists.txt Wire test source
python/.../streaming/coll/halo_exchange.pxd New Cython extern + cdef class
python/.../streaming/coll/halo_exchange.pyx New async Python binding
python/.../streaming/coll/CMakeLists.txt Add halo_exchange.pyx
python/.../streaming/coll/__init__.py Export HaloExchange
python/.../streaming/coll/halo_exchange.py Deleted (AllGather prototype)
python/.../tests/streaming/test_halo_exchange.py New single-rank Python tests

Testing

Python (pytest tests/streaming/test_halo_exchange.py): single-rank smoke
tests using single_process_comm — no-data round, round with data,
multi-round loop.

C++ (mpirun -np N gtests/ucxx_tests --gtest_filter="StreamingHaloExchange*"
for N ∈ {1, 2, 3, 4, 5, 8}): two_rank_exchange (basic GPU buffer round-trip)
and boundary_ranks (verifies nullopt for absent directions and correct
neighbor values for interior ranks). Multi-rank coverage is in C++ rather than
Python because the streaming Python conftest always provides single_process_comm,
matching the pattern for AllGather and AllReduce.

@rjzamora rjzamora self-assigned this Apr 7, 2026
@rjzamora rjzamora added feature request New feature or request non-breaking Introduces a non-breaking change labels Apr 7, 2026
@rjzamora rjzamora requested review from a team as code owners April 7, 2026 15:50
@@ -0,0 +1,150 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

surely not 2025

@@ -0,0 +1,292 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.

Comment on lines +165 to +166
std::lock_guard lock(mutex_);
co_return {std::move(from_left_), std::move(from_right_)};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can change thread, so this usage of the mutex is unsafe.

// ---------------------------------------------------------------------------

coro::task<std::pair<std::optional<PackedData>, std::optional<PackedData>>>
HaloExchange::exchange(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refactor so that this collective is like all the other ones. There's a "non-streaming" version that actually does all the work and has an optional callback. The streaming version just devolves to the non-streaming one and inserts a callback that waits for an event.

Comment on lines +208 to +214
// Allocate device buffer and post async GPU receive
auto buf = ctx_->br()->allocate(
ctx_->br()->stream_pool().get_stream(),
ctx_->br()->reserve_or_fail(left_data_size_, MEMORY_TYPES)
);
left_data_recv_future_ =
comm_->recv(rank - 1, Tag{op_id_, 1}, std::move(buf));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue. The recv can only be posted when the buffer is ready.

Comment on lines +39 to +48
* Multiple sequential calls on the same instance represent successive rounds of
* halo propagation (for multi-hop window coverage when the rolling window period
* exceeds a single rank's data range).
*
* ### Rolling window use case
*
* After a global sort, rank k owns a contiguous index interval. The rolling actor
* buffers all local chunks and then calls `exchange()` in a loop until every rank
* has accumulated enough lookback context:
*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has nothing to do with halo exchanges and is not relevant to the documentation of halo exchanges.

* if all_done: break
* send_right = from_left // propagate one hop further right
* send_left = from_right // propagate one hop further left
* ```
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If providing code examples in the C++ docs, please use C++

Comment on lines +60 to +68
* ### Tag encoding (uses 4 of the 8 available stage bits per op_id)
*
* | Stage | Direction | Content |
* |-------|------------|----------|
* | 0 | rightward | metadata |
* | 1 | rightward | GPU data |
* | 2 | leftward | metadata |
* | 3 | leftward | GPU data |
*
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this encoded in the docstirng at all. Also, we don't need four stages. The rightward and leftward sends are never to/from the same rank so we can use a single stage for metadata and a single stage for data.

OpID op_id_;

mutable std::mutex mutex_;
coro::event event_{true}; ///< initially set (no active round)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event is always true, so suspending on it will never suspend. So this can't be right.

* @param comm Communicator.
* @param op_id Pre-allocated operation ID. Uses stages 0–3.
*/
class HaloExchange {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if all of this would be simpler if we said that (like allreduce) a halo exchange is a one-and-done operation. So if you need multiple rounds you need to build a new object each time.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rapidsai/cudf#22046 is only a rough sketch at this point, but thinking through that high-level design is what gave me the impression that we wanted to allow multiple rounds. The multi-hop exchange logic in that PR is still a mess, but it seems like we really don't know how many rounds we need ahead of time (when the collective IDs are assigned).

@rjzamora rjzamora changed the title Add async 1D HaloExchange API [WIP] Add async 1D HaloExchange API Apr 8, 2026
@rjzamora
Copy link
Copy Markdown
Member Author

rjzamora commented Apr 8, 2026

Thanks for the review @wence-, but sorry you jumped into this so early. Most of this PR was generated by Claude, with only two high-level rounds of review from me. My only goal was to have something available to sketch out rapidsai/cudf#22046 (which is also just an initial sketch, and not really ready for review). I should have prbably used "draft" PRs, but I get annoyed that CI doesn't run automatically (I know - I should just deal with it). With that said, I think your comments are very valuable.

@rjzamora
Copy link
Copy Markdown
Member Author

Closing in favor of #959 (will re-open if necessary)

@rjzamora rjzamora closed this Apr 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request non-breaking Introduces a non-breaking change

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants