Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,19 @@ add_daqiri_raw_bench(daqiri_bench_raw_reorder_quantize raw_reorder_quantize_benc
add_daqiri_raw_bench(daqiri_example_gds_write gds_write_example.cpp)
add_daqiri_raw_bench(daqiri_example_pcap_writer pcap_writer_example.cpp)

add_executable(daqiri_bench_rdma rdma_bench.cpp)
add_executable(daqiri_bench_rdma rdma_bench.cpp raw_bench_common.cpp)
link_daqiri_bench(daqiri_bench_rdma)
target_link_libraries(daqiri_bench_rdma PRIVATE CUDA::cudart)
set_target_properties(daqiri_bench_rdma PROPERTIES
BUILD_RPATH "$ORIGIN/../src;$ORIGIN/../src/third_party/yaml-cpp"
)

add_executable(daqiri_bench_socket socket_bench.cpp)
add_executable(daqiri_bench_socket socket_bench.cpp raw_bench_common.cpp)
link_daqiri_bench(daqiri_bench_socket)
target_link_libraries(daqiri_bench_socket PRIVATE CUDA::cudart)
set_target_properties(daqiri_bench_socket PROPERTIES
BUILD_RPATH "$ORIGIN/../src;$ORIGIN/../src/third_party/yaml-cpp"
)

foreach(cfg IN LISTS DAQIRI_BENCH_CONFIGS)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/${cfg} ${CMAKE_CURRENT_BINARY_DIR}/${cfg} COPYONLY)
Expand Down
116 changes: 116 additions & 0 deletions examples/bench_capture_environment.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#!/usr/bin/env bash
#
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Capture host/NIC/GPU/build state for a benchmark run, so numbers are
# reproducible across machines and over time. Writes one structured text file
# with named sections.
#
# Usage: ./bench_capture_environment.sh <output_dir>
# Default output dir: bench-results/<UTC timestamp>/

set -u

OUT_DIR="${1:-bench-results/$(date -u +%Y%m%dT%H%M%SZ)}"
mkdir -p "$OUT_DIR"
OUT="$OUT_DIR/environment.txt"

