From 11bbd75e50b5782f3d89d1816883bafb31772584 Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Wed, 27 May 2026 21:10:49 +0000 Subject: [PATCH 1/5] #15 - Add RDMA benchmark profiling knobs Instrument the RDMA benchmark and manager hot paths so SEND/RECV posting, CQ polling, ring traffic, and app refill behavior can be measured while debugging the perftest gap. Add RDMA tuning hooks for relaxed ordering, SEND signaling cadence, queue depth, SPSC rings, and RDMA buffer alignment, plus a server connection readiness guard. Signed-off-by: Cliff Burdick --- examples/rdma_bench.cpp | 208 +++++++++++++++++-- src/managers/rdma/daqiri_rdma_mgr.cpp | 286 +++++++++++++++++++++++--- src/managers/rdma/daqiri_rdma_mgr.h | 4 +- 3 files changed, 455 insertions(+), 43 deletions(-) diff --git a/examples/rdma_bench.cpp b/examples/rdma_bench.cpp index 41c7b3f..dbac755 100644 --- a/examples/rdma_bench.cpp +++ b/examples/rdma_bench.cpp @@ -17,9 +17,15 @@ #include +#include +#include + +#include #include #include +#include #include +#include #include #include #include @@ -44,6 +50,7 @@ struct RdmaBenchConfig { std::string server_address = "10.100.1.1"; std::string client_address = "10.100.4.1"; uint16_t server_port = 4096; + int cpu_core = -1; }; struct RdmaWorkerStats { @@ -51,8 +58,33 @@ struct RdmaWorkerStats { uint64_t recv_completions = 0; uint64_t send_bytes = 0; uint64_t recv_bytes = 0; + uint64_t loop_iters = 0; + uint64_t send_post_attempts = 0; + uint64_t recv_post_attempts = 0; + uint64_t send_post_skips = 0; + uint64_t recv_post_skips = 0; + uint64_t create_burst_ns = 0; + uint64_t set_header_ns = 0; + uint64_t get_tx_burst_ns = 0; + uint64_t get_tx_burst_waits = 0; + uint64_t set_lengths_ns = 0; + uint64_t send_tx_burst_ns = 0; + uint64_t completion_poll_ns = 0; + uint64_t completion_polls = 0; + uint64_t completion_hits = 0; + uint64_t completion_misses = 0; + uint64_t empty_completion_sleeps = 0; + uint64_t max_outstanding_send = 0; + uint64_t max_outstanding_recv = 0; }; +uint64_t elapsed_ns(std::chrono::steady_clock::time_point start) { + return static_cast( + std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count()); +} + RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { RdmaBenchConfig cfg; cfg.server = node["server"].as(cfg.server); @@ -62,9 +94,23 @@ RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { cfg.server_address = node["server_address"].as(cfg.server_address); cfg.client_address = node["client_address"].as(cfg.client_address); cfg.server_port = node["server_port"].as(cfg.server_port); + cfg.cpu_core = node["cpu_core"].as(cfg.cpu_core); return cfg; } +void set_current_thread_affinity(int cpu_core, const char* name) { + if (cpu_core < 0) { return; } + + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(cpu_core, &cpuset); + const int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + if (rc != 0) { + std::cerr << name << " failed to set affinity to core " << cpu_core << ": " + << std::strerror(rc) << '\n'; + } +} + void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer, std::atomic& stop, RdmaWorkerStats& stats) { // Matches the per-MR num_bufs in the YAML configs. Higher values deadlock @@ -72,16 +118,26 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa // but free_tx_burst (which refills it) only runs later in the same loop // iteration via get_rx_burst. Until the loop is refactored to interleave // drain with post, this constant must stay <= num_bufs. - static constexpr int kMaxOutstanding = 20; + int max_outstanding = 20; + if (const char* env = std::getenv("DAQIRI_RDMA_MAX_OUTSTANDING")) { + max_outstanding = std::max(1, std::stoi(env)); + } int outstanding_send = 0; int outstanding_recv = 0; uint64_t send_wr_id = 0x1234; uint64_t recv_wr_id = 0x2345; uintptr_t conn_id = 0; + const bool profile = std::getenv("DAQIRI_RDMA_BENCH_PROFILE") != nullptr; + int empty_poll_sleep_us = 100; + if (const char* env = std::getenv("DAQIRI_RDMA_EMPTY_POLL_SLEEP_US")) { + empty_poll_sleep_us = std::max(0, std::stoi(env)); + } + set_current_thread_affinity(cfg.cpu_core, cfg.server ? "Server app thread" : "Client app thread"); std::string send_mr = cfg.server ? "DATA_TX_GPU_SERVER" : "DATA_TX_GPU_CLIENT"; std::string recv_mr = cfg.server ? "DATA_RX_GPU_SERVER" : "DATA_RX_GPU_CLIENT"; while (!stop.load()) { + if (profile) { stats.loop_iters++; } if (conn_id == 0) { daqiri::Status s = daqiri::Status::GENERIC_FAILURE; if (cfg.server) { @@ -97,46 +153,102 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa } auto post_req = [&](int& outstanding, uint64_t& wr_id, daqiri::RDMAOpCode op, - const std::string& mr_name) { - if (outstanding >= kMaxOutstanding) { return; } + const std::string& mr_name) -> bool { + if (outstanding >= max_outstanding) { + if (profile) { + if (op == daqiri::RDMAOpCode::SEND) { + stats.send_post_skips++; + } else { + stats.recv_post_skips++; + } + } + return false; + } + + if (profile) { + if (op == daqiri::RDMAOpCode::SEND) { + stats.send_post_attempts++; + } else { + stats.recv_post_attempts++; + } + } + auto start = std::chrono::steady_clock::now(); auto* msg = daqiri::create_burst_params(); + if (profile) { stats.create_burst_ns += elapsed_ns(start); } + + start = std::chrono::steady_clock::now(); if (daqiri::rdma_set_header(msg, op, conn_id, cfg.server, 1, wr_id, mr_name) != daqiri::Status::SUCCESS) { + if (profile) { stats.set_header_ns += elapsed_ns(start); } daqiri::free_tx_burst(msg); - return; + return false; } + if (profile) { stats.set_header_ns += elapsed_ns(start); } - while (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS && !stop.load()) { + while (!stop.load()) { + start = std::chrono::steady_clock::now(); + const auto status = daqiri::get_tx_packet_burst(msg); + if (profile) { stats.get_tx_burst_ns += elapsed_ns(start); } + if (status == daqiri::Status::SUCCESS) { break; } + if (profile) { stats.get_tx_burst_waits++; } std::this_thread::sleep_for(std::chrono::microseconds(50)); } if (stop.load()) { daqiri::free_tx_burst(msg); - return; + return false; } + start = std::chrono::steady_clock::now(); if (daqiri::set_packet_lengths(msg, 0, {cfg.message_size}) != daqiri::Status::SUCCESS) { + if (profile) { stats.set_lengths_ns += elapsed_ns(start); } daqiri::free_tx_burst(msg); - return; + return false; } + if (profile) { stats.set_lengths_ns += elapsed_ns(start); } + + start = std::chrono::steady_clock::now(); if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) { + if (profile) { stats.send_tx_burst_ns += elapsed_ns(start); } daqiri::free_tx_burst(msg); - return; + return false; } + if (profile) { stats.send_tx_burst_ns += elapsed_ns(start); } + outstanding++; wr_id++; + if (profile) { + if (op == daqiri::RDMAOpCode::SEND) { + stats.max_outstanding_send = std::max( + stats.max_outstanding_send, static_cast(outstanding)); + } else { + stats.max_outstanding_recv = std::max( + stats.max_outstanding_recv, static_cast(outstanding)); + } + } // Only meter actual byte transmissions (SENDs), not RECEIVE-side posts. if (op == daqiri::RDMAOpCode::SEND) { pacer.wait_for_bytes(static_cast(cfg.message_size), stop); } + return true; }; - if (cfg.send) { post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr); } - if (cfg.receive) { post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr); } + bool got_completion = false; + while (true) { + daqiri::BurstParams* completion = nullptr; + auto poll_start = std::chrono::steady_clock::now(); + const auto completion_status = daqiri::get_rx_burst(&completion, conn_id, cfg.server); + if (profile) { + stats.completion_poll_ns += elapsed_ns(poll_start); + stats.completion_polls++; + } + if (completion_status != daqiri::Status::SUCCESS || completion == nullptr) { + if (profile) { stats.completion_misses++; } + break; + } - daqiri::BurstParams* completion = nullptr; - if (daqiri::get_rx_burst(&completion, conn_id, cfg.server) == daqiri::Status::SUCCESS && - completion != nullptr) { + got_completion = true; + if (profile) { stats.completion_hits++; } if (daqiri::rdma_get_opcode(completion) == daqiri::RDMAOpCode::SEND && outstanding_send > 0) { outstanding_send--; stats.send_completions++; @@ -148,12 +260,72 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa stats.recv_bytes += static_cast(cfg.message_size); } daqiri::free_tx_burst(completion); - } else { - std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + + bool posted_work = false; + if (cfg.receive) { + while (outstanding_recv < max_outstanding && + post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr)) { + posted_work = true; + } + if (profile && outstanding_recv >= max_outstanding) { stats.recv_post_skips++; } + } + if (cfg.send) { + while (outstanding_send < max_outstanding && + post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr)) { + posted_work = true; + } + if (profile && outstanding_send >= max_outstanding) { stats.send_post_skips++; } + } + + if (!got_completion && !posted_work) { + if (profile) { stats.empty_completion_sleeps++; } + if (empty_poll_sleep_us > 0) { + std::this_thread::sleep_for(std::chrono::microseconds(empty_poll_sleep_us)); + } else { + std::this_thread::yield(); + } } } } +void print_worker_profile(const char* name, const RdmaWorkerStats& stats) { + const auto post_attempts = stats.send_post_attempts + stats.recv_post_attempts; + std::cout << name << " profile:" + << " loop_iters=" << stats.loop_iters + << " send_post_attempts=" << stats.send_post_attempts + << " recv_post_attempts=" << stats.recv_post_attempts + << " send_post_skips=" << stats.send_post_skips + << " recv_post_skips=" << stats.recv_post_skips + << " empty_poll_sleep_us=" + << (std::getenv("DAQIRI_RDMA_EMPTY_POLL_SLEEP_US") == nullptr + ? 100 + : std::max(0, std::stoi(std::getenv("DAQIRI_RDMA_EMPTY_POLL_SLEEP_US")))) + << " max_outstanding_limit=" + << (std::getenv("DAQIRI_RDMA_MAX_OUTSTANDING") == nullptr + ? 20 + : std::max(1, std::stoi(std::getenv("DAQIRI_RDMA_MAX_OUTSTANDING")))) + << " max_outstanding_send=" << stats.max_outstanding_send + << " max_outstanding_recv=" << stats.max_outstanding_recv + << " completion_polls=" << stats.completion_polls + << " completion_hits=" << stats.completion_hits + << " completion_misses=" << stats.completion_misses + << " get_tx_burst_waits=" << stats.get_tx_burst_waits + << " create_burst_avg_ns=" + << (post_attempts == 0 ? 0 : stats.create_burst_ns / post_attempts) + << " set_header_avg_ns=" + << (post_attempts == 0 ? 0 : stats.set_header_ns / post_attempts) + << " get_tx_burst_avg_ns=" + << (post_attempts == 0 ? 0 : stats.get_tx_burst_ns / post_attempts) + << " set_lengths_avg_ns=" + << (post_attempts == 0 ? 0 : stats.set_lengths_ns / post_attempts) + << " send_tx_burst_avg_ns=" + << (post_attempts == 0 ? 0 : stats.send_tx_burst_ns / post_attempts) + << " completion_poll_avg_ns=" + << (stats.completion_polls == 0 ? 0 : stats.completion_poll_ns / stats.completion_polls) + << '\n'; +} + } // namespace int main(int argc, char** argv) { @@ -236,6 +408,9 @@ int main(int argc, char** argv) { << " send_bytes=" << server_stats.send_bytes << " recv_bytes=" << server_stats.recv_bytes << " seconds=" << secs << '\n'; + if (std::getenv("DAQIRI_RDMA_BENCH_PROFILE") != nullptr) { + print_worker_profile("Server", server_stats); + } } if (run_client) { std::cout << "Client complete: send_completions=" << client_stats.send_completions @@ -243,6 +418,9 @@ int main(int argc, char** argv) { << " send_bytes=" << client_stats.send_bytes << " recv_bytes=" << client_stats.recv_bytes << " seconds=" << secs << '\n'; + if (std::getenv("DAQIRI_RDMA_BENCH_PROFILE") != nullptr) { + print_worker_profile("Client", client_stats); + } } daqiri::print_stats(); diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index d5bb676..1ebbb7e 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -20,6 +20,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -154,6 +158,7 @@ int RdmaMgr::mr_access_to_ibv(uint32_t access) { if (access & MEM_ACCESS_LOCAL) { ibv_access |= IBV_ACCESS_LOCAL_WRITE; } if (access & MEM_ACCESS_RDMA_READ) { ibv_access |= IBV_ACCESS_REMOTE_READ; } if (access & MEM_ACCESS_RDMA_WRITE) { ibv_access |= IBV_ACCESS_REMOTE_WRITE; } + ibv_access |= IBV_ACCESS_RELAXED_ORDERING; return ibv_access; } @@ -169,6 +174,17 @@ int RdmaMgr::rdma_register_mr(const MemoryRegionConfig& mr, void* ptr) { int access = mr_access_to_ibv(mr.access_); params.ctx_mr_map_[pd.second] = ibv_reg_mr(pd.second, ptr, mr.adj_size_ * mr.num_bufs_, access); + if (params.ctx_mr_map_[pd.second] == nullptr && + (access & IBV_ACCESS_RELAXED_ORDERING) != 0) { + const int fallback_access = access & ~IBV_ACCESS_RELAXED_ORDERING; + DAQIRI_LOG_WARN( + "Failed to register MR {} with relaxed ordering on PD {}; retrying without it", + mr.name_, + (void*)pd.second); + params.ctx_mr_map_[pd.second] = + ibv_reg_mr(pd.second, ptr, mr.adj_size_ * mr.num_bufs_, fallback_access); + access = fallback_access; + } if (params.ctx_mr_map_[pd.second] == nullptr) { DAQIRI_LOG_CRITICAL("Failed to register MR {} on PD {}", mr.name_, (void*)pd.second); return -1; @@ -181,7 +197,7 @@ int RdmaMgr::rdma_register_mr(const MemoryRegionConfig& mr, void* ptr) { (void*)ptr, (void*)((uint8_t*)ptr + mr.adj_size_ * mr.num_bufs_), params.ctx_mr_map_[pd.second]->lkey, - mr.access_); + access); } } } @@ -383,11 +399,51 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { struct ibv_wc wc; int num_comp; BurstParams* msg; + struct RdmaThreadProfile { + uint64_t loop_iters = 0; + uint64_t rx_cq_poll_calls = 0; + uint64_t rx_cq_completions = 0; + uint64_t rx_cq_poll_ns = 0; + uint64_t tx_cq_poll_calls = 0; + uint64_t tx_cq_completions = 0; + uint64_t tx_cq_poll_ns = 0; + uint64_t rx_ring_enqueue_ns = 0; + uint64_t rx_ring_enqueues = 0; + uint64_t tx_ring_dequeue_ns = 0; + uint64_t tx_ring_dequeues = 0; + uint64_t tx_ring_empty = 0; + uint64_t send_requests = 0; + uint64_t recv_requests = 0; + uint64_t send_request_ns = 0; + uint64_t recv_request_ns = 0; + uint64_t post_send_calls = 0; + uint64_t post_send_signaled = 0; + uint64_t post_send_unsignaled = 0; + uint64_t post_send_ns = 0; + uint64_t post_recv_calls = 0; + uint64_t post_recv_ns = 0; + uint64_t max_outstanding_send = 0; + uint64_t max_outstanding_recv = 0; + } profile_stats; + const bool profile_enabled = std::getenv("DAQIRI_RDMA_PROFILE") != nullptr; + int send_signal_every = 1; + if (const char* env = std::getenv("DAQIRI_RDMA_SEND_SIGNAL_EVERY")) { + const long parsed = strtol(env, nullptr, 10); + if (parsed > 0) { send_signal_every = static_cast(parsed); } + } + uint64_t sends_since_signal = 0; + const auto thread_start = std::chrono::steady_clock::now(); + auto elapsed_ns = [](std::chrono::steady_clock::time_point start) { + return static_cast( + std::chrono::duration_cast( + std::chrono::steady_clock::now() - start) + .count()); + }; const auto& qref = cfg_.ifs_[tparams->if_idx].tx_.queues_[tparams->queue_idx]; const long cpu_core = strtol(qref.common_.cpu_core_.c_str(), NULL, 10); struct rte_ring* tx_ring = tparams->qp_params.tx_ring; struct rte_ring* rx_ring = tparams->qp_params.rx_ring; - std::unordered_map outstanding_send_wr_ids; + std::map outstanding_send_wr_ids; std::unordered_map outstanding_receive_wr_ids; if (set_affinity(cpu_core) != 0) { @@ -396,12 +452,42 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { } DAQIRI_LOG_INFO("Affined {} RDMA thread to core {}", is_server ? "Server" : "Client", cpu_core); + if (profile_enabled) { + ibv_qp_attr qp_attr = {}; + ibv_qp_init_attr qp_init_attr = {}; + if (ibv_query_qp(tparams->client_id->qp, + &qp_attr, + IBV_QP_STATE | IBV_QP_PATH_MTU, + &qp_init_attr) == 0) { + DAQIRI_LOG_INFO("RDMA_QP_PROFILE role={} core={} qp_state={} path_mtu={}", + is_server ? "server" : "client", + cpu_core, + static_cast(qp_attr.qp_state), + static_cast(qp_attr.path_mtu)); + } else { + DAQIRI_LOG_WARN("Failed to query QP profile for role={} core={}: {}", + is_server ? "server" : "client", + cpu_core, + strerror(errno)); + } + } // Main TX loop. Wait for send requests from the transmitters to arrive for sending. Also // periodically poll the CQ. while (!rdma_force_quit.load()) { + if (profile_enabled) { profile_stats.loop_iters++; } // Check RQ first to reduce latency - while ((num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc)) != 0) { + while (true) { + auto poll_start = std::chrono::steady_clock::now(); + num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc); + if (profile_enabled) { + profile_stats.rx_cq_poll_calls++; + profile_stats.rx_cq_poll_ns += elapsed_ns(poll_start); + } + if (num_comp == 0) { break; } + if (profile_enabled) { + profile_stats.rx_cq_completions += static_cast(num_comp); + } DAQIRI_LOG_DEBUG("GOT RX COMPLETION in thread {} core {} wrid {}", (void*)tparams->client_id, cpu_core, @@ -446,16 +532,31 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { msg->transport_hdr.tx = false; // msg->transport_hdr.wr_id = wc.wr_id; + auto enqueue_start = std::chrono::steady_clock::now(); if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); free_tx_burst(msg); free_tx_metadata(msg); return; } + if (profile_enabled) { + profile_stats.rx_ring_enqueue_ns += elapsed_ns(enqueue_start); + profile_stats.rx_ring_enqueues++; + } } // Check TX CQ for completion - while ((num_comp = ibv_poll_cq(tparams->qp_params.tx_cq, 1, &wc)) != 0) { + while (true) { + auto poll_start = std::chrono::steady_clock::now(); + num_comp = ibv_poll_cq(tparams->qp_params.tx_cq, 1, &wc); + if (profile_enabled) { + profile_stats.tx_cq_poll_calls++; + profile_stats.tx_cq_poll_ns += elapsed_ns(poll_start); + } + if (num_comp == 0) { break; } + if (profile_enabled) { + profile_stats.tx_cq_completions += static_cast(num_comp); + } DAQIRI_LOG_DEBUG("GOT TX COMPLETION in thread {} core {} wrid {}", (void*)tparams->client_id, cpu_core, @@ -477,18 +578,39 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { continue; } - msg = it->second; + auto completed_end = outstanding_send_wr_ids.upper_bound(wc.wr_id); + for (auto completed_it = outstanding_send_wr_ids.begin(); completed_it != completed_end;) { + msg = completed_it->second; + + const auto conn_id = get_connection_id(msg); + const auto expected_conn_id = reinterpret_cast(tparams->client_id); + if (conn_id != expected_conn_id) { + DAQIRI_LOG_CRITICAL("Wrong connection ID in send completion {}: {} != {}", + completed_it->first, + conn_id, + expected_conn_id); + } - const auto conn_id = get_connection_id(msg); - const auto expected_conn_id = reinterpret_cast(tparams->client_id); - if (conn_id != expected_conn_id) { - DAQIRI_LOG_CRITICAL("Wrong connection ID in send completion {}: {} != {}", - wc.wr_id, - conn_id, - expected_conn_id); - } + completed_it = outstanding_send_wr_ids.erase(completed_it); + + msg->transport_hdr.tx = true; + msg->transport_hdr.status = + wc.status == IBV_WC_SUCCESS ? Status::SUCCESS : Status::GENERIC_FAILURE; + msg->transport_hdr.server = is_server; - outstanding_send_wr_ids.erase(it); + auto enqueue_start = std::chrono::steady_clock::now(); + if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { + DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); + free_tx_burst(msg); + free_tx_metadata(msg); + return; + } + if (profile_enabled) { + profile_stats.rx_ring_enqueue_ns += elapsed_ns(enqueue_start); + profile_stats.rx_ring_enqueues++; + } + } + continue; } else { msg = create_burst_params(); } @@ -502,12 +624,17 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { msg->transport_hdr.server = is_server; // msg->transport_hdr.wr_id = wc.wr_id; + auto enqueue_start = std::chrono::steady_clock::now(); if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); free_tx_burst(msg); free_tx_metadata(msg); return; } + if (profile_enabled) { + profile_stats.rx_ring_enqueue_ns += elapsed_ns(enqueue_start); + profile_stats.rx_ring_enqueues++; + } } // Now handle any incoming messages @@ -515,9 +642,18 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { // ssize_t bytes = mq_receive(tparams.tx_mq, reinterpret_cast(&burst), sizeof(burst), // nullptr); + auto dequeue_start = std::chrono::steady_clock::now(); if (rte_ring_dequeue(tparams->qp_params.tx_ring, reinterpret_cast(&burst)) != 0) { + if (profile_enabled) { + profile_stats.tx_ring_dequeue_ns += elapsed_ns(dequeue_start); + profile_stats.tx_ring_empty++; + } continue; } + if (profile_enabled) { + profile_stats.tx_ring_dequeue_ns += elapsed_ns(dequeue_start); + profile_stats.tx_ring_dequeues++; + } const auto local_mr = mrs_.find(std::string(burst->transport_hdr.local_mr_name)); if (local_mr == mrs_.end()) { @@ -529,6 +665,8 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { switch (burst->transport_hdr.opcode) { case RDMAOpCode::SEND: { + const auto request_start = std::chrono::steady_clock::now(); + if (profile_enabled) { profile_stats.send_requests++; } // Get lkey for this PD auto pd = pd_map_.find(tparams->client_id->verbs); if (pd == pd_map_.end()) { @@ -559,21 +697,46 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { wr.sg_list = &sge; wr.num_sge = 1; wr.opcode = IBV_WR_SEND; - wr.send_flags = IBV_SEND_SIGNALED; - + // Keep signaled completions frequent enough for callers that use SEND + // completions as their credit to refill the outstanding window. + const bool signal_send = send_signal_every == 1 || + sends_since_signal + 1 >= send_signal_every || + outstanding_send_wr_ids.size() >= 15; + wr.send_flags = signal_send ? IBV_SEND_SIGNALED : 0; + + auto post_start = std::chrono::steady_clock::now(); int ret = ibv_post_send(tparams->client_id->qp, &wr, &bad_wr); + if (profile_enabled) { + profile_stats.post_send_calls++; + profile_stats.post_send_ns += elapsed_ns(post_start); + } if (ret != 0) { DAQIRI_LOG_CRITICAL("Failed to post SEND request, errno: {}", strerror(errno)); free_tx_burst(burst); continue; } + if (signal_send) { + sends_since_signal = 0; + if (profile_enabled) { profile_stats.post_send_signaled++; } + } else { + sends_since_signal++; + if (profile_enabled) { profile_stats.post_send_unsignaled++; } + } outstanding_send_wr_ids[burst->transport_hdr.wr_id + p] = burst; } + if (profile_enabled) { + profile_stats.max_outstanding_send = std::max( + profile_stats.max_outstanding_send, + static_cast(outstanding_send_wr_ids.size())); + profile_stats.send_request_ns += elapsed_ns(request_start); + } break; } case RDMAOpCode::RECEIVE: { + const auto request_start = std::chrono::steady_clock::now(); + if (profile_enabled) { profile_stats.recv_requests++; } // Get lkey for this PD auto pd = pd_map_.find(tparams->client_id->verbs); if (pd == pd_map_.end()) { @@ -611,7 +774,12 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { recv_wr.num_sge = 1; // Post the receive request + auto post_start = std::chrono::steady_clock::now(); ret = ibv_post_recv(tparams->client_id->qp, &recv_wr, &bad_wr); + if (profile_enabled) { + profile_stats.post_recv_calls++; + profile_stats.post_recv_ns += elapsed_ns(post_start); + } if (ret) { DAQIRI_LOG_CRITICAL("ibv_post_recv failed: {}", strerror(errno)); free_tx_burst(burst); @@ -620,6 +788,12 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { outstanding_receive_wr_ids[burst->transport_hdr.wr_id + p] = burst; } + if (profile_enabled) { + profile_stats.max_outstanding_recv = std::max( + profile_stats.max_outstanding_recv, + static_cast(outstanding_receive_wr_ids.size())); + profile_stats.recv_request_ns += elapsed_ns(request_start); + } break; } case RDMAOpCode::RDMA_WRITE: @@ -633,6 +807,57 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { } } + if (profile_enabled) { + const double elapsed_s = + std::chrono::duration(std::chrono::steady_clock::now() - thread_start).count(); + DAQIRI_LOG_INFO( + "RDMA_THREAD_PROFILE role={} core={} elapsed_s={:.6f} loop_iters={} tx_ring_empty={} " + "tx_ring_dequeues={} rx_cq_poll_calls={} rx_cq_completions={} tx_cq_poll_calls={} " + "tx_cq_completions={} rx_ring_enqueues={} send_requests={} recv_requests={} " + "post_send_calls={} post_send_signaled={} post_send_unsignaled={} post_recv_calls={} " + "max_outstanding_send={} max_outstanding_recv={} send_signal_every={} " + "tx_ring_dequeue_avg_ns={} rx_cq_poll_avg_ns={} tx_cq_poll_avg_ns={} " + "rx_ring_enqueue_avg_ns={} send_request_avg_ns={} recv_request_avg_ns={} " + "post_send_avg_ns={} post_recv_avg_ns={}", + is_server ? "server" : "client", + cpu_core, + elapsed_s, + profile_stats.loop_iters, + profile_stats.tx_ring_empty, + profile_stats.tx_ring_dequeues, + profile_stats.rx_cq_poll_calls, + profile_stats.rx_cq_completions, + profile_stats.tx_cq_poll_calls, + profile_stats.tx_cq_completions, + profile_stats.rx_ring_enqueues, + profile_stats.send_requests, + profile_stats.recv_requests, + profile_stats.post_send_calls, + profile_stats.post_send_signaled, + profile_stats.post_send_unsignaled, + profile_stats.post_recv_calls, + profile_stats.max_outstanding_send, + profile_stats.max_outstanding_recv, + send_signal_every, + (profile_stats.tx_ring_dequeues + profile_stats.tx_ring_empty) == 0 + ? 0 + : profile_stats.tx_ring_dequeue_ns / + (profile_stats.tx_ring_dequeues + profile_stats.tx_ring_empty), + profile_stats.rx_cq_poll_calls == 0 + ? 0 + : profile_stats.rx_cq_poll_ns / profile_stats.rx_cq_poll_calls, + profile_stats.tx_cq_poll_calls == 0 + ? 0 + : profile_stats.tx_cq_poll_ns / profile_stats.tx_cq_poll_calls, + profile_stats.rx_ring_enqueues == 0 + ? 0 + : profile_stats.rx_ring_enqueue_ns / profile_stats.rx_ring_enqueues, + profile_stats.send_requests == 0 ? 0 : profile_stats.send_request_ns / profile_stats.send_requests, + profile_stats.recv_requests == 0 ? 0 : profile_stats.recv_request_ns / profile_stats.recv_requests, + profile_stats.post_send_calls == 0 ? 0 : profile_stats.post_send_ns / profile_stats.post_send_calls, + profile_stats.post_recv_calls == 0 ? 0 : profile_stats.post_recv_ns / profile_stats.post_recv_calls); + } + DAQIRI_LOG_INFO("{} RDMA thread exiting on core {}", is_server ? "Server" : "Client", cpu_core); } @@ -681,12 +906,20 @@ Status RdmaMgr::rdma_get_server_conn_id(const std::string& server_addr, uint16_t // Find the next queue ID that's not already in use for (size_t i = 0; i < server_params->second.size(); i++) { if (server_params->second[i].client_id != nullptr && !server_params->second[i].active) { - *conn_id = reinterpret_cast(server_params->second[i].client_id); + auto* client_id = server_params->second[i].client_id; + std::lock_guard lock(threads_mutex_); + if (worker_threads_.find(client_id) == worker_threads_.end() || + tx_rings_map_.find(client_id) == tx_rings_map_.end() || + rx_rings_map_.find(client_id) == rx_rings_map_.end()) { + continue; + } + + *conn_id = reinterpret_cast(client_id); server_params->second[i].active = true; DAQIRI_LOG_INFO("Found available queue ID for server {}:{} with cm_id {}", server_addr, server_port, - (void*)server_params->second[i].client_id); + (void*)client_id); return Status::SUCCESS; } } @@ -1248,8 +1481,8 @@ int RdmaMgr::setup_pools_and_rings() { for (int i = 0; i < MAX_RDMA_CONNECTIONS; i++) { std::string ring_name = "RX_RING_" + std::to_string(i); DAQIRI_LOG_DEBUG("Setting up RX ring {}", ring_name); - struct rte_ring* ring = rte_ring_create( - ring_name.c_str(), 2048, rte_socket_id(), RING_F_MC_RTS_DEQ | RING_F_MP_RTS_ENQ); + struct rte_ring* ring = + rte_ring_create(ring_name.c_str(), 2048, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); if (ring == nullptr) { DAQIRI_LOG_CRITICAL("Failed to allocate RX ring {}!", ring_name); return -1; @@ -1259,7 +1492,7 @@ int RdmaMgr::setup_pools_and_rings() { ring_name = "TX_RING_" + std::to_string(i); DAQIRI_LOG_DEBUG("Setting up TX ring {}", ring_name); ring = rte_ring_create( - ring_name.c_str(), 2048, rte_socket_id(), RING_F_MC_RTS_DEQ | RING_F_MP_RTS_ENQ); + ring_name.c_str(), 2048, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); if (ring == nullptr) { DAQIRI_LOG_CRITICAL("Failed to allocate TX ring {}!", ring_name); return -1; @@ -1270,7 +1503,7 @@ int RdmaMgr::setup_pools_and_rings() { // Packet length buffers DAQIRI_LOG_DEBUG("Setting up RX meta pool"); pkt_len_pool_ = rte_mempool_create("PKT_LEN_POOL", - (1U << 7) - 1U, + (1U << 11) - 1U, sizeof(uint32_t) * MAX_RDMA_BATCH, 0, 0, @@ -1287,7 +1520,7 @@ int RdmaMgr::setup_pools_and_rings() { DAQIRI_LOG_DEBUG("Setting up RX meta pool"); rx_meta = rte_mempool_create("RX_META_POOL", - (1U << 6) - 1U, + (1U << 11) - 1U, sizeof(BurstParams), 0, 0, @@ -1304,7 +1537,7 @@ int RdmaMgr::setup_pools_and_rings() { DAQIRI_LOG_DEBUG("Setting up TX meta pool"); tx_meta = rte_mempool_create("TX_META_POOL", - (1U << 6) - 1U, + (1U << 11) - 1U, sizeof(BurstParams), 0, 0, @@ -1320,7 +1553,7 @@ int RdmaMgr::setup_pools_and_rings() { } tx_burst_pool_ = rte_mempool_create("TX_BURST_POOL", - (1U << 7) - 1U, + (1U << 11) - 1U, sizeof(void*) * MAX_RDMA_BATCH, 0, 0, @@ -1484,7 +1717,8 @@ void RdmaMgr::initialize() { // Set up memory region sizes for (auto& mr : cfg_.mrs_) { - mr.second.adj_size_ = RTE_ALIGN_CEIL(mr.second.buf_size_, get_alignment(mr.second.kind_)); + const size_t rdma_alignment = std::max(get_alignment(mr.second.kind_), GPU_PAGE_SIZE); + mr.second.adj_size_ = RTE_ALIGN_CEIL(mr.second.buf_size_, rdma_alignment); } for (const auto& intf : cfg_.ifs_) { diff --git a/src/managers/rdma/daqiri_rdma_mgr.h b/src/managers/rdma/daqiri_rdma_mgr.h index c3499e1..cd12dfd 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.h +++ b/src/managers/rdma/daqiri_rdma_mgr.h @@ -162,11 +162,11 @@ class RdmaMgr : public Manager { private: static constexpr int MAX_RDMA_CONNECTIONS = 128; - static constexpr int MAX_CQ = 16; + static constexpr int MAX_CQ = 1024; static constexpr int NUM_SGE_ELS = 1024; static constexpr int NUM_SGE_BUFS = 256; static constexpr int MAX_NUM_MR = 16; // Maximum number of memory registers to exchange - static constexpr int MAX_OUSTANDING_WR = 64; + static constexpr int MAX_OUSTANDING_WR = 1024; static constexpr int MAX_NUM_PORTS = 4; static constexpr int MAX_RDMA_BATCH = 1024; From 7766d27d5255d2df5193c16d21b7b65a7974225d Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Wed, 27 May 2026 21:18:40 +0000 Subject: [PATCH 2/5] #15 - Remove RDMA benchmark debug instrumentation Drop benchmark-side timing/profile output and remove RDMA manager profiling timers while keeping the non-debug RDMA transport tuning changes. Signed-off-by: Cliff Burdick --- examples/rdma_bench.cpp | 174 +------------------------ src/managers/rdma/daqiri_rdma_mgr.cpp | 178 +------------------------- 2 files changed, 8 insertions(+), 344 deletions(-) diff --git a/examples/rdma_bench.cpp b/examples/rdma_bench.cpp index dbac755..5002812 100644 --- a/examples/rdma_bench.cpp +++ b/examples/rdma_bench.cpp @@ -17,15 +17,9 @@ #include -#include -#include - -#include #include #include -#include #include -#include #include #include #include @@ -50,7 +44,6 @@ struct RdmaBenchConfig { std::string server_address = "10.100.1.1"; std::string client_address = "10.100.4.1"; uint16_t server_port = 4096; - int cpu_core = -1; }; struct RdmaWorkerStats { @@ -58,33 +51,8 @@ struct RdmaWorkerStats { uint64_t recv_completions = 0; uint64_t send_bytes = 0; uint64_t recv_bytes = 0; - uint64_t loop_iters = 0; - uint64_t send_post_attempts = 0; - uint64_t recv_post_attempts = 0; - uint64_t send_post_skips = 0; - uint64_t recv_post_skips = 0; - uint64_t create_burst_ns = 0; - uint64_t set_header_ns = 0; - uint64_t get_tx_burst_ns = 0; - uint64_t get_tx_burst_waits = 0; - uint64_t set_lengths_ns = 0; - uint64_t send_tx_burst_ns = 0; - uint64_t completion_poll_ns = 0; - uint64_t completion_polls = 0; - uint64_t completion_hits = 0; - uint64_t completion_misses = 0; - uint64_t empty_completion_sleeps = 0; - uint64_t max_outstanding_send = 0; - uint64_t max_outstanding_recv = 0; }; -uint64_t elapsed_ns(std::chrono::steady_clock::time_point start) { - return static_cast( - std::chrono::duration_cast( - std::chrono::steady_clock::now() - start) - .count()); -} - RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { RdmaBenchConfig cfg; cfg.server = node["server"].as(cfg.server); @@ -94,23 +62,9 @@ RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { cfg.server_address = node["server_address"].as(cfg.server_address); cfg.client_address = node["client_address"].as(cfg.client_address); cfg.server_port = node["server_port"].as(cfg.server_port); - cfg.cpu_core = node["cpu_core"].as(cfg.cpu_core); return cfg; } -void set_current_thread_affinity(int cpu_core, const char* name) { - if (cpu_core < 0) { return; } - - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - CPU_SET(cpu_core, &cpuset); - const int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); - if (rc != 0) { - std::cerr << name << " failed to set affinity to core " << cpu_core << ": " - << std::strerror(rc) << '\n'; - } -} - void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer, std::atomic& stop, RdmaWorkerStats& stats) { // Matches the per-MR num_bufs in the YAML configs. Higher values deadlock @@ -118,26 +72,16 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa // but free_tx_burst (which refills it) only runs later in the same loop // iteration via get_rx_burst. Until the loop is refactored to interleave // drain with post, this constant must stay <= num_bufs. - int max_outstanding = 20; - if (const char* env = std::getenv("DAQIRI_RDMA_MAX_OUTSTANDING")) { - max_outstanding = std::max(1, std::stoi(env)); - } + static constexpr int kMaxOutstanding = 20; int outstanding_send = 0; int outstanding_recv = 0; uint64_t send_wr_id = 0x1234; uint64_t recv_wr_id = 0x2345; uintptr_t conn_id = 0; - const bool profile = std::getenv("DAQIRI_RDMA_BENCH_PROFILE") != nullptr; - int empty_poll_sleep_us = 100; - if (const char* env = std::getenv("DAQIRI_RDMA_EMPTY_POLL_SLEEP_US")) { - empty_poll_sleep_us = std::max(0, std::stoi(env)); - } - set_current_thread_affinity(cfg.cpu_core, cfg.server ? "Server app thread" : "Client app thread"); std::string send_mr = cfg.server ? "DATA_TX_GPU_SERVER" : "DATA_TX_GPU_CLIENT"; std::string recv_mr = cfg.server ? "DATA_RX_GPU_SERVER" : "DATA_RX_GPU_CLIENT"; while (!stop.load()) { - if (profile) { stats.loop_iters++; } if (conn_id == 0) { daqiri::Status s = daqiri::Status::GENERIC_FAILURE; if (cfg.server) { @@ -154,44 +98,16 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa auto post_req = [&](int& outstanding, uint64_t& wr_id, daqiri::RDMAOpCode op, const std::string& mr_name) -> bool { - if (outstanding >= max_outstanding) { - if (profile) { - if (op == daqiri::RDMAOpCode::SEND) { - stats.send_post_skips++; - } else { - stats.recv_post_skips++; - } - } - return false; - } - - if (profile) { - if (op == daqiri::RDMAOpCode::SEND) { - stats.send_post_attempts++; - } else { - stats.recv_post_attempts++; - } - } + if (outstanding >= kMaxOutstanding) { return false; } - auto start = std::chrono::steady_clock::now(); auto* msg = daqiri::create_burst_params(); - if (profile) { stats.create_burst_ns += elapsed_ns(start); } - - start = std::chrono::steady_clock::now(); if (daqiri::rdma_set_header(msg, op, conn_id, cfg.server, 1, wr_id, mr_name) != daqiri::Status::SUCCESS) { - if (profile) { stats.set_header_ns += elapsed_ns(start); } daqiri::free_tx_burst(msg); return false; } - if (profile) { stats.set_header_ns += elapsed_ns(start); } - while (!stop.load()) { - start = std::chrono::steady_clock::now(); - const auto status = daqiri::get_tx_packet_burst(msg); - if (profile) { stats.get_tx_burst_ns += elapsed_ns(start); } - if (status == daqiri::Status::SUCCESS) { break; } - if (profile) { stats.get_tx_burst_waits++; } + while (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS && !stop.load()) { std::this_thread::sleep_for(std::chrono::microseconds(50)); } if (stop.load()) { @@ -199,33 +115,16 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa return false; } - start = std::chrono::steady_clock::now(); if (daqiri::set_packet_lengths(msg, 0, {cfg.message_size}) != daqiri::Status::SUCCESS) { - if (profile) { stats.set_lengths_ns += elapsed_ns(start); } daqiri::free_tx_burst(msg); return false; } - if (profile) { stats.set_lengths_ns += elapsed_ns(start); } - - start = std::chrono::steady_clock::now(); if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) { - if (profile) { stats.send_tx_burst_ns += elapsed_ns(start); } daqiri::free_tx_burst(msg); return false; } - if (profile) { stats.send_tx_burst_ns += elapsed_ns(start); } - outstanding++; wr_id++; - if (profile) { - if (op == daqiri::RDMAOpCode::SEND) { - stats.max_outstanding_send = std::max( - stats.max_outstanding_send, static_cast(outstanding)); - } else { - stats.max_outstanding_recv = std::max( - stats.max_outstanding_recv, static_cast(outstanding)); - } - } // Only meter actual byte transmissions (SENDs), not RECEIVE-side posts. if (op == daqiri::RDMAOpCode::SEND) { pacer.wait_for_bytes(static_cast(cfg.message_size), stop); @@ -236,19 +135,10 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa bool got_completion = false; while (true) { daqiri::BurstParams* completion = nullptr; - auto poll_start = std::chrono::steady_clock::now(); const auto completion_status = daqiri::get_rx_burst(&completion, conn_id, cfg.server); - if (profile) { - stats.completion_poll_ns += elapsed_ns(poll_start); - stats.completion_polls++; - } - if (completion_status != daqiri::Status::SUCCESS || completion == nullptr) { - if (profile) { stats.completion_misses++; } - break; - } + if (completion_status != daqiri::Status::SUCCESS || completion == nullptr) { break; } got_completion = true; - if (profile) { stats.completion_hits++; } if (daqiri::rdma_get_opcode(completion) == daqiri::RDMAOpCode::SEND && outstanding_send > 0) { outstanding_send--; stats.send_completions++; @@ -264,68 +154,24 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa bool posted_work = false; if (cfg.receive) { - while (outstanding_recv < max_outstanding && + while (outstanding_recv < kMaxOutstanding && post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr)) { posted_work = true; } - if (profile && outstanding_recv >= max_outstanding) { stats.recv_post_skips++; } } if (cfg.send) { - while (outstanding_send < max_outstanding && + while (outstanding_send < kMaxOutstanding && post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr)) { posted_work = true; } - if (profile && outstanding_send >= max_outstanding) { stats.send_post_skips++; } } if (!got_completion && !posted_work) { - if (profile) { stats.empty_completion_sleeps++; } - if (empty_poll_sleep_us > 0) { - std::this_thread::sleep_for(std::chrono::microseconds(empty_poll_sleep_us)); - } else { - std::this_thread::yield(); - } + std::this_thread::sleep_for(std::chrono::microseconds(100)); } } } -void print_worker_profile(const char* name, const RdmaWorkerStats& stats) { - const auto post_attempts = stats.send_post_attempts + stats.recv_post_attempts; - std::cout << name << " profile:" - << " loop_iters=" << stats.loop_iters - << " send_post_attempts=" << stats.send_post_attempts - << " recv_post_attempts=" << stats.recv_post_attempts - << " send_post_skips=" << stats.send_post_skips - << " recv_post_skips=" << stats.recv_post_skips - << " empty_poll_sleep_us=" - << (std::getenv("DAQIRI_RDMA_EMPTY_POLL_SLEEP_US") == nullptr - ? 100 - : std::max(0, std::stoi(std::getenv("DAQIRI_RDMA_EMPTY_POLL_SLEEP_US")))) - << " max_outstanding_limit=" - << (std::getenv("DAQIRI_RDMA_MAX_OUTSTANDING") == nullptr - ? 20 - : std::max(1, std::stoi(std::getenv("DAQIRI_RDMA_MAX_OUTSTANDING")))) - << " max_outstanding_send=" << stats.max_outstanding_send - << " max_outstanding_recv=" << stats.max_outstanding_recv - << " completion_polls=" << stats.completion_polls - << " completion_hits=" << stats.completion_hits - << " completion_misses=" << stats.completion_misses - << " get_tx_burst_waits=" << stats.get_tx_burst_waits - << " create_burst_avg_ns=" - << (post_attempts == 0 ? 0 : stats.create_burst_ns / post_attempts) - << " set_header_avg_ns=" - << (post_attempts == 0 ? 0 : stats.set_header_ns / post_attempts) - << " get_tx_burst_avg_ns=" - << (post_attempts == 0 ? 0 : stats.get_tx_burst_ns / post_attempts) - << " set_lengths_avg_ns=" - << (post_attempts == 0 ? 0 : stats.set_lengths_ns / post_attempts) - << " send_tx_burst_avg_ns=" - << (post_attempts == 0 ? 0 : stats.send_tx_burst_ns / post_attempts) - << " completion_poll_avg_ns=" - << (stats.completion_polls == 0 ? 0 : stats.completion_poll_ns / stats.completion_polls) - << '\n'; -} - } // namespace int main(int argc, char** argv) { @@ -408,9 +254,6 @@ int main(int argc, char** argv) { << " send_bytes=" << server_stats.send_bytes << " recv_bytes=" << server_stats.recv_bytes << " seconds=" << secs << '\n'; - if (std::getenv("DAQIRI_RDMA_BENCH_PROFILE") != nullptr) { - print_worker_profile("Server", server_stats); - } } if (run_client) { std::cout << "Client complete: send_completions=" << client_stats.send_completions @@ -418,9 +261,6 @@ int main(int argc, char** argv) { << " send_bytes=" << client_stats.send_bytes << " recv_bytes=" << client_stats.recv_bytes << " seconds=" << secs << '\n'; - if (std::getenv("DAQIRI_RDMA_BENCH_PROFILE") != nullptr) { - print_worker_profile("Client", client_stats); - } } daqiri::print_stats(); diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index 1ebbb7e..81574d4 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include #include @@ -399,46 +398,12 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { struct ibv_wc wc; int num_comp; BurstParams* msg; - struct RdmaThreadProfile { - uint64_t loop_iters = 0; - uint64_t rx_cq_poll_calls = 0; - uint64_t rx_cq_completions = 0; - uint64_t rx_cq_poll_ns = 0; - uint64_t tx_cq_poll_calls = 0; - uint64_t tx_cq_completions = 0; - uint64_t tx_cq_poll_ns = 0; - uint64_t rx_ring_enqueue_ns = 0; - uint64_t rx_ring_enqueues = 0; - uint64_t tx_ring_dequeue_ns = 0; - uint64_t tx_ring_dequeues = 0; - uint64_t tx_ring_empty = 0; - uint64_t send_requests = 0; - uint64_t recv_requests = 0; - uint64_t send_request_ns = 0; - uint64_t recv_request_ns = 0; - uint64_t post_send_calls = 0; - uint64_t post_send_signaled = 0; - uint64_t post_send_unsignaled = 0; - uint64_t post_send_ns = 0; - uint64_t post_recv_calls = 0; - uint64_t post_recv_ns = 0; - uint64_t max_outstanding_send = 0; - uint64_t max_outstanding_recv = 0; - } profile_stats; - const bool profile_enabled = std::getenv("DAQIRI_RDMA_PROFILE") != nullptr; int send_signal_every = 1; if (const char* env = std::getenv("DAQIRI_RDMA_SEND_SIGNAL_EVERY")) { const long parsed = strtol(env, nullptr, 10); if (parsed > 0) { send_signal_every = static_cast(parsed); } } uint64_t sends_since_signal = 0; - const auto thread_start = std::chrono::steady_clock::now(); - auto elapsed_ns = [](std::chrono::steady_clock::time_point start) { - return static_cast( - std::chrono::duration_cast( - std::chrono::steady_clock::now() - start) - .count()); - }; const auto& qref = cfg_.ifs_[tparams->if_idx].tx_.queues_[tparams->queue_idx]; const long cpu_core = strtol(qref.common_.cpu_core_.c_str(), NULL, 10); struct rte_ring* tx_ring = tparams->qp_params.tx_ring; @@ -452,42 +417,14 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { } DAQIRI_LOG_INFO("Affined {} RDMA thread to core {}", is_server ? "Server" : "Client", cpu_core); - if (profile_enabled) { - ibv_qp_attr qp_attr = {}; - ibv_qp_init_attr qp_init_attr = {}; - if (ibv_query_qp(tparams->client_id->qp, - &qp_attr, - IBV_QP_STATE | IBV_QP_PATH_MTU, - &qp_init_attr) == 0) { - DAQIRI_LOG_INFO("RDMA_QP_PROFILE role={} core={} qp_state={} path_mtu={}", - is_server ? "server" : "client", - cpu_core, - static_cast(qp_attr.qp_state), - static_cast(qp_attr.path_mtu)); - } else { - DAQIRI_LOG_WARN("Failed to query QP profile for role={} core={}: {}", - is_server ? "server" : "client", - cpu_core, - strerror(errno)); - } - } // Main TX loop. Wait for send requests from the transmitters to arrive for sending. Also // periodically poll the CQ. while (!rdma_force_quit.load()) { - if (profile_enabled) { profile_stats.loop_iters++; } // Check RQ first to reduce latency while (true) { - auto poll_start = std::chrono::steady_clock::now(); num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc); - if (profile_enabled) { - profile_stats.rx_cq_poll_calls++; - profile_stats.rx_cq_poll_ns += elapsed_ns(poll_start); - } if (num_comp == 0) { break; } - if (profile_enabled) { - profile_stats.rx_cq_completions += static_cast(num_comp); - } DAQIRI_LOG_DEBUG("GOT RX COMPLETION in thread {} core {} wrid {}", (void*)tparams->client_id, cpu_core, @@ -532,31 +469,18 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { msg->transport_hdr.tx = false; // msg->transport_hdr.wr_id = wc.wr_id; - auto enqueue_start = std::chrono::steady_clock::now(); if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); free_tx_burst(msg); free_tx_metadata(msg); return; } - if (profile_enabled) { - profile_stats.rx_ring_enqueue_ns += elapsed_ns(enqueue_start); - profile_stats.rx_ring_enqueues++; - } } // Check TX CQ for completion while (true) { - auto poll_start = std::chrono::steady_clock::now(); num_comp = ibv_poll_cq(tparams->qp_params.tx_cq, 1, &wc); - if (profile_enabled) { - profile_stats.tx_cq_poll_calls++; - profile_stats.tx_cq_poll_ns += elapsed_ns(poll_start); - } if (num_comp == 0) { break; } - if (profile_enabled) { - profile_stats.tx_cq_completions += static_cast(num_comp); - } DAQIRI_LOG_DEBUG("GOT TX COMPLETION in thread {} core {} wrid {}", (void*)tparams->client_id, cpu_core, @@ -598,17 +522,12 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { wc.status == IBV_WC_SUCCESS ? Status::SUCCESS : Status::GENERIC_FAILURE; msg->transport_hdr.server = is_server; - auto enqueue_start = std::chrono::steady_clock::now(); if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); free_tx_burst(msg); free_tx_metadata(msg); return; } - if (profile_enabled) { - profile_stats.rx_ring_enqueue_ns += elapsed_ns(enqueue_start); - profile_stats.rx_ring_enqueues++; - } } continue; } else { @@ -624,17 +543,12 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { msg->transport_hdr.server = is_server; // msg->transport_hdr.wr_id = wc.wr_id; - auto enqueue_start = std::chrono::steady_clock::now(); if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); free_tx_burst(msg); free_tx_metadata(msg); return; } - if (profile_enabled) { - profile_stats.rx_ring_enqueue_ns += elapsed_ns(enqueue_start); - profile_stats.rx_ring_enqueues++; - } } // Now handle any incoming messages @@ -642,18 +556,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { // ssize_t bytes = mq_receive(tparams.tx_mq, reinterpret_cast(&burst), sizeof(burst), // nullptr); - auto dequeue_start = std::chrono::steady_clock::now(); - if (rte_ring_dequeue(tparams->qp_params.tx_ring, reinterpret_cast(&burst)) != 0) { - if (profile_enabled) { - profile_stats.tx_ring_dequeue_ns += elapsed_ns(dequeue_start); - profile_stats.tx_ring_empty++; - } - continue; - } - if (profile_enabled) { - profile_stats.tx_ring_dequeue_ns += elapsed_ns(dequeue_start); - profile_stats.tx_ring_dequeues++; - } + if (rte_ring_dequeue(tx_ring, reinterpret_cast(&burst)) != 0) { continue; } const auto local_mr = mrs_.find(std::string(burst->transport_hdr.local_mr_name)); if (local_mr == mrs_.end()) { @@ -665,8 +568,6 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { switch (burst->transport_hdr.opcode) { case RDMAOpCode::SEND: { - const auto request_start = std::chrono::steady_clock::now(); - if (profile_enabled) { profile_stats.send_requests++; } // Get lkey for this PD auto pd = pd_map_.find(tparams->client_id->verbs); if (pd == pd_map_.end()) { @@ -704,12 +605,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { outstanding_send_wr_ids.size() >= 15; wr.send_flags = signal_send ? IBV_SEND_SIGNALED : 0; - auto post_start = std::chrono::steady_clock::now(); int ret = ibv_post_send(tparams->client_id->qp, &wr, &bad_wr); - if (profile_enabled) { - profile_stats.post_send_calls++; - profile_stats.post_send_ns += elapsed_ns(post_start); - } if (ret != 0) { DAQIRI_LOG_CRITICAL("Failed to post SEND request, errno: {}", strerror(errno)); free_tx_burst(burst); @@ -717,26 +613,16 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { } if (signal_send) { sends_since_signal = 0; - if (profile_enabled) { profile_stats.post_send_signaled++; } } else { sends_since_signal++; - if (profile_enabled) { profile_stats.post_send_unsignaled++; } } outstanding_send_wr_ids[burst->transport_hdr.wr_id + p] = burst; } - if (profile_enabled) { - profile_stats.max_outstanding_send = std::max( - profile_stats.max_outstanding_send, - static_cast(outstanding_send_wr_ids.size())); - profile_stats.send_request_ns += elapsed_ns(request_start); - } break; } case RDMAOpCode::RECEIVE: { - const auto request_start = std::chrono::steady_clock::now(); - if (profile_enabled) { profile_stats.recv_requests++; } // Get lkey for this PD auto pd = pd_map_.find(tparams->client_id->verbs); if (pd == pd_map_.end()) { @@ -774,12 +660,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { recv_wr.num_sge = 1; // Post the receive request - auto post_start = std::chrono::steady_clock::now(); ret = ibv_post_recv(tparams->client_id->qp, &recv_wr, &bad_wr); - if (profile_enabled) { - profile_stats.post_recv_calls++; - profile_stats.post_recv_ns += elapsed_ns(post_start); - } if (ret) { DAQIRI_LOG_CRITICAL("ibv_post_recv failed: {}", strerror(errno)); free_tx_burst(burst); @@ -788,12 +669,6 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { outstanding_receive_wr_ids[burst->transport_hdr.wr_id + p] = burst; } - if (profile_enabled) { - profile_stats.max_outstanding_recv = std::max( - profile_stats.max_outstanding_recv, - static_cast(outstanding_receive_wr_ids.size())); - profile_stats.recv_request_ns += elapsed_ns(request_start); - } break; } case RDMAOpCode::RDMA_WRITE: @@ -807,57 +682,6 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { } } - if (profile_enabled) { - const double elapsed_s = - std::chrono::duration(std::chrono::steady_clock::now() - thread_start).count(); - DAQIRI_LOG_INFO( - "RDMA_THREAD_PROFILE role={} core={} elapsed_s={:.6f} loop_iters={} tx_ring_empty={} " - "tx_ring_dequeues={} rx_cq_poll_calls={} rx_cq_completions={} tx_cq_poll_calls={} " - "tx_cq_completions={} rx_ring_enqueues={} send_requests={} recv_requests={} " - "post_send_calls={} post_send_signaled={} post_send_unsignaled={} post_recv_calls={} " - "max_outstanding_send={} max_outstanding_recv={} send_signal_every={} " - "tx_ring_dequeue_avg_ns={} rx_cq_poll_avg_ns={} tx_cq_poll_avg_ns={} " - "rx_ring_enqueue_avg_ns={} send_request_avg_ns={} recv_request_avg_ns={} " - "post_send_avg_ns={} post_recv_avg_ns={}", - is_server ? "server" : "client", - cpu_core, - elapsed_s, - profile_stats.loop_iters, - profile_stats.tx_ring_empty, - profile_stats.tx_ring_dequeues, - profile_stats.rx_cq_poll_calls, - profile_stats.rx_cq_completions, - profile_stats.tx_cq_poll_calls, - profile_stats.tx_cq_completions, - profile_stats.rx_ring_enqueues, - profile_stats.send_requests, - profile_stats.recv_requests, - profile_stats.post_send_calls, - profile_stats.post_send_signaled, - profile_stats.post_send_unsignaled, - profile_stats.post_recv_calls, - profile_stats.max_outstanding_send, - profile_stats.max_outstanding_recv, - send_signal_every, - (profile_stats.tx_ring_dequeues + profile_stats.tx_ring_empty) == 0 - ? 0 - : profile_stats.tx_ring_dequeue_ns / - (profile_stats.tx_ring_dequeues + profile_stats.tx_ring_empty), - profile_stats.rx_cq_poll_calls == 0 - ? 0 - : profile_stats.rx_cq_poll_ns / profile_stats.rx_cq_poll_calls, - profile_stats.tx_cq_poll_calls == 0 - ? 0 - : profile_stats.tx_cq_poll_ns / profile_stats.tx_cq_poll_calls, - profile_stats.rx_ring_enqueues == 0 - ? 0 - : profile_stats.rx_ring_enqueue_ns / profile_stats.rx_ring_enqueues, - profile_stats.send_requests == 0 ? 0 : profile_stats.send_request_ns / profile_stats.send_requests, - profile_stats.recv_requests == 0 ? 0 : profile_stats.recv_request_ns / profile_stats.recv_requests, - profile_stats.post_send_calls == 0 ? 0 : profile_stats.post_send_ns / profile_stats.post_send_calls, - profile_stats.post_recv_calls == 0 ? 0 : profile_stats.post_recv_ns / profile_stats.post_recv_calls); - } - DAQIRI_LOG_INFO("{} RDMA thread exiting on core {}", is_server ? "Server" : "Client", cpu_core); } From 6eca99084fdf58054dd1c637c33134159331b9c2 Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Wed, 27 May 2026 21:54:01 +0000 Subject: [PATCH 3/5] #15 - Avoid RDMA benchmark buffer starvation deadlock Treat TX buffer exhaustion as backpressure in the RDMA benchmark so the worker returns to completion draining before retrying. Also clean up RDMA TX packet burst allocations on partial failure. Signed-off-by: Cliff Burdick --- examples/rdma_bench.cpp | 19 +++++++------------ src/managers/rdma/daqiri_rdma_mgr.cpp | 20 ++++++++++---------- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/examples/rdma_bench.cpp b/examples/rdma_bench.cpp index 5002812..929b033 100644 --- a/examples/rdma_bench.cpp +++ b/examples/rdma_bench.cpp @@ -67,12 +67,10 @@ RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer, std::atomic& stop, RdmaWorkerStats& stats) { - // Matches the per-MR num_bufs in the YAML configs. Higher values deadlock - // the bench: post_req blocks in get_tx_packet_burst when the pool is empty, - // but free_tx_burst (which refills it) only runs later in the same loop - // iteration via get_rx_burst. Until the loop is refactored to interleave - // drain with post, this constant must stay <= num_bufs. - static constexpr int kMaxOutstanding = 20; + // Application credit limit. Buffer exhaustion is handled as backpressure so + // this can be larger than a memory region's num_bufs: post_req returns to the + // outer loop, which drains completions and releases buffers before retrying. + static constexpr int kMaxOutstanding = 64; int outstanding_send = 0; int outstanding_recv = 0; uint64_t send_wr_id = 0x1234; @@ -103,15 +101,12 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa auto* msg = daqiri::create_burst_params(); if (daqiri::rdma_set_header(msg, op, conn_id, cfg.server, 1, wr_id, mr_name) != daqiri::Status::SUCCESS) { - daqiri::free_tx_burst(msg); + daqiri::free_tx_metadata(msg); return false; } - while (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS && !stop.load()) { - std::this_thread::sleep_for(std::chrono::microseconds(50)); - } - if (stop.load()) { - daqiri::free_tx_burst(msg); + if (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS) { + daqiri::free_tx_metadata(msg); return false; } diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index 81574d4..aed4386 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -992,23 +992,23 @@ Status RdmaMgr::get_tx_packet_burst(BurstParams* burst) { return Status::NO_FREE_BURST_BUFFERS; } - int rx = rte_ring_dequeue_bulk(burst_pool->second, - reinterpret_cast(burst->pkts[0]), - burst->transport_hdr.num_pkts, - nullptr); - if (rx != burst->transport_hdr.num_pkts) { - DAQIRI_LOG_ERROR("Asked for {} packets, got {}", burst->transport_hdr.num_pkts, rx); - rte_ring_enqueue_bulk(burst_pool->second, - reinterpret_cast(burst->pkts[0]), - burst->transport_hdr.num_pkts, - nullptr); + const unsigned num_pkts = static_cast(burst->transport_hdr.num_pkts); + const unsigned rx = + rte_ring_dequeue_bulk(burst_pool->second, reinterpret_cast(burst->pkts[0]), num_pkts, nullptr); + if (rx != num_pkts) { + DAQIRI_LOG_ERROR("Asked for {} packets, got {}", num_pkts, rx); + rte_mempool_put(tx_burst_pool_, reinterpret_cast(burst->pkts[0])); + burst->pkts[0] = nullptr; return Status::NO_FREE_BURST_BUFFERS; } // Allocate packet length buffer if (rte_mempool_get(pkt_len_pool_, reinterpret_cast(&burst->pkt_lens[0])) != 0) { DAQIRI_LOG_ERROR("Failed to get packet length buffer"); + rte_ring_enqueue_bulk( + burst_pool->second, reinterpret_cast(burst->pkts[0]), num_pkts, nullptr); rte_mempool_put(tx_burst_pool_, reinterpret_cast(burst->pkts[0])); + burst->pkts[0] = nullptr; return Status::NO_FREE_PACKET_BUFFERS; } From 97c5a7c370b4bb8ffb4687f5fa16af1e350e1d2d Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Thu, 28 May 2026 00:31:49 +0000 Subject: [PATCH 4/5] #15 - Address Greptile RDMA review comments Fix RDMA completion enqueue failure cleanup to avoid returning metadata twice, and document the single-producer/single-consumer invariant for RDMA connection rings. Signed-off-by: Cliff Burdick --- src/managers/rdma/daqiri_rdma_mgr.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index aed4386..9da0b23 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -459,6 +459,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { } else { msg = create_burst_params(); } + const bool msg_owns_packets = wc.opcode == IBV_WC_RECV; // Only populate a header to indicate which burst needs to be freed // msg->transport_hdr.opcode = ibv_opcode_to_daqiri_opcode(wc.opcode); @@ -471,8 +472,11 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); - free_tx_burst(msg); - free_tx_metadata(msg); + if (msg_owns_packets) { + free_tx_burst(msg); + } else { + free_tx_metadata(msg); + } return; } } @@ -525,7 +529,6 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); free_tx_burst(msg); - free_tx_metadata(msg); return; } } @@ -545,7 +548,6 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); - free_tx_burst(msg); free_tx_metadata(msg); return; } @@ -1302,6 +1304,10 @@ int RdmaMgr::setup_pools_and_rings() { // RX rings DAQIRI_LOG_INFO("Setting up TX/RX per-queue rings"); + // Each connection ring is single-producer/single-consumer by design: one + // rdma_thread owns the manager side and one application thread owns the API + // side for a given conn_id. If multi-threaded app access per conn_id is ever + // allowed, these rings must go back to MP/MC-safe flags. for (int i = 0; i < MAX_RDMA_CONNECTIONS; i++) { std::string ring_name = "RX_RING_" + std::to_string(i); DAQIRI_LOG_DEBUG("Setting up RX ring {}", ring_name); From 6abd9b65c2b8cc5f09019112639ea504f71900cf Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Thu, 28 May 2026 16:56:12 +0000 Subject: [PATCH 5/5] #15 - Retire RDMA send completions on CQ errors Sweep outstanding SEND work requests through an errored CQE so selective signaling cannot orphan unsignaled bursts on TX errors. Signed-off-by: Cliff Burdick --- src/managers/rdma/daqiri_rdma_mgr.cpp | 75 +++++++++++++++------------ 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index 9da0b23..7b5160f 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -481,6 +481,44 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { } } + auto complete_send_wrs = [&](uint64_t wr_id, Status status) -> bool { + auto completed_end = outstanding_send_wr_ids.upper_bound(wr_id); + if (completed_end == outstanding_send_wr_ids.begin()) { + DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wr_id); + return true; + } + if (outstanding_send_wr_ids.find(wr_id) == outstanding_send_wr_ids.end()) { + DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wr_id); + } + + for (auto completed_it = outstanding_send_wr_ids.begin(); completed_it != completed_end;) { + msg = completed_it->second; + + const auto conn_id = get_connection_id(msg); + const auto expected_conn_id = reinterpret_cast(tparams->client_id); + if (conn_id != expected_conn_id) { + DAQIRI_LOG_CRITICAL("Wrong connection ID in send completion {}: {} != {}", + completed_it->first, + conn_id, + expected_conn_id); + } + + completed_it = outstanding_send_wr_ids.erase(completed_it); + + msg->transport_hdr.tx = true; + msg->transport_hdr.status = status; + msg->transport_hdr.server = is_server; + + if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { + DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); + free_tx_burst(msg); + return false; + } + } + + return true; + }; + // Check TX CQ for completion while (true) { num_comp = ibv_poll_cq(tparams->qp_params.tx_cq, 1, &wc); @@ -496,42 +534,15 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { (int)wc.status, (int64_t)wc.wr_id, (int)wc.opcode); + if (wc.opcode == IBV_WC_SEND && + !complete_send_wrs(wc.wr_id, Status::GENERIC_FAILURE)) { + return; + } continue; } if (wc.opcode == IBV_WC_SEND) { - auto it = outstanding_send_wr_ids.find(wc.wr_id); - if (it == outstanding_send_wr_ids.end()) { - DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wc.wr_id); - continue; - } - - auto completed_end = outstanding_send_wr_ids.upper_bound(wc.wr_id); - for (auto completed_it = outstanding_send_wr_ids.begin(); completed_it != completed_end;) { - msg = completed_it->second; - - const auto conn_id = get_connection_id(msg); - const auto expected_conn_id = reinterpret_cast(tparams->client_id); - if (conn_id != expected_conn_id) { - DAQIRI_LOG_CRITICAL("Wrong connection ID in send completion {}: {} != {}", - completed_it->first, - conn_id, - expected_conn_id); - } - - completed_it = outstanding_send_wr_ids.erase(completed_it); - - msg->transport_hdr.tx = true; - msg->transport_hdr.status = - wc.status == IBV_WC_SUCCESS ? Status::SUCCESS : Status::GENERIC_FAILURE; - msg->transport_hdr.server = is_server; - - if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { - DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); - free_tx_burst(msg); - return; - } - } + if (!complete_send_wrs(wc.wr_id, Status::SUCCESS)) { return; } continue; } else { msg = create_burst_params();