[WIP] Add async 1D HaloExchange API#947
Conversation
| @@ -0,0 +1,150 @@ | |||
| /** | |||
| * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |||
| @@ -0,0 +1,292 @@ | |||
| /** | |||
| * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |||
There was a problem hiding this comment.
| * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. | |
| * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. |
| std::lock_guard lock(mutex_); | ||
| co_return {std::move(from_left_), std::move(from_right_)}; |
There was a problem hiding this comment.
This can change thread, so this usage of the mutex is unsafe.
| // --------------------------------------------------------------------------- | ||
|
|
||
| coro::task<std::pair<std::optional<PackedData>, std::optional<PackedData>>> | ||
| HaloExchange::exchange( |
There was a problem hiding this comment.
Please refactor so that this collective is like all the other ones. There's a "non-streaming" version that actually does all the work and has an optional callback. The streaming version just devolves to the non-streaming one and inserts a callback that waits for an event.
| // Allocate device buffer and post async GPU receive | ||
| auto buf = ctx_->br()->allocate( | ||
| ctx_->br()->stream_pool().get_stream(), | ||
| ctx_->br()->reserve_or_fail(left_data_size_, MEMORY_TYPES) | ||
| ); | ||
| left_data_recv_future_ = | ||
| comm_->recv(rank - 1, Tag{op_id_, 1}, std::move(buf)); |
There was a problem hiding this comment.
Issue. The recv can only be posted when the buffer is ready.
| * Multiple sequential calls on the same instance represent successive rounds of | ||
| * halo propagation (for multi-hop window coverage when the rolling window period | ||
| * exceeds a single rank's data range). | ||
| * | ||
| * ### Rolling window use case | ||
| * | ||
| * After a global sort, rank k owns a contiguous index interval. The rolling actor | ||
| * buffers all local chunks and then calls `exchange()` in a loop until every rank | ||
| * has accumulated enough lookback context: | ||
| * |
There was a problem hiding this comment.
This has nothing to do with halo exchanges and is not relevant to the documentation of halo exchanges.
| * if all_done: break | ||
| * send_right = from_left // propagate one hop further right | ||
| * send_left = from_right // propagate one hop further left | ||
| * ``` |
There was a problem hiding this comment.
If providing code examples in the C++ docs, please use C++
| * ### Tag encoding (uses 4 of the 8 available stage bits per op_id) | ||
| * | ||
| * | Stage | Direction | Content | | ||
| * |-------|------------|----------| | ||
| * | 0 | rightward | metadata | | ||
| * | 1 | rightward | GPU data | | ||
| * | 2 | leftward | metadata | | ||
| * | 3 | leftward | GPU data | | ||
| * |
There was a problem hiding this comment.
Why is this encoded in the docstirng at all. Also, we don't need four stages. The rightward and leftward sends are never to/from the same rank so we can use a single stage for metadata and a single stage for data.
| OpID op_id_; | ||
|
|
||
| mutable std::mutex mutex_; | ||
| coro::event event_{true}; ///< initially set (no active round) |
There was a problem hiding this comment.
The event is always true, so suspending on it will never suspend. So this can't be right.
| * @param comm Communicator. | ||
| * @param op_id Pre-allocated operation ID. Uses stages 0–3. | ||
| */ | ||
| class HaloExchange { |
There was a problem hiding this comment.
I wonder if all of this would be simpler if we said that (like allreduce) a halo exchange is a one-and-done operation. So if you need multiple rounds you need to build a new object each time.
There was a problem hiding this comment.
rapidsai/cudf#22046 is only a rough sketch at this point, but thinking through that high-level design is what gave me the impression that we wanted to allow multiple rounds. The multi-hop exchange logic in that PR is still a mess, but it seems like we really don't know how many rounds we need ahead of time (when the collective IDs are assigned).
HaloExchange APIHaloExchange API
|
Thanks for the review @wence-, but sorry you jumped into this so early. Most of this PR was generated by Claude, with only two high-level rounds of review from me. My only goal was to have something available to sketch out rapidsai/cudf#22046 (which is also just an initial sketch, and not really ready for review). I should have prbably used "draft" PRs, but I get annoyed that CI doesn't run automatically (I know - I should just deal with it). With that said, I think your comments are very valuable. |
|
Closing in favor of #959 (will re-open if necessary) |
Adds
rapidsmpf::streaming::HaloExchange, a new streaming collective thatexchanges 1-dimensional boundary data between adjacent ranks in a linear rank topology. Each
call to
exchange(send_left, send_right)performs one bidirectional hop andreturns
(from_left, from_right).The intended use is rolling window aggregations: after a global sort, ranks
loop on
exchange()until every rank has accumulated enough lookback context,propagating halos one hop further each round.
Condensed overview of changes (according to Claude)...
Design
Wire protocol: Each direction uses two tags per
op_id(metadata then GPUdata, stages 0–3). Every rank always sends exactly one metadata message per
direction per round; a null sentinel (8-byte message,
gpu_data_size == 0)signals "nothing to send", keeping the polling logic simple. Successive rounds
reuse the same tags; ordering is guaranteed by the communicator's no-overtaking
property.
Progress thread:
event_loop()is registered with the communicator'sprogress thread. It polls for metadata, posts async GPU receives, and fires
event_.set(executor)once both directions complete. The destructor assertsevent_.is_set()(matchingAllGather/AllReduce/ShufflerAsync).Coroutine interface:
exchange()resets per-round state, issues allsends, and
co_awaits acoro::event. On nranks == 1 the event is setimmediately (fast path, no progress thread involved). The Python binding uses
the same
asyncio.Future+cpp_set_py_future+spawn_detachedpattern asallgather.pyx.Files
cpp/include/rapidsmpf/streaming/coll/halo_exchange.hppcpp/src/streaming/coll/halo_exchange.cppcpp/CMakeLists.txtRAPIDSMPF_HAVE_STREAMINGcpp/tests/streaming/test_halo_exchange.cppcpp/tests/CMakeLists.txtpython/.../streaming/coll/halo_exchange.pxdpython/.../streaming/coll/halo_exchange.pyxpython/.../streaming/coll/CMakeLists.txthalo_exchange.pyxpython/.../streaming/coll/__init__.pyHaloExchangepython/.../streaming/coll/halo_exchange.pypython/.../tests/streaming/test_halo_exchange.pyTesting
Python (
pytest tests/streaming/test_halo_exchange.py): single-rank smoketests using
single_process_comm— no-data round, round with data,multi-round loop.
C++ (
mpirun -np N gtests/ucxx_tests --gtest_filter="StreamingHaloExchange*"for N ∈ {1, 2, 3, 4, 5, 8}):
two_rank_exchange(basic GPU buffer round-trip)and
boundary_ranks(verifiesnulloptfor absent directions and correctneighbor values for interior ranks). Multi-rank coverage is in C++ rather than
Python because the streaming Python conftest always provides
single_process_comm,matching the pattern for
AllGatherandAllReduce.