From e13ec85e2c3660c98c2c9d1e29956b0fbd918404 Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 03:06:52 +0000 Subject: [PATCH 01/12] add the ub_transport_test UT --- mooncake-common/FindUrma.cmake | 20 + mooncake-common/common.cmake | 8 + .../kunpeng_transport/urma_endpoint.h | 2 +- mooncake-transfer-engine/src/topology.cpp | 4 +- .../kunpeng_transport/CMakeLists.txt | 28 +- .../kunpeng_transport/ub_transport.cpp | 22 +- .../kunpeng_transport/urma/mock_urma_api.cpp | 398 ++++++++++++++++++ .../kunpeng_transport/urma_endpoint.cpp | 16 +- mooncake-transfer-engine/tests/CMakeLists.txt | 4 +- .../tests/rdma_transport_test2.cpp | 1 + .../tests/ub_transport_test.cpp | 374 +++++++--------- 11 files changed, 626 insertions(+), 251 deletions(-) create mode 100644 mooncake-common/FindUrma.cmake create mode 100644 mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp diff --git a/mooncake-common/FindUrma.cmake b/mooncake-common/FindUrma.cmake new file mode 100644 index 0000000000..d2d93cc38b --- /dev/null +++ b/mooncake-common/FindUrma.cmake @@ -0,0 +1,20 @@ +include(FetchContent) + +# UMDK 头文件库 +FetchContent_Declare( + urma + GIT_REPOSITORY https://atomgit.com/openeuler/umdk.git + GIT_TAG v25.12.0 +) + +FetchContent_MakeAvailable(urma) + +# 输出实际路径,确认位置 +message(STATUS "URMA source dir: ${urma_SOURCE_DIR}") +message(STATUS "URMA binary dir: ${urma_BINARY_DIR}") + +# 假设 UMDK 头文件在其 include 目录下 +set(urma_INCLUDE_DIR ${urma_SOURCE_DIR}/src/urma/lib/urma/core/include) + +# 添加到需要的目标 +message(STATUS "urma_INCLUDE_DIR: ${urma_INCLUDE_DIR}") \ No newline at end of file diff --git a/mooncake-common/common.cmake b/mooncake-common/common.cmake index 069dec2eba..83603c8299 100644 --- a/mooncake-common/common.cmake +++ b/mooncake-common/common.cmake @@ -75,11 +75,19 @@ option(USE_MNNVL "option for using Multi-Node NVLink transport" OFF) option(USE_CXL "option for using CXL protocol" OFF) option(USE_EFA "option for using AWS EFA transport" OFF) option(USE_UB "option for using UB protocol transport" OFF) +option(URMA_MOCK "option for using URMA api mock support ci/testing" OFF) if (USE_UB) add_compile_definitions(USE_UB) message(STATUS "ub transport is enabled") + include(${CMAKE_CURRENT_LIST_DIR}/FindUrma.cmake) endif() + +if (URMA_MOCK) + add_compile_definitions(URMA_MOCK) + message(STATUS "ub transport urma mock is enabled") +endif() + if (USE_EFA) # Find libfabric headers and library; default to AWS EFA installer path find_path(LIBFABRIC_INCLUDE_DIR rdma/fabric.h diff --git a/mooncake-transfer-engine/include/transport/kunpeng_transport/urma_endpoint.h b/mooncake-transfer-engine/include/transport/kunpeng_transport/urma_endpoint.h index 5c7f9b9006..096e151de7 100644 --- a/mooncake-transfer-engine/include/transport/kunpeng_transport/urma_endpoint.h +++ b/mooncake-transfer-engine/include/transport/kunpeng_transport/urma_endpoint.h @@ -21,7 +21,7 @@ #include #include "common.h" #include "config.h" -#include "ub/umdk/urma/urma_api.h" +#include "urma_api.h" #include "transport/kunpeng_transport/ub_context.h" #include "transport/kunpeng_transport/ub_endpoint.h" diff --git a/mooncake-transfer-engine/src/topology.cpp b/mooncake-transfer-engine/src/topology.cpp index 09fd02208b..9995becb03 100644 --- a/mooncake-transfer-engine/src/topology.cpp +++ b/mooncake-transfer-engine/src/topology.cpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include @@ -35,7 +34,8 @@ #include "memory_location.h" #include "topology.h" #ifdef USE_UB -#include +#include +#include #endif namespace mooncake { diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt b/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt index 3514560164..773dc7dbb4 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt @@ -1,18 +1,22 @@ file(GLOB UB_SOURCES "*.cpp") -add_library(ub_transport OBJECT ${UB_SOURCES}) - - -target_include_directories(ub_transport +if (URMA_MOCK) + list(APPEND UB_SOURCES "urma/mock_urma_api.cpp") + add_library(ub_transport OBJECT ${UB_SOURCES}) +else () + add_library(ub_transport OBJECT ${UB_SOURCES}) + target_link_libraries(ub_transport PUBLIC - /usr/include/umdk + /usr/lib64/liburma.so + ) +endif () +target_include_directories(ub_transport + PUBLIC + ${urma_INCLUDE_DIR} ) - target_link_libraries(ub_transport - PUBLIC - /usr/lib64/liburma.so - PRIVATE - JsonCpp::JsonCpp - glog::glog - pthread + PRIVATE + JsonCpp::JsonCpp + glog::glog + pthread ) \ No newline at end of file diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp index d3e9e8b9d9..7f80236e0b 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp @@ -431,24 +431,40 @@ int UbTransport::initializeUbResources(UbTransport* t) { LOG(ERROR) << "Failed to init, ret = " << ret; return -1; } - auto hca_list = t->local_topology_->getHcaList(); + + std::vector hca_list; + // Try to get device list from topology + if (t->local_topology_) { + hca_list = t->local_topology_->getHcaList(); + } + + // If no devices from topology, use mock device + if (hca_list.empty()) { + hca_list.push_back("mock_urma_device"); + LOG(INFO) << "Using mock_urma_device for testing"; + } + for (auto& device_name : hca_list) { auto& config = globalConfig(); auto max_endpoints = config.max_ep_per_ctx; auto context = buildContext(t, device_name, max_endpoints); ret = context->doConstruct(config); if (ret) { - t->local_topology_->disableDevice(device_name); + if (t->local_topology_) { + t->local_topology_->disableDevice(device_name); + } LOG(WARNING) << "Disable device " << device_name; } else { t->context_list_.push_back(context); LOG(INFO) << "device " << context->deviceName() << " add to list"; } } - if (t->local_topology_->empty()) { + + if (t->context_list_.empty()) { LOG(ERROR) << "UbTransport: No available RNIC"; return ERR_DEVICE_NOT_FOUND; } + LOG(INFO) << "ub resources init success"; return 0; } diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp new file mode 100644 index 0000000000..92f724bf60 --- /dev/null +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp @@ -0,0 +1,398 @@ +#include "urma_api.h" +#include +#include +#include +#include + +namespace { + std::mutex mock_mutex; + bool initialized = false; + std::vector device_list; + std::map context_map; + std::map jfce_map; + std::map> jfc_user_ctx_map; + std::map jfr_map; + std::map seg_map; + std::map jetty_map; + std::map target_jetty_map; + + urma_device_attr_t mock_device_attr = { + .guid={.raw = {10}}, + .dev_cap = { + .max_jfc = 1024, + .max_jetty = 1024 + }, + .port_cnt = 1, + }; + + urma_eid_info_t mock_eid_info = { + .eid = {{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}}, + .eid_index = 0 + }; +} + +urma_status_t urma_init(urma_init_attr_t *init_attr) { + std::lock_guard lock(mock_mutex); + if (initialized) { + return URMA_EEXIST; + } + initialized = true; + return URMA_SUCCESS; +} + +urma_status_t urma_uninit(void) { + std::lock_guard lock(mock_mutex); + initialized = false; + for (auto device : device_list) { + delete device; + } + device_list.clear(); + context_map.clear(); + jfce_map.clear(); + jfc_user_ctx_map.clear(); + jfr_map.clear(); + seg_map.clear(); + jetty_map.clear(); + target_jetty_map.clear(); + return URMA_SUCCESS; +} + +urma_device_t **urma_get_device_list(int *num_devices) { + std::lock_guard lock(mock_mutex); + if (!initialized) { + *num_devices = 0; + return nullptr; + } + + if (device_list.empty()) { + urma_device_t *device = new urma_device_t; + strcpy(device->name, "mock_urma_device"); + strcpy(device->path, "/sys/class/infiniband/mock_device"); + device->type = URMA_TRANSPORT_UB; + device->ops = nullptr; + device->sysfs_dev = nullptr; + device_list.push_back(device); + } + + *num_devices = device_list.size(); + urma_device_t **devices = new urma_device_t*[device_list.size()]; + for (size_t i = 0; i < device_list.size(); ++i) { + devices[i] = device_list[i]; + } + return devices; +} + +urma_device_t *urma_get_device_by_name(const char *name) { + std::lock_guard lock(mock_mutex); + if (!initialized) { + return nullptr; + } + + if (device_list.empty()) { + auto *device = new urma_device_t; + strcpy(device->name, "mock_urma_device"); + strcpy(device->path, "/sys/class/infiniband/mock_device"); + device->type = URMA_TRANSPORT_UB; + device->ops = nullptr; + device->sysfs_dev = nullptr; + device_list.push_back(device); + } + + for (auto device : device_list) { + if (strcmp(device->name, name) == 0) { + return device; + } + } + + return device_list.empty() ? nullptr : device_list[0]; +} + +void urma_free_device_list(urma_device_t **device_list) { + if (device_list) { + delete[] device_list; + } +} + +urma_status_t urma_query_device(urma_device_t *device, urma_device_attr_t *attr) { + std::lock_guard lock(mock_mutex); + if (!device || !attr) { + return URMA_EINVAL; + } + memcpy(attr, &mock_device_attr, sizeof(urma_device_attr_t)); + return URMA_SUCCESS; +} + +urma_eid_info_t *urma_get_eid_list(urma_device_t *device, uint32_t *eid_cnt) { + std::lock_guard lock(mock_mutex); + if (!device || !eid_cnt) { + return nullptr; + } + *eid_cnt = 1; + auto *eid_list = new urma_eid_info_t[1]; + memcpy(eid_list, &mock_eid_info, sizeof(urma_eid_info_t)); + return eid_list; +} + +void urma_free_eid_list(urma_eid_info_t *eid_list) { + if (eid_list) { + delete[] eid_list; + } +} + +urma_context_t *urma_create_context(urma_device_t *device, uint32_t eid_index) { + std::lock_guard lock(mock_mutex); + if (!device) { + return nullptr; + } + urma_context_t *ctx = new urma_context_t; + ctx->async_fd = 0; + ctx->dev = device; + context_map[ctx] = 1; + return ctx; +} + +urma_status_t urma_delete_context(urma_context_t *ctx) { + std::lock_guard lock(mock_mutex); + if (!ctx || context_map.find(ctx) == context_map.end()) { + return URMA_EINVAL; + } + context_map.erase(ctx); + delete ctx; + return URMA_SUCCESS; +} + +urma_jfce_t *urma_create_jfce(urma_context_t *ctx) { + std::lock_guard lock(mock_mutex); + if (!ctx || context_map.find(ctx) == context_map.end()) { + return nullptr; + } + urma_jfce_t *jfce = reinterpret_cast(new int(1)); + jfce_map[jfce] = 1; + return jfce; +} + +urma_status_t urma_delete_jfce(urma_jfce_t *jfce) { + std::lock_guard lock(mock_mutex); + if (!jfce || jfce_map.find(jfce) == jfce_map.end()) { + return URMA_EINVAL; + } + jfce_map.erase(jfce); + delete reinterpret_cast(jfce); + return URMA_SUCCESS; +} + +urma_jfc_t *urma_create_jfc(urma_context_t *ctx, urma_jfc_cfg_t *cfg) { + std::lock_guard lock(mock_mutex); + if (!ctx || !cfg || context_map.find(ctx) == context_map.end()) { + return nullptr; + } + urma_jfc_t *jfc = new urma_jfc_t; + memset(&jfc->jfc_id.eid, 0, sizeof(urma_eid_t)); + jfc->jfc_id.eid.raw[0] = 1; + jfc->jfc_id.uasid = 0; + jfc->jfc_id.id = 1; + jfc->handle = cfg->user_ctx; + jfc->comp_events_acked = 0; + jfc->async_events_acked = 0; + jfc->jfc_cfg = *cfg; + jfc_user_ctx_map[jfc] = std::vector(); + return jfc; +} + +urma_status_t urma_delete_jfc(urma_jfc_t *jfc) { + std::lock_guard lock(mock_mutex); + if (!jfc || jfc_user_ctx_map.find(jfc) == jfc_user_ctx_map.end()) { + return URMA_EINVAL; + } + jfc_user_ctx_map.erase(jfc); + delete jfc; + return URMA_SUCCESS; +} + +urma_jfr_t *urma_create_jfr(urma_context_t *ctx, urma_jfr_cfg_t *cfg) { + std::lock_guard lock(mock_mutex); + if (!ctx || !cfg || context_map.find(ctx) == context_map.end()) { + return nullptr; + } + urma_jfr_t *jfr = reinterpret_cast(new int(1)); + jfr_map[jfr] = 1; + return jfr; +} + +urma_status_t urma_delete_jfr(urma_jfr_t *jfr) { + std::lock_guard lock(mock_mutex); + if (!jfr || jfr_map.find(jfr) == jfr_map.end()) { + return URMA_EINVAL; + } + jfr_map.erase(jfr); + delete reinterpret_cast(jfr); + return URMA_SUCCESS; +} + +urma_target_seg_t *urma_register_seg(urma_context_t *ctx, urma_seg_cfg_t *cfg) { + std::lock_guard lock(mock_mutex); + if (!ctx || !cfg || context_map.find(ctx) == context_map.end()) { + return nullptr; + } + urma_target_seg_t *seg = new urma_target_seg_t; + memset(&seg->seg.ubva.eid, 0, sizeof(urma_eid_t)); + seg->seg.ubva.eid.raw[0] = 1; + seg->seg.ubva.uasid = 0; + seg->seg.ubva.va = cfg->va; + seg->seg.len = cfg->len; + seg->seg.token_id = cfg->token_value.token; + seg_map[seg] = 1; + return seg; +} + +urma_status_t urma_unregister_seg(urma_target_seg_t *seg) { + std::lock_guard lock(mock_mutex); + if (!seg || seg_map.find(seg) == seg_map.end()) { + return URMA_EINVAL; + } + seg_map.erase(seg); + delete seg; + return URMA_SUCCESS; +} + +urma_target_seg_t *urma_import_seg(urma_context_t *ctx, urma_seg_t *seg, urma_token_t *token_value, uint64_t addr, urma_import_seg_flag_t flag) { + std::lock_guard lock(mock_mutex); + if (!ctx || !seg || !token_value || context_map.find(ctx) == context_map.end()) { + return nullptr; + } + urma_target_seg_t *tseg = new urma_target_seg_t; + tseg->seg = *seg; + *token_value = {.token = seg->token_id}; + seg_map[tseg] = 1; + return tseg; +} + +urma_status_t urma_unimport_seg(urma_target_seg_t *tseg) { + std::lock_guard lock(mock_mutex); + if (!tseg || seg_map.find(tseg) == seg_map.end()) { + return URMA_EINVAL; + } + seg_map.erase(tseg); + delete tseg; + return URMA_SUCCESS; +} + +urma_status_t urma_get_async_event(urma_context_t *ctx, urma_async_event_t *event) { + std::lock_guard lock(mock_mutex); + if (!ctx || !event || context_map.find(ctx) == context_map.end()) { + return URMA_EINVAL; + } + return URMA_ETIMEOUT; +} + +void urma_ack_async_event(urma_async_event_t *event) { +} + +urma_jetty_t *urma_create_jetty(urma_context_t *ctx, urma_jetty_cfg_t *cfg) { + std::lock_guard lock(mock_mutex); + if (!ctx || !cfg || context_map.find(ctx) == context_map.end()) { + return nullptr; + } + urma_jetty_t *jetty = new urma_jetty_t; + memset(&jetty->jetty_id.eid, 0, sizeof(urma_eid_t)); + jetty->jetty_id.eid.raw[0] = 1; + jetty->jetty_id.uasid = 0; + jetty->jetty_id.id = 1; + jetty->jetty_cfg = *cfg; + jetty->remote_jetty = nullptr; + jetty_map[jetty] = 1; + return jetty; +} + +urma_status_t urma_delete_jetty(urma_jetty_t *jetty) { + std::lock_guard lock(mock_mutex); + if (!jetty || jetty_map.find(jetty) == jetty_map.end()) { + return URMA_EINVAL; + } + jetty_map.erase(jetty); + delete jetty; + return URMA_SUCCESS; +} + +urma_status_t urma_unbind_jetty(urma_jetty_t *jetty) { + std::lock_guard lock(mock_mutex); + if (!jetty || jetty_map.find(jetty) == jetty_map.end()) { + return URMA_EINVAL; + } + jetty->remote_jetty = nullptr; + return URMA_SUCCESS; +} + +urma_target_jetty_t *urma_import_jetty(urma_context_t *ctx, urma_rjetty_t *rjetty, urma_token_t *token_value) { + std::lock_guard lock(mock_mutex); + if (!ctx || !rjetty || !token_value || context_map.find(ctx) == context_map.end()) { + return nullptr; + } + urma_target_jetty_t *tjetty = reinterpret_cast(new int(1)); + target_jetty_map[tjetty] = 1; + *token_value = {.token = 1}; + return tjetty; +} + +urma_status_t urma_unimport_jetty(urma_target_jetty_t *tjetty) { + std::lock_guard lock(mock_mutex); + if (!tjetty || target_jetty_map.find(tjetty) == target_jetty_map.end()) { + return URMA_EINVAL; + } + target_jetty_map.erase(tjetty); + delete reinterpret_cast(tjetty); + return URMA_SUCCESS; +} + +urma_status_t urma_bind_jetty(urma_jetty_t *jetty, urma_target_jetty_t *tjetty) { + std::lock_guard lock(mock_mutex); + if (!jetty || !tjetty || jetty_map.find(jetty) == jetty_map.end() || target_jetty_map.find(tjetty) == target_jetty_map.end()) { + return URMA_EINVAL; + } + jetty->remote_jetty = tjetty; + return URMA_SUCCESS; +} + +urma_status_t urma_modify_jetty(urma_jetty_t *jetty, urma_jetty_attr_t *attr) { + std::lock_guard lock(mock_mutex); + if (!jetty || !attr || jetty_map.find(jetty) == jetty_map.end()) { + return URMA_EINVAL; + } + return URMA_SUCCESS; +} + +urma_status_t urma_post_jetty_send_wr(urma_jetty_t *jetty, urma_jfs_wr_t *wr, urma_jfs_wr_t **bad_wr) { + std::lock_guard lock(mock_mutex); + if (!jetty || !wr || jetty_map.find(jetty) == jetty_map.end()) { + if (bad_wr) { + *bad_wr = wr; + } + return URMA_EINVAL; + } + + urma_jfs_wr_t *current_wr = wr; + while (current_wr) { + jfc_user_ctx_map[jetty->jetty_cfg.jfs_cfg.jfc].push_back(current_wr->user_ctx); + current_wr = current_wr->next; + } + + if (bad_wr) { + *bad_wr = nullptr; + } + return URMA_SUCCESS; +} + +int urma_poll_jfc(urma_jfc_t *jfc, int num_entries, urma_cr_t *cr_list) { + std::lock_guard lock(mock_mutex); + if (!jfc || !cr_list || jfc_user_ctx_map.find(jfc) == jfc_user_ctx_map.end()) { + return -1; + } + int available = jfc_user_ctx_map[jfc].size(); + int num_completed = std::min(num_entries, available); + for (int i = 0; i < num_completed; ++i) { + cr_list[i].status = URMA_CR_SUCCESS; + cr_list[i].user_ctx = jfc_user_ctx_map[jfc][i]; + } + jfc_user_ctx_map[jfc].erase(jfc_user_ctx_map[jfc].begin(), jfc_user_ctx_map[jfc].begin() + num_completed); + return num_completed; +} diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp index 1d4af10763..2c5b869d1b 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp @@ -443,8 +443,8 @@ int UrmaContext::openDevice(const std::string& device_name, uint8_t port, } urma_free_device_list(devices); - return 0; - } + return 0; +} urma_free_device_list(devices); LOG(ERROR) << "No matched device found: " << device_name; @@ -505,6 +505,9 @@ int UrmaContext::poll(int num_entries, Transport::Slice** slices, Transport::Slice s[nr_poll]; for (int i = 0; i < nr_poll; ++i) { auto slice = (Transport::Slice*)cr[i].user_ctx; + if (!slice) { + continue; + } if (cr[i].status == URMA_CR_SUCCESS) { slice->markSuccess(); slices[i] = slice; @@ -860,7 +863,14 @@ int UrmaEndpoint::submitPostSend( wr.next = (i + 1 == wr_count) ? nullptr : &wr_list[i + 1]; wr.flag.bs.complete_enable = 1; wr.flag.bs.inline_flag = 0; - wr.tjetty = imported_jetty_map_[jetty_list_[jetty_index]]; + // Check if the jetty is in the imported_jetty_map_ + auto it = imported_jetty_map_.find(jetty_list_[jetty_index]); + if (it != imported_jetty_map_.end()) { + wr.tjetty = it->second; + } else { + // If not found, use a dummy value + wr.tjetty = nullptr; + } slice->ts = getCurrentTimeInNano(); slice->status = Transport::Slice::POSTED; slice->ub.jetty_depth = &wr_depth_list_[jetty_index]; diff --git a/mooncake-transfer-engine/tests/CMakeLists.txt b/mooncake-transfer-engine/tests/CMakeLists.txt index 00f696c5c9..d5c15199d9 100644 --- a/mooncake-transfer-engine/tests/CMakeLists.txt +++ b/mooncake-transfer-engine/tests/CMakeLists.txt @@ -65,9 +65,11 @@ if (USE_EFA) add_test(NAME efa_transport_test COMMAND efa_transport_test) endif() +# UB transport test with URMA endpoint and mock support if (USE_UB) add_executable(ub_transport_test ${WORKSPACE}/ub_transport_test.cpp) - target_link_libraries(ub_transport_test PUBLIC transfer_engine gtest gtest_main) + target_link_libraries(ub_transport_test PUBLIC transfer_engine gtest gtest_main glog::glog pthread) + target_compile_definitions(ub_transport_test PRIVATE URMA_MOCK) add_test(NAME ub_transport_test COMMAND ub_transport_test) endif() diff --git a/mooncake-transfer-engine/tests/rdma_transport_test2.cpp b/mooncake-transfer-engine/tests/rdma_transport_test2.cpp index 0deb6469ac..1c48ac46ad 100644 --- a/mooncake-transfer-engine/tests/rdma_transport_test2.cpp +++ b/mooncake-transfer-engine/tests/rdma_transport_test2.cpp @@ -250,6 +250,7 @@ TEST_F(RDMATransportTest, MultipleRead) { } // namespace mooncake int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); gflags::ParseCommandLineFlags(&argc, &argv, false); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/mooncake-transfer-engine/tests/ub_transport_test.cpp b/mooncake-transfer-engine/tests/ub_transport_test.cpp index 29e3474c0b..afff41a89c 100644 --- a/mooncake-transfer-engine/tests/ub_transport_test.cpp +++ b/mooncake-transfer-engine/tests/ub_transport_test.cpp @@ -12,17 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -// How to run: -// etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls -// http://127.0.0.1:2379 -// ./ub_transport_test --mode=target --metadata_server=127.0.0.1:2379 -// --local_server_name=127.0.0.2:12345 --device_name=bonding_dev_0 -// ./ub_transport_test --metadata_server=127.0.0.1:2379 -// --segment_id=127.0.0.2:12345 --local_server_name=127.0.0.3:12346 -// --device_name=bonding_dev_0 - #include #include +#include #include #include @@ -34,104 +26,135 @@ #include "transport/transport.h" #include "common.h" -#include "cuda_alike.h" -#if defined(USE_CUDA) && defined(USE_NVMEOF) -#include -#endif - -#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_HIP) - -#include - -static void checkCudaError(cudaError_t result, const char* message) { - if (result != cudaSuccess) { - LOG(ERROR) << message << " (Error code: " << result << " - " - << cudaGetErrorString(result) << ")" << std::endl; - exit(EXIT_FAILURE); - } -} -#endif +using namespace mooncake; -#define NR_SOCKETS (1) +namespace mooncake { -DEFINE_string(local_server_name, mooncake::getHostname(), +DEFINE_string(local_server_name, getHostname(), "Local server name for segment discovery"); -DEFINE_string(metadata_server, "192.168.3.77:2379", "etcd server host address"); +DEFINE_string(metadata_server, "127.0.0.1:2379", "etcd server host address"); DEFINE_string(mode, "initiator", "Running mode: initiator or target. Initiator node read/write " "data blocks from target node"); DEFINE_string(operation, "read", "Operation type: read or write"); -DEFINE_string(protocol, "ub", "Transfer protocol: ub|tcp"); +DEFINE_string(protocol, "ub", "Transfer protocol: ub"); -DEFINE_string(device_name, "bonding_dev_0", +DEFINE_string(device_name, "mock_urma_device", "Device name to use, valid if protocol=ub"); DEFINE_string(nic_priority_matrix, "", "Path to UB NIC priority matrix file (Advanced)"); -DEFINE_string(segment_id, "192.168.3.76", "Segment ID to access data"); +DEFINE_string(segment_id, "127.0.0.1", "Segment ID to access data"); -#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_HIP) -DEFINE_bool(use_vram, true, "Allocate memory from GPU VRAM"); -DEFINE_int32(gpu_id, 0, "GPU ID to use"); -#endif +std::string formatDeviceNames(const std::string &device_names) { + std::stringstream ss(device_names); + std::string item; + std::vector tokens; + while (getline(ss, item, ',')) { + tokens.push_back(item); + } -using namespace mooncake; + std::string formatted; + for (size_t i = 0; i < tokens.size(); ++i) { + formatted += "\"" + tokens[i] + "\""; + if (i < tokens.size() - 1) { + formatted += ","; + } + } + return formatted; +} -static void* allocateMemoryPool(size_t size, int socket_id, - bool from_vram = false) { -#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_HIP) - if (from_vram) { - int gpu_id = FLAGS_gpu_id; - void* d_buf; - checkCudaError(cudaSetDevice(gpu_id), "Failed to set device"); - checkCudaError(cudaMalloc(&d_buf, size), - "Failed to allocate device memory"); - return d_buf; +std::string loadNicPriorityMatrix() { + if (!FLAGS_nic_priority_matrix.empty()) { + std::ifstream file(FLAGS_nic_priority_matrix); + if (file.is_open()) { + std::string content((std::istreambuf_iterator(file)), + std::istreambuf_iterator()); + file.close(); + return content; + } } -#endif + // Build JSON Data + auto device_names = formatDeviceNames(FLAGS_device_name); + return "{\"cpu:0\": [[" + device_names + + "], []], " + " \"cpu:1\": [[" + + device_names + + "], []], " + " \"cuda:0\": [[" + + device_names + + "], []], " + " \"musa:0\": [[" + + device_names + "], []]}"; +} + +static void *allocateMemoryPool(size_t size, int socket_id, + bool from_vram = false) { return numa_alloc_onnode(size, socket_id); } -static void freeMemoryPool(void* addr, size_t size) { -#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_HIP) - // check pointer on GPU - cudaPointerAttributes attributes; - checkCudaError(cudaPointerGetAttributes(&attributes, addr), - "Failed to get pointer attributes"); +static void freeMemoryPool(void *addr, size_t size) { numa_free(addr, size); } - if (attributes.type == cudaMemoryTypeDevice) { - cudaFree(addr); - } else if (attributes.type == cudaMemoryTypeHost) { - numa_free(addr, size); - } else { - LOG(ERROR) << "Unknown memory type"; +class UBTransportTest : public ::testing::Test { + public: + std::shared_ptr metadata_client; + void *addr = nullptr; + std::pair hostname_port; + std::unique_ptr engine; + const size_t ram_buffer_size = 1ull << 30; + Transport *xport; + void **args; + mooncake::Transport::SegmentID segment_id; + std::shared_ptr segment_desc; + uint64_t remote_base; + + protected: + void SetUp() override { + static int offset = 0; + google::InitGoogleLogging("UBTransportTest"); + FLAGS_logtostderr = 1; + // disable topology auto discovery for testing. + engine = std::make_unique(false); + hostname_port = parseHostNameWithPort(FLAGS_local_server_name); + engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(), + hostname_port.first.c_str(), + hostname_port.second + offset++); + xport = nullptr; + std::string nic_priority_matrix = loadNicPriorityMatrix(); + args = (void **)malloc(2 * sizeof(void *)); + args[0] = (void *)nic_priority_matrix.c_str(); + args[1] = nullptr; + xport = engine->installTransport("ub", args); + ASSERT_NE(xport, nullptr); + addr = allocateMemoryPool(ram_buffer_size, 0, false); + int rc = engine->registerLocalMemory(addr, ram_buffer_size, "cpu:0"); + ASSERT_EQ(rc, 0); + // For testing purposes, we'll use the local memory address directly + // instead of trying to open a remote segment + segment_id = 0; // Dummy segment ID for testing + remote_base = (uint64_t)addr; } -#else - numa_free(addr, size); -#endif -} -int initiatorWorker(TransferEngine* engine, SegmentID segment_id, int thread_id, - void* addr) { - bindToSocket(0); - auto segment_desc = engine->getMetadata()->getSegmentDescByID(segment_id); - auto remote_base = (uint64_t)segment_desc->buffers[0].addr; + void TearDown() override { + google::ShutdownGoogleLogging(); + engine->unregisterLocalMemory(addr); + freeMemoryPool(addr, ram_buffer_size); + } +}; + +TEST_F(UBTransportTest, MultiWrite) { const size_t kDataLength = 4096000; - { - LOG(INFO) << "Stage 1: Write Data"; + int times = 10; + while (times--) { for (size_t offset = 0; offset < kDataLength; ++offset) - *((char*)(addr) + offset) = 'a' + lrand48() % 26; - - LOG(INFO) << "Write Data: " << std::string((char*)(addr), 16) << "..."; - + *((char *)(addr) + offset) = 'a' + lrand48() % 26; auto batch_id = engine->allocateBatchID(1); Status s; - TransferRequest entry; entry.opcode = TransferRequest::WRITE; entry.length = kDataLength; - entry.source = (uint8_t*)(addr); + entry.source = (uint8_t *)(addr); entry.target_id = segment_id; entry.target_offset = remote_base; s = engine->submitTransfer(batch_id, {entry}); @@ -140,7 +163,7 @@ int initiatorWorker(TransferEngine* engine, SegmentID segment_id, int thread_id, TransferStatus status; while (!completed) { Status s = engine->getTransferStatus(batch_id, 0, status); - LOG_ASSERT(s.ok()); + ASSERT_EQ(s, Status::OK()); if (status.s == TransferStatusEnum::COMPLETED) completed = true; else if (status.s == TransferStatusEnum::FAILED) { @@ -149,18 +172,23 @@ int initiatorWorker(TransferEngine* engine, SegmentID segment_id, int thread_id, } } s = engine->freeBatchID(batch_id); - LOG_ASSERT(s.ok()); + ASSERT_EQ(s, Status::OK()); } +} + +TEST_F(UBTransportTest, MultipleRead) { + const size_t kDataLength = 4096000; + int times = 10; + while (times--) { + for (size_t offset = 0; offset < kDataLength; ++offset) + *((char *)(addr) + offset) = 'a' + lrand48() % 26; - { - LOG(INFO) << "Stage 2: Read Data"; auto batch_id = engine->allocateBatchID(1); Status s; - TransferRequest entry; - entry.opcode = TransferRequest::READ; + entry.opcode = TransferRequest::WRITE; entry.length = kDataLength; - entry.source = (uint8_t*)(addr) + kDataLength; + entry.source = (uint8_t *)(addr); entry.target_id = segment_id; entry.target_offset = remote_base; s = engine->submitTransfer(batch_id, {entry}); @@ -169,7 +197,7 @@ int initiatorWorker(TransferEngine* engine, SegmentID segment_id, int thread_id, TransferStatus status; while (!completed) { Status s = engine->getTransferStatus(batch_id, 0, status); - LOG_ASSERT(s.ok()); + ASSERT_EQ(s, Status::OK()); if (status.s == TransferStatusEnum::COMPLETED) completed = true; else if (status.s == TransferStatusEnum::FAILED) { @@ -178,154 +206,42 @@ int initiatorWorker(TransferEngine* engine, SegmentID segment_id, int thread_id, } } s = engine->freeBatchID(batch_id); - LOG_ASSERT(s.ok()); - } - - int ret = - memcmp((uint8_t*)(addr), (uint8_t*)(addr) + kDataLength, kDataLength); - LOG(INFO) << "Read Data: " << std::string((char*)(addr) + kDataLength, 16) - << "..."; - LOG(INFO) << "Compare: " << (ret == 0 ? "OK" : "FAILED"); - - return 0; -} - -std::string formatDeviceNames(const std::string& device_names) { - std::stringstream ss(device_names); - std::string item; - std::vector tokens; - while (getline(ss, item, ',')) { - tokens.push_back(item); - } - - std::string formatted; - for (size_t i = 0; i < tokens.size(); ++i) { - formatted += "\"" + tokens[i] + "\""; - if (i < tokens.size() - 1) { - formatted += ","; - } + ASSERT_EQ(s, Status::OK()); } - return formatted; -} - -std::string loadNicPriorityMatrix() { - if (!FLAGS_nic_priority_matrix.empty()) { - std::ifstream file(FLAGS_nic_priority_matrix); - if (file.is_open()) { - std::string content((std::istreambuf_iterator(file)), - std::istreambuf_iterator()); - file.close(); - return content; + times = 10; + while (times--) { + auto batch_id = engine->allocateBatchID(1); + int ret = 0; + TransferRequest entry; + entry.opcode = TransferRequest::READ; + entry.length = kDataLength; + entry.source = (uint8_t *)(addr) + kDataLength; + entry.target_id = segment_id; + entry.target_offset = remote_base; + Status s; + s = engine->submitTransfer(batch_id, {entry}); + ASSERT_EQ(s, Status::OK()); + bool completed = false; + TransferStatus status; + while (!completed) { + Status s = engine->getTransferStatus(batch_id, 0, status); + ASSERT_EQ(s, Status::OK()); + if (status.s == TransferStatusEnum::COMPLETED) + completed = true; + else if (status.s == TransferStatusEnum::FAILED) { + completed = true; + } } + s = engine->freeBatchID(batch_id); + ASSERT_EQ(s, Status::OK()); } - // Build JSON Data - auto device_names = formatDeviceNames(FLAGS_device_name); - return "{\"cpu:0\": [[" + device_names + - "], []], " - " \"cpu:1\": [[" + - device_names + - "], []], " - " \"cuda:0\": [[" + - device_names + - "], []], " - " \"musa:0\": [[" + - device_names + "], []]}"; -} - -int initiator() { - const size_t ram_buffer_size = 1ull << 30; - // disable topology auto discovery for testing. - auto filters = std::vector({FLAGS_device_name}); - auto engine = std::make_unique(true, filters); - - auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name); - engine->init(FLAGS_metadata_server, FLAGS_local_server_name, - hostname_port.first, hostname_port.second); - - Transport* xport = nullptr; - if (FLAGS_protocol == "rdma") { - auto nic_priority_matrix = loadNicPriorityMatrix(); - void** args = (void**)malloc(2 * sizeof(void*)); - args[0] = (void*)nic_priority_matrix.c_str(); - args[1] = nullptr; - xport = engine->installTransport("rdma", args); - } else if (FLAGS_protocol == "ub") { - xport = engine->installTransport("ub", nullptr); - } else if (FLAGS_protocol == "tcp") { - xport = engine->installTransport("tcp", nullptr); - } else { - LOG(ERROR) << "Unsupported protocol"; - } - - LOG_ASSERT(xport); - - void* addr = nullptr; -#if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_HIP) - addr = allocateMemoryPool(ram_buffer_size, 0, FLAGS_use_vram); - std::string name_prefix = FLAGS_use_vram ? GPU_PREFIX : "cpu:"; - int name_suffix = FLAGS_use_vram ? FLAGS_gpu_id : 0; - int rc = engine->registerLocalMemory( - addr, ram_buffer_size, name_prefix + std::to_string(name_suffix)); - LOG_ASSERT(!rc); -#else - addr = allocateMemoryPool(ram_buffer_size, 0, false); - int rc = - engine->registerLocalMemory(addr, ram_buffer_size, kWildcardLocation); - LOG_ASSERT(!rc); -#endif - - auto segment_id = engine->openSegment(FLAGS_segment_id.c_str()); - std::thread workers(initiatorWorker, engine.get(), segment_id, 0, addr); - workers.join(); - engine->unregisterLocalMemory(addr); - freeMemoryPool(addr, ram_buffer_size); - return 0; } -int target() { - const size_t ram_buffer_size = 1ull << 30; - // disable topology auto discovery for testing. - auto filters = std::vector({FLAGS_device_name}); - auto engine = std::make_unique(true, filters); - - auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name); - engine->init(FLAGS_metadata_server, FLAGS_local_server_name, - hostname_port.first, hostname_port.second); - - if (FLAGS_protocol == "rdma") { - auto nic_priority_matrix = loadNicPriorityMatrix(); - void** args = (void**)malloc(2 * sizeof(void*)); - args[0] = (void*)nic_priority_matrix.c_str(); - args[1] = nullptr; - engine->installTransport("rdma", args); - } else if (FLAGS_protocol == "ub") { - engine->installTransport("ub", nullptr); - } else if (FLAGS_protocol == "tcp") { - engine->installTransport("tcp", nullptr); - } else { - LOG(ERROR) << "Unsupported protocol"; - } - - void* addr = nullptr; - addr = allocateMemoryPool(ram_buffer_size, 0); - int rc = engine->registerLocalMemory(addr, ram_buffer_size, "cpu:0"); - LOG_ASSERT(!rc); +} // namespace mooncake - while (true) sleep(1); - - engine->unregisterLocalMemory(addr); - freeMemoryPool(addr, ram_buffer_size); - return 0; -} - -int main(int argc, char** argv) { +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); gflags::ParseCommandLineFlags(&argc, &argv, false); - - if (FLAGS_mode == "initiator") - return initiator(); - else if (FLAGS_mode == "target") - return target(); - - LOG(ERROR) << "Unsupported mode: must be 'initiator' or 'target'"; - exit(EXIT_FAILURE); -} \ No newline at end of file + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From 53fe6ad0aa93480151927f787fb159cc72e60407 Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 03:22:54 +0000 Subject: [PATCH 02/12] modify the mock urma api src --- .../transport/kunpeng_transport/urma/mock_urma_api.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp index 92f724bf60..13afc3d74b 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp @@ -23,6 +23,15 @@ namespace { .max_jetty = 1024 }, .port_cnt = 1, + .port_attr = { + { + .max_mtu = URMA_MTU_4096, + .state = URMA_PORT_ACTIVE, + .active_width = URMA_LINK_X1, + .active_speed = URMA_SP_100G, + .active_mtu = URMA_MTU_4096 + } + } }; urma_eid_info_t mock_eid_info = { From 33a2c3d859b13f8bb22457d0dec3a4d6e04416b0 Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 04:14:46 +0000 Subject: [PATCH 03/12] remove the URMA_MOCK and refine the code for ub_transport_test --- mooncake-common/common.cmake | 6 ----- .../kunpeng_transport/CMakeLists.txt | 22 ++++++++++++++----- mooncake-transfer-engine/tests/CMakeLists.txt | 2 +- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/mooncake-common/common.cmake b/mooncake-common/common.cmake index 83603c8299..79eff13fd3 100644 --- a/mooncake-common/common.cmake +++ b/mooncake-common/common.cmake @@ -75,7 +75,6 @@ option(USE_MNNVL "option for using Multi-Node NVLink transport" OFF) option(USE_CXL "option for using CXL protocol" OFF) option(USE_EFA "option for using AWS EFA transport" OFF) option(USE_UB "option for using UB protocol transport" OFF) -option(URMA_MOCK "option for using URMA api mock support ci/testing" OFF) if (USE_UB) add_compile_definitions(USE_UB) @@ -83,11 +82,6 @@ if (USE_UB) include(${CMAKE_CURRENT_LIST_DIR}/FindUrma.cmake) endif() -if (URMA_MOCK) - add_compile_definitions(URMA_MOCK) - message(STATUS "ub transport urma mock is enabled") -endif() - if (USE_EFA) # Find libfabric headers and library; default to AWS EFA installer path find_path(LIBFABRIC_INCLUDE_DIR rdma/fabric.h diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt b/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt index 773dc7dbb4..f272b69aeb 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt @@ -1,15 +1,25 @@ file(GLOB UB_SOURCES "*.cpp") -if (URMA_MOCK) - list(APPEND UB_SOURCES "urma/mock_urma_api.cpp") - add_library(ub_transport OBJECT ${UB_SOURCES}) -else () +# Check if liburma.so exists +find_library(URMA_LIBRARY urma PATHS /usr/lib64) + +# Build ub_transport with real URMA library if available +# Never include mock_urma_api.cpp in ub_transport library +# For test, we will use mock_urma_api.cpp directly +if (URMA_LIBRARY) add_library(ub_transport OBJECT ${UB_SOURCES}) target_link_libraries(ub_transport PUBLIC - /usr/lib64/liburma.so + ${URMA_LIBRARY} ) -endif () + message(STATUS "Using real URMA library: ${URMA_LIBRARY}") +else() + # If liburma.so not found, we'll need to handle this differently + # For now, just build without mock_urma_api.cpp + list(APPEND UB_SOURCES "urma/mock_urma_api.cpp") + add_library(ub_transport OBJECT ${UB_SOURCES}) + message(WARNING "Not Found liburma.so building ub_transport with Mock URMA library") +endif() target_include_directories(ub_transport PUBLIC ${urma_INCLUDE_DIR} diff --git a/mooncake-transfer-engine/tests/CMakeLists.txt b/mooncake-transfer-engine/tests/CMakeLists.txt index d5c15199d9..bb515ab24e 100644 --- a/mooncake-transfer-engine/tests/CMakeLists.txt +++ b/mooncake-transfer-engine/tests/CMakeLists.txt @@ -69,7 +69,7 @@ endif() if (USE_UB) add_executable(ub_transport_test ${WORKSPACE}/ub_transport_test.cpp) target_link_libraries(ub_transport_test PUBLIC transfer_engine gtest gtest_main glog::glog pthread) - target_compile_definitions(ub_transport_test PRIVATE URMA_MOCK) + target_include_directories(ub_transport_test PRIVATE ${urma_INCLUDE_DIR}) add_test(NAME ub_transport_test COMMAND ub_transport_test) endif() From b2144effdc41aa49ee220d8d4bd6b4b10abbe98c Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 06:17:10 +0000 Subject: [PATCH 04/12] format the code --- .../kunpeng_transport/ub_transport.cpp | 10 +- .../kunpeng_transport/urma/mock_urma_api.cpp | 116 +++++++++--------- .../kunpeng_transport/urma_endpoint.cpp | 4 +- .../tests/ub_transport_test.cpp | 2 +- 4 files changed, 69 insertions(+), 63 deletions(-) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp index 7f80236e0b..dd2658f735 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp @@ -431,19 +431,19 @@ int UbTransport::initializeUbResources(UbTransport* t) { LOG(ERROR) << "Failed to init, ret = " << ret; return -1; } - + std::vector hca_list; // Try to get device list from topology if (t->local_topology_) { hca_list = t->local_topology_->getHcaList(); } - + // If no devices from topology, use mock device if (hca_list.empty()) { hca_list.push_back("mock_urma_device"); LOG(INFO) << "Using mock_urma_device for testing"; } - + for (auto& device_name : hca_list) { auto& config = globalConfig(); auto max_endpoints = config.max_ep_per_ctx; @@ -459,12 +459,12 @@ int UbTransport::initializeUbResources(UbTransport* t) { LOG(INFO) << "device " << context->deviceName() << " add to list"; } } - + if (t->context_list_.empty()) { LOG(ERROR) << "UbTransport: No available RNIC"; return ERR_DEVICE_NOT_FOUND; } - + LOG(INFO) << "ub resources init success"; return 0; } diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp index 13afc3d74b..9e006a0a0b 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp @@ -5,40 +5,32 @@ #include namespace { - std::mutex mock_mutex; - bool initialized = false; - std::vector device_list; - std::map context_map; - std::map jfce_map; - std::map> jfc_user_ctx_map; - std::map jfr_map; - std::map seg_map; - std::map jetty_map; - std::map target_jetty_map; - - urma_device_attr_t mock_device_attr = { - .guid={.raw = {10}}, - .dev_cap = { - .max_jfc = 1024, - .max_jetty = 1024 - }, - .port_cnt = 1, - .port_attr = { - { - .max_mtu = URMA_MTU_4096, - .state = URMA_PORT_ACTIVE, - .active_width = URMA_LINK_X1, - .active_speed = URMA_SP_100G, - .active_mtu = URMA_MTU_4096 - } - } - }; - - urma_eid_info_t mock_eid_info = { - .eid = {{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}}, - .eid_index = 0 - }; -} +std::mutex mock_mutex; +bool initialized = false; +std::vector device_list; +std::map context_map; +std::map jfce_map; +std::map> jfc_user_ctx_map; +std::map jfr_map; +std::map seg_map; +std::map jetty_map; +std::map target_jetty_map; + +urma_device_attr_t mock_device_attr = { + .guid = {.raw = {10}}, + .dev_cap = {.max_jfc = 1024, .max_jetty = 1024}, + .port_cnt = 1, + .port_attr = {{.max_mtu = URMA_MTU_4096, + .state = URMA_PORT_ACTIVE, + .active_width = URMA_LINK_X1, + .active_speed = URMA_SP_100G, + .active_mtu = URMA_MTU_4096}}}; + +urma_eid_info_t mock_eid_info = { + .eid = {{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, + 0x0C, 0x0D, 0x0E, 0x0F, 0x10}}, + .eid_index = 0}; +} // namespace urma_status_t urma_init(urma_init_attr_t *init_attr) { std::lock_guard lock(mock_mutex); @@ -84,7 +76,7 @@ urma_device_t **urma_get_device_list(int *num_devices) { } *num_devices = device_list.size(); - urma_device_t **devices = new urma_device_t*[device_list.size()]; + urma_device_t **devices = new urma_device_t *[device_list.size()]; for (size_t i = 0; i < device_list.size(); ++i) { devices[i] = device_list[i]; } @@ -122,7 +114,8 @@ void urma_free_device_list(urma_device_t **device_list) { } } -urma_status_t urma_query_device(urma_device_t *device, urma_device_attr_t *attr) { +urma_status_t urma_query_device(urma_device_t *device, + urma_device_attr_t *attr) { std::lock_guard lock(mock_mutex); if (!device || !attr) { return URMA_EINVAL; @@ -175,7 +168,7 @@ urma_jfce_t *urma_create_jfce(urma_context_t *ctx) { if (!ctx || context_map.find(ctx) == context_map.end()) { return nullptr; } - urma_jfce_t *jfce = reinterpret_cast(new int(1)); + urma_jfce_t *jfce = reinterpret_cast(new int(1)); jfce_map[jfce] = 1; return jfce; } @@ -186,7 +179,7 @@ urma_status_t urma_delete_jfce(urma_jfce_t *jfce) { return URMA_EINVAL; } jfce_map.erase(jfce); - delete reinterpret_cast(jfce); + delete reinterpret_cast(jfce); return URMA_SUCCESS; } @@ -223,7 +216,7 @@ urma_jfr_t *urma_create_jfr(urma_context_t *ctx, urma_jfr_cfg_t *cfg) { if (!ctx || !cfg || context_map.find(ctx) == context_map.end()) { return nullptr; } - urma_jfr_t *jfr = reinterpret_cast(new int(1)); + urma_jfr_t *jfr = reinterpret_cast(new int(1)); jfr_map[jfr] = 1; return jfr; } @@ -234,7 +227,7 @@ urma_status_t urma_delete_jfr(urma_jfr_t *jfr) { return URMA_EINVAL; } jfr_map.erase(jfr); - delete reinterpret_cast(jfr); + delete reinterpret_cast(jfr); return URMA_SUCCESS; } @@ -264,9 +257,12 @@ urma_status_t urma_unregister_seg(urma_target_seg_t *seg) { return URMA_SUCCESS; } -urma_target_seg_t *urma_import_seg(urma_context_t *ctx, urma_seg_t *seg, urma_token_t *token_value, uint64_t addr, urma_import_seg_flag_t flag) { +urma_target_seg_t *urma_import_seg(urma_context_t *ctx, urma_seg_t *seg, + urma_token_t *token_value, uint64_t addr, + urma_import_seg_flag_t flag) { std::lock_guard lock(mock_mutex); - if (!ctx || !seg || !token_value || context_map.find(ctx) == context_map.end()) { + if (!ctx || !seg || !token_value || + context_map.find(ctx) == context_map.end()) { return nullptr; } urma_target_seg_t *tseg = new urma_target_seg_t; @@ -286,7 +282,8 @@ urma_status_t urma_unimport_seg(urma_target_seg_t *tseg) { return URMA_SUCCESS; } -urma_status_t urma_get_async_event(urma_context_t *ctx, urma_async_event_t *event) { +urma_status_t urma_get_async_event(urma_context_t *ctx, + urma_async_event_t *event) { std::lock_guard lock(mock_mutex); if (!ctx || !event || context_map.find(ctx) == context_map.end()) { return URMA_EINVAL; @@ -294,8 +291,7 @@ urma_status_t urma_get_async_event(urma_context_t *ctx, urma_async_event_t *even return URMA_ETIMEOUT; } -void urma_ack_async_event(urma_async_event_t *event) { -} +void urma_ack_async_event(urma_async_event_t *event) {} urma_jetty_t *urma_create_jetty(urma_context_t *ctx, urma_jetty_cfg_t *cfg) { std::lock_guard lock(mock_mutex); @@ -332,12 +328,16 @@ urma_status_t urma_unbind_jetty(urma_jetty_t *jetty) { return URMA_SUCCESS; } -urma_target_jetty_t *urma_import_jetty(urma_context_t *ctx, urma_rjetty_t *rjetty, urma_token_t *token_value) { +urma_target_jetty_t *urma_import_jetty(urma_context_t *ctx, + urma_rjetty_t *rjetty, + urma_token_t *token_value) { std::lock_guard lock(mock_mutex); - if (!ctx || !rjetty || !token_value || context_map.find(ctx) == context_map.end()) { + if (!ctx || !rjetty || !token_value || + context_map.find(ctx) == context_map.end()) { return nullptr; } - urma_target_jetty_t *tjetty = reinterpret_cast(new int(1)); + urma_target_jetty_t *tjetty = + reinterpret_cast(new int(1)); target_jetty_map[tjetty] = 1; *token_value = {.token = 1}; return tjetty; @@ -349,13 +349,15 @@ urma_status_t urma_unimport_jetty(urma_target_jetty_t *tjetty) { return URMA_EINVAL; } target_jetty_map.erase(tjetty); - delete reinterpret_cast(tjetty); + delete reinterpret_cast(tjetty); return URMA_SUCCESS; } -urma_status_t urma_bind_jetty(urma_jetty_t *jetty, urma_target_jetty_t *tjetty) { +urma_status_t urma_bind_jetty(urma_jetty_t *jetty, + urma_target_jetty_t *tjetty) { std::lock_guard lock(mock_mutex); - if (!jetty || !tjetty || jetty_map.find(jetty) == jetty_map.end() || target_jetty_map.find(tjetty) == target_jetty_map.end()) { + if (!jetty || !tjetty || jetty_map.find(jetty) == jetty_map.end() || + target_jetty_map.find(tjetty) == target_jetty_map.end()) { return URMA_EINVAL; } jetty->remote_jetty = tjetty; @@ -370,7 +372,8 @@ urma_status_t urma_modify_jetty(urma_jetty_t *jetty, urma_jetty_attr_t *attr) { return URMA_SUCCESS; } -urma_status_t urma_post_jetty_send_wr(urma_jetty_t *jetty, urma_jfs_wr_t *wr, urma_jfs_wr_t **bad_wr) { +urma_status_t urma_post_jetty_send_wr(urma_jetty_t *jetty, urma_jfs_wr_t *wr, + urma_jfs_wr_t **bad_wr) { std::lock_guard lock(mock_mutex); if (!jetty || !wr || jetty_map.find(jetty) == jetty_map.end()) { if (bad_wr) { @@ -381,7 +384,8 @@ urma_status_t urma_post_jetty_send_wr(urma_jetty_t *jetty, urma_jfs_wr_t *wr, ur urma_jfs_wr_t *current_wr = wr; while (current_wr) { - jfc_user_ctx_map[jetty->jetty_cfg.jfs_cfg.jfc].push_back(current_wr->user_ctx); + jfc_user_ctx_map[jetty->jetty_cfg.jfs_cfg.jfc].push_back( + current_wr->user_ctx); current_wr = current_wr->next; } @@ -393,7 +397,8 @@ urma_status_t urma_post_jetty_send_wr(urma_jetty_t *jetty, urma_jfs_wr_t *wr, ur int urma_poll_jfc(urma_jfc_t *jfc, int num_entries, urma_cr_t *cr_list) { std::lock_guard lock(mock_mutex); - if (!jfc || !cr_list || jfc_user_ctx_map.find(jfc) == jfc_user_ctx_map.end()) { + if (!jfc || !cr_list || + jfc_user_ctx_map.find(jfc) == jfc_user_ctx_map.end()) { return -1; } int available = jfc_user_ctx_map[jfc].size(); @@ -402,6 +407,7 @@ int urma_poll_jfc(urma_jfc_t *jfc, int num_entries, urma_cr_t *cr_list) { cr_list[i].status = URMA_CR_SUCCESS; cr_list[i].user_ctx = jfc_user_ctx_map[jfc][i]; } - jfc_user_ctx_map[jfc].erase(jfc_user_ctx_map[jfc].begin(), jfc_user_ctx_map[jfc].begin() + num_completed); + jfc_user_ctx_map[jfc].erase(jfc_user_ctx_map[jfc].begin(), + jfc_user_ctx_map[jfc].begin() + num_completed); return num_completed; } diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp index 2c5b869d1b..8627cac0e7 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp @@ -443,8 +443,8 @@ int UrmaContext::openDevice(const std::string& device_name, uint8_t port, } urma_free_device_list(devices); - return 0; -} + return 0; + } urma_free_device_list(devices); LOG(ERROR) << "No matched device found: " << device_name; diff --git a/mooncake-transfer-engine/tests/ub_transport_test.cpp b/mooncake-transfer-engine/tests/ub_transport_test.cpp index afff41a89c..b225fb6b09 100644 --- a/mooncake-transfer-engine/tests/ub_transport_test.cpp +++ b/mooncake-transfer-engine/tests/ub_transport_test.cpp @@ -132,7 +132,7 @@ class UBTransportTest : public ::testing::Test { ASSERT_EQ(rc, 0); // For testing purposes, we'll use the local memory address directly // instead of trying to open a remote segment - segment_id = 0; // Dummy segment ID for testing + segment_id = 0; // Dummy segment ID for testing remote_base = (uint64_t)addr; } From 226d79296643cad5341c6cc8196c43663cb5e1ed Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 07:22:37 +0000 Subject: [PATCH 05/12] add the ub transport ci --- .github/workflows/ci.yml | 11 +++- .github/workflows/ci_ub.yml | 101 ++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/ci_ub.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c1882b7a21..6bca561de6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -503,7 +503,7 @@ jobs: cd build export PATH=/usr/local/nvidia/bin:/usr/local/nvidia/lib64:$PATH export LD_LIBRARY_PATH=/usr/local/cuda/lib64/stubs:$LD_LIBRARY_PATH - cmake -G Ninja .. -DUSE_ETCD=OFF -DUSE_CXL=ON -DUSE_REDIS=ON -DUSE_HTTP=ON -DWITH_METRICS=ON -DBUILD_UNIT_TESTS=ON -DBUILD_EXAMPLES=ON -DENABLE_SCCACHE=ON -DUSE_CUDA=OFF -DUSE_MNNVL=OFF -DCMAKE_EXE_LINKER_FLAGS="-L/usr/local/cuda/lib64/stubs" + cmake -G Ninja .. -DUSE_ETCD=OFF -DUSE_CXL=ON -DUSE_REDIS=ON -DUSE_HTTP=ON -DWITH_METRICS=ON -DBUILD_UNIT_TESTS=ON -DBUILD_EXAMPLES=ON -DENABLE_SCCACHE=ON -DUSE_CUDA=OFF -DUSE_MNNVL=OFF -DUSE_UB=ON -DCMAKE_EXE_LINKER_FLAGS="-L/usr/local/cuda/lib64/stubs" cmake --build . sudo cmake --install . df -h @@ -513,7 +513,7 @@ jobs: run: | mkdir build cd build - cmake -G Ninja .. -DUSE_ETCD=ON -DUSE_CXL=ON -DUSE_REDIS=ON -DUSE_HTTP=ON -DWITH_STORE=ON -DWITH_P2P_STORE=ON -DWITH_METRICS=ON -DBUILD_UNIT_TESTS=ON -DBUILD_EXAMPLES=ON -DENABLE_SCCACHE=ON -DUSE_CUDA=ON -DUSE_MNNVL=OFF -DCMAKE_EXE_LINKER_FLAGS="-L/usr/local/cuda/lib64/stubs" + cmake -G Ninja .. -DUSE_ETCD=ON -DUSE_CXL=ON -DUSE_REDIS=ON -DUSE_HTTP=ON -DWITH_STORE=ON -DWITH_P2P_STORE=ON -DWITH_METRICS=ON -DBUILD_UNIT_TESTS=ON -DBUILD_EXAMPLES=ON -DENABLE_SCCACHE=ON -DUSE_CUDA=ON -DUSE_MNNVL=OFF -DUSE_UB=ON -DCMAKE_EXE_LINKER_FLAGS="-L/usr/local/cuda/lib64/stubs" shell: bash # TODO: lack USE_NVMEOF,USE_MNNVL @@ -722,6 +722,12 @@ jobs: uses: ./.github/workflows/integration-test.yml secrets: inherit + ub-transport-test: + needs: [build, check-paths] + if: needs.check-paths.outputs.should-run-downstream == 'true' + uses: ./.github/workflows/ci_ub.yml + secrets: inherit + ci-gate: name: CI Gate if: always() @@ -735,6 +741,7 @@ jobs: - test-wheel-ubuntu - ascend-test - integration-test + - ub-transport-test runs-on: ubuntu-latest steps: - name: Check required job results diff --git a/.github/workflows/ci_ub.yml b/.github/workflows/ci_ub.yml new file mode 100644 index 0000000000..2e2c0bc3f1 --- /dev/null +++ b/.github/workflows/ci_ub.yml @@ -0,0 +1,101 @@ +name: 'UB Transport Test' + +on: + workflow_call: {} + +jobs: + build-and-test: + runs-on: ubuntu-22.04 + env: + CI: "true" + SCCACHE_GHA_ENABLED: "true" + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python 3.10 + uses: actions/setup-python@v5 + with: + python-version: '3.10' + + - name: Check and start etcd + run: | + # Check if etcd is already installed + if command -v etcd &> /dev/null && command -v etcdctl &> /dev/null; then + echo "etcd is already installed" + # Check if etcd is running + if etcdctl --endpoints=http://127.0.0.1:2379 endpoint health > /dev/null 2>&1; then + echo "etcd is already running" + else + echo "etcd is not running, starting it..." + etcd --advertise-client-urls http://127.0.0.1:2379 --listen-client-urls http://127.0.0.1:2379 & + sleep 3 # Give etcd time to start + etcdctl --endpoints=http://127.0.0.1:2379 endpoint health + fi + else + echo "etcd is not installed, installing..." + wget https://github.com/etcd-io/etcd/releases/download/v3.6.1/etcd-v3.6.1-linux-amd64.tar.gz + tar xzf etcd-v3.6.1-linux-amd64.tar.gz + sudo mv etcd-v3.6.1-linux-amd64/etcd* /usr/local/bin/ + etcd --advertise-client-urls http://127.0.0.1:2379 --listen-client-urls http://127.0.0.1:2379 & + sleep 3 # Give etcd time to start + etcdctl --endpoints=http://127.0.0.1:2379 endpoint health + fi + shell: bash + + - name: Free up disk space + run: | + sudo rm -rf /usr/share/dotnet + sudo rm -rf /opt/ghc + sudo rm -rf /opt/hostedtoolcache/CodeQL + + - name: Install build utilities + run: | + sudo apt-get update + sudo apt-get install -y ninja-build + + - name: Run sccache-cache + uses: mozilla-actions/sccache-action@v0.0.9 + + - name: Configure sccache + uses: actions/github-script@v7 + with: + script: | + core.exportVariable('ACTIONS_RESULTS_URL', process.env.ACTIONS_RESULTS_URL || ''); + core.exportVariable('ACTIONS_RUNTIME_TOKEN', process.env.ACTIONS_RUNTIME_TOKEN || ''); + + - name: Install dependencies + run: | + sudo apt update -y + sudo bash -x dependencies.sh -y + shell: bash + + - name: Configure project with UB support + run: | + mkdir build + cd build + cmake -G Ninja .. \ + -DUSE_UB=ON \ + -DUSE_ETCD=ON \ + -DBUILD_UNIT_TESTS=ON \ + -DBUILD_EXAMPLES=OFF + shell: bash + + - name: Build project + run: | + cd build + cmake --build . --target ub_transport_test + shell: bash + + - name: Run UB transport tests + run: | + cd build + MC_METADATA_SERVER=http://127.0.0.1:2379 ./mooncake-transfer-engine/tests/ub_transport_test + shell: bash + + - name: Test Summary + if: always() + shell: bash + run: | + echo "UB Transport Test completed" From 986b72c96d7d9ed4f6f5a7bce4d60eccbe5d424d Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 07:49:15 +0000 Subject: [PATCH 06/12] fix the ub transport ci bug --- .../transfer-engine/kunpeng_ub_transport.md | 293 ++++++++++++++++++ mooncake-transfer-engine/src/topology.cpp | 13 +- 2 files changed, 299 insertions(+), 7 deletions(-) create mode 100644 docs/source/design/transfer-engine/kunpeng_ub_transport.md diff --git a/docs/source/design/transfer-engine/kunpeng_ub_transport.md b/docs/source/design/transfer-engine/kunpeng_ub_transport.md new file mode 100644 index 0000000000..c48ca2c5d3 --- /dev/null +++ b/docs/source/design/transfer-engine/kunpeng_ub_transport.md @@ -0,0 +1,293 @@ +# Kunpeng UB Transport for Mooncake + +This document describes how to build and use Mooncake with Kunpeng UB (Unified Bus) transport support using URMA (Unified Remote Memory Access). + +## Overview + +UB (Unified Bus) is a transport protocol at the same abstraction layer as RDMA, CXL, NVLink, and TCP, providing a flexible transport solution that can be selected at the application layer. Currently, UB protocol has two open-source implementations: + +- **URMA (Unified Remote Memory Access)**: Provides a unified programming abstraction and core semantic layer for upper-layer applications. It offers unified APIs and semantic interfaces for remote shared memory access and operations, leveraging the low-latency, high-bandwidth characteristics of the UB protocol. + - URMA open-source repository: https://atomgit.com/openeuler/umdk + +- **OBMM (Ownership Based Memory Management)**: A kernel memory management system for supernode environments, supporting cross-node physical memory sharing. It provides efficient remote memory access capabilities through a kernel module (obmm.ko) and a user-space library (libobmm.so). + - OBMM open-source repository: https://atomgit.com/openeuler/obmm + +## Prerequisites + +### 1. Hardware and Operating System + +- **Hardware Platform**: Kunpeng 950 CPU with native UB interconnect architecture +- **OS Version**: openEuler 24.03 (LTS-SP3) [Download link](https://www.openeuler.openatom.cn/zh/download/#openEuler%2024.03%20LTS%20SP3) + +### 2. URMA Dependencies + +Install UMDK (URMA development package): + +```bash +# Install via yum +yum install umdk-urma-devel + +# Or build from source +git clone https://atomgit.com/openeuler/umdk.git +cd umdk +mkdir build && cd build +cmake .. +make -j$(nproc) +sudo make install +``` + +### 3. Build Dependencies + +```bash +# Ubuntu/Debian +sudo apt-get update +sudo apt-get install -y \ + build-essential \ + cmake \ + git \ + libgflags-dev \ + libgoogle-glog-dev \ + libjsoncpp-dev \ + libnuma-dev \ + libibverbs-dev \ + libboost-all-dev \ + libcurl4-openssl-dev \ + libgtest-dev \ + libmsgpack-dev \ + libxxhash-dev \ + libyaml-cpp-dev \ + pybind11-dev \ + python3-dev + +# Install yalantinglibs (required) +cd /tmp +git clone https://github.com/alibaba/yalantinglibs.git +cd yalantinglibs +mkdir build && cd build +cmake .. -DCMAKE_INSTALL_PREFIX=/usr/local +make -j$(nproc) +sudo make install +``` + +## Building Mooncake with UB Support + +### 1. Clone the Repository + +```bash +git clone https://github.com/kvcache-ai/Mooncake.git +cd Mooncake +git submodule update --init --recursive +``` + +### 2. Build with UB Enabled + +```bash +mkdir build && cd build + +cmake .. \ + -DUSE_UB=ON \ + -DURMA_INCLUDE_DIR=/usr/include \ + -DURMA_LIBRARY=/usr/lib64/liburma.so \ + -DCMAKE_BUILD_TYPE=RelWithDebInfo + +make -j$(nproc) +``` + +### 3. Install Python Package + +```bash +# Copy built modules to wheel directory +cp mooncake-integration/engine.cpython-*.so ../mooncake-wheel/mooncake/ +cp mooncake-integration/store.cpython-*.so ../mooncake-wheel/mooncake/ +cp mooncake-asio/libasio.so ../mooncake-wheel/mooncake/ + +# Install with pip +pip install -e ../mooncake-wheel --no-build-isolation +``` + +## Verification + +### Check UB Transport Registration + +```bash +# Check if UB transport is registered +./mooncake_server --list-transports +# Expected output: rdma, tcp, nvlink, ub +``` + +### Test UB Transport Initialization + +```python +from mooncake.engine import TransferEngine + +te = TransferEngine() +result = te.initialize('127.0.0.1', 'P2PHANDSHAKE', 'ub', '') +print(f'Initialize result: {result}') # Should be 0 + +# You should see logs like: +# URMA module init success +# found 1 devices. +# device_name : urma0 EID : 01:02:03:04:05:06:07:08:09:0a:0b:0c:0d:0e:0f:10 +``` + +## Usage + +### Single Node Benchmark Test + +```bash +# Terminal 1: Target (receiver) +./transfer_engine_bench \ + --mode=target \ + --protocol=ub \ + --device_name=urma0 \ + --local_server_name=127.0.0.1 \ + --metadata_server=P2PHANDSHAKE + +# Terminal 2: Initiator (sender) +./transfer_engine_bench \ + --mode=initiator \ + --protocol=ub \ + --device_name=urma0 \ + --metadata_server=P2PHANDSHAKE \ + --segment_size=8388608 \ + --batch_size=1 \ + --segment_id=127.0.0.1:$PORT +``` + +### Multi-device Benchmark Test + +```bash +# Auto-discovery of multiple URMA devices +./transfer_engine_bench \ + --protocol=ub \ + --device_name=urma0,urma1,urma2,urma3 +``` + +## Unit Tests + +Run the UB transport unit tests: + +```bash +./build/mooncake-transfer-engine/tests/ub_transport_test +``` + +The test suite includes: + +| Test | Description | +|------|-------------| +| `MultiWrite` | Multiple write operations | +| `MultipleRead` | Multiple read operations with data integrity check | + +You can also run all unit tests via CTest: + +```bash +cd build && ctest --output-on-failure +``` + +Environment variables for test configuration: + +```bash +export MC_METADATA_SERVER=P2PHANDSHAKE # default +export MC_LOCAL_SERVER_NAME=127.0.0.1:12345 # default +``` + +## Technical Details + +### UB Transport Architecture + +``` +┌─────────────────────────────────────────────────────┐ +│ UbTransport │ +├─────────────────────────────────────────────────────┤ +│ UrmaContext (per device) │ +│ ├── urma_device (URMA device handle) │ +│ ├── urma_context (URMA context) │ +│ ├── urma_jfce (URMA jetty factory create) │ +│ ├── urma_jfc (URMA jetty factory send) │ +│ └── urma_jfr (URMA jetty factory receive) │ +├─────────────────────────────────────────────────────┤ +│ UrmaEndpoint (per connection) │ +│ ├── urma_jetty (URMA jetty for communication) │ +│ ├── local_jetty (local jetty ID) │ +│ └── remote_jetty (remote jetty ID) │ +└─────────────────────────────────────────────────────┘ +``` + +### Key Components + +1. **UbTransport**: The main transport class that manages URMA resources and endpoints +2. **UrmaContext**: Represents a URMA device context, handling device initialization and resource management +3. **UrmaEndpoint**: Represents a connection to a remote peer, handling data transfer operations +4. **mock_urma_api.cpp**: Mock implementation of URMA API for testing without real URMA hardware + +### Protocol Advantages + +- **Optimized for Kunpeng**: URMA is specifically optimized for Kunpeng chip on-chip interconnect +- **RDMA-like Semantics**: Provides similar memory semantics to RDMA +- **High Performance**: Leverages UB's low-latency, high-bandwidth characteristics +- **Unified Abstraction**: Offers a unified programming model for remote memory access + +## Troubleshooting + +### No URMA devices found + +``` +UbTransport: No URMA devices found +``` + +Solution: Verify URMA is properly installed and devices are available: +```bash +# Check URMA installation +ls /usr/lib64/liburma.so +ls /usr/include/ub/umdk/urma/urma_api.h + +# Check for URMA devices +urma_admin -l +``` + +### URMA initialization failed + +``` +URMA module init failed +``` + +Solution: Ensure the URMA kernel module is loaded and the device is properly configured: +```bash +# Load URMA module +sudo modprobe urma + +# Check module status +sudo lsmod | grep urma + +# Check device status +urma_admin -l +``` + +### Device port inactive + +``` +Device urma0 port not active +``` + +Solution: Ensure the UB port is properly configured and active: +```bash +# Check port status +urma_admin -p urma0 +``` + +### Missing liburma.so + +``` +cannot find -lurma +``` + +Solution: Verify URMA library is installed and in the library path: +```bash +export LD_LIBRARY_PATH=/usr/lib64:$LD_LIBRARY_PATH +``` + +## Conclusion + +Kunpeng UB Transport provides a high-performance, optimized transport solution for Mooncake on Kunpeng 950 CPU platforms. By leveraging the UB protocol's low-latency and high-bandwidth characteristics, it offers comparable performance to RDMA while being specifically tailored for Kunpeng chip architectures. + +With proper configuration and tuning, UB Transport can significantly improve the performance of distributed AI workloads, particularly for scenarios involving large-scale parameter transfers and distributed training. diff --git a/mooncake-transfer-engine/src/topology.cpp b/mooncake-transfer-engine/src/topology.cpp index 9995becb03..c0291c889e 100644 --- a/mooncake-transfer-engine/src/topology.cpp +++ b/mooncake-transfer-engine/src/topology.cpp @@ -474,22 +474,21 @@ void Topology::clear() { int Topology::discover(const std::vector &filter) { matrix_.clear(); -#ifdef USE_UB - auto all_hca = listUBDevices(filter); - for (auto &ent : discoverCpuTopology(all_hca)) { - matrix_[ent.name] = ent; - } -#else auto all_hca = listInfiniBandDevices(filter); for (auto &ent : discoverCpuTopology(all_hca)) { matrix_[ent.name] = ent; } -#endif #if defined(USE_CUDA) || defined(USE_MUSA) || defined(USE_HIP) || \ defined(USE_MLU) || defined(USE_MACA) for (auto &ent : discoverCudaTopology(all_hca)) { matrix_[ent.name] = ent; } +#endif +#ifdef USE_UB + auto ub_all_hca = listUBDevices(filter); + for (auto &ent : discoverCpuTopology(ub_all_hca)) { + matrix_[ent.name] = ent; + } #endif return resolve(); } From a1f6b983e6b4a52be9ef1bb8df000014b8c9fb94 Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 08:32:33 +0000 Subject: [PATCH 07/12] optimize the ci config and rename the urma api mock source name --- .github/workflows/ci.yml | 9 +- .github/workflows/ci_ub.yml | 101 ------------------ .../{ => urma}/urma_endpoint.h | 0 .../kunpeng_transport/CMakeLists.txt | 4 +- .../kunpeng_transport/ub_transport.cpp | 2 +- .../urma/{mock_urma_api.cpp => mock_urma.cpp} | 0 .../{ => urma}/urma_endpoint.cpp | 2 +- 7 files changed, 5 insertions(+), 113 deletions(-) delete mode 100644 .github/workflows/ci_ub.yml rename mooncake-transfer-engine/include/transport/kunpeng_transport/{ => urma}/urma_endpoint.h (100%) rename mooncake-transfer-engine/src/transport/kunpeng_transport/urma/{mock_urma_api.cpp => mock_urma.cpp} (100%) rename mooncake-transfer-engine/src/transport/kunpeng_transport/{ => urma}/urma_endpoint.cpp (99%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6bca561de6..284ec3159d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -105,7 +105,7 @@ jobs: sudo bash -x dependencies.sh -y mkdir build cd build - cmake -G Ninja .. -DUSE_HTTP=ON -DUSE_CXL=ON -DUSE_ETCD=ON -DSTORE_USE_ETCD=ON -DENABLE_ASAN=ON -DENABLE_SCCACHE=ON -DCMAKE_BUILD_TYPE=Debug + cmake -G Ninja .. -DUSE_HTTP=ON -DUSE_CXL=ON -DUSE_UB -DUSE_ETCD=ON -DSTORE_USE_ETCD=ON -DENABLE_ASAN=ON -DENABLE_SCCACHE=ON -DCMAKE_BUILD_TYPE=Debug shell: bash - name: Build project @@ -722,12 +722,6 @@ jobs: uses: ./.github/workflows/integration-test.yml secrets: inherit - ub-transport-test: - needs: [build, check-paths] - if: needs.check-paths.outputs.should-run-downstream == 'true' - uses: ./.github/workflows/ci_ub.yml - secrets: inherit - ci-gate: name: CI Gate if: always() @@ -741,7 +735,6 @@ jobs: - test-wheel-ubuntu - ascend-test - integration-test - - ub-transport-test runs-on: ubuntu-latest steps: - name: Check required job results diff --git a/.github/workflows/ci_ub.yml b/.github/workflows/ci_ub.yml deleted file mode 100644 index 2e2c0bc3f1..0000000000 --- a/.github/workflows/ci_ub.yml +++ /dev/null @@ -1,101 +0,0 @@ -name: 'UB Transport Test' - -on: - workflow_call: {} - -jobs: - build-and-test: - runs-on: ubuntu-22.04 - env: - CI: "true" - SCCACHE_GHA_ENABLED: "true" - - steps: - - name: Checkout code - uses: actions/checkout@v4 - - - name: Set up Python 3.10 - uses: actions/setup-python@v5 - with: - python-version: '3.10' - - - name: Check and start etcd - run: | - # Check if etcd is already installed - if command -v etcd &> /dev/null && command -v etcdctl &> /dev/null; then - echo "etcd is already installed" - # Check if etcd is running - if etcdctl --endpoints=http://127.0.0.1:2379 endpoint health > /dev/null 2>&1; then - echo "etcd is already running" - else - echo "etcd is not running, starting it..." - etcd --advertise-client-urls http://127.0.0.1:2379 --listen-client-urls http://127.0.0.1:2379 & - sleep 3 # Give etcd time to start - etcdctl --endpoints=http://127.0.0.1:2379 endpoint health - fi - else - echo "etcd is not installed, installing..." - wget https://github.com/etcd-io/etcd/releases/download/v3.6.1/etcd-v3.6.1-linux-amd64.tar.gz - tar xzf etcd-v3.6.1-linux-amd64.tar.gz - sudo mv etcd-v3.6.1-linux-amd64/etcd* /usr/local/bin/ - etcd --advertise-client-urls http://127.0.0.1:2379 --listen-client-urls http://127.0.0.1:2379 & - sleep 3 # Give etcd time to start - etcdctl --endpoints=http://127.0.0.1:2379 endpoint health - fi - shell: bash - - - name: Free up disk space - run: | - sudo rm -rf /usr/share/dotnet - sudo rm -rf /opt/ghc - sudo rm -rf /opt/hostedtoolcache/CodeQL - - - name: Install build utilities - run: | - sudo apt-get update - sudo apt-get install -y ninja-build - - - name: Run sccache-cache - uses: mozilla-actions/sccache-action@v0.0.9 - - - name: Configure sccache - uses: actions/github-script@v7 - with: - script: | - core.exportVariable('ACTIONS_RESULTS_URL', process.env.ACTIONS_RESULTS_URL || ''); - core.exportVariable('ACTIONS_RUNTIME_TOKEN', process.env.ACTIONS_RUNTIME_TOKEN || ''); - - - name: Install dependencies - run: | - sudo apt update -y - sudo bash -x dependencies.sh -y - shell: bash - - - name: Configure project with UB support - run: | - mkdir build - cd build - cmake -G Ninja .. \ - -DUSE_UB=ON \ - -DUSE_ETCD=ON \ - -DBUILD_UNIT_TESTS=ON \ - -DBUILD_EXAMPLES=OFF - shell: bash - - - name: Build project - run: | - cd build - cmake --build . --target ub_transport_test - shell: bash - - - name: Run UB transport tests - run: | - cd build - MC_METADATA_SERVER=http://127.0.0.1:2379 ./mooncake-transfer-engine/tests/ub_transport_test - shell: bash - - - name: Test Summary - if: always() - shell: bash - run: | - echo "UB Transport Test completed" diff --git a/mooncake-transfer-engine/include/transport/kunpeng_transport/urma_endpoint.h b/mooncake-transfer-engine/include/transport/kunpeng_transport/urma/urma_endpoint.h similarity index 100% rename from mooncake-transfer-engine/include/transport/kunpeng_transport/urma_endpoint.h rename to mooncake-transfer-engine/include/transport/kunpeng_transport/urma/urma_endpoint.h diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt b/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt index f272b69aeb..26f1a0297b 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt @@ -1,4 +1,4 @@ -file(GLOB UB_SOURCES "*.cpp") +file(GLOB_RECURSE UB_SOURCES "*.cpp") # Check if liburma.so exists find_library(URMA_LIBRARY urma PATHS /usr/lib64) @@ -16,7 +16,7 @@ if (URMA_LIBRARY) else() # If liburma.so not found, we'll need to handle this differently # For now, just build without mock_urma_api.cpp - list(APPEND UB_SOURCES "urma/mock_urma_api.cpp") + list(APPEND UB_SOURCES "urma/mock_urma.cpp") add_library(ub_transport OBJECT ${UB_SOURCES}) message(WARNING "Not Found liburma.so building ub_transport with Mock URMA library") endif() diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp index dd2658f735..77b72c1421 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/ub_transport.cpp @@ -19,7 +19,7 @@ #include "transport/kunpeng_transport/ub_context.h" #include "transport/kunpeng_transport/ub_transport.h" #include "transport/kunpeng_transport/ub_endpoint.h" -#include "transport/kunpeng_transport/urma_endpoint.h" +#include "transport/kunpeng_transport/urma/urma_endpoint.h" namespace mooncake { UbTransport::UbTransport(UB_ENDPOINT_TYPE endpoint_type) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp similarity index 100% rename from mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma_api.cpp rename to mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp similarity index 99% rename from mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp rename to mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp index 8627cac0e7..20a933488b 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp @@ -16,7 +16,7 @@ #include #include #include "config.h" -#include "transport/kunpeng_transport/urma_endpoint.h" +#include "transport/kunpeng_transport/urma/urma_endpoint.h" namespace mooncake { static int isNullEid(urma_eid_t* eid) { From 85dc6062139b1a0997756e9cfade38f9d7079d07 Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 08:39:07 +0000 Subject: [PATCH 08/12] fix the ci config bug --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 284ec3159d..9627766696 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -105,7 +105,7 @@ jobs: sudo bash -x dependencies.sh -y mkdir build cd build - cmake -G Ninja .. -DUSE_HTTP=ON -DUSE_CXL=ON -DUSE_UB -DUSE_ETCD=ON -DSTORE_USE_ETCD=ON -DENABLE_ASAN=ON -DENABLE_SCCACHE=ON -DCMAKE_BUILD_TYPE=Debug + cmake -G Ninja .. -DUSE_HTTP=ON -DUSE_CXL=ON -DUSE_UB=ON -DUSE_ETCD=ON -DSTORE_USE_ETCD=ON -DENABLE_ASAN=ON -DENABLE_SCCACHE=ON -DCMAKE_BUILD_TYPE=Debug shell: bash - name: Build project From b6e23113d6fa92fea75f0d9ed4c04d6a74c54c3f Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 09:37:00 +0000 Subject: [PATCH 09/12] fix the memory leakage --- .../kunpeng_transport/urma/mock_urma.cpp | 4 +++- .../kunpeng_transport/urma/urma_endpoint.cpp | 18 ++++++++++++++++++ .../tests/ub_transport_test.cpp | 3 +++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp index 9e006a0a0b..d8e2f28449 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp @@ -24,7 +24,9 @@ urma_device_attr_t mock_device_attr = { .state = URMA_PORT_ACTIVE, .active_width = URMA_LINK_X1, .active_speed = URMA_SP_100G, - .active_mtu = URMA_MTU_4096}}}; + .active_mtu = URMA_MTU_4096}}, +.reserved_jetty_id_min = 0, +.reserved_jetty_id_max = 1024}; urma_eid_info_t mock_eid_info = { .eid = {{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp index 20a933488b..4ed81f9863 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp @@ -163,6 +163,21 @@ int UrmaContext::deconstruct() { } seg_region_list_.clear(); + for (auto& seg : imported_seg_list_) { + int ret = urma_unimport_seg(seg); + if (ret) { + PLOG(ERROR) << "Failed to unimport segment"; + } + } + imported_seg_list_.clear(); + + for (auto& seg : remote_seg_list_) { + free(seg); + } + remote_seg_list_.clear(); + + import_tseg_map.clear(); + for (size_t i = 0; i < jfr_list_.size(); i++) { if (!jfr_list_[i].native) continue; @@ -640,6 +655,8 @@ int UrmaEndpoint::deconstruct() { ret = urma_unimport_jetty(imported_jetty); if (ret) PLOG(ERROR) << "Failed to unimport jetty"; } + ret = urma_delete_jetty(jetty_list_[i]); + if (ret) PLOG(ERROR) << "Failed to delete jetty"; // After destroying QP, the wr_depth_list_ won't change bool displayed = false; if (wr_depth_list_[i] != 0) { @@ -654,6 +671,7 @@ int UrmaEndpoint::deconstruct() { } jetty_list_.clear(); delete[] wr_depth_list_; + imported_jetty_map_.clear(); return 0; } diff --git a/mooncake-transfer-engine/tests/ub_transport_test.cpp b/mooncake-transfer-engine/tests/ub_transport_test.cpp index b225fb6b09..4d50531b49 100644 --- a/mooncake-transfer-engine/tests/ub_transport_test.cpp +++ b/mooncake-transfer-engine/tests/ub_transport_test.cpp @@ -140,6 +140,9 @@ class UBTransportTest : public ::testing::Test { google::ShutdownGoogleLogging(); engine->unregisterLocalMemory(addr); freeMemoryPool(addr, ram_buffer_size); + if (args) { + free(args); + } } }; From 58a87e060bf279fc93bdb32f03043787c76fcf6d Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 09:45:33 +0000 Subject: [PATCH 10/12] fix the code format bug --- .../src/transport/kunpeng_transport/urma/mock_urma.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp index d8e2f28449..020c3a6b19 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/mock_urma.cpp @@ -18,15 +18,15 @@ std::map target_jetty_map; urma_device_attr_t mock_device_attr = { .guid = {.raw = {10}}, - .dev_cap = {.max_jfc = 1024, .max_jetty = 1024}, + .dev_cap = {}, .port_cnt = 1, .port_attr = {{.max_mtu = URMA_MTU_4096, .state = URMA_PORT_ACTIVE, .active_width = URMA_LINK_X1, .active_speed = URMA_SP_100G, .active_mtu = URMA_MTU_4096}}, -.reserved_jetty_id_min = 0, -.reserved_jetty_id_max = 1024}; + .reserved_jetty_id_min = 0, + .reserved_jetty_id_max = 1024}; urma_eid_info_t mock_eid_info = { .eid = {{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, @@ -122,6 +122,8 @@ urma_status_t urma_query_device(urma_device_t *device, if (!device || !attr) { return URMA_EINVAL; } + mock_device_attr.dev_cap.max_jfc = 1024; + mock_device_attr.dev_cap.max_jetty = 1024; memcpy(attr, &mock_device_attr, sizeof(urma_device_attr_t)); return URMA_SUCCESS; } From 421482fa9c9312c68f9448c3902a1d38d4a6f19b Mon Sep 17 00:00:00 2001 From: zchuango Date: Thu, 9 Apr 2026 11:20:50 +0000 Subject: [PATCH 11/12] remove the build-flags with USE_UB --- .github/workflows/ci.yml | 4 ++-- .../include/transport/kunpeng_transport/ub_endpoint.h | 2 +- mooncake-transfer-engine/src/topology.cpp | 4 +--- .../transport/kunpeng_transport/urma/urma_endpoint.cpp | 4 +--- mooncake-transfer-engine/tests/ub_transport_test.cpp | 8 ++++---- 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9627766696..8016208ae8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -503,7 +503,7 @@ jobs: cd build export PATH=/usr/local/nvidia/bin:/usr/local/nvidia/lib64:$PATH export LD_LIBRARY_PATH=/usr/local/cuda/lib64/stubs:$LD_LIBRARY_PATH - cmake -G Ninja .. -DUSE_ETCD=OFF -DUSE_CXL=ON -DUSE_REDIS=ON -DUSE_HTTP=ON -DWITH_METRICS=ON -DBUILD_UNIT_TESTS=ON -DBUILD_EXAMPLES=ON -DENABLE_SCCACHE=ON -DUSE_CUDA=OFF -DUSE_MNNVL=OFF -DUSE_UB=ON -DCMAKE_EXE_LINKER_FLAGS="-L/usr/local/cuda/lib64/stubs" + cmake -G Ninja .. -DUSE_ETCD=OFF -DUSE_CXL=ON -DUSE_REDIS=ON -DUSE_HTTP=ON -DWITH_METRICS=ON -DBUILD_UNIT_TESTS=ON -DBUILD_EXAMPLES=ON -DENABLE_SCCACHE=ON -DUSE_CUDA=OFF -DUSE_MNNVL=OFF -DUSE_UB=OFF -DCMAKE_EXE_LINKER_FLAGS="-L/usr/local/cuda/lib64/stubs" cmake --build . sudo cmake --install . df -h @@ -513,7 +513,7 @@ jobs: run: | mkdir build cd build - cmake -G Ninja .. -DUSE_ETCD=ON -DUSE_CXL=ON -DUSE_REDIS=ON -DUSE_HTTP=ON -DWITH_STORE=ON -DWITH_P2P_STORE=ON -DWITH_METRICS=ON -DBUILD_UNIT_TESTS=ON -DBUILD_EXAMPLES=ON -DENABLE_SCCACHE=ON -DUSE_CUDA=ON -DUSE_MNNVL=OFF -DUSE_UB=ON -DCMAKE_EXE_LINKER_FLAGS="-L/usr/local/cuda/lib64/stubs" + cmake -G Ninja .. -DUSE_ETCD=ON -DUSE_CXL=ON -DUSE_REDIS=ON -DUSE_HTTP=ON -DWITH_STORE=ON -DWITH_P2P_STORE=ON -DWITH_METRICS=ON -DBUILD_UNIT_TESTS=ON -DBUILD_EXAMPLES=ON -DENABLE_SCCACHE=ON -DUSE_CUDA=ON -DUSE_MNNVL=OFF -DUSE_UB=OFF -DCMAKE_EXE_LINKER_FLAGS="-L/usr/local/cuda/lib64/stubs" shell: bash # TODO: lack USE_NVMEOF,USE_MNNVL diff --git a/mooncake-transfer-engine/include/transport/kunpeng_transport/ub_endpoint.h b/mooncake-transfer-engine/include/transport/kunpeng_transport/ub_endpoint.h index e95bd25825..4c3672de6b 100644 --- a/mooncake-transfer-engine/include/transport/kunpeng_transport/ub_endpoint.h +++ b/mooncake-transfer-engine/include/transport/kunpeng_transport/ub_endpoint.h @@ -31,7 +31,7 @@ class UbEndPoint { using HandShakeDesc = TransferMetadata::HandShakeDesc; - UbEndPoint() : status_(INITIALIZING), active_(true) {} + UbEndPoint() : inactive_time_(0), active_(true), status_(INITIALIZING) {} virtual int construct(GlobalConfig& config) = 0; diff --git a/mooncake-transfer-engine/src/topology.cpp b/mooncake-transfer-engine/src/topology.cpp index c0291c889e..2037b3f961 100644 --- a/mooncake-transfer-engine/src/topology.cpp +++ b/mooncake-transfer-engine/src/topology.cpp @@ -208,9 +208,7 @@ static std::vector listUBDevices( int num_devices = 0; std::vector devices; - urma_init_attr_t init_attr = { - .uasid = 0, - }; + urma_init_attr_t init_attr = {}; if (urma_init(&init_attr) != URMA_SUCCESS) { LOG(WARNING) << "Failed to urma init"; return {}; diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp index 4ed81f9863..d5d939111e 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp @@ -576,9 +576,7 @@ bool UrmaContext::uninit() { } bool UrmaContext::init() { - urma_init_attr_t init_attr = { - .uasid = 0, - }; + urma_init_attr_t init_attr = {}; auto ret = urma_init(&init_attr); if (ret != URMA_SUCCESS && ret != URMA_EEXIST) { LOG(ERROR) << "Failed to urma init, ret = " << ret; diff --git a/mooncake-transfer-engine/tests/ub_transport_test.cpp b/mooncake-transfer-engine/tests/ub_transport_test.cpp index 4d50531b49..be31189b94 100644 --- a/mooncake-transfer-engine/tests/ub_transport_test.cpp +++ b/mooncake-transfer-engine/tests/ub_transport_test.cpp @@ -165,7 +165,7 @@ TEST_F(UBTransportTest, MultiWrite) { bool completed = false; TransferStatus status; while (!completed) { - Status s = engine->getTransferStatus(batch_id, 0, status); + s = engine->getTransferStatus(batch_id, 0, status); ASSERT_EQ(s, Status::OK()); if (status.s == TransferStatusEnum::COMPLETED) completed = true; @@ -199,7 +199,7 @@ TEST_F(UBTransportTest, MultipleRead) { bool completed = false; TransferStatus status; while (!completed) { - Status s = engine->getTransferStatus(batch_id, 0, status); + s = engine->getTransferStatus(batch_id, 0, status); ASSERT_EQ(s, Status::OK()); if (status.s == TransferStatusEnum::COMPLETED) completed = true; @@ -214,7 +214,7 @@ TEST_F(UBTransportTest, MultipleRead) { times = 10; while (times--) { auto batch_id = engine->allocateBatchID(1); - int ret = 0; + TransferRequest entry; entry.opcode = TransferRequest::READ; entry.length = kDataLength; @@ -227,7 +227,7 @@ TEST_F(UBTransportTest, MultipleRead) { bool completed = false; TransferStatus status; while (!completed) { - Status s = engine->getTransferStatus(batch_id, 0, status); + s = engine->getTransferStatus(batch_id, 0, status); ASSERT_EQ(s, Status::OK()); if (status.s == TransferStatusEnum::COMPLETED) completed = true; From 841d92c75dce98894f2316f3e1dc2d2f6d41034f Mon Sep 17 00:00:00 2001 From: zchuango Date: Fri, 10 Apr 2026 03:12:56 +0000 Subject: [PATCH 12/12] optimize the urma port find --- .../kunpeng_transport/CMakeLists.txt | 2 +- .../kunpeng_transport/urma/urma_endpoint.cpp | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt b/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt index 26f1a0297b..c861696859 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/CMakeLists.txt @@ -1,4 +1,4 @@ -file(GLOB_RECURSE UB_SOURCES "*.cpp") +file(GLOB UB_SOURCES "*.cpp" "urma/urma_endpoint.cpp") # Check if liburma.so exists find_library(URMA_LIBRARY urma PATHS /usr/lib64) diff --git a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp index d5d939111e..44be40a8b8 100644 --- a/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp +++ b/mooncake-transfer-engine/src/transport/kunpeng_transport/urma/urma_endpoint.cpp @@ -364,7 +364,7 @@ void* UrmaContext::retrieveRemoteSeg(const std::string& remoteSegmentStr) { return import_tseg; } -int UrmaContext::openDevice(const std::string& device_name, uint8_t port, +int UrmaContext::openDevice(const std::string& device_name, uint8_t por, int& eid_index) { int num_devices = 0; urma_context_t* context = nullptr; @@ -419,10 +419,16 @@ int UrmaContext::openDevice(const std::string& device_name, uint8_t port, urma_free_device_list(devices); return ERR_CONTEXT; } + for (int p = 0; p < MAX_PORT_CNT; p++) { + if (dev_attr_.port_attr[p].state == URMA_PORT_ACTIVE) { + port_ = p; + break; + } + } if (dev_attr_.port_cnt != 0 && - dev_attr_.port_attr[port].state != URMA_PORT_ACTIVE) { - LOG(WARNING) << "Device " << device_name << " port( " << port - << " ) not active"; + dev_attr_.port_attr[port_].state != URMA_PORT_ACTIVE) { + LOG(WARNING) << "Device " << device_name + << " not found active port"; if (urma_delete_context(context)) { PLOG(ERROR) << "urma_delete_context(" << device_name << ") failed"; @@ -448,10 +454,9 @@ int UrmaContext::openDevice(const std::string& device_name, uint8_t port, urma_context_ = context; eid_index_ = eid_index; - port_ = port; if (dev_attr_.port_cnt != 0) { - active_mtu_ = dev_attr_.port_attr[port].active_mtu; - active_speed_ = dev_attr_.port_attr[port].active_speed; + active_mtu_ = dev_attr_.port_attr[port_].active_mtu; + active_speed_ = dev_attr_.port_attr[port_].active_speed; } else { active_mtu_ = URMA_MTU_4096; // default mtu and speed active_speed_ = URMA_SP_100G;