Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/tutorials/benchmarking_examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ After having modified the configuration file, ensure you have connected an SFP c

By default the application runs for 10 seconds and then exits. You can change the duration by passing `--seconds <N>` after the YAML path, or stop it gracefully at any time with `Ctrl-C`.

## Tune RDMA SEND completion signaling

The RDMA manager signals every SEND work request by default. For `daqiri_bench_rdma`
runs where CQ polling overhead is part of the bottleneck investigation, set
`DAQIRI_RDMA_SEND_SIGNAL_EVERY=N` to request one signaled SEND every `N` posts:

```bash
DAQIRI_RDMA_SEND_SIGNAL_EVERY=16 \
/opt/daqiri/bin/daqiri_bench_rdma /opt/daqiri/bin/daqiri_bench_rdma_tx_rx.yaml
```

Larger values reduce SEND completion traffic, but completions also drive
application credits and buffer recycling in the benchmark. Keep the variable
unset, or set it to `1`, for baseline measurements.

## Watch live OpenTelemetry metrics in Grafana

DAQIRI can expose the raw benchmark counters through OpenTelemetry when metrics
Expand Down
64 changes: 36 additions & 28 deletions examples/rdma_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,10 @@ RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) {

void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer,
std::atomic<bool>& stop, RdmaWorkerStats& stats) {
// Matches the per-MR num_bufs in the YAML configs. Higher values deadlock
// the bench: post_req blocks in get_tx_packet_burst when the pool is empty,
// but free_tx_burst (which refills it) only runs later in the same loop
// iteration via get_rx_burst. Until the loop is refactored to interleave
// drain with post, this constant must stay <= num_bufs.
static constexpr int kMaxOutstanding = 20;
// Application credit limit. Buffer exhaustion is handled as backpressure so
// this can be larger than a memory region's num_bufs: post_req returns to the
// outer loop, which drains completions and releases buffers before retrying.
static constexpr int kMaxOutstanding = 64;
int outstanding_send = 0;
int outstanding_recv = 0;
uint64_t send_wr_id = 0x1234;
Expand All @@ -97,50 +95,44 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa
}

auto post_req = [&](int& outstanding, uint64_t& wr_id, daqiri::RDMAOpCode op,
const std::string& mr_name) {
if (outstanding >= kMaxOutstanding) { return; }
const std::string& mr_name) -> bool {
if (outstanding >= kMaxOutstanding) { return false; }

auto* msg = daqiri::create_tx_burst_params();
if (msg == nullptr) { return; }
if (msg == nullptr) { return false; }

if (daqiri::rdma_set_header(msg, op, conn_id, cfg.server, 1, wr_id, mr_name) !=
daqiri::Status::SUCCESS) {
daqiri::free_tx_metadata(msg);
return;
return false;
}

bool has_packets = false;
while (!stop.load()) {
if (daqiri::get_tx_packet_burst(msg) == daqiri::Status::SUCCESS) {
has_packets = true;
break;
}
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
if (!has_packets) {
if (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS) {
daqiri::free_tx_metadata(msg);
return;
return false;
}

if (daqiri::set_packet_lengths(msg, 0, {cfg.message_size}) != daqiri::Status::SUCCESS) {
daqiri::free_tx_burst(msg);
return;
return false;
}
if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) { return; }
if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) { return false; }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 After get_tx_packet_burst succeeds, msg owns allocated packet buffers. If send_tx_burst fails, returning false without freeing msg silently drains the TX buffer pool — subsequent calls to get_tx_packet_burst will eventually return NO_FREE_BURST_BUFFERS, making the benchmark appear to stall without a clear error.

Suggested change
if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) { return false; }
if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) {
daqiri::free_tx_burst(msg);
return false;
}

Rule Used: BurstParams is a zero-copy batch — the caller MUST... (source)

outstanding++;
wr_id++;
// Only meter actual byte transmissions (SENDs), not RECEIVE-side posts.
if (op == daqiri::RDMAOpCode::SEND) {
pacer.wait_for_bytes(static_cast<size_t>(cfg.message_size), stop);
}
return true;
};

if (cfg.send) { post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr); }
if (cfg.receive) { post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr); }
bool got_completion = false;
while (true) {
daqiri::BurstParams* completion = nullptr;
const auto completion_status = daqiri::get_rx_burst(&completion, conn_id, cfg.server);
if (completion_status != daqiri::Status::SUCCESS || completion == nullptr) { break; }

daqiri::BurstParams* completion = nullptr;
if (daqiri::get_rx_burst(&completion, conn_id, cfg.server) == daqiri::Status::SUCCESS &&
completion != nullptr) {
got_completion = true;
if (daqiri::rdma_get_opcode(completion) == daqiri::RDMAOpCode::SEND && outstanding_send > 0) {
outstanding_send--;
stats.send_completions++;
Expand All @@ -152,7 +144,23 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa
stats.recv_bytes += static_cast<uint64_t>(cfg.message_size);
}
daqiri::free_tx_burst(completion);
} else {
}

bool posted_work = false;
if (cfg.receive) {
while (outstanding_recv < kMaxOutstanding &&
post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr)) {
posted_work = true;
}
}
if (cfg.send) {
while (outstanding_send < kMaxOutstanding &&
post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr)) {
posted_work = true;
}
}

if (!got_completion && !posted_work) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
Expand Down
Loading
Loading