diff --git a/examples/rdma_bench.cpp b/examples/rdma_bench.cpp index 41c7b3f..929b033 100644 --- a/examples/rdma_bench.cpp +++ b/examples/rdma_bench.cpp @@ -67,12 +67,10 @@ RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer, std::atomic& stop, RdmaWorkerStats& stats) { - // Matches the per-MR num_bufs in the YAML configs. Higher values deadlock - // the bench: post_req blocks in get_tx_packet_burst when the pool is empty, - // but free_tx_burst (which refills it) only runs later in the same loop - // iteration via get_rx_burst. Until the loop is refactored to interleave - // drain with post, this constant must stay <= num_bufs. - static constexpr int kMaxOutstanding = 20; + // Application credit limit. Buffer exhaustion is handled as backpressure so + // this can be larger than a memory region's num_bufs: post_req returns to the + // outer loop, which drains completions and releases buffers before retrying. + static constexpr int kMaxOutstanding = 64; int outstanding_send = 0; int outstanding_recv = 0; uint64_t send_wr_id = 0x1234; @@ -97,31 +95,28 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa } auto post_req = [&](int& outstanding, uint64_t& wr_id, daqiri::RDMAOpCode op, - const std::string& mr_name) { - if (outstanding >= kMaxOutstanding) { return; } + const std::string& mr_name) -> bool { + if (outstanding >= kMaxOutstanding) { return false; } auto* msg = daqiri::create_burst_params(); if (daqiri::rdma_set_header(msg, op, conn_id, cfg.server, 1, wr_id, mr_name) != daqiri::Status::SUCCESS) { - daqiri::free_tx_burst(msg); - return; + daqiri::free_tx_metadata(msg); + return false; } - while (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS && !stop.load()) { - std::this_thread::sleep_for(std::chrono::microseconds(50)); - } - if (stop.load()) { - daqiri::free_tx_burst(msg); - return; + if (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS) { + daqiri::free_tx_metadata(msg); + return false; } if (daqiri::set_packet_lengths(msg, 0, {cfg.message_size}) != daqiri::Status::SUCCESS) { daqiri::free_tx_burst(msg); - return; + return false; } if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) { daqiri::free_tx_burst(msg); - return; + return false; } outstanding++; wr_id++; @@ -129,14 +124,16 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa if (op == daqiri::RDMAOpCode::SEND) { pacer.wait_for_bytes(static_cast(cfg.message_size), stop); } + return true; }; - if (cfg.send) { post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr); } - if (cfg.receive) { post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr); } + bool got_completion = false; + while (true) { + daqiri::BurstParams* completion = nullptr; + const auto completion_status = daqiri::get_rx_burst(&completion, conn_id, cfg.server); + if (completion_status != daqiri::Status::SUCCESS || completion == nullptr) { break; } - daqiri::BurstParams* completion = nullptr; - if (daqiri::get_rx_burst(&completion, conn_id, cfg.server) == daqiri::Status::SUCCESS && - completion != nullptr) { + got_completion = true; if (daqiri::rdma_get_opcode(completion) == daqiri::RDMAOpCode::SEND && outstanding_send > 0) { outstanding_send--; stats.send_completions++; @@ -148,7 +145,23 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa stats.recv_bytes += static_cast(cfg.message_size); } daqiri::free_tx_burst(completion); - } else { + } + + bool posted_work = false; + if (cfg.receive) { + while (outstanding_recv < kMaxOutstanding && + post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr)) { + posted_work = true; + } + } + if (cfg.send) { + while (outstanding_send < kMaxOutstanding && + post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr)) { + posted_work = true; + } + } + + if (!got_completion && !posted_work) { std::this_thread::sleep_for(std::chrono::microseconds(100)); } } diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index d5bb676..7b5160f 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -20,6 +20,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -154,6 +157,7 @@ int RdmaMgr::mr_access_to_ibv(uint32_t access) { if (access & MEM_ACCESS_LOCAL) { ibv_access |= IBV_ACCESS_LOCAL_WRITE; } if (access & MEM_ACCESS_RDMA_READ) { ibv_access |= IBV_ACCESS_REMOTE_READ; } if (access & MEM_ACCESS_RDMA_WRITE) { ibv_access |= IBV_ACCESS_REMOTE_WRITE; } + ibv_access |= IBV_ACCESS_RELAXED_ORDERING; return ibv_access; } @@ -169,6 +173,17 @@ int RdmaMgr::rdma_register_mr(const MemoryRegionConfig& mr, void* ptr) { int access = mr_access_to_ibv(mr.access_); params.ctx_mr_map_[pd.second] = ibv_reg_mr(pd.second, ptr, mr.adj_size_ * mr.num_bufs_, access); + if (params.ctx_mr_map_[pd.second] == nullptr && + (access & IBV_ACCESS_RELAXED_ORDERING) != 0) { + const int fallback_access = access & ~IBV_ACCESS_RELAXED_ORDERING; + DAQIRI_LOG_WARN( + "Failed to register MR {} with relaxed ordering on PD {}; retrying without it", + mr.name_, + (void*)pd.second); + params.ctx_mr_map_[pd.second] = + ibv_reg_mr(pd.second, ptr, mr.adj_size_ * mr.num_bufs_, fallback_access); + access = fallback_access; + } if (params.ctx_mr_map_[pd.second] == nullptr) { DAQIRI_LOG_CRITICAL("Failed to register MR {} on PD {}", mr.name_, (void*)pd.second); return -1; @@ -181,7 +196,7 @@ int RdmaMgr::rdma_register_mr(const MemoryRegionConfig& mr, void* ptr) { (void*)ptr, (void*)((uint8_t*)ptr + mr.adj_size_ * mr.num_bufs_), params.ctx_mr_map_[pd.second]->lkey, - mr.access_); + access); } } } @@ -383,11 +398,17 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { struct ibv_wc wc; int num_comp; BurstParams* msg; + int send_signal_every = 1; + if (const char* env = std::getenv("DAQIRI_RDMA_SEND_SIGNAL_EVERY")) { + const long parsed = strtol(env, nullptr, 10); + if (parsed > 0) { send_signal_every = static_cast(parsed); } + } + uint64_t sends_since_signal = 0; const auto& qref = cfg_.ifs_[tparams->if_idx].tx_.queues_[tparams->queue_idx]; const long cpu_core = strtol(qref.common_.cpu_core_.c_str(), NULL, 10); struct rte_ring* tx_ring = tparams->qp_params.tx_ring; struct rte_ring* rx_ring = tparams->qp_params.rx_ring; - std::unordered_map outstanding_send_wr_ids; + std::map outstanding_send_wr_ids; std::unordered_map outstanding_receive_wr_ids; if (set_affinity(cpu_core) != 0) { @@ -401,7 +422,9 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { // periodically poll the CQ. while (!rdma_force_quit.load()) { // Check RQ first to reduce latency - while ((num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc)) != 0) { + while (true) { + num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc); + if (num_comp == 0) { break; } DAQIRI_LOG_DEBUG("GOT RX COMPLETION in thread {} core {} wrid {}", (void*)tparams->client_id, cpu_core, @@ -436,6 +459,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { } else { msg = create_burst_params(); } + const bool msg_owns_packets = wc.opcode == IBV_WC_RECV; // Only populate a header to indicate which burst needs to be freed // msg->transport_hdr.opcode = ibv_opcode_to_daqiri_opcode(wc.opcode); @@ -448,14 +472,57 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); - free_tx_burst(msg); - free_tx_metadata(msg); + if (msg_owns_packets) { + free_tx_burst(msg); + } else { + free_tx_metadata(msg); + } return; } } + auto complete_send_wrs = [&](uint64_t wr_id, Status status) -> bool { + auto completed_end = outstanding_send_wr_ids.upper_bound(wr_id); + if (completed_end == outstanding_send_wr_ids.begin()) { + DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wr_id); + return true; + } + if (outstanding_send_wr_ids.find(wr_id) == outstanding_send_wr_ids.end()) { + DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wr_id); + } + + for (auto completed_it = outstanding_send_wr_ids.begin(); completed_it != completed_end;) { + msg = completed_it->second; + + const auto conn_id = get_connection_id(msg); + const auto expected_conn_id = reinterpret_cast(tparams->client_id); + if (conn_id != expected_conn_id) { + DAQIRI_LOG_CRITICAL("Wrong connection ID in send completion {}: {} != {}", + completed_it->first, + conn_id, + expected_conn_id); + } + + completed_it = outstanding_send_wr_ids.erase(completed_it); + + msg->transport_hdr.tx = true; + msg->transport_hdr.status = status; + msg->transport_hdr.server = is_server; + + if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { + DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); + free_tx_burst(msg); + return false; + } + } + + return true; + }; + // Check TX CQ for completion - while ((num_comp = ibv_poll_cq(tparams->qp_params.tx_cq, 1, &wc)) != 0) { + while (true) { + num_comp = ibv_poll_cq(tparams->qp_params.tx_cq, 1, &wc); + if (num_comp == 0) { break; } DAQIRI_LOG_DEBUG("GOT TX COMPLETION in thread {} core {} wrid {}", (void*)tparams->client_id, cpu_core, @@ -467,28 +534,16 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { (int)wc.status, (int64_t)wc.wr_id, (int)wc.opcode); + if (wc.opcode == IBV_WC_SEND && + !complete_send_wrs(wc.wr_id, Status::GENERIC_FAILURE)) { + return; + } continue; } if (wc.opcode == IBV_WC_SEND) { - auto it = outstanding_send_wr_ids.find(wc.wr_id); - if (it == outstanding_send_wr_ids.end()) { - DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wc.wr_id); - continue; - } - - msg = it->second; - - const auto conn_id = get_connection_id(msg); - const auto expected_conn_id = reinterpret_cast(tparams->client_id); - if (conn_id != expected_conn_id) { - DAQIRI_LOG_CRITICAL("Wrong connection ID in send completion {}: {} != {}", - wc.wr_id, - conn_id, - expected_conn_id); - } - - outstanding_send_wr_ids.erase(it); + if (!complete_send_wrs(wc.wr_id, Status::SUCCESS)) { return; } + continue; } else { msg = create_burst_params(); } @@ -504,7 +559,6 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); - free_tx_burst(msg); free_tx_metadata(msg); return; } @@ -515,9 +569,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { // ssize_t bytes = mq_receive(tparams.tx_mq, reinterpret_cast(&burst), sizeof(burst), // nullptr); - if (rte_ring_dequeue(tparams->qp_params.tx_ring, reinterpret_cast(&burst)) != 0) { - continue; - } + if (rte_ring_dequeue(tx_ring, reinterpret_cast(&burst)) != 0) { continue; } const auto local_mr = mrs_.find(std::string(burst->transport_hdr.local_mr_name)); if (local_mr == mrs_.end()) { @@ -559,7 +611,12 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { wr.sg_list = &sge; wr.num_sge = 1; wr.opcode = IBV_WR_SEND; - wr.send_flags = IBV_SEND_SIGNALED; + // Keep signaled completions frequent enough for callers that use SEND + // completions as their credit to refill the outstanding window. + const bool signal_send = send_signal_every == 1 || + sends_since_signal + 1 >= send_signal_every || + outstanding_send_wr_ids.size() >= 15; + wr.send_flags = signal_send ? IBV_SEND_SIGNALED : 0; int ret = ibv_post_send(tparams->client_id->qp, &wr, &bad_wr); if (ret != 0) { @@ -567,6 +624,11 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { free_tx_burst(burst); continue; } + if (signal_send) { + sends_since_signal = 0; + } else { + sends_since_signal++; + } outstanding_send_wr_ids[burst->transport_hdr.wr_id + p] = burst; } @@ -681,12 +743,20 @@ Status RdmaMgr::rdma_get_server_conn_id(const std::string& server_addr, uint16_t // Find the next queue ID that's not already in use for (size_t i = 0; i < server_params->second.size(); i++) { if (server_params->second[i].client_id != nullptr && !server_params->second[i].active) { - *conn_id = reinterpret_cast(server_params->second[i].client_id); + auto* client_id = server_params->second[i].client_id; + std::lock_guard lock(threads_mutex_); + if (worker_threads_.find(client_id) == worker_threads_.end() || + tx_rings_map_.find(client_id) == tx_rings_map_.end() || + rx_rings_map_.find(client_id) == rx_rings_map_.end()) { + continue; + } + + *conn_id = reinterpret_cast(client_id); server_params->second[i].active = true; DAQIRI_LOG_INFO("Found available queue ID for server {}:{} with cm_id {}", server_addr, server_port, - (void*)server_params->second[i].client_id); + (void*)client_id); return Status::SUCCESS; } } @@ -935,23 +1005,23 @@ Status RdmaMgr::get_tx_packet_burst(BurstParams* burst) { return Status::NO_FREE_BURST_BUFFERS; } - int rx = rte_ring_dequeue_bulk(burst_pool->second, - reinterpret_cast(burst->pkts[0]), - burst->transport_hdr.num_pkts, - nullptr); - if (rx != burst->transport_hdr.num_pkts) { - DAQIRI_LOG_ERROR("Asked for {} packets, got {}", burst->transport_hdr.num_pkts, rx); - rte_ring_enqueue_bulk(burst_pool->second, - reinterpret_cast(burst->pkts[0]), - burst->transport_hdr.num_pkts, - nullptr); + const unsigned num_pkts = static_cast(burst->transport_hdr.num_pkts); + const unsigned rx = + rte_ring_dequeue_bulk(burst_pool->second, reinterpret_cast(burst->pkts[0]), num_pkts, nullptr); + if (rx != num_pkts) { + DAQIRI_LOG_ERROR("Asked for {} packets, got {}", num_pkts, rx); + rte_mempool_put(tx_burst_pool_, reinterpret_cast(burst->pkts[0])); + burst->pkts[0] = nullptr; return Status::NO_FREE_BURST_BUFFERS; } // Allocate packet length buffer if (rte_mempool_get(pkt_len_pool_, reinterpret_cast(&burst->pkt_lens[0])) != 0) { DAQIRI_LOG_ERROR("Failed to get packet length buffer"); + rte_ring_enqueue_bulk( + burst_pool->second, reinterpret_cast(burst->pkts[0]), num_pkts, nullptr); rte_mempool_put(tx_burst_pool_, reinterpret_cast(burst->pkts[0])); + burst->pkts[0] = nullptr; return Status::NO_FREE_PACKET_BUFFERS; } @@ -1245,11 +1315,15 @@ int RdmaMgr::setup_pools_and_rings() { // RX rings DAQIRI_LOG_INFO("Setting up TX/RX per-queue rings"); + // Each connection ring is single-producer/single-consumer by design: one + // rdma_thread owns the manager side and one application thread owns the API + // side for a given conn_id. If multi-threaded app access per conn_id is ever + // allowed, these rings must go back to MP/MC-safe flags. for (int i = 0; i < MAX_RDMA_CONNECTIONS; i++) { std::string ring_name = "RX_RING_" + std::to_string(i); DAQIRI_LOG_DEBUG("Setting up RX ring {}", ring_name); - struct rte_ring* ring = rte_ring_create( - ring_name.c_str(), 2048, rte_socket_id(), RING_F_MC_RTS_DEQ | RING_F_MP_RTS_ENQ); + struct rte_ring* ring = + rte_ring_create(ring_name.c_str(), 2048, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); if (ring == nullptr) { DAQIRI_LOG_CRITICAL("Failed to allocate RX ring {}!", ring_name); return -1; @@ -1259,7 +1333,7 @@ int RdmaMgr::setup_pools_and_rings() { ring_name = "TX_RING_" + std::to_string(i); DAQIRI_LOG_DEBUG("Setting up TX ring {}", ring_name); ring = rte_ring_create( - ring_name.c_str(), 2048, rte_socket_id(), RING_F_MC_RTS_DEQ | RING_F_MP_RTS_ENQ); + ring_name.c_str(), 2048, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); if (ring == nullptr) { DAQIRI_LOG_CRITICAL("Failed to allocate TX ring {}!", ring_name); return -1; @@ -1270,7 +1344,7 @@ int RdmaMgr::setup_pools_and_rings() { // Packet length buffers DAQIRI_LOG_DEBUG("Setting up RX meta pool"); pkt_len_pool_ = rte_mempool_create("PKT_LEN_POOL", - (1U << 7) - 1U, + (1U << 11) - 1U, sizeof(uint32_t) * MAX_RDMA_BATCH, 0, 0, @@ -1287,7 +1361,7 @@ int RdmaMgr::setup_pools_and_rings() { DAQIRI_LOG_DEBUG("Setting up RX meta pool"); rx_meta = rte_mempool_create("RX_META_POOL", - (1U << 6) - 1U, + (1U << 11) - 1U, sizeof(BurstParams), 0, 0, @@ -1304,7 +1378,7 @@ int RdmaMgr::setup_pools_and_rings() { DAQIRI_LOG_DEBUG("Setting up TX meta pool"); tx_meta = rte_mempool_create("TX_META_POOL", - (1U << 6) - 1U, + (1U << 11) - 1U, sizeof(BurstParams), 0, 0, @@ -1320,7 +1394,7 @@ int RdmaMgr::setup_pools_and_rings() { } tx_burst_pool_ = rte_mempool_create("TX_BURST_POOL", - (1U << 7) - 1U, + (1U << 11) - 1U, sizeof(void*) * MAX_RDMA_BATCH, 0, 0, @@ -1484,7 +1558,8 @@ void RdmaMgr::initialize() { // Set up memory region sizes for (auto& mr : cfg_.mrs_) { - mr.second.adj_size_ = RTE_ALIGN_CEIL(mr.second.buf_size_, get_alignment(mr.second.kind_)); + const size_t rdma_alignment = std::max(get_alignment(mr.second.kind_), GPU_PAGE_SIZE); + mr.second.adj_size_ = RTE_ALIGN_CEIL(mr.second.buf_size_, rdma_alignment); } for (const auto& intf : cfg_.ifs_) { diff --git a/src/managers/rdma/daqiri_rdma_mgr.h b/src/managers/rdma/daqiri_rdma_mgr.h index c3499e1..cd12dfd 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.h +++ b/src/managers/rdma/daqiri_rdma_mgr.h @@ -162,11 +162,11 @@ class RdmaMgr : public Manager { private: static constexpr int MAX_RDMA_CONNECTIONS = 128; - static constexpr int MAX_CQ = 16; + static constexpr int MAX_CQ = 1024; static constexpr int NUM_SGE_ELS = 1024; static constexpr int NUM_SGE_BUFS = 256; static constexpr int MAX_NUM_MR = 16; // Maximum number of memory registers to exchange - static constexpr int MAX_OUSTANDING_WR = 64; + static constexpr int MAX_OUSTANDING_WR = 1024; static constexpr int MAX_NUM_PORTS = 4; static constexpr int MAX_RDMA_BATCH = 1024;