Implement a sparse alltoall exchange pattern#959
Implement a sparse alltoall exchange pattern#959wence- wants to merge 5 commits intorapidsai:mainfrom
Conversation
In this collective every rank advertises the destination ranks it will send to and the source ranks it will receive from (these have to match up collective, no error is provided). The caller can then insert messages to particular ranks, followed by a final insert_finished() call. On the receive side, after waiting for completion, we can extract received messages by rank. The receive side message order is defined by the insertion order on the send side. That is, if rank-A inserts messages in order [A0, A1, A2] to rank-B, then when rank-B calls `extract(rank-A)` it will see the same order (even if the messages were sent in a different order).
|
I can split these into bits for review purposes if that is useful |
| * @note Concurrent insertion by multiple threads is supported, the caller must ensure | ||
| * that `insert_finished()` is called _after_ all `insert()` calls have completed. |
There was a problem hiding this comment.
They seem like two separate notes or am I misreading? It seems that the multi-threaded insertion comment and insert_finished() requirement are independent from each other.
There was a problem hiding this comment.
This relates to the note in insert_finished, they are indeed dependent. I would rephrase this slightly to make dependence clearer:
| * @note Concurrent insertion by multiple threads is supported, the caller must ensure | |
| * that `insert_finished()` is called _after_ all `insert()` calls have completed. | |
| * @note Concurrent insertion by multiple threads is supported, the caller must ensure | |
| * all `insert()` calls (concurrent or not) have completed before calling | |
| * `insert_finished()`. |
| packed_data_vector_to_list) | ||
|
|
||
|
|
||
| cdef class SparseAlltoall: |
There was a problem hiding this comment.
I know this isn't common, but especially with Python free threading do we want to also add notes about multithreading support here?
There was a problem hiding this comment.
We should also include SparseAlltoAll to https://github.com/rapidsai/rapidsmpf/blob/main/python/rapidsmpf/rapidsmpf/coll/__init__.py .
| plc.Table( | ||
| [plc.Column.from_array(np.array([29], dtype=np.int32), stream=stream)] | ||
| ), | ||
| ) |
There was a problem hiding this comment.
This test is ok but it only tests exactly 2 ranks, while any other ranks are all no-op. I think we could use a test that would test as many ranks are available too.
| ); | ||
| } | ||
|
|
||
| void SparseAlltoall::insert_finished() { |
There was a problem hiding this comment.
We should probably add a check to prevent calling insert_finished() more than once.
There was a problem hiding this comment.
Maybe we can check locally_finished_==false
There was a problem hiding this comment.
A few tests I think we're missing include:
insert()afterinsert_finished()- multi-threaded
insert() extract()with invalid source rank
| RAPIDSMPF_EXPECTS_FATAL( | ||
| event_.is_set(), | ||
| "~SparseAlltoall: not all notification tasks complete, did you forget to await " | ||
| "this->wait() or to call this->insert_finished()?" |
There was a problem hiding this comment.
Is this->wait() a mistake? I don't think we have a wait() method.
nirandaperera
left a comment
There was a problem hiding this comment.
Had some comments for the normal impl. I will check the coroutine impl later today.
| std::uint64_t received_count{0}; | ||
| std::vector<std::unique_ptr<detail::Chunk>> chunks; | ||
|
|
||
| [[nodiscard]] bool ready() const noexcept { |
There was a problem hiding this comment.
Nit.
| [[nodiscard]] bool ready() const noexcept { | |
| [[nodiscard]] bool constexpr ready() const noexcept { |
| void send_ready_messages(); | ||
| void receive_metadata_messages(); | ||
| void receive_data_messages(); | ||
| void complete_data_messages(); |
There was a problem hiding this comment.
It would be nice to add some @brief docstrings here for these methods (for future references). Either here, or in the cpp file
| RAPIDSMPF_EXPECTS(br_ != nullptr, "the buffer resource pointer cannot be null"); | ||
| auto const size = comm_->nranks(); | ||
| auto const self = comm_->rank(); | ||
| for (auto src : srcs_) { |
There was a problem hiding this comment.
Nit
| for (auto src : srcs_) { | |
| source_states.reserve(srcs_.size()); | |
| for (auto src : srcs_) { |
| ); | ||
| source_states_.emplace(src, SourceState{}); | ||
| } | ||
| for (auto dst : dsts_) { |
There was a problem hiding this comment.
Nit.
| for (auto dst : dsts_) { | |
| next_ordinal_per_dst_.reserve(dsts_.size()); | |
| for (auto dst : dsts_) { |
| SparseAlltoall::~SparseAlltoall() noexcept { | ||
| RAPIDSMPF_EXPECTS_FATAL( | ||
| locally_finished_.load(std::memory_order_acquire), | ||
| "Destroying SparseAlltoall without `insert_finished()`" | ||
| ); |
There was a problem hiding this comment.
Marked as noexcept but throwing.
| Tag const metadata_tag{op_id_, 0}; | ||
| for (auto src : srcs_) { | ||
| auto& state = source_states_.at(src); | ||
| while (!state.ready()) { |
There was a problem hiding this comment.
Woouldnt this while not ready loop hog the progress thread until all the messages are received from all sources? I feel like this will be unfair for other concurrent collectives, isnt it?
There was a problem hiding this comment.
Did you mean to use an if instead?
| ); | ||
| state.expected_count = chunk->sequence(); | ||
| } else { | ||
| incoming_by_src_.at(src).push_back(std::move(chunk)); |
There was a problem hiding this comment.
Nit
| incoming_by_src_.at(src).push_back(std::move(chunk)); | |
| incoming_by_src_.[src].push_back(std::move(chunk)); |
| src >= 0 && src < size && src != self, "SparseAlltoall invalid source rank." | ||
| ); | ||
| RAPIDSMPF_EXPECTS( | ||
| incoming_by_src_.emplace(src, std::vector<std::unique_ptr<detail::Chunk>>{}) |
There was a problem hiding this comment.
Why cant incoming_by_src_ a class member of the SourceState? I feel like, it is the received metadata queue from a particular source, isnt it?
| ); | ||
| } | ||
| } | ||
| queue.erase(queue.begin(), queue.begin() + processed); |
There was a problem hiding this comment.
Wouldn't a std::deque/queue better here? Unprocessed chunks will always be moved by processed elements in each progress iteration, isnt it?
| } | ||
| processed++; | ||
| if (chunk->data_size() == 0) { | ||
| auto& state = source_states_.at(chunk->origin()); |
There was a problem hiding this comment.
| auto& state = source_states_.at(chunk->origin()); | |
| auto& state = source_states_[chunk->origin()]; |
In this collective every rank advertises the destination ranks it will send
to and the source ranks it will receive from (these have to match up
collective, no error is provided). The caller can then insert messages to
particular ranks, followed by a final insert_finished() call.
On the receive side, after waiting for completion, we can extract received
messages by rank. The receive side message order is defined by the
insertion order on the send side. That is, if rank-A inserts messages in
order [A0, A1, A2] to rank-B, then when rank-B calls
extract(rank-A)itwill see the same order (even if the messages were sent in a different order).