From f7fe3554074e5ba03143a5848e781407291df1a4 Mon Sep 17 00:00:00 2001 From: Cliff Burdick <30670611+cliffburdick@users.noreply.github.com> Date: Wed, 3 Jun 2026 08:19:53 -0700 Subject: [PATCH] #15 - Reapply RDMA benchmark profiling updates Replay the RDMA benchmark profiling and tuning changes from PR #103 on top of main after the stacked branch was merged out of order. Keep the newer main fixes while resolving conflicts: use create_tx_burst_params and null guards in the benchmark, keep IBV_ACCESS_RELAXED_ORDERING guarded for older verbs headers, preserve RDMA metrics accounting, and avoid duplicate TX burst cleanup after send_tx_burst failure. Original PR #103 merge commit: 94a7d2efe294c8e8ccfa91ee5afd1d59841f0719. Signed-off-by: Cliff Burdick Signed-off-by: Denis Leshchev --- docs/tutorials/benchmarking_examples.md | 15 ++ examples/rdma_bench.cpp | 64 ++++---- src/managers/rdma/daqiri_rdma_mgr.cpp | 191 +++++++++++++++++------- src/managers/rdma/daqiri_rdma_mgr.h | 4 +- 4 files changed, 190 insertions(+), 84 deletions(-) diff --git a/docs/tutorials/benchmarking_examples.md b/docs/tutorials/benchmarking_examples.md index 5f62380..1cb98fe 100644 --- a/docs/tutorials/benchmarking_examples.md +++ b/docs/tutorials/benchmarking_examples.md @@ -174,6 +174,21 @@ After having modified the configuration file, ensure you have connected an SFP c By default the application runs for 10 seconds and then exits. You can change the duration by passing `--seconds ` after the YAML path, or stop it gracefully at any time with `Ctrl-C`. +## Tune RDMA SEND completion signaling + +The RDMA manager signals every SEND work request by default. For `daqiri_bench_rdma` +runs where CQ polling overhead is part of the bottleneck investigation, set +`DAQIRI_RDMA_SEND_SIGNAL_EVERY=N` to request one signaled SEND every `N` posts: + +```bash +DAQIRI_RDMA_SEND_SIGNAL_EVERY=16 \ + /opt/daqiri/bin/daqiri_bench_rdma /opt/daqiri/bin/daqiri_bench_rdma_tx_rx.yaml +``` + +Larger values reduce SEND completion traffic, but completions also drive +application credits and buffer recycling in the benchmark. Keep the variable +unset, or set it to `1`, for baseline measurements. + ## Watch live OpenTelemetry metrics in Grafana DAQIRI can expose the raw benchmark counters through OpenTelemetry when metrics diff --git a/examples/rdma_bench.cpp b/examples/rdma_bench.cpp index 7ff2446..6270ea7 100644 --- a/examples/rdma_bench.cpp +++ b/examples/rdma_bench.cpp @@ -67,12 +67,10 @@ RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer, std::atomic& stop, RdmaWorkerStats& stats) { - // Matches the per-MR num_bufs in the YAML configs. Higher values deadlock - // the bench: post_req blocks in get_tx_packet_burst when the pool is empty, - // but free_tx_burst (which refills it) only runs later in the same loop - // iteration via get_rx_burst. Until the loop is refactored to interleave - // drain with post, this constant must stay <= num_bufs. - static constexpr int kMaxOutstanding = 20; + // Application credit limit. Buffer exhaustion is handled as backpressure so + // this can be larger than a memory region's num_bufs: post_req returns to the + // outer loop, which drains completions and releases buffers before retrying. + static constexpr int kMaxOutstanding = 64; int outstanding_send = 0; int outstanding_recv = 0; uint64_t send_wr_id = 0x1234; @@ -97,50 +95,44 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa } auto post_req = [&](int& outstanding, uint64_t& wr_id, daqiri::RDMAOpCode op, - const std::string& mr_name) { - if (outstanding >= kMaxOutstanding) { return; } + const std::string& mr_name) -> bool { + if (outstanding >= kMaxOutstanding) { return false; } auto* msg = daqiri::create_tx_burst_params(); - if (msg == nullptr) { return; } + if (msg == nullptr) { return false; } if (daqiri::rdma_set_header(msg, op, conn_id, cfg.server, 1, wr_id, mr_name) != daqiri::Status::SUCCESS) { daqiri::free_tx_metadata(msg); - return; + return false; } - bool has_packets = false; - while (!stop.load()) { - if (daqiri::get_tx_packet_burst(msg) == daqiri::Status::SUCCESS) { - has_packets = true; - break; - } - std::this_thread::sleep_for(std::chrono::microseconds(50)); - } - if (!has_packets) { + if (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS) { daqiri::free_tx_metadata(msg); - return; + return false; } if (daqiri::set_packet_lengths(msg, 0, {cfg.message_size}) != daqiri::Status::SUCCESS) { daqiri::free_tx_burst(msg); - return; + return false; } - if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) { return; } + if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) { return false; } outstanding++; wr_id++; // Only meter actual byte transmissions (SENDs), not RECEIVE-side posts. if (op == daqiri::RDMAOpCode::SEND) { pacer.wait_for_bytes(static_cast(cfg.message_size), stop); } + return true; }; - if (cfg.send) { post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr); } - if (cfg.receive) { post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr); } + bool got_completion = false; + while (true) { + daqiri::BurstParams* completion = nullptr; + const auto completion_status = daqiri::get_rx_burst(&completion, conn_id, cfg.server); + if (completion_status != daqiri::Status::SUCCESS || completion == nullptr) { break; } - daqiri::BurstParams* completion = nullptr; - if (daqiri::get_rx_burst(&completion, conn_id, cfg.server) == daqiri::Status::SUCCESS && - completion != nullptr) { + got_completion = true; if (daqiri::rdma_get_opcode(completion) == daqiri::RDMAOpCode::SEND && outstanding_send > 0) { outstanding_send--; stats.send_completions++; @@ -152,7 +144,23 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa stats.recv_bytes += static_cast(cfg.message_size); } daqiri::free_tx_burst(completion); - } else { + } + + bool posted_work = false; + if (cfg.receive) { + while (outstanding_recv < kMaxOutstanding && + post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr)) { + posted_work = true; + } + } + if (cfg.send) { + while (outstanding_send < kMaxOutstanding && + post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr)) { + posted_work = true; + } + } + + if (!got_completion && !posted_work) { std::this_thread::sleep_for(std::chrono::microseconds(100)); } } diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index 0d781fb..36f5c44 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -20,6 +20,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -231,6 +234,19 @@ int RdmaMgr::rdma_register_mr(const MemoryRegionConfig& mr, void* ptr) { int access = mr_access_to_ibv(mr.access_); params.ctx_mr_map_[pd.second] = ibv_reg_mr(pd.second, ptr, mr.adj_size_ * mr.num_bufs_, access); +#ifdef IBV_ACCESS_RELAXED_ORDERING + if (params.ctx_mr_map_[pd.second] == nullptr && + (access & IBV_ACCESS_RELAXED_ORDERING) != 0) { + const int fallback_access = access & ~IBV_ACCESS_RELAXED_ORDERING; + DAQIRI_LOG_WARN( + "Failed to register MR {} with relaxed ordering on PD {}; retrying without it", + mr.name_, + (void*)pd.second); + params.ctx_mr_map_[pd.second] = + ibv_reg_mr(pd.second, ptr, mr.adj_size_ * mr.num_bufs_, fallback_access); + access = fallback_access; + } +#endif if (params.ctx_mr_map_[pd.second] == nullptr) { DAQIRI_LOG_CRITICAL("Failed to register MR {} on PD {}", mr.name_, (void*)pd.second); return -1; @@ -243,7 +259,7 @@ int RdmaMgr::rdma_register_mr(const MemoryRegionConfig& mr, void* ptr) { (void*)ptr, (void*)((uint8_t*)ptr + mr.adj_size_ * mr.num_bufs_), params.ctx_mr_map_[pd.second]->lkey, - mr.access_); + access); } } } @@ -454,11 +470,17 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { struct ibv_wc wc; int num_comp; BurstParams* msg; + int send_signal_every = 1; + if (const char* env = std::getenv("DAQIRI_RDMA_SEND_SIGNAL_EVERY")) { + const long parsed = strtol(env, nullptr, 10); + if (parsed > 0) { send_signal_every = static_cast(parsed); } + } + uint64_t sends_since_signal = 0; const auto& qref = cfg_.ifs_[tparams->if_idx].tx_.queues_[tparams->queue_idx]; const long cpu_core = strtol(qref.common_.cpu_core_.c_str(), NULL, 10); struct rte_ring* tx_ring = tparams->qp_params.tx_ring; struct rte_ring* rx_ring = tparams->qp_params.rx_ring; - std::unordered_map outstanding_send_wr_ids; + std::map outstanding_send_wr_ids; std::unordered_map outstanding_receive_wr_ids; auto counters = get_rdma_metrics(cfg_, *tparams); auto packet_length_for_wr = [](BurstParams* burst, uint64_t wr_id) -> uint64_t { @@ -480,7 +502,9 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { // periodically poll the CQ. while (!rdma_force_quit.load()) { // Check RQ first to reduce latency - while ((num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc)) != 0) { + while (true) { + num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc); + if (num_comp == 0) { break; } DAQIRI_LOG_DEBUG("GOT RX COMPLETION in thread {} core {} wrid {}", (void*)tparams->client_id, cpu_core, @@ -518,6 +542,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { } else { msg = create_burst_params(); } + const bool msg_owns_packets = wc.opcode == IBV_WC_RECV; // Only populate a header to indicate which burst needs to be freed // msg->transport_hdr.opcode = ibv_opcode_to_daqiri_opcode(wc.opcode); @@ -531,13 +556,63 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); metrics::add_dropped(counters, "rx_completion_enqueue_failure", 1); - free_tx_burst(msg); + if (msg_owns_packets) { + free_tx_burst(msg); + } else { + free_tx_metadata(msg); + } return; } } + auto complete_send_wrs = [&](uint64_t wr_id, Status status) -> bool { + auto completed_end = outstanding_send_wr_ids.upper_bound(wr_id); + if (completed_end == outstanding_send_wr_ids.begin()) { + DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wr_id); + metrics::add_dropped(counters, "tx_unknown_wr_id", 1); + return true; + } + if (outstanding_send_wr_ids.find(wr_id) == outstanding_send_wr_ids.end()) { + DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wr_id); + metrics::add_dropped(counters, "tx_unknown_wr_id", 1); + } + + for (auto completed_it = outstanding_send_wr_ids.begin(); completed_it != completed_end;) { + msg = completed_it->second; + if (status == Status::SUCCESS) { + metrics::add_tx(counters, 1, packet_length_for_wr(msg, completed_it->first)); + } + + const auto conn_id = get_connection_id(msg); + const auto expected_conn_id = reinterpret_cast(tparams->client_id); + if (conn_id != expected_conn_id) { + DAQIRI_LOG_CRITICAL("Wrong connection ID in send completion {}: {} != {}", + completed_it->first, + conn_id, + expected_conn_id); + } + + completed_it = outstanding_send_wr_ids.erase(completed_it); + + msg->transport_hdr.tx = true; + msg->transport_hdr.status = status; + msg->transport_hdr.server = is_server; + + if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { + DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); + metrics::add_dropped(counters, "tx_completion_enqueue_failure", 1); + free_tx_burst(msg); + return false; + } + } + + return true; + }; + // Check TX CQ for completion - while ((num_comp = ibv_poll_cq(tparams->qp_params.tx_cq, 1, &wc)) != 0) { + while (true) { + num_comp = ibv_poll_cq(tparams->qp_params.tx_cq, 1, &wc); + if (num_comp == 0) { break; } DAQIRI_LOG_DEBUG("GOT TX COMPLETION in thread {} core {} wrid {}", (void*)tparams->client_id, cpu_core, @@ -550,30 +625,16 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { (int64_t)wc.wr_id, (int)wc.opcode); metrics::add_dropped(counters, "tx_completion_error", 1); + if (wc.opcode == IBV_WC_SEND && + !complete_send_wrs(wc.wr_id, Status::GENERIC_FAILURE)) { + return; + } continue; } if (wc.opcode == IBV_WC_SEND) { - auto it = outstanding_send_wr_ids.find(wc.wr_id); - if (it == outstanding_send_wr_ids.end()) { - DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wc.wr_id); - metrics::add_dropped(counters, "tx_unknown_wr_id", 1); - continue; - } - - msg = it->second; - metrics::add_tx(counters, 1, packet_length_for_wr(msg, wc.wr_id)); - - const auto conn_id = get_connection_id(msg); - const auto expected_conn_id = reinterpret_cast(tparams->client_id); - if (conn_id != expected_conn_id) { - DAQIRI_LOG_CRITICAL("Wrong connection ID in send completion {}: {} != {}", - wc.wr_id, - conn_id, - expected_conn_id); - } - - outstanding_send_wr_ids.erase(it); + if (!complete_send_wrs(wc.wr_id, Status::SUCCESS)) { return; } + continue; } else { msg = create_burst_params(); } @@ -590,7 +651,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); metrics::add_dropped(counters, "tx_completion_enqueue_failure", 1); - free_tx_burst(msg); + free_tx_metadata(msg); return; } } @@ -600,9 +661,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { // ssize_t bytes = mq_receive(tparams.tx_mq, reinterpret_cast(&burst), sizeof(burst), // nullptr); - if (rte_ring_dequeue(tparams->qp_params.tx_ring, reinterpret_cast(&burst)) != 0) { - continue; - } + if (rte_ring_dequeue(tx_ring, reinterpret_cast(&burst)) != 0) { continue; } const auto local_mr = mrs_.find(std::string(burst->transport_hdr.local_mr_name)); if (local_mr == mrs_.end()) { @@ -634,6 +693,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { continue; } + bool posted_any = false; for (int p = 0; p < burst->transport_hdr.num_pkts; p++) { ibv_send_wr wr; ibv_send_wr* bad_wr; @@ -647,14 +707,25 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { wr.sg_list = &sge; wr.num_sge = 1; wr.opcode = IBV_WR_SEND; - wr.send_flags = IBV_SEND_SIGNALED; + // Keep signaled completions frequent enough for callers that use SEND + // completions as their credit to refill the outstanding window. + const bool signal_send = send_signal_every == 1 || + sends_since_signal + 1 >= send_signal_every || + outstanding_send_wr_ids.size() >= 15; + wr.send_flags = signal_send ? IBV_SEND_SIGNALED : 0; int ret = ibv_post_send(tparams->client_id->qp, &wr, &bad_wr); if (ret != 0) { DAQIRI_LOG_CRITICAL("Failed to post SEND request, errno: {}", strerror(errno)); metrics::add_dropped(counters, "tx_post_failure", 1); - free_tx_burst(burst); - continue; + if (!posted_any) { free_tx_burst(burst); } + break; + } + posted_any = true; + if (signal_send) { + sends_since_signal = 0; + } else { + sends_since_signal++; } outstanding_send_wr_ids[burst->transport_hdr.wr_id + p] = burst; @@ -682,6 +753,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { continue; } + bool posted_any = false; for (int p = 0; p < burst->transport_hdr.num_pkts; p++) { struct ibv_recv_wr recv_wr; struct ibv_sge sge; @@ -706,9 +778,10 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (ret) { DAQIRI_LOG_CRITICAL("ibv_post_recv failed: {}", strerror(errno)); metrics::add_dropped(counters, "rx_post_failure", 1); - free_tx_burst(burst); - continue; + if (!posted_any) { free_tx_burst(burst); } + break; } + posted_any = true; outstanding_receive_wr_ids[burst->transport_hdr.wr_id + p] = burst; } @@ -773,12 +846,20 @@ Status RdmaMgr::rdma_get_server_conn_id(const std::string& server_addr, uint16_t // Find the next queue ID that's not already in use for (size_t i = 0; i < server_params->second.size(); i++) { if (server_params->second[i].client_id != nullptr && !server_params->second[i].active) { - *conn_id = reinterpret_cast(server_params->second[i].client_id); + auto* client_id = server_params->second[i].client_id; + std::lock_guard lock(threads_mutex_); + if (worker_threads_.find(client_id) == worker_threads_.end() || + tx_rings_map_.find(client_id) == tx_rings_map_.end() || + rx_rings_map_.find(client_id) == rx_rings_map_.end()) { + continue; + } + + *conn_id = reinterpret_cast(client_id); server_params->second[i].active = true; DAQIRI_LOG_INFO("Found available queue ID for server {}:{} with cm_id {}", server_addr, server_port, - (void*)server_params->second[i].client_id); + (void*)client_id); return Status::SUCCESS; } } @@ -1029,12 +1110,11 @@ Status RdmaMgr::get_tx_packet_burst(BurstParams* burst) { return Status::NO_FREE_BURST_BUFFERS; } - int rx = rte_ring_dequeue_bulk(burst_pool->second, - reinterpret_cast(burst->pkts[0]), - burst->transport_hdr.num_pkts, - nullptr); - if (rx != burst->transport_hdr.num_pkts) { - DAQIRI_LOG_ERROR("Asked for {} packets, got {}", burst->transport_hdr.num_pkts, rx); + const unsigned num_pkts = static_cast(burst->transport_hdr.num_pkts); + const unsigned rx = + rte_ring_dequeue_bulk(burst_pool->second, reinterpret_cast(burst->pkts[0]), num_pkts, nullptr); + if (rx != num_pkts) { + DAQIRI_LOG_ERROR("Asked for {} packets, got {}", num_pkts, rx); rte_mempool_put(tx_burst_pool_, reinterpret_cast(burst->pkts[0])); burst->pkts[0] = nullptr; return Status::NO_FREE_BURST_BUFFERS; @@ -1043,10 +1123,8 @@ Status RdmaMgr::get_tx_packet_burst(BurstParams* burst) { // Allocate packet length buffer if (rte_mempool_get(pkt_len_pool_, reinterpret_cast(&burst->pkt_lens[0])) != 0) { DAQIRI_LOG_ERROR("Failed to get packet length buffer"); - rte_ring_enqueue_bulk(burst_pool->second, - reinterpret_cast(burst->pkts[0]), - burst->transport_hdr.num_pkts, - nullptr); + rte_ring_enqueue_bulk( + burst_pool->second, reinterpret_cast(burst->pkts[0]), num_pkts, nullptr); rte_mempool_put(tx_burst_pool_, reinterpret_cast(burst->pkts[0])); burst->pkts[0] = nullptr; return Status::NO_FREE_PACKET_BUFFERS; @@ -1342,11 +1420,15 @@ int RdmaMgr::setup_pools_and_rings() { // RX rings DAQIRI_LOG_INFO("Setting up TX/RX per-queue rings"); + // Each connection ring is single-producer/single-consumer by design: one + // rdma_thread owns the manager side and one application thread owns the API + // side for a given conn_id. If multi-threaded app access per conn_id is ever + // allowed, these rings must go back to MP/MC-safe flags. for (int i = 0; i < MAX_RDMA_CONNECTIONS; i++) { std::string ring_name = "RX_RING_" + std::to_string(i); DAQIRI_LOG_DEBUG("Setting up RX ring {}", ring_name); - struct rte_ring* ring = rte_ring_create( - ring_name.c_str(), 2048, rte_socket_id(), RING_F_MC_RTS_DEQ | RING_F_MP_RTS_ENQ); + struct rte_ring* ring = + rte_ring_create(ring_name.c_str(), 2048, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); if (ring == nullptr) { DAQIRI_LOG_CRITICAL("Failed to allocate RX ring {}!", ring_name); return -1; @@ -1356,7 +1438,7 @@ int RdmaMgr::setup_pools_and_rings() { ring_name = "TX_RING_" + std::to_string(i); DAQIRI_LOG_DEBUG("Setting up TX ring {}", ring_name); ring = rte_ring_create( - ring_name.c_str(), 2048, rte_socket_id(), RING_F_MC_RTS_DEQ | RING_F_MP_RTS_ENQ); + ring_name.c_str(), 2048, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); if (ring == nullptr) { DAQIRI_LOG_CRITICAL("Failed to allocate TX ring {}!", ring_name); return -1; @@ -1367,7 +1449,7 @@ int RdmaMgr::setup_pools_and_rings() { // Packet length buffers DAQIRI_LOG_DEBUG("Setting up RX meta pool"); pkt_len_pool_ = rte_mempool_create("PKT_LEN_POOL", - (1U << 7) - 1U, + (1U << 11) - 1U, sizeof(uint32_t) * MAX_RDMA_BATCH, 0, 0, @@ -1384,7 +1466,7 @@ int RdmaMgr::setup_pools_and_rings() { DAQIRI_LOG_DEBUG("Setting up RX meta pool"); rx_meta = rte_mempool_create("RX_META_POOL", - (1U << 6) - 1U, + (1U << 11) - 1U, sizeof(BurstParams), 0, 0, @@ -1401,7 +1483,7 @@ int RdmaMgr::setup_pools_and_rings() { DAQIRI_LOG_DEBUG("Setting up TX meta pool"); tx_meta = rte_mempool_create("TX_META_POOL", - (1U << 6) - 1U, + (1U << 11) - 1U, sizeof(BurstParams), 0, 0, @@ -1417,7 +1499,7 @@ int RdmaMgr::setup_pools_and_rings() { } tx_burst_pool_ = rte_mempool_create("TX_BURST_POOL", - (1U << 7) - 1U, + (1U << 11) - 1U, sizeof(void*) * MAX_RDMA_BATCH, 0, 0, @@ -1589,7 +1671,8 @@ void RdmaMgr::initialize() { // Set up memory region sizes for (auto& mr : cfg_.mrs_) { - mr.second.adj_size_ = RTE_ALIGN_CEIL(mr.second.buf_size_, get_alignment(mr.second.kind_)); + const size_t rdma_alignment = std::max(get_alignment(mr.second.kind_), GPU_PAGE_SIZE); + mr.second.adj_size_ = RTE_ALIGN_CEIL(mr.second.buf_size_, rdma_alignment); } for (const auto& intf : cfg_.ifs_) { diff --git a/src/managers/rdma/daqiri_rdma_mgr.h b/src/managers/rdma/daqiri_rdma_mgr.h index c3499e1..cd12dfd 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.h +++ b/src/managers/rdma/daqiri_rdma_mgr.h @@ -162,11 +162,11 @@ class RdmaMgr : public Manager { private: static constexpr int MAX_RDMA_CONNECTIONS = 128; - static constexpr int MAX_CQ = 16; + static constexpr int MAX_CQ = 1024; static constexpr int NUM_SGE_ELS = 1024; static constexpr int NUM_SGE_BUFS = 256; static constexpr int MAX_NUM_MR = 16; // Maximum number of memory registers to exchange - static constexpr int MAX_OUSTANDING_WR = 64; + static constexpr int MAX_OUSTANDING_WR = 1024; static constexpr int MAX_NUM_PORTS = 4; static constexpr int MAX_RDMA_BATCH = 1024;