Skip to content

Enable cudf-vector exchange on top of UCXX.#3

Open
dan13bauer wants to merge 17 commits intoexchange_hooksfrom
cudf_exchange
Open

Enable cudf-vector exchange on top of UCXX.#3
dan13bauer wants to merge 17 commits intoexchange_hooksfrom
cudf_exchange

Conversation

@dan13bauer
Copy link
Owner

No description provided.

@dan13bauer dan13bauer force-pushed the cudf_exchange branch 3 times, most recently from 3df9f33 to 9e0ac62 Compare September 2, 2025 13:10
Comment on lines +177 to +187
bytes_ = dataPtr_->gpu_data->size();
VLOG(3) << "Sending rmm::buffer: " << std::hex << dataPtr_->gpu_data.get()
<< " pointing to device memory: " << std::hex
<< dataPtr_->gpu_data->data() << std::dec << " to task "
<< partitionKey_.toString() << ":" << this->sequenceNumber_
<< std::dec << " of size " << bytes_;

setState(ServerState::WaitingForSendComplete);
uint64_t dataTag =
getDataTag(this->partitionKeyHash_, this->sequenceNumber_);
dataRequest_ = endpointRef_->endpoint_->tagSend(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dataPtr_ was created by a call to cudf::pack on some stream. However, the underlying comms library in UCX is not stream-ordered.

Consequently, before calling tagSend we need to ensure that the work that was queued up to populate dataPtr_->gpu_data is done.

A few ways to do this:

  1. After calling cudf::pack sync the stream.
  2. After calling cudf::pack record an event in the stream; before calling tagSend here, synchronize that event with cudaEventSynchronize.

Option 1 is easier, but potentially syncs earlier than necessary if this tagSend request is processed lazily.

Option 2 requires that one track the event with the packed data. it would look something like (cuda runtime API error checking elided):

cudaEvent_t event;
// we only need this event for stream-ordering/synchronization
cudaEventCreateWithFlags(&event, cudaEventDisableTiming);

auto packed = cudf::pack(table, ..., stream, mr);
cudaEventRecord(event, stream);
enqueue({packed, event});

// later
auto [packed, event] = dequeue(...);
cudaEventSynchronize(event);
cudaEventDestroy(event);
tagSend(packed, ...);

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wence- Great, many thanks for pointing this out and the perfect explanation!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries! Stream-ordered programming interacting with stream-atheistic libraries is tricksy. I saw that you'd correctly done the stream sync before tagRecv

@zoltan
Copy link
Collaborator

zoltan commented Sep 9, 2025

@majetideepak here is the exchange PR.

Copy link

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dan13bauer , this is great progress. I left a review now, there are some critical issues, with the largest one being that which Lawrence previously mentioned. Sorry for the delay on the review, but this is quite a long PR. 🙂

ucp_ep_h ep);

/// @brief Adds the endpoint reference to the handleToEndpointRef_ map such
/// that endpoint handls can be resoved

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// that endpoint handls can be resoved
/// that endpoint handles can be resolved

Comment on lines +33 to +36
VELOX_CHECK(
buffer->getSize() == sizeof(HandshakeMsg),
"AMCallback: unexpected size of handshake.");
HandshakeMsg* handshakePtr = reinterpret_cast<HandshakeMsg*>(buffer->data());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
VELOX_CHECK(
buffer->getSize() == sizeof(HandshakeMsg),
"AMCallback: unexpected size of handshake.");
HandshakeMsg* handshakePtr = reinterpret_cast<HandshakeMsg*>(buffer->data());
HandshakeMsg* handshakePtr = reinterpret_cast<HandshakeMsg*>(buffer->data());
VELOX_CHECK(
handshakePtr != nullptr,
"AMCallback: could not cast to HandshakeMsg.");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the size check is probably unnecessary, but checking that the result of reinterpret_cast succeeded isn't.


