Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions examples/rdma_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,10 @@ RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) {

void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer,
std::atomic<bool>& stop, RdmaWorkerStats& stats) {
// Matches the per-MR num_bufs in the YAML configs. Higher values deadlock
// the bench: post_req blocks in get_tx_packet_burst when the pool is empty,
// but free_tx_burst (which refills it) only runs later in the same loop
// iteration via get_rx_burst. Until the loop is refactored to interleave
// drain with post, this constant must stay <= num_bufs.
static constexpr int kMaxOutstanding = 20;
// Application credit limit. Buffer exhaustion is handled as backpressure so
// this can be larger than a memory region's num_bufs: post_req returns to the
// outer loop, which drains completions and releases buffers before retrying.
static constexpr int kMaxOutstanding = 64;
int outstanding_send = 0;
int outstanding_recv = 0;
uint64_t send_wr_id = 0x1234;
Expand All @@ -97,46 +95,45 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa
}

auto post_req = [&](int& outstanding, uint64_t& wr_id, daqiri::RDMAOpCode op,
const std::string& mr_name) {
if (outstanding >= kMaxOutstanding) { return; }
const std::string& mr_name) -> bool {
if (outstanding >= kMaxOutstanding) { return false; }

auto* msg = daqiri::create_burst_params();
if (daqiri::rdma_set_header(msg, op, conn_id, cfg.server, 1, wr_id, mr_name) !=
daqiri::Status::SUCCESS) {
daqiri::free_tx_burst(msg);
return;
daqiri::free_tx_metadata(msg);
return false;
}

while (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS && !stop.load()) {
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
if (stop.load()) {
daqiri::free_tx_burst(msg);
return;
if (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS) {
daqiri::free_tx_metadata(msg);
return false;
}

if (daqiri::set_packet_lengths(msg, 0, {cfg.message_size}) != daqiri::Status::SUCCESS) {
daqiri::free_tx_burst(msg);
return;
return false;
}
if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) {
daqiri::free_tx_burst(msg);
return;
return false;
}
outstanding++;
wr_id++;
// Only meter actual byte transmissions (SENDs), not RECEIVE-side posts.
if (op == daqiri::RDMAOpCode::SEND) {
pacer.wait_for_bytes(static_cast<size_t>(cfg.message_size), stop);
}
return true;
};

if (cfg.send) { post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr); }
if (cfg.receive) { post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr); }
bool got_completion = false;
while (true) {
daqiri::BurstParams* completion = nullptr;
const auto completion_status = daqiri::get_rx_burst(&completion, conn_id, cfg.server);
if (completion_status != daqiri::Status::SUCCESS || completion == nullptr) { break; }

daqiri::BurstParams* completion = nullptr;
if (daqiri::get_rx_burst(&completion, conn_id, cfg.server) == daqiri::Status::SUCCESS &&
completion != nullptr) {
got_completion = true;
if (daqiri::rdma_get_opcode(completion) == daqiri::RDMAOpCode::SEND && outstanding_send > 0) {
outstanding_send--;
stats.send_completions++;
Expand All @@ -148,7 +145,23 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa
stats.recv_bytes += static_cast<uint64_t>(cfg.message_size);
}
daqiri::free_tx_burst(completion);
} else {
}

bool posted_work = false;
if (cfg.receive) {
while (outstanding_recv < kMaxOutstanding &&
post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr)) {
posted_work = true;
}
}
if (cfg.send) {
while (outstanding_send < kMaxOutstanding &&
post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr)) {
posted_work = true;
}
}

if (!got_completion && !posted_work) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
Expand Down
Loading
Loading