Enable cudf-vector exchange on top of UCXX.#3
Enable cudf-vector exchange on top of UCXX.#3dan13bauer wants to merge 17 commits intoexchange_hooksfrom
Conversation
78fa49f to
adaca30
Compare
3df9f33 to
9e0ac62
Compare
| bytes_ = dataPtr_->gpu_data->size(); | ||
| VLOG(3) << "Sending rmm::buffer: " << std::hex << dataPtr_->gpu_data.get() | ||
| << " pointing to device memory: " << std::hex | ||
| << dataPtr_->gpu_data->data() << std::dec << " to task " | ||
| << partitionKey_.toString() << ":" << this->sequenceNumber_ | ||
| << std::dec << " of size " << bytes_; | ||
|
|
||
| setState(ServerState::WaitingForSendComplete); | ||
| uint64_t dataTag = | ||
| getDataTag(this->partitionKeyHash_, this->sequenceNumber_); | ||
| dataRequest_ = endpointRef_->endpoint_->tagSend( |
There was a problem hiding this comment.
This dataPtr_ was created by a call to cudf::pack on some stream. However, the underlying comms library in UCX is not stream-ordered.
Consequently, before calling tagSend we need to ensure that the work that was queued up to populate dataPtr_->gpu_data is done.
A few ways to do this:
- After calling
cudf::packsync the stream. - After calling
cudf::packrecord an event in the stream; before callingtagSendhere, synchronize that event withcudaEventSynchronize.
Option 1 is easier, but potentially syncs earlier than necessary if this tagSend request is processed lazily.
Option 2 requires that one track the event with the packed data. it would look something like (cuda runtime API error checking elided):
cudaEvent_t event;
// we only need this event for stream-ordering/synchronization
cudaEventCreateWithFlags(&event, cudaEventDisableTiming);
auto packed = cudf::pack(table, ..., stream, mr);
cudaEventRecord(event, stream);
enqueue({packed, event});
// later
auto [packed, event] = dequeue(...);
cudaEventSynchronize(event);
cudaEventDestroy(event);
tagSend(packed, ...);
There was a problem hiding this comment.
@wence- Great, many thanks for pointing this out and the perfect explanation!
There was a problem hiding this comment.
No worries! Stream-ordered programming interacting with stream-atheistic libraries is tricksy. I saw that you'd correctly done the stream sync before tagRecv
|
@majetideepak here is the exchange PR. |
pentschev
left a comment
There was a problem hiding this comment.
Thanks @dan13bauer , this is great progress. I left a review now, there are some critical issues, with the largest one being that which Lawrence previously mentioned. Sorry for the delay on the review, but this is quite a long PR. 🙂
| ucp_ep_h ep); | ||
|
|
||
| /// @brief Adds the endpoint reference to the handleToEndpointRef_ map such | ||
| /// that endpoint handls can be resoved |
There was a problem hiding this comment.
| /// that endpoint handls can be resoved | |
| /// that endpoint handles can be resolved |
| VELOX_CHECK( | ||
| buffer->getSize() == sizeof(HandshakeMsg), | ||
| "AMCallback: unexpected size of handshake."); | ||
| HandshakeMsg* handshakePtr = reinterpret_cast<HandshakeMsg*>(buffer->data()); |
There was a problem hiding this comment.
| VELOX_CHECK( | |
| buffer->getSize() == sizeof(HandshakeMsg), | |
| "AMCallback: unexpected size of handshake."); | |
| HandshakeMsg* handshakePtr = reinterpret_cast<HandshakeMsg*>(buffer->data()); | |
| HandshakeMsg* handshakePtr = reinterpret_cast<HandshakeMsg*>(buffer->data()); | |
| VELOX_CHECK( | |
| handshakePtr != nullptr, | |
| "AMCallback: could not cast to HandshakeMsg."); |
There was a problem hiding this comment.
I think the size check is probably unnecessary, but checking that the result of reinterpret_cast succeeded isn't.
|
|
||
| target_link_libraries( | ||
| velox_cudf_exchange | ||
| PUBLIC ucxx::ucxx ucx::ucp ucx::ucs |
There was a problem hiding this comment.
Did you need to add ucx::ucs explicitly? This should be resolved by ucx::ucp instead.
There was a problem hiding this comment.
Actually, I did, since otherwise:
presto-native-execution/velox/velox/experimental/cudf-exchange/Communicator.h:18:10: fatal error: ucxx/api.h: No such file or directory
18 | #include <ucxx/api.h>
| ^~~~~~~~~~~~
There was a problem hiding this comment.
If you remove ucx::ucs you have an issue including ucxx/api.h? That seems very odd. To be completely clear, can you confirm you see the issue you're referring to if you modify the line to PUBLIC ucxx::ucxx ucx::ucp?
|
|
||
| The velox cudf exchange implements all the necessary components to efficiently transfer cudf-vectors between tasks. At the core is a UCXX based transfer that directly copies the raw vector data from GPU memory to GPU memory. | ||
|
|
||
| A high level description of the design is given here: https://github.ibm.com/perfleap/tt/wiki/Cudf-Exchange-Design-Sketch |
There was a problem hiding this comment.
Unfortunately this link is inaccessible for me, not sure if it is supposed to be open source too, but I'd suggest either adding it somewhere public or removing the link from this doc.
| UCX_TCP_KEEPINTVL=1ms UCX_KEEPALIVE_INTERVAL=1ms _build/release/velox/experimental/cudf-exchange/tests/exchange_srv_tst -logtostdout -v=3 -port <PORT> | ||
| UCX_TCP_KEEPINTVL=1ms UCX_KEEPALIVE_INTERVAL=1ms _build/release/velox/experimental/cudf-exchange/tests/exchange_client_tst -logtostdout -v=3 -port <PORT> |
There was a problem hiding this comment.
I see that you're not using the UCXX progress thread, and instead are progressing manually with progressWorkerEvent(0). Do you still see the 20s hangs if you don't have UCX_TCP_KEEPINTVL=1ms UCX_KEEPALIVE_INTERVAL=1ms? As far as I remember, this was something I only observed previously with UCXX progress thread, as described in rapidsai/ucxx#15. Can you confirm this is still required to prevent that when you progress manually?
There was a problem hiding this comment.
No, we don't see the 20s hangs any more.
| request_->checkError(); | ||
| auto s = request_->getStatus(); | ||
| if (s != UCS_INPROGRESS && s != UCS_OK) { | ||
| VLOG(0) << "Error in sendHandshake " << ucs_status_string(s) | ||
| << " failed for task: " << partitionKey_.toString(); | ||
| setState(ReceiverState::Done); | ||
| communicator_->addToWorkQueue(getSelfPtr()); | ||
| } |
There was a problem hiding this comment.
Same issue here, you're not waiting for the request to complete. I'd suggest just remove this block and let onHandshake deal with the status.
| request_->checkError(); | ||
| auto s = request_->getStatus(); | ||
| if (s != UCS_INPROGRESS && s != UCS_OK) { | ||
| VLOG(3) << "Error in getMetadata, receive metadata " << ucs_status_string(s) | ||
| << " failed for task: " << partitionKey_.toString(); | ||
| } |
| } | ||
|
|
||
| // sync after allocating. | ||
| stream.synchronize(); |
There was a problem hiding this comment.
Note that callbacks run as part of the UCX progress operation, so this will cause the thread calling worker_->progressWorkerEvent(0) blocking until the stream synchronization. If this is unacceptable, you must ensure it's done outside of UCX callbacks.
| request_->checkError(); | ||
| auto s = request_->getStatus(); | ||
| if (s != UCS_INPROGRESS && s != UCS_OK) { | ||
| VLOG(0) << "Error in onMetadata, receive data " << ucs_status_string(s) | ||
| << " failed for task: " << partitionKey_.toString(); | ||
| setState(ReceiverState::Done); | ||
| communicator_->addToWorkQueue(getSelfPtr()); | ||
| } |
There was a problem hiding this comment.
Can also be removed in favor of onData.
| friend Acceptor; | ||
|
|
||
| public: | ||
| const ucxx::AmReceiverCallbackOwnerType kAmCallbackOwner = "communicator"; |
There was a problem hiding this comment.
| const ucxx::AmReceiverCallbackOwnerType kAmCallbackOwner = "communicator"; | |
| const ucxx::AmReceiverCallbackOwnerType kAmCallbackOwner = "velox"; |
There was a problem hiding this comment.
Not of big immediate concern, the owner string is expected to be something relatively unique that's unlikely for a different application to use as well and cause a collision, so "communicator" may be too generic, and the name of the application is not.
a553655 to
ebab5a2
Compare
Fea project more ops
Update from Velox main (2025-04-17)
…ome stability fixes
|
|
||
| Communicator::~Communicator() { | ||
| listener_.reset(); | ||
| auto req = worker_->flush(); |
There was a problem hiding this comment.
Flush is only useful for AMO/RMA operations, you only use TAG/AM in this code, this is therefore unnecessary. See https://github.com/rapidsai/ucxx/blob/7736ead94447ab673f1a7c7d7f9d473983934e5c/cpp/include/ucxx/worker.h#L981-L985.
| comms->process(); | ||
| worker_->progressWorkerEvent(0); | ||
| } | ||
| } catch (ucxx::IOError& e) { |
There was a problem hiding this comment.
Is this specific IOError exception catched intentional? Note that UCX can fail in many different ways, so if you want to broadly catch UCX errors I'd suggest catching ucxx::Error (base exception class) instead.
There was a problem hiding this comment.
Thanks, so far we've only seen IOErrors. Will change to the broader exception.
FYI: @SeanRooooney
Summary:
Fixes OSS Asan segV due to calling 'as->' on a nullptr.
```
=================================================================
==4058438==ERROR: AddressSanitizer: SEGV on unknown address 0x000000000000 (pc 0x000000a563a4 bp 0x7ffd54ee5bc0 sp 0x7ffd54ee5aa0 T0)
==4058438==The signal is caused by a READ memory access.
==4058438==Hint: address points to the zero page.
#0 0x000000a563a4 in facebook::velox::FlatVector<int>* facebook::velox::BaseVector::as<facebook::velox::FlatVector<int>>() /velox/./velox/vector/BaseVector.h:116:12
#1 0x000000a563a4 in facebook::velox::test::(anonymous namespace)::FlatMapVectorTest_encodedKeys_Test::TestBody() /velox/velox/vector/tests/FlatMapVectorTest.cpp:156:5
#2 0x70874f90ce0b (/lib64/libgtest.so.1.11.0+0x4fe0b) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
#3 0x70874f8ed825 in testing::Test::Run() (/lib64/libgtest.so.1.11.0+0x30825) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
#4 0x70874f8ed9ef in testing::TestInfo::Run() (/lib64/libgtest.so.1.11.0+0x309ef) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
#5 0x70874f8edaf8 in testing::TestSuite::Run() (/lib64/libgtest.so.1.11.0+0x30af8) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
#6 0x70874f8fcfc4 in testing::internal::UnitTestImpl::RunAllTests() (/lib64/libgtest.so.1.11.0+0x3ffc4) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
#7 0x70874f8fa7c7 in testing::UnitTest::Run() (/lib64/libgtest.so.1.11.0+0x3d7c7) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
#8 0x70877c073153 in main (/lib64/libgtest_main.so.1.11.0+0x1153) (BuildId: c3a576d37d6cfc6875afdc98684c143107a226a0)
#9 0x70874f48460f in __libc_start_call_main (/lib64/libc.so.6+0x2a60f) (BuildId: 4dbf824d0f6afd9b2faee4787d89a39921c0a65e)
#10 0x70874f4846bf in __libc_start_main@GLIBC_2.2.5 (/lib64/libc.so.6+0x2a6bf) (BuildId: 4dbf824d0f6afd9b2faee4787d89a39921c0a65e)
#11 0x00000044c1b4 in _start (/velox/_build/debug/velox/vector/tests/velox_vector_test+0x44c1b4) (BuildId: 6da0b0d1074134be8f4d4534e5dbac9eeb9d482b)
```
Reviewed By: peterenescu
Differential Revision: D91275269
fbshipit-source-id: 0806aa7562dc8cf4ad708fc6a8e4b29409507745
Summary: Pull Request resolved: facebookincubator#16102 Fixes Asan error in S3Util.cpp, See stack trace below: ``` ==4125762==ERROR: AddressSanitizer: global-buffer-overflow on address 0x0000006114ff at pc 0x70aa17bc0120 bp 0x7ffe905f3030 sp 0x7ffe905f3028 READ of size 1 at 0x0000006114ff thread T0 #0 0x70aa17bc011f in facebook::velox::filesystems::parseAWSStandardRegionName[abi:cxx11](std::basic_string_view<char, std::char_traits<char>>) /velox/velox/connectors/hive/storage_adapters/s3fs/S3Util.cpp:160:16 #1 0x00000055790b in facebook::velox::filesystems::S3UtilTest_parseAWSRegion_Test::TestBody() /velox/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp:147:3 #2 0x70aa2e89be0b (/lib64/libgtest.so.1.11.0+0x4fe0b) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679) #3 0x70aa2e87c825 in testing::Test::Run() (/lib64/libgtest.so.1.11.0+0x30825) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679) #4 0x70aa2e87c9ef in testing::TestInfo::Run() (/lib64/libgtest.so.1.11.0+0x309ef) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679) #5 0x70aa2e87caf8 in testing::TestSuite::Run() (/lib64/libgtest.so.1.11.0+0x30af8) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679) #6 0x70aa2e88bfc4 in testing::internal::UnitTestImpl::RunAllTests() (/lib64/libgtest.so.1.11.0+0x3ffc4) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679) #7 0x70aa2e8897c7 in testing::UnitTest::Run() (/lib64/libgtest.so.1.11.0+0x3d7c7) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679) #8 0x70aa2e8ba153 in main (/lib64/libgtest_main.so.1.11.0+0x1153) (BuildId: c3a576d37d6cfc6875afdc98684c143107a226a0) #9 0x70aa01ceb60f in __libc_start_call_main (/lib64/libc.so.6+0x2a60f) (BuildId: 4dbf824d0f6afd9b2faee4787d89a39921c0a65e) #10 0x70aa01ceb6bf in __libc_start_main@GLIBC_2.2.5 (/lib64/libc.so.6+0x2a6bf) (BuildId: 4dbf824d0f6afd9b2faee4787d89a39921c0a65e) #11 0x000000408684 in _start (/velox/_build/debug/velox/connectors/hive/storage_adapters/s3fs/tests/velox_s3file_test+0x408684) (BuildId: bbf3099c9a66a548c6da234b17ad1b631e9ed649) 0x0000006114ff is located 33 bytes before global variable '.str.135' defined in '/velox/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp:126' (0x000000611520) of size 46 '.str.135' is ascii string 'isHostExcludedFromProxy(hostname, pair.first)' 0x0000006114ff is located 1 bytes before global variable '.str.133' defined in '/velox/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp:122' (0x000000611500) of size 1 '.str.133' is ascii string '' 0x0000006114ff is located 42 bytes after global variable '.str.132' defined in '/velox/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp:121' (0x0000006114c0) of size 21 '.str.132' is ascii string 'localhost,foobar.com' AddressSanitizer: global-buffer-overflow /velox/velox/connectors/hive/storage_adapters/s3fs/S3Util.cpp:160:16 in facebook::velox::filesystems::parseAWSStandardRegionName[abi:cxx11](std::basic_string_view<char, std::char_traits<char>>) Shadow bytes around the buggy address: ``` Reviewed By: pedroerp Differential Revision: D91278230 fbshipit-source-id: 05283bc8408069fa3f5ab8a7840b2bd0835fa7d6
No description provided.