target_link_libraries(
velox_cudf_exchange
PUBLIC ucxx::ucxx ucx::ucp ucx::ucs

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you need to add ucx::ucs explicitly? This should be resolved by ucx::ucp instead.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I did, since otherwise:

presto-native-execution/velox/velox/experimental/cudf-exchange/Communicator.h:18:10: fatal error: ucxx/api.h: No such file or directory
   18 | #include <ucxx/api.h>
      |          ^~~~~~~~~~~~

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you remove ucx::ucs you have an issue including ucxx/api.h? That seems very odd. To be completely clear, can you confirm you see the issue you're referring to if you modify the line to PUBLIC ucxx::ucxx ucx::ucp?


The velox cudf exchange implements all the necessary components to efficiently transfer cudf-vectors between tasks. At the core is a UCXX based transfer that directly copies the raw vector data from GPU memory to GPU memory.

A high level description of the design is given here: https://github.ibm.com/perfleap/tt/wiki/Cudf-Exchange-Design-Sketch

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this link is inaccessible for me, not sure if it is supposed to be open source too, but I'd suggest either adding it somewhere public or removing the link from this doc.

Comment on lines +41 to +42
UCX_TCP_KEEPINTVL=1ms UCX_KEEPALIVE_INTERVAL=1ms _build/release/velox/experimental/cudf-exchange/tests/exchange_srv_tst -logtostdout -v=3 -port <PORT>
UCX_TCP_KEEPINTVL=1ms UCX_KEEPALIVE_INTERVAL=1ms _build/release/velox/experimental/cudf-exchange/tests/exchange_client_tst -logtostdout -v=3 -port <PORT>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you're not using the UCXX progress thread, and instead are progressing manually with progressWorkerEvent(0). Do you still see the 20s hangs if you don't have UCX_TCP_KEEPINTVL=1ms UCX_KEEPALIVE_INTERVAL=1ms? As far as I remember, this was something I only observed previously with UCXX progress thread, as described in rapidsai/ucxx#15. Can you confirm this is still required to prevent that when you progress manually?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't see the 20s hangs any more.

Comment on lines +219 to +226
request_->checkError();
auto s = request_->getStatus();
if (s != UCS_INPROGRESS && s != UCS_OK) {
VLOG(0) << "Error in sendHandshake " << ucs_status_string(s)
<< " failed for task: " << partitionKey_.toString();
setState(ReceiverState::Done);
communicator_->addToWorkQueue(getSelfPtr());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue here, you're not waiting for the request to complete. I'd suggest just remove this block and let onHandshake deal with the status.

Comment on lines +271 to +276
request_->checkError();
auto s = request_->getStatus();
if (s != UCS_INPROGRESS && s != UCS_OK) {
VLOG(3) << "Error in getMetadata, receive metadata " << ucs_status_string(s)
<< " failed for task: " << partitionKey_.toString();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove in favor of onMetadata.

}

// sync after allocating.
stream.synchronize();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that callbacks run as part of the UCX progress operation, so this will cause the thread calling worker_->progressWorkerEvent(0) blocking until the stream synchronization. If this is unacceptable, you must ensure it's done outside of UCX callbacks.

Comment on lines +366 to +373
request_->checkError();
auto s = request_->getStatus();
if (s != UCS_INPROGRESS && s != UCS_OK) {
VLOG(0) << "Error in onMetadata, receive data " << ucs_status_string(s)
<< " failed for task: " << partitionKey_.toString();
setState(ReceiverState::Done);
communicator_->addToWorkQueue(getSelfPtr());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can also be removed in favor of onData.

friend Acceptor;

public:
const ucxx::AmReceiverCallbackOwnerType kAmCallbackOwner = "communicator";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const ucxx::AmReceiverCallbackOwnerType kAmCallbackOwner = "communicator";
const ucxx::AmReceiverCallbackOwnerType kAmCallbackOwner = "velox";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not of big immediate concern, the owner string is expected to be something relatively unique that's unlikely for a different application to use as well and cause a collision, so "communicator" may be too generic, and the name of the application is not.

lga-zurich pushed a commit to lga-zurich/velox-exchange that referenced this pull request Sep 19, 2025
lga-zurich pushed a commit to lga-zurich/velox-exchange that referenced this pull request Sep 19, 2025
@GregoryKimball GregoryKimball removed this from libcudf Oct 17, 2025

Communicator::~Communicator() {
listener_.reset();
auto req = worker_->flush();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flush is only useful for AMO/RMA operations, you only use TAG/AM in this code, this is therefore unnecessary. See https://github.com/rapidsai/ucxx/blob/7736ead94447ab673f1a7c7d7f9d473983934e5c/cpp/include/ucxx/worker.h#L981-L985.

comms->process();
worker_->progressWorkerEvent(0);
}
} catch (ucxx::IOError& e) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this specific IOError exception catched intentional? Note that UCX can fail in many different ways, so if you want to broadly catch UCX errors I'd suggest catching ucxx::Error (base exception class) instead.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, so far we've only seen IOErrors. Will change to the broader exception.
FYI: @SeanRooooney

dan13bauer pushed a commit that referenced this pull request Feb 2, 2026
Summary:
Fixes OSS Asan segV due to calling 'as->' on a nullptr.

```
=================================================================
==4058438==ERROR: AddressSanitizer: SEGV on unknown address 0x000000000000 (pc 0x000000a563a4 bp 0x7ffd54ee5bc0 sp 0x7ffd54ee5aa0 T0)
==4058438==The signal is caused by a READ memory access.
==4058438==Hint: address points to the zero page.
    #0 0x000000a563a4 in facebook::velox::FlatVector<int>* facebook::velox::BaseVector::as<facebook::velox::FlatVector<int>>() /velox/./velox/vector/BaseVector.h:116:12
    #1 0x000000a563a4 in facebook::velox::test::(anonymous namespace)::FlatMapVectorTest_encodedKeys_Test::TestBody() /velox/velox/vector/tests/FlatMapVectorTest.cpp:156:5
    #2 0x70874f90ce0b  (/lib64/libgtest.so.1.11.0+0x4fe0b) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #3 0x70874f8ed825 in testing::Test::Run() (/lib64/libgtest.so.1.11.0+0x30825) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #4 0x70874f8ed9ef in testing::TestInfo::Run() (/lib64/libgtest.so.1.11.0+0x309ef) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #5 0x70874f8edaf8 in testing::TestSuite::Run() (/lib64/libgtest.so.1.11.0+0x30af8) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #6 0x70874f8fcfc4 in testing::internal::UnitTestImpl::RunAllTests() (/lib64/libgtest.so.1.11.0+0x3ffc4) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #7 0x70874f8fa7c7 in testing::UnitTest::Run() (/lib64/libgtest.so.1.11.0+0x3d7c7) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #8 0x70877c073153 in main (/lib64/libgtest_main.so.1.11.0+0x1153) (BuildId: c3a576d37d6cfc6875afdc98684c143107a226a0)
    #9 0x70874f48460f in __libc_start_call_main (/lib64/libc.so.6+0x2a60f) (BuildId: 4dbf824d0f6afd9b2faee4787d89a39921c0a65e)
    #10 0x70874f4846bf in __libc_start_main@GLIBC_2.2.5 (/lib64/libc.so.6+0x2a6bf) (BuildId: 4dbf824d0f6afd9b2faee4787d89a39921c0a65e)
    #11 0x00000044c1b4 in _start (/velox/_build/debug/velox/vector/tests/velox_vector_test+0x44c1b4) (BuildId: 6da0b0d1074134be8f4d4534e5dbac9eeb9d482b)
```

Reviewed By: peterenescu

Differential Revision: D91275269

fbshipit-source-id: 0806aa7562dc8cf4ad708fc6a8e4b29409507745
dan13bauer pushed a commit that referenced this pull request Feb 2, 2026
Summary:
Pull Request resolved: facebookincubator#16102

Fixes Asan error in S3Util.cpp, See stack trace below:

```
==4125762==ERROR: AddressSanitizer: global-buffer-overflow on address 0x0000006114ff at pc 0x70aa17bc0120 bp 0x7ffe905f3030 sp 0x7ffe905f3028
READ of size 1 at 0x0000006114ff thread T0
    #0 0x70aa17bc011f in facebook::velox::filesystems::parseAWSStandardRegionName[abi:cxx11](std::basic_string_view<char, std::char_traits<char>>) /velox/velox/connectors/hive/storage_adapters/s3fs/S3Util.cpp:160:16
    #1 0x00000055790b in facebook::velox::filesystems::S3UtilTest_parseAWSRegion_Test::TestBody() /velox/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp:147:3
    #2 0x70aa2e89be0b  (/lib64/libgtest.so.1.11.0+0x4fe0b) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #3 0x70aa2e87c825 in testing::Test::Run() (/lib64/libgtest.so.1.11.0+0x30825) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #4 0x70aa2e87c9ef in testing::TestInfo::Run() (/lib64/libgtest.so.1.11.0+0x309ef) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #5 0x70aa2e87caf8 in testing::TestSuite::Run() (/lib64/libgtest.so.1.11.0+0x30af8) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #6 0x70aa2e88bfc4 in testing::internal::UnitTestImpl::RunAllTests() (/lib64/libgtest.so.1.11.0+0x3ffc4) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #7 0x70aa2e8897c7 in testing::UnitTest::Run() (/lib64/libgtest.so.1.11.0+0x3d7c7) (BuildId: 506b2df0fc901091ff83631fd797a325cae6b679)
    #8 0x70aa2e8ba153 in main (/lib64/libgtest_main.so.1.11.0+0x1153) (BuildId: c3a576d37d6cfc6875afdc98684c143107a226a0)
    #9 0x70aa01ceb60f in __libc_start_call_main (/lib64/libc.so.6+0x2a60f) (BuildId: 4dbf824d0f6afd9b2faee4787d89a39921c0a65e)
    #10 0x70aa01ceb6bf in __libc_start_main@GLIBC_2.2.5 (/lib64/libc.so.6+0x2a6bf) (BuildId: 4dbf824d0f6afd9b2faee4787d89a39921c0a65e)
    #11 0x000000408684 in _start (/velox/_build/debug/velox/connectors/hive/storage_adapters/s3fs/tests/velox_s3file_test+0x408684) (BuildId: bbf3099c9a66a548c6da234b17ad1b631e9ed649)

0x0000006114ff is located 33 bytes before global variable '.str.135' defined in '/velox/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp:126' (0x000000611520) of size 46
  '.str.135' is ascii string 'isHostExcludedFromProxy(hostname, pair.first)'
0x0000006114ff is located 1 bytes before global variable '.str.133' defined in '/velox/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp:122' (0x000000611500) of size 1
  '.str.133' is ascii string ''
0x0000006114ff is located 42 bytes after global variable '.str.132' defined in '/velox/velox/connectors/hive/storage_adapters/s3fs/tests/S3UtilTest.cpp:121' (0x0000006114c0) of size 21
  '.str.132' is ascii string 'localhost,foobar.com'
AddressSanitizer: global-buffer-overflow /velox/velox/connectors/hive/storage_adapters/s3fs/S3Util.cpp:160:16 in facebook::velox::filesystems::parseAWSStandardRegionName[abi:cxx11](std::basic_string_view<char, std::char_traits<char>>)
Shadow bytes around the buggy address:
```

Reviewed By: pedroerp

Differential Revision: D91278230

fbshipit-source-id: 05283bc8408069fa3f5ab8a7840b2bd0835fa7d6
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants