diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 2a92047..9915ba0 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -98,14 +98,19 @@ add_daqiri_raw_bench(daqiri_bench_raw_reorder_quantize raw_reorder_quantize_benc add_daqiri_raw_bench(daqiri_example_gds_write gds_write_example.cpp) add_daqiri_raw_bench(daqiri_example_pcap_writer pcap_writer_example.cpp) -add_executable(daqiri_bench_rdma rdma_bench.cpp) +add_executable(daqiri_bench_rdma rdma_bench.cpp raw_bench_common.cpp) link_daqiri_bench(daqiri_bench_rdma) +target_link_libraries(daqiri_bench_rdma PRIVATE CUDA::cudart) set_target_properties(daqiri_bench_rdma PROPERTIES BUILD_RPATH "$ORIGIN/../src;$ORIGIN/../src/third_party/yaml-cpp" ) -add_executable(daqiri_bench_socket socket_bench.cpp) +add_executable(daqiri_bench_socket socket_bench.cpp raw_bench_common.cpp) link_daqiri_bench(daqiri_bench_socket) +target_link_libraries(daqiri_bench_socket PRIVATE CUDA::cudart) +set_target_properties(daqiri_bench_socket PROPERTIES + BUILD_RPATH "$ORIGIN/../src;$ORIGIN/../src/third_party/yaml-cpp" +) foreach(cfg IN LISTS DAQIRI_BENCH_CONFIGS) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/${cfg} ${CMAKE_CURRENT_BINARY_DIR}/${cfg} COPYONLY) diff --git a/examples/bench_capture_environment.sh b/examples/bench_capture_environment.sh new file mode 100755 index 0000000..3314b08 --- /dev/null +++ b/examples/bench_capture_environment.sh @@ -0,0 +1,116 @@ +#!/usr/bin/env bash +# +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Capture host/NIC/GPU/build state for a benchmark run, so numbers are +# reproducible across machines and over time. Writes one structured text file +# with named sections. +# +# Usage: ./bench_capture_environment.sh +# Default output dir: bench-results// + +set -u + +OUT_DIR="${1:-bench-results/$(date -u +%Y%m%dT%H%M%SZ)}" +mkdir -p "$OUT_DIR" +OUT="$OUT_DIR/environment.txt" + +# Run a command, capturing exit status. Always write a header so the section is +# present even when the command is missing or fails — silent absence is harder +# to debug than an explicit "command not found". +run_section() { + local label="$1"; shift + { + echo "==========================================================" + echo "[$label]" + echo " cmd: $*" + echo "==========================================================" + if command -v "$1" >/dev/null 2>&1 || [[ "$1" == /* || "$1" == ./* ]]; then + "$@" 2>&1 + echo " (exit: $?)" + else + echo " (command not found in PATH: $1)" + fi + echo + } >> "$OUT" +} + +# Cat a file/glob; write a header either way. +cat_section() { + local label="$1"; shift + { + echo "==========================================================" + echo "[$label]" + echo " paths: $*" + echo "==========================================================" + for p in "$@"; do + if compgen -G "$p" >/dev/null; then + for f in $p; do + echo "----- $f -----" + cat "$f" 2>&1 + done + else + echo " (no match: $p)" + fi + done + echo + } >> "$OUT" +} + +: > "$OUT" + +echo "DAQIRI benchmark environment capture" >> "$OUT" +echo "Generated: $(date -u +%Y-%m-%dT%H:%M:%SZ)" >> "$OUT" +echo "Host: $(hostname)" >> "$OUT" +echo "Output: $OUT" >> "$OUT" +echo >> "$OUT" + +# --- Kernel / OS --- +run_section "uname" uname -a +cat_section "kernel-cmdline" /proc/cmdline +cat_section "os-release" /etc/os-release +run_section "lsb-release" lsb_release -a +run_section "clocksource" cat /sys/devices/system/clocksource/clocksource0/current_clocksource + +# --- CPU / NUMA / IRQ --- +run_section "numactl" numactl --show +run_section "lscpu" lscpu +cat_section "cpu-isolated" /sys/devices/system/cpu/isolated +run_section "cpufreq-info" cpupower frequency-info +cat_section "cpu-governor" /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor +run_section "irq-mlx5" bash -c "grep mlx5 /proc/interrupts || true" + +# --- Hugepages --- +cat_section "hugepages" /sys/kernel/mm/hugepages/*/nr_hugepages +run_section "free-h" free -h + +# --- PCIe topology --- +run_section "lspci-mellanox" bash -c "lspci -vvv -d 15b3: 2>/dev/null" +run_section "lspci-nvidia" bash -c "lspci -vvv -d 10de: 2>/dev/null" + +# --- NIC: OFED / firmware / DPDK binding --- +run_section "ofed-info" ofed_info -s +run_section "mlxfwmanager" mlxfwmanager --query +run_section "dpdk-devbind" dpdk-devbind.py --status +# Per-iface ethtool — iterate over the daqiri-tx/rx names if present, else all mlx5. +for iface in daqiri-tx daqiri-rx $(ls /sys/class/net 2>/dev/null | grep -E '^(enP|enp|eth)' || true); do + [[ -d "/sys/class/net/$iface" ]] || continue + run_section "ethtool-i:$iface" ethtool -i "$iface" + run_section "ethtool-g:$iface" ethtool -g "$iface" + run_section "ethtool-l:$iface" ethtool -l "$iface" + cat_section "iface-mtu:$iface" "/sys/class/net/$iface/mtu" + cat_section "iface-mac:$iface" "/sys/class/net/$iface/address" +done + +# --- GPU --- +run_section "nvidia-smi-q" nvidia-smi -q +run_section "nvidia-smi-tempclk" nvidia-smi --query-gpu=name,driver_version,temperature.gpu,clocks.current.sm,clocks.current.memory --format=csv + +# --- Build state --- +DAQIRI_DIR="$(git -C "$(dirname "$0")/.." rev-parse --show-toplevel 2>/dev/null || pwd)" +run_section "git-rev-parse" git -C "$DAQIRI_DIR" rev-parse HEAD +run_section "git-status" git -C "$DAQIRI_DIR" status --short +run_section "git-describe" git -C "$DAQIRI_DIR" describe --always --dirty + +echo "Capture complete: $OUT" diff --git a/examples/raw_bench_common.cpp b/examples/raw_bench_common.cpp index b912778..9307542 100644 --- a/examples/raw_bench_common.cpp +++ b/examples/raw_bench_common.cpp @@ -247,6 +247,45 @@ int parse_run_seconds(int argc, char **argv) { return run_seconds; } +double parse_target_gbps(int argc, char **argv) { + double target_gbps = 0.0; + for (int i = 2; i + 1 < argc; i += 2) { + if (std::string(argv[i]) == "--target-gbps") { + target_gbps = std::stod(argv[i + 1]); + } + } + return target_gbps; +} + +TokenBucketPacer::TokenBucketPacer(double target_gbps) + : target_bps_(target_gbps > 0.0 ? target_gbps * 1e9 : 0.0), + t0_(std::chrono::steady_clock::now()) {} + +void TokenBucketPacer::wait_for_bytes(size_t bytes, std::atomic &stop) { + if (target_bps_ <= 0.0) { + return; + } + std::lock_guard lock(mutex_); + total_bytes_ += bytes; + const double scheduled_secs = (total_bytes_ * 8.0) / target_bps_; + const auto scheduled = t0_ + std::chrono::duration_cast< + std::chrono::steady_clock::duration>( + std::chrono::duration(scheduled_secs)); + // Slice the wait into 10 ms chunks so a stop flag (--seconds expiry or + // Ctrl-C) can break us out promptly. The total slept across the slices + // accumulates to the scheduled deadline, so pacing remains accurate. + constexpr auto kSlice = std::chrono::milliseconds(10); + while (!stop.load()) { + const auto now = std::chrono::steady_clock::now(); + if (scheduled <= now) { + return; + } + const auto remaining = scheduled - now; + std::this_thread::sleep_for( + std::min(remaining, kSlice)); + } +} + bool has_bench_rx(const YAML::Node &root) { return has_bench_config(root, "bench_rx"); } @@ -458,7 +497,8 @@ void wait_for_stop(int run_seconds, std::atomic &stop) { } void print_queue_stats(const char *direction, const std::string &interface_name, - int queue_id, const RawBenchQueueStats &stats) { + int queue_id, const RawBenchQueueStats &stats, + double seconds) { std::lock_guard lock(g_stats_print_mutex); std::cout << direction << " complete: interface=" << interface_name; if (queue_id >= 0) { @@ -467,7 +507,8 @@ void print_queue_stats(const char *direction, const std::string &interface_name, std::cout << " queues=all"; } std::cout << " packets=" << stats.packets << " bytes=" << stats.bytes - << " bursts=" << stats.bursts << std::endl; + << " bursts=" << stats.bursts << " seconds=" << seconds + << std::endl; } void rx_count_worker(const RawBenchRxConfig &cfg, std::atomic &stop) { @@ -497,6 +538,7 @@ void rx_count_worker(const RawBenchRxConfig &cfg, std::atomic &stop) { } std::vector queue_stats(num_rx_queues); + const auto t0 = std::chrono::steady_clock::now(); while (!stop.load()) { bool got_any = false; for (int q : queue_ids) { @@ -516,6 +558,9 @@ void rx_count_worker(const RawBenchRxConfig &cfg, std::atomic &stop) { std::this_thread::sleep_for(std::chrono::microseconds(100)); } } + const double secs = + std::chrono::duration(std::chrono::steady_clock::now() - t0) + .count(); RawBenchQueueStats total; for (int q : queue_ids) { @@ -523,11 +568,11 @@ void rx_count_worker(const RawBenchRxConfig &cfg, std::atomic &stop) { total.packets += stats.packets; total.bytes += stats.bytes; total.bursts += stats.bursts; - print_queue_stats("RX", cfg.interface_name, q, stats); + print_queue_stats("RX", cfg.interface_name, q, stats, secs); } if (queue_ids.size() > 1) { - print_queue_stats("RX", cfg.interface_name, -1, total); + print_queue_stats("RX", cfg.interface_name, -1, total, secs); } } diff --git a/examples/raw_bench_common.h b/examples/raw_bench_common.h index 24a0d1b..d1b2f7b 100644 --- a/examples/raw_bench_common.h +++ b/examples/raw_bench_common.h @@ -21,13 +21,44 @@ #include #include +#include #include #include +#include #include #include namespace daqiri::bench { +// Software token-bucket pacer used by the bench TX workers. When +// target_gbps == 0 the wait_for_bytes() call is a no-op early return, so the +// pacer adds no overhead when --target-gbps is unset. +// +// Accuracy: ~5% at high rates due to Linux nanosleep granularity and scheduler +// jitter. Acceptable for drop-curve sweeps; tighter pacing would require +// hardware TX timestamping (DAQIRI's accurate_send YAML flag), deferred. +class TokenBucketPacer { +public: + TokenBucketPacer() = default; + explicit TokenBucketPacer(double target_gbps); + + // Call after each TX burst. Sleeps in short slices until the pacer's notion + // of "time the configured target rate would have taken to send the + // accumulated bytes" catches up, OR `stop` flips true. Slicing keeps the + // bench responsive to --seconds expiry / Ctrl-C without truncating the total + // sleep (which would silently break pacing for low target rates). + void wait_for_bytes(size_t bytes, std::atomic &stop); + + bool enabled() const { return target_bps_ > 0.0; } + double target_gbps() const { return target_bps_ / 1e9; } + +private: + double target_bps_ = 0.0; // 0 means disabled + uint64_t total_bytes_ = 0; + std::chrono::steady_clock::time_point t0_; + std::mutex mutex_; +}; + struct RawBenchTxConfig { std::string interface_name = "tx_port"; int queue_id = 0; @@ -76,6 +107,7 @@ class PinnedHostBuffer { }; int parse_run_seconds(int argc, char **argv); +double parse_target_gbps(int argc, char **argv); bool has_bench_rx(const YAML::Node &root); bool has_bench_tx(const YAML::Node &root); RawBenchRxConfig parse_rx(const YAML::Node &root); @@ -100,7 +132,8 @@ cudaError_t memcpy_batch_async(const std::vector &dsts, void signal_handler(int signum); void wait_for_stop(int run_seconds, std::atomic &stop); void print_queue_stats(const char *direction, const std::string &interface_name, - int queue_id, const RawBenchQueueStats &stats); + int queue_id, const RawBenchQueueStats &stats, + double seconds); void rx_count_worker(const RawBenchRxConfig &cfg, std::atomic &stop); } // namespace daqiri::bench diff --git a/examples/raw_gpudirect_bench.cpp b/examples/raw_gpudirect_bench.cpp index 45e9011..c4cafe9 100644 --- a/examples/raw_gpudirect_bench.cpp +++ b/examples/raw_gpudirect_bench.cpp @@ -37,6 +37,7 @@ namespace { void tx_worker(const daqiri::bench::RawBenchTxConfig &cfg, + daqiri::bench::TokenBucketPacer &pacer, std::atomic &stop) { const int port_id = daqiri::get_port_id(cfg.interface_name); if (port_id < 0) { @@ -71,6 +72,7 @@ void tx_worker(const daqiri::bench::RawBenchTxConfig &cfg, daqiri::bench::RawBenchQueueStats stats; const auto packet_size = static_cast(cfg.header_size) + cfg.payload_size; + const auto t0 = std::chrono::steady_clock::now(); while (!stop.load()) { auto *msg = daqiri::create_tx_burst_params(); @@ -114,7 +116,7 @@ void tx_worker(const daqiri::bench::RawBenchTxConfig &cfg, } if (daqiri::set_packet_lengths( - msg, i, {static_cast(cfg.header_size + cfg.payload_size)}) != + msg, i, {static_cast(packet_size)}) != daqiri::Status::SUCCESS) { failed = true; break; @@ -127,26 +129,33 @@ void tx_worker(const daqiri::bench::RawBenchTxConfig &cfg, } if (daqiri::send_tx_burst(msg) == daqiri::Status::SUCCESS) { stats.packets += static_cast(num_pkts); - stats.bytes += static_cast(num_pkts) * packet_size; + const uint64_t burst_bytes = static_cast(num_pkts) * packet_size; + stats.bytes += burst_bytes; ++stats.bursts; + pacer.wait_for_bytes(burst_bytes, stop); } } + const double secs = + std::chrono::duration(std::chrono::steady_clock::now() - t0) + .count(); daqiri::bench::print_queue_stats("TX", cfg.interface_name, cfg.queue_id, - stats); + stats, secs); } } // namespace int main(int argc, char **argv) { if (argc < 2) { - std::cerr << "Usage: " << argv[0] << " [--seconds N]\n"; + std::cerr << "Usage: " << argv[0] + << " [--seconds N] [--target-gbps G]\n"; return 1; } const auto prometheus_metrics = daqiri::bench::grafana::init_prometheus_metrics_from_env(); const int run_seconds = daqiri::bench::parse_run_seconds(argc, argv); + const double target_gbps = daqiri::bench::parse_target_gbps(argc, argv); const auto root = YAML::LoadFile(argv[1]); std::vector rx_configs; @@ -172,6 +181,7 @@ int main(int argc, char **argv) { std::atomic stop{false}; std::vector tx_threads; std::vector rx_threads; + daqiri::bench::TokenBucketPacer tx_pacer(target_gbps); rx_threads.reserve(rx_configs.size()); for (const auto &cfg : rx_configs) { @@ -180,7 +190,7 @@ int main(int argc, char **argv) { } tx_threads.reserve(tx_configs.size()); for (const auto &cfg : tx_configs) { - tx_threads.emplace_back(tx_worker, cfg, std::ref(stop)); + tx_threads.emplace_back(tx_worker, cfg, std::ref(tx_pacer), std::ref(stop)); } daqiri::bench::wait_for_stop(run_seconds, stop); diff --git a/examples/rdma_bench.cpp b/examples/rdma_bench.cpp index 760b487..0a1c58f 100644 --- a/examples/rdma_bench.cpp +++ b/examples/rdma_bench.cpp @@ -25,6 +25,7 @@ #include #include +#include "raw_bench_common.h" #include namespace { @@ -48,6 +49,8 @@ struct RdmaBenchConfig { struct RdmaWorkerStats { uint64_t send_completions = 0; uint64_t recv_completions = 0; + uint64_t send_bytes = 0; + uint64_t recv_bytes = 0; }; RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { @@ -62,7 +65,8 @@ RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { return cfg; } -void rdma_worker(const RdmaBenchConfig& cfg, std::atomic& stop, RdmaWorkerStats& stats) { +void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer, + std::atomic& stop, RdmaWorkerStats& stats) { static constexpr int kMaxOutstanding = 5; int outstanding_send = 0; int outstanding_recv = 0; @@ -120,6 +124,10 @@ void rdma_worker(const RdmaBenchConfig& cfg, std::atomic& stop, RdmaWorker if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) { return; } outstanding++; wr_id++; + // Only meter actual byte transmissions (SENDs), not RECEIVE-side posts. + if (op == daqiri::RDMAOpCode::SEND) { + pacer.wait_for_bytes(static_cast(cfg.message_size), stop); + } }; if (cfg.send) { post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr); } @@ -131,10 +139,12 @@ void rdma_worker(const RdmaBenchConfig& cfg, std::atomic& stop, RdmaWorker if (daqiri::rdma_get_opcode(completion) == daqiri::RDMAOpCode::SEND && outstanding_send > 0) { outstanding_send--; stats.send_completions++; + stats.send_bytes += static_cast(cfg.message_size); } else if (daqiri::rdma_get_opcode(completion) == daqiri::RDMAOpCode::RECEIVE && outstanding_recv > 0) { outstanding_recv--; stats.recv_completions++; + stats.recv_bytes += static_cast(cfg.message_size); } daqiri::free_tx_burst(completion); } else { @@ -147,17 +157,21 @@ void rdma_worker(const RdmaBenchConfig& cfg, std::atomic& stop, RdmaWorker int main(int argc, char** argv) { if (argc < 2) { - std::cerr << "Usage: " << argv[0] << " [--seconds N] [--mode server|client|both]\n"; + std::cerr << "Usage: " << argv[0] + << " [--seconds N] [--mode server|client|both] [--target-gbps G]\n"; return 1; } int run_seconds = 10; + double target_gbps = 0.0; std::string mode = "both"; for (int i = 2; i + 1 < argc; i += 2) { if (std::string(argv[i]) == "--seconds") { run_seconds = std::stoi(argv[i + 1]); } else if (std::string(argv[i]) == "--mode") { mode = argv[i + 1]; + } else if (std::string(argv[i]) == "--target-gbps") { + target_gbps = std::stod(argv[i + 1]); } } @@ -172,18 +186,22 @@ int main(int argc, char** argv) { std::thread client_thread; RdmaWorkerStats server_stats; RdmaWorkerStats client_stats; + daqiri::bench::TokenBucketPacer server_pacer(target_gbps); + daqiri::bench::TokenBucketPacer client_pacer(target_gbps); bool run_server = false; bool run_client = false; if ((mode == "server" || mode == "both") && root["rdma_bench_server"]) { run_server = true; server_thread = std::thread( - rdma_worker, parse_rdma_cfg(root["rdma_bench_server"]), std::ref(stop), std::ref(server_stats)); + rdma_worker, parse_rdma_cfg(root["rdma_bench_server"]), + std::ref(server_pacer), std::ref(stop), std::ref(server_stats)); } if ((mode == "client" || mode == "both") && root["rdma_bench_client"]) { run_client = true; client_thread = std::thread( - rdma_worker, parse_rdma_cfg(root["rdma_bench_client"]), std::ref(stop), std::ref(client_stats)); + rdma_worker, parse_rdma_cfg(root["rdma_bench_client"]), + std::ref(client_pacer), std::ref(stop), std::ref(client_stats)); } if (!server_thread.joinable() && !client_thread.joinable()) { @@ -207,11 +225,23 @@ int main(int argc, char** argv) { if (server_thread.joinable()) { server_thread.join(); } if (client_thread.joinable()) { client_thread.join(); } + const double secs = + std::chrono::duration(std::chrono::steady_clock::now() - start) + .count(); + if (run_server) { - std::cout << "Server received messages: " << server_stats.recv_completions << '\n'; + std::cout << "Server complete: send_completions=" << server_stats.send_completions + << " recv_completions=" << server_stats.recv_completions + << " send_bytes=" << server_stats.send_bytes + << " recv_bytes=" << server_stats.recv_bytes + << " seconds=" << secs << '\n'; } if (run_client) { - std::cout << "Client received messages: " << client_stats.recv_completions << '\n'; + std::cout << "Client complete: send_completions=" << client_stats.send_completions + << " recv_completions=" << client_stats.recv_completions + << " send_bytes=" << client_stats.send_bytes + << " recv_bytes=" << client_stats.recv_bytes + << " seconds=" << secs << '\n'; } daqiri::print_stats(); diff --git a/examples/socket_bench.cpp b/examples/socket_bench.cpp index 6c937d0..57a5e16 100644 --- a/examples/socket_bench.cpp +++ b/examples/socket_bench.cpp @@ -26,6 +26,7 @@ #include #include +#include "raw_bench_common.h" #include namespace { @@ -50,6 +51,8 @@ struct SocketBenchConfig { struct SocketWorkerStats { uint64_t sent_packets = 0; uint64_t received_packets = 0; + uint64_t sent_bytes = 0; + uint64_t received_bytes = 0; }; SocketBenchConfig parse_socket_cfg(const YAML::Node& node) { @@ -65,7 +68,8 @@ SocketBenchConfig parse_socket_cfg(const YAML::Node& node) { return cfg; } -void socket_worker(const SocketBenchConfig& cfg, std::atomic& stop, SocketWorkerStats& stats) { +void socket_worker(const SocketBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer, + std::atomic& stop, SocketWorkerStats& stats) { uintptr_t conn_id = 0; uint16_t port = 0; uint16_t queue = 0; @@ -92,8 +96,14 @@ void socket_worker(const SocketBenchConfig& cfg, std::atomic& stop, Socket } } - const bool send_done = !cfg.send || stats.sent_packets >= static_cast(cfg.iterations); - const bool recv_done = !cfg.receive || stats.received_packets >= static_cast(cfg.iterations); + // When cfg.iterations <= 0, the loop is time-bounded (driven by stop.load() + // set by --seconds). Otherwise the iteration cap applies as before. + const bool send_done = !cfg.send || + (cfg.iterations > 0 && + stats.sent_packets >= static_cast(cfg.iterations)); + const bool recv_done = !cfg.receive || + (cfg.iterations > 0 && + stats.received_packets >= static_cast(cfg.iterations)); if (send_done && recv_done) { break; } if (cfg.send && !send_done) { @@ -109,6 +119,8 @@ void socket_worker(const SocketBenchConfig& cfg, std::atomic& stop, Socket if (daqiri::send_tx_burst(msg) == daqiri::Status::SUCCESS) { stats.sent_packets++; + stats.sent_bytes += static_cast(cfg.message_size); + pacer.wait_for_bytes(static_cast(cfg.message_size), stop); } } else { daqiri::free_tx_metadata(msg); @@ -119,7 +131,9 @@ void socket_worker(const SocketBenchConfig& cfg, std::atomic& stop, Socket daqiri::BurstParams* burst = nullptr; if (daqiri::get_rx_burst(&burst, conn_id, cfg.server) == daqiri::Status::SUCCESS && burst != nullptr) { - stats.received_packets += static_cast(daqiri::get_num_packets(burst)); + const uint64_t rx_pkts = static_cast(daqiri::get_num_packets(burst)); + stats.received_packets += rx_pkts; + stats.received_bytes += daqiri::get_burst_tot_byte(burst); daqiri::free_all_packets_and_burst_rx(burst); } else { std::this_thread::sleep_for(std::chrono::microseconds(100)); @@ -133,17 +147,20 @@ void socket_worker(const SocketBenchConfig& cfg, std::atomic& stop, Socket int main(int argc, char** argv) { if (argc < 2) { std::cerr << "Usage: " << argv[0] - << " [--seconds N] [--mode server|client|both]\n"; + << " [--seconds N] [--mode server|client|both] [--target-gbps G]\n"; return 1; } int run_seconds = 10; + double target_gbps = 0.0; std::string mode = "both"; for (int i = 2; i + 1 < argc; i += 2) { if (std::string(argv[i]) == "--seconds") { run_seconds = std::stoi(argv[i + 1]); } else if (std::string(argv[i]) == "--mode") { mode = argv[i + 1]; + } else if (std::string(argv[i]) == "--target-gbps") { + target_gbps = std::stod(argv[i + 1]); } } @@ -158,6 +175,8 @@ int main(int argc, char** argv) { std::thread client_thread; SocketWorkerStats server_stats; SocketWorkerStats client_stats; + daqiri::bench::TokenBucketPacer server_pacer(target_gbps); + daqiri::bench::TokenBucketPacer client_pacer(target_gbps); bool run_server = false; bool run_client = false; @@ -165,6 +184,7 @@ int main(int argc, char** argv) { run_server = true; server_thread = std::thread(socket_worker, parse_socket_cfg(root["socket_bench_server"]), + std::ref(server_pacer), std::ref(stop), std::ref(server_stats)); } @@ -172,6 +192,7 @@ int main(int argc, char** argv) { run_client = true; client_thread = std::thread(socket_worker, parse_socket_cfg(root["socket_bench_client"]), + std::ref(client_pacer), std::ref(stop), std::ref(client_stats)); } @@ -198,13 +219,23 @@ int main(int argc, char** argv) { if (server_thread.joinable()) { server_thread.join(); } if (client_thread.joinable()) { client_thread.join(); } + const double secs = + std::chrono::duration(std::chrono::steady_clock::now() - start) + .count(); + if (run_server) { - std::cout << "Server sent packets: " << server_stats.sent_packets - << ", received packets: " << server_stats.received_packets << '\n'; + std::cout << "Server complete: sent_packets=" << server_stats.sent_packets + << " recv_packets=" << server_stats.received_packets + << " sent_bytes=" << server_stats.sent_bytes + << " recv_bytes=" << server_stats.received_bytes + << " seconds=" << secs << '\n'; } if (run_client) { - std::cout << "Client sent packets: " << client_stats.sent_packets - << ", received packets: " << client_stats.received_packets << '\n'; + std::cout << "Client complete: sent_packets=" << client_stats.sent_packets + << " recv_packets=" << client_stats.received_packets + << " sent_bytes=" << client_stats.sent_bytes + << " recv_bytes=" << client_stats.received_bytes + << " seconds=" << secs << '\n'; } daqiri::print_stats();