# Run a command, capturing exit status. Always write a header so the section is
# present even when the command is missing or fails — silent absence is harder
# to debug than an explicit "command not found".
run_section() {
local label="$1"; shift
{
echo "=========================================================="
echo "[$label]"
echo " cmd: $*"
echo "=========================================================="
if command -v "$1" >/dev/null 2>&1 || [[ "$1" == /* || "$1" == ./* ]]; then
"$@" 2>&1
echo " (exit: $?)"
else
echo " (command not found in PATH: $1)"
fi
echo
} >> "$OUT"
}

# Cat a file/glob; write a header either way.
cat_section() {
local label="$1"; shift
{
echo "=========================================================="
echo "[$label]"
echo " paths: $*"
echo "=========================================================="
for p in "$@"; do
if compgen -G "$p" >/dev/null; then
for f in $p; do
echo "----- $f -----"
cat "$f" 2>&1
done
else
echo " (no match: $p)"
fi
done
echo
} >> "$OUT"
}

: > "$OUT"

echo "DAQIRI benchmark environment capture" >> "$OUT"
echo "Generated: $(date -u +%Y-%m-%dT%H:%M:%SZ)" >> "$OUT"
echo "Host: $(hostname)" >> "$OUT"
echo "Output: $OUT" >> "$OUT"
echo >> "$OUT"

# --- Kernel / OS ---
run_section "uname" uname -a
cat_section "kernel-cmdline" /proc/cmdline
cat_section "os-release" /etc/os-release
run_section "lsb-release" lsb_release -a
run_section "clocksource" cat /sys/devices/system/clocksource/clocksource0/current_clocksource

# --- CPU / NUMA / IRQ ---
run_section "numactl" numactl --show
run_section "lscpu" lscpu
cat_section "cpu-isolated" /sys/devices/system/cpu/isolated
run_section "cpufreq-info" cpupower frequency-info
cat_section "cpu-governor" /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor
run_section "irq-mlx5" bash -c "grep mlx5 /proc/interrupts || true"

# --- Hugepages ---
cat_section "hugepages" /sys/kernel/mm/hugepages/*/nr_hugepages
run_section "free-h" free -h

# --- PCIe topology ---
run_section "lspci-mellanox" bash -c "lspci -vvv -d 15b3: 2>/dev/null"
run_section "lspci-nvidia" bash -c "lspci -vvv -d 10de: 2>/dev/null"

# --- NIC: OFED / firmware / DPDK binding ---
run_section "ofed-info" ofed_info -s
run_section "mlxfwmanager" mlxfwmanager --query
run_section "dpdk-devbind" dpdk-devbind.py --status
# Per-iface ethtool — iterate over the daqiri-tx/rx names if present, else all mlx5.
for iface in daqiri-tx daqiri-rx $(ls /sys/class/net 2>/dev/null | grep -E '^(enP|enp|eth)' || true); do
[[ -d "/sys/class/net/$iface" ]] || continue
run_section "ethtool-i:$iface" ethtool -i "$iface"
run_section "ethtool-g:$iface" ethtool -g "$iface"
run_section "ethtool-l:$iface" ethtool -l "$iface"
cat_section "iface-mtu:$iface" "/sys/class/net/$iface/mtu"
cat_section "iface-mac:$iface" "/sys/class/net/$iface/address"
done

# --- GPU ---
run_section "nvidia-smi-q" nvidia-smi -q
run_section "nvidia-smi-tempclk" nvidia-smi --query-gpu=name,driver_version,temperature.gpu,clocks.current.sm,clocks.current.memory --format=csv

# --- Build state ---
DAQIRI_DIR="$(git -C "$(dirname "$0")/.." rev-parse --show-toplevel 2>/dev/null || pwd)"
run_section "git-rev-parse" git -C "$DAQIRI_DIR" rev-parse HEAD
run_section "git-status" git -C "$DAQIRI_DIR" status --short
run_section "git-describe" git -C "$DAQIRI_DIR" describe --always --dirty

echo "Capture complete: $OUT"
53 changes: 49 additions & 4 deletions examples/raw_bench_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,45 @@ int parse_run_seconds(int argc, char **argv) {
return run_seconds;
}

double parse_target_gbps(int argc, char **argv) {
double target_gbps = 0.0;
for (int i = 2; i + 1 < argc; i += 2) {
if (std::string(argv[i]) == "--target-gbps") {
target_gbps = std::stod(argv[i + 1]);
}
}
return target_gbps;
}

TokenBucketPacer::TokenBucketPacer(double target_gbps)
: target_bps_(target_gbps > 0.0 ? target_gbps * 1e9 : 0.0),
t0_(std::chrono::steady_clock::now()) {}

void TokenBucketPacer::wait_for_bytes(size_t bytes, std::atomic<bool> &stop) {
if (target_bps_ <= 0.0) {
return;
}
std::lock_guard<std::mutex> lock(mutex_);
total_bytes_ += bytes;
const double scheduled_secs = (total_bytes_ * 8.0) / target_bps_;
const auto scheduled = t0_ + std::chrono::duration_cast<
std::chrono::steady_clock::duration>(
std::chrono::duration<double>(scheduled_secs));
// Slice the wait into 10 ms chunks so a stop flag (--seconds expiry or
// Ctrl-C) can break us out promptly. The total slept across the slices
// accumulates to the scheduled deadline, so pacing remains accurate.
constexpr auto kSlice = std::chrono::milliseconds(10);
while (!stop.load()) {
const auto now = std::chrono::steady_clock::now();
if (scheduled <= now) {
return;
}
const auto remaining = scheduled - now;
std::this_thread::sleep_for(
std::min<std::chrono::steady_clock::duration>(remaining, kSlice));
}
}

bool has_bench_rx(const YAML::Node &root) {
return has_bench_config(root, "bench_rx");
}
Expand Down Expand Up @@ -458,7 +497,8 @@ void wait_for_stop(int run_seconds, std::atomic<bool> &stop) {
}

void print_queue_stats(const char *direction, const std::string &interface_name,
int queue_id, const RawBenchQueueStats &stats) {
int queue_id, const RawBenchQueueStats &stats,
double seconds) {
std::lock_guard<std::mutex> lock(g_stats_print_mutex);
std::cout << direction << " complete: interface=" << interface_name;
if (queue_id >= 0) {
Expand All @@ -467,7 +507,8 @@ void print_queue_stats(const char *direction, const std::string &interface_name,
std::cout << " queues=all";
}
std::cout << " packets=" << stats.packets << " bytes=" << stats.bytes
<< " bursts=" << stats.bursts << std::endl;
<< " bursts=" << stats.bursts << " seconds=" << seconds
<< std::endl;
}

void rx_count_worker(const RawBenchRxConfig &cfg, std::atomic<bool> &stop) {
Expand Down Expand Up @@ -497,6 +538,7 @@ void rx_count_worker(const RawBenchRxConfig &cfg, std::atomic<bool> &stop) {
}

std::vector<RawBenchQueueStats> queue_stats(num_rx_queues);
const auto t0 = std::chrono::steady_clock::now();
while (!stop.load()) {
bool got_any = false;
for (int q : queue_ids) {
Expand All @@ -516,18 +558,21 @@ void rx_count_worker(const RawBenchRxConfig &cfg, std::atomic<bool> &stop) {
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
}
const double secs =
std::chrono::duration<double>(std::chrono::steady_clock::now() - t0)
.count();

RawBenchQueueStats total;
for (int q : queue_ids) {
const auto &stats = queue_stats[static_cast<size_t>(q)];
total.packets += stats.packets;
total.bytes += stats.bytes;
total.bursts += stats.bursts;
print_queue_stats("RX", cfg.interface_name, q, stats);
print_queue_stats("RX", cfg.interface_name, q, stats, secs);
}

if (queue_ids.size() > 1) {
print_queue_stats("RX", cfg.interface_name, -1, total);
print_queue_stats("RX", cfg.interface_name, -1, total, secs);
}
}

Expand Down
35 changes: 34 additions & 1 deletion examples/raw_bench_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,44 @@
#include <yaml-cpp/yaml.h>

#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <mutex>
#include <string>
#include <vector>

namespace daqiri::bench {

// Software token-bucket pacer used by the bench TX workers. When
// target_gbps == 0 the wait_for_bytes() call is a no-op early return, so the
// pacer adds no overhead when --target-gbps is unset.
//
// Accuracy: ~5% at high rates due to Linux nanosleep granularity and scheduler
// jitter. Acceptable for drop-curve sweeps; tighter pacing would require
// hardware TX timestamping (DAQIRI's accurate_send YAML flag), deferred.
class TokenBucketPacer {
public:
TokenBucketPacer() = default;
explicit TokenBucketPacer(double target_gbps);

// Call after each TX burst. Sleeps in short slices until the pacer's notion
// of "time the configured target rate would have taken to send the
// accumulated bytes" catches up, OR `stop` flips true. Slicing keeps the
// bench responsive to --seconds expiry / Ctrl-C without truncating the total
// sleep (which would silently break pacing for low target rates).
void wait_for_bytes(size_t bytes, std::atomic<bool> &stop);

bool enabled() const { return target_bps_ > 0.0; }
double target_gbps() const { return target_bps_ / 1e9; }

private:
double target_bps_ = 0.0; // 0 means disabled
uint64_t total_bytes_ = 0;
std::chrono::steady_clock::time_point t0_;
std::mutex mutex_;
};

struct RawBenchTxConfig {
Comment on lines 31 to 62
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Doc-sync required for new CLI flags and output format

This PR introduces the TokenBucketPacer class, the --target-gbps flag, and rewrites every bench's final summary line to use name=value fields (adding seconds=, send_bytes=, recv_bytes=, etc.). It also adds examples/bench_capture_environment.sh. Per the repo's doc-sync rule, changes under examples/*.cpp require updating docs/tutorials/benchmarking_examples.md, docs/tutorials/configuration-walkthrough.md, and the benchmark table in AGENTS.md. None of those appear to be updated in this PR.

Rule Used: DAQIRI has no automated doc-sync gate beyond mkdoc... (source)

std::string interface_name = "tx_port";
int queue_id = 0;
Expand Down Expand Up @@ -76,6 +107,7 @@ class PinnedHostBuffer {
};

int parse_run_seconds(int argc, char **argv);
double parse_target_gbps(int argc, char **argv);
bool has_bench_rx(const YAML::Node &root);
bool has_bench_tx(const YAML::Node &root);
RawBenchRxConfig parse_rx(const YAML::Node &root);
Expand All @@ -100,7 +132,8 @@ cudaError_t memcpy_batch_async(const std::vector<void *> &dsts,
void signal_handler(int signum);
void wait_for_stop(int run_seconds, std::atomic<bool> &stop);
void print_queue_stats(const char *direction, const std::string &interface_name,
int queue_id, const RawBenchQueueStats &stats);
int queue_id, const RawBenchQueueStats &stats,
double seconds);
void rx_count_worker(const RawBenchRxConfig &cfg, std::atomic<bool> &stop);

} // namespace daqiri::bench
20 changes: 15 additions & 5 deletions examples/raw_gpudirect_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
namespace {

void tx_worker(const daqiri::bench::RawBenchTxConfig &cfg,
daqiri::bench::TokenBucketPacer &pacer,
std::atomic<bool> &stop) {
const int port_id = daqiri::get_port_id(cfg.interface_name);
if (port_id < 0) {
Expand Down Expand Up @@ -71,6 +72,7 @@ void tx_worker(const daqiri::bench::RawBenchTxConfig &cfg,
daqiri::bench::RawBenchQueueStats stats;
const auto packet_size =
static_cast<uint64_t>(cfg.header_size) + cfg.payload_size;
const auto t0 = std::chrono::steady_clock::now();

while (!stop.load()) {
auto *msg = daqiri::create_tx_burst_params();
Expand Down Expand Up @@ -114,7 +116,7 @@ void tx_worker(const daqiri::bench::RawBenchTxConfig &cfg,
}

if (daqiri::set_packet_lengths(
msg, i, {static_cast<int>(cfg.header_size + cfg.payload_size)}) !=
msg, i, {static_cast<int>(packet_size)}) !=
daqiri::Status::SUCCESS) {
failed = true;
break;
Expand All @@ -127,26 +129,33 @@ void tx_worker(const daqiri::bench::RawBenchTxConfig &cfg,
}
if (daqiri::send_tx_burst(msg) == daqiri::Status::SUCCESS) {
stats.packets += static_cast<uint64_t>(num_pkts);
stats.bytes += static_cast<uint64_t>(num_pkts) * packet_size;
const uint64_t burst_bytes = static_cast<uint64_t>(num_pkts) * packet_size;
stats.bytes += burst_bytes;
++stats.bursts;
pacer.wait_for_bytes(burst_bytes, stop);
}
}

const double secs =
std::chrono::duration<double>(std::chrono::steady_clock::now() - t0)
.count();
daqiri::bench::print_queue_stats("TX", cfg.interface_name, cfg.queue_id,
stats);
stats, secs);
}

} // namespace

int main(int argc, char **argv) {
if (argc < 2) {
std::cerr << "Usage: " << argv[0] << " <config.yaml> [--seconds N]\n";
std::cerr << "Usage: " << argv[0]
<< " <config.yaml> [--seconds N] [--target-gbps G]\n";
return 1;
}

const auto prometheus_metrics =
daqiri::bench::grafana::init_prometheus_metrics_from_env();
const int run_seconds = daqiri::bench::parse_run_seconds(argc, argv);
const double target_gbps = daqiri::bench::parse_target_gbps(argc, argv);
const auto root = YAML::LoadFile(argv[1]);

std::vector<daqiri::bench::RawBenchRxConfig> rx_configs;
Expand All @@ -172,6 +181,7 @@ int main(int argc, char **argv) {
std::atomic<bool> stop{false};
std::vector<std::thread> tx_threads;
std::vector<std::thread> rx_threads;
daqiri::bench::TokenBucketPacer tx_pacer(target_gbps);

rx_threads.reserve(rx_configs.size());
for (const auto &cfg : rx_configs) {
Expand All @@ -180,7 +190,7 @@ int main(int argc, char **argv) {
}
tx_threads.reserve(tx_configs.size());
for (const auto &cfg : tx_configs) {
tx_threads.emplace_back(tx_worker, cfg, std::ref(stop));
tx_threads.emplace_back(tx_worker, cfg, std::ref(tx_pacer), std::ref(stop));
}

daqiri::bench::wait_for_stop(run_seconds, stop);
Expand Down
Loading
Loading