From d67d532576be9b2c86a7c6ee507d703e795cea04 Mon Sep 17 00:00:00 2001 From: rgurunathan Date: Tue, 26 May 2026 15:25:33 -0400 Subject: [PATCH 1/2] #15 - Add DGX Spark sweep tooling and RDMA loopback prereqs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stacks on the bench-infra PR (#α) which provides --seconds, TokenBucketPacer, and the bench-output format these scripts parse. - examples/run_spark_bench.sh: Spark-tuned sweep driver with per-backend payload/batch matrices, CPU pins, drop-source dispatch (DPDK imissed/ ierrors/nombuf, RDMA CQ errors, socket /proc/net/udp drops + nstat retrans), and one CSV row per cell into bench-results/. - scripts/spark_data_fill.sh: one-shot driver that runs the full bench matrix across DPDK / socket-UDP / socket-TCP, with hugepage pre-flight and orphan-hugepage cleanup between runs. - scripts/setup_spark_rdma_loopback.sh: idempotent host prereq pinning static ARP entries and source-based policy routing for the Spark single-NIC cross-cable RoCE loopback. Hardcodes the Spark CX-7 netdev names, MAC addresses, and 1.1.1.1 / 2.2.2.2 IPs from daqiri_bench_rdma_tx_rx_spark.yaml — the technique generalizes to any single-NIC RoCE loopback, the script does not. Renamed from the earlier setup_rdma_loopback.sh draft to make the platform scope obvious in the directory listing. - examples/rdma_bench.cpp: raise kMaxOutstanding 5→20 to match num_bufs in the YAML configs. Lifts small-payload pps 8–22× on Spark (4 KB: 4→39 msg/s, 8 KB: 4→88, 64 KB: 32→255) without affecting the 8 MB / 1 MB cells already saturated at depth 5. The new comment documents why the constant cannot exceed num_bufs (post_req / free_tx_burst ordering in the same loop iteration would deadlock instead of throttling). A follow-up tracks the deeper architectural fix (interleave drain with post, bulk tx_ring dequeue, configurable depth). - .gitignore: pcie_schematic.png (generated by tune_system.py). Includes fixes for two parsing bugs Greptile flagged on the original draft of run_spark_bench.sh: - /proc/net/udp drops column is decimal (%lu in net/ipv4/udp.c), not hex. Drop the strtonum("0x" ...) treatment that was silently multiplying drop counts whenever the column value contained any digit > 9. - Socket bench emits sent_packets / sent_bytes (not RDMA's send_completions / send_bytes), so the RDMA-keyed fallback was always returning empty for socket backends and producing zero-filled CSV rows. Dispatch the fallback on $BACKEND. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: rgurunathan --- .gitignore | 4 + examples/rdma_bench.cpp | 7 +- examples/run_spark_bench.sh | 348 +++++++++++++++++++++++++++ scripts/setup_spark_rdma_loopback.sh | 53 ++++ scripts/spark_data_fill.sh | 143 +++++++++++ 5 files changed, 554 insertions(+), 1 deletion(-) create mode 100755 examples/run_spark_bench.sh create mode 100755 scripts/setup_spark_rdma_loopback.sh create mode 100755 scripts/spark_data_fill.sh diff --git a/.gitignore b/.gitignore index e221700..472185d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,9 @@ build*/ site/ +bench-results/ + +# tune_system.py default output +pcie_schematic.png # macOS .DS_Store diff --git a/examples/rdma_bench.cpp b/examples/rdma_bench.cpp index a066509..41c7b3f 100644 --- a/examples/rdma_bench.cpp +++ b/examples/rdma_bench.cpp @@ -67,7 +67,12 @@ RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer, std::atomic& stop, RdmaWorkerStats& stats) { - static constexpr int kMaxOutstanding = 5; + // Matches the per-MR num_bufs in the YAML configs. Higher values deadlock + // the bench: post_req blocks in get_tx_packet_burst when the pool is empty, + // but free_tx_burst (which refills it) only runs later in the same loop + // iteration via get_rx_burst. Until the loop is refactored to interleave + // drain with post, this constant must stay <= num_bufs. + static constexpr int kMaxOutstanding = 20; int outstanding_send = 0; int outstanding_recv = 0; uint64_t send_wr_id = 0x1234; diff --git a/examples/run_spark_bench.sh b/examples/run_spark_bench.sh new file mode 100755 index 0000000..4c4cfcf --- /dev/null +++ b/examples/run_spark_bench.sh @@ -0,0 +1,348 @@ +#!/usr/bin/env bash +# +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Sweep wrapper for DAQIRI benchmarks on DGX Spark. Runs the bench across a +# matrix of (payload/message size, batch size, target-gbps), captures per-run +# CPU/GPU/NIC counters, and emits one CSV row per cell into bench-results/. +# +# Drop sources per backend (per the report methodology): +# DPDK : grep imissed/ierrors/rx_nombuf from bench log (DAQIRI_LOG_INFO). +# RDMA : grep "CQ error" lines from bench log (DAQIRI_LOG_ERROR). +# socket : diff /proc/net/udp drops column (UDP); nstat -a (TCP retransmits). +# +# Usage: +# ./run_spark_bench.sh [mode] +# backend ∈ {dpdk, rdma, socket-udp, socket-tcp} +# mode ∈ {smoke, sweep, drop-curve, drop-curve-matrix} (default: smoke) +# +# Required environment in current shell: +# DAQIRI_BUILD_DIR — path to the cmake build dir (defaults to ../build). +# ETH_DST_ADDR — required for dpdk backend (the RX iface MAC). +# RX_IFACE — kernel name of the RX interface for /proc/net/udp diff +# (e.g. enP2p1s0f0np0); required for socket-udp. +# +# Run inside the project container as root (per AGENTS.md). + +set -u +set -o pipefail + +# -------------------------------------------------------------------------- +# Configuration +# -------------------------------------------------------------------------- + +BACKEND="${1:-}" +MODE="${2:-smoke}" +if [[ -z "$BACKEND" ]]; then + echo "Usage: $0 [smoke|sweep|drop-curve|drop-curve-matrix]" >&2 + exit 1 +fi + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +BUILD_DIR="${DAQIRI_BUILD_DIR:-$SCRIPT_DIR/../build}" +TS="$(date -u +%Y%m%dT%H%M%SZ)" +OUT_DIR="$SCRIPT_DIR/../bench-results/$TS-$BACKEND-$MODE" +mkdir -p "$OUT_DIR" + +CSV="$OUT_DIR/runs.csv" +echo "lang,backend,post_process,payload,batch,target_gbps,seconds,packets,bytes,pps,gbps,drops,drops_kind,cpu_master_pct,cpu_tx_pct,cpu_rx_pct,gpu_sm_pct,gpu_mem_pct" > "$CSV" + +# Capture slow-moving environment state once per result set. +"$SCRIPT_DIR/bench_capture_environment.sh" "$OUT_DIR" + +RUN_SECONDS=30 +DRIVER_LOG="$OUT_DIR/last_run.stderr" + +# Per-backend sweep matrices (see docs/performance-dgx-spark.md methodology). +# Native-shape sizes are the leftmost entry; "matched 8K" cell is also included. +case "$BACKEND" in + dpdk) + PAYLOADS_SWEEP=(8000 4096 1024 256 64) + BATCHES_SWEEP=(10240 4096 1024 256) + PAYLOADS_HEADLINE=(8000) + BATCHES_HEADLINE=(10240) + BASE_YAML="$SCRIPT_DIR/daqiri_bench_raw_tx_rx_spark.yaml" + BENCH_BIN="$BUILD_DIR/examples/daqiri_bench_raw_gpudirect" + CPU_MASTER=8; CPU_TX=17; CPU_RX=18 + : "${ETH_DST_ADDR:?ETH_DST_ADDR must be set for dpdk backend (cat /sys/class/net//address)}" + ;; + rdma) + PAYLOADS_SWEEP=(8000000 1048576 65536 8192 4096) + BATCHES_SWEEP=(1) + PAYLOADS_HEADLINE=(8000000) + BATCHES_HEADLINE=(1) + BASE_YAML="$SCRIPT_DIR/daqiri_bench_rdma_tx_rx_spark.yaml" + BENCH_BIN="$BUILD_DIR/examples/daqiri_bench_rdma" + CPU_MASTER=8; CPU_TX=17; CPU_RX=18 + ;; + socket-udp) + PAYLOADS_SWEEP=(1472 1024 256 64) + BATCHES_SWEEP=(256 32 1) + PAYLOADS_HEADLINE=(1472) + BATCHES_HEADLINE=(256) + BASE_YAML="$SCRIPT_DIR/daqiri_bench_socket_udp_tx_rx.yaml" + BENCH_BIN="$BUILD_DIR/examples/daqiri_bench_socket" + CPU_MASTER=8; CPU_TX=17; CPU_RX=18 + ;; + socket-tcp) + PAYLOADS_SWEEP=(1048576 65536 1024) + BATCHES_SWEEP=(1) + PAYLOADS_HEADLINE=(65536) + BATCHES_HEADLINE=(1) + BASE_YAML="$SCRIPT_DIR/daqiri_bench_socket_tcp_tx_rx.yaml" + BENCH_BIN="$BUILD_DIR/examples/daqiri_bench_socket" + CPU_MASTER=8; CPU_TX=17; CPU_RX=18 + ;; + *) echo "Unknown backend: $BACKEND" >&2; exit 1 ;; +esac + +DROP_CURVE_TARGETS=(1 5 10 25 50 75 100 0) # 0 means unpaced (line rate) + +# -------------------------------------------------------------------------- +# Helpers +# -------------------------------------------------------------------------- + +# Read a scalar field from a `key=value` style stdout line. +# usage: extract_field +extract_field() { + local prefix="$1" field="$2" file="$3" + grep -E "^$prefix" "$file" | tail -n1 | grep -oE " $field=[^ ]+" | head -n1 | sed -E "s/.*$field=//" +} + +# Sum DPDK drop counters from the manager log emitted via DAQIRI_LOG_INFO. +parse_dpdk_drops() { + local log="$1" + local sum=0 v + for key in imissed ierrors rx_nombuf; do + v="$(grep -oE "$key=[0-9]+" "$log" 2>/dev/null | tail -n1 | sed -E "s/.*=//" || true)" + [[ -n "${v:-}" ]] && sum=$((sum + v)) + done + echo "$sum" +} + +# Count RDMA CQ errors in the manager log. +parse_rdma_drops() { + local log="$1" + grep -c 'CQ error' "$log" 2>/dev/null || echo 0 +} + +# Snapshot socket drops on the kernel side. +# /proc/net/udp column 13 ("drops") is printed in decimal (%lu in +# net/ipv4/udp.c). The local_address / rem_address columns are hex. +snapshot_proc_net_udp() { + awk 'NR>1 { sum += $13 } END { print sum+0 }' /proc/net/udp 2>/dev/null || echo 0 +} +snapshot_nstat() { + nstat -a 2>/dev/null | awk '/TcpExtTCPLostRetransmit|TcpRetransSegs|TcpInErrs/ { s += $2 } END { print s+0 }' || echo 0 +} + +# Snapshot /proc/stat per-cpu counters to a file. Mpstat is often not installed +# in the bench container; /proc/stat is always available. +snapshot_cpu_stat() { + awk '/^cpu[0-9]+/ { + total = $2+$3+$4+$5+$6+$7+$8 + busy = total - $5 - $6 + print $1, total, busy + }' /proc/stat > "$1" +} + +# Compute busy% for a single cpu index between two /proc/stat snapshots. +cpu_busy_pct() { + local before="$1" after="$2" cpu_idx="$3" + awk -v cpu="cpu$cpu_idx" ' + NR == FNR { b_total[$1] = $2; b_busy[$1] = $3; next } + { a_total[$1] = $2; a_busy[$1] = $3 } + END { + dt = a_total[cpu] - b_total[cpu] + db = a_busy[cpu] - b_busy[cpu] + if (dt > 0) printf "%.1f", (db * 100.0) / dt + else printf "0.0" + } + ' "$before" "$after" +} + +# Substitute payload / batch into the base YAML and write a temp config. +generate_yaml() { + local out="$1" payload="$2" batch="$3" + case "$BACKEND" in + dpdk) + sed -E \ + -e "s|^( *payload_size: ).*|\1$payload|" \ + -e "s|^( *batch_size: ).*|\1$batch|" \ + -e "s|<00:00:00:00:00:00>|$ETH_DST_ADDR|g" \ + "$BASE_YAML" > "$out" + ;; + rdma) + sed -E "s|^( *message_size: ).*|\1$payload|g" "$BASE_YAML" > "$out" + ;; + socket-udp|socket-tcp) + sed -E "s|^( *message_size: ).*|\1$payload|g" "$BASE_YAML" > "$out" + ;; + esac +} + +# Run one cell. Echoes the CSV row to stdout. +run_cell() { + local lang="$1" payload="$2" batch="$3" target_gbps="$4" + local cell="$lang-$BACKEND-p$payload-b$batch-g$target_gbps" + local cell_dir="$OUT_DIR/$cell" + mkdir -p "$cell_dir" + + local yaml="$cell_dir/config.yaml" + generate_yaml "$yaml" "$payload" "$batch" + + # Snapshot kernel-side drop counters. + local udp_before tcp_before + udp_before="$(snapshot_proc_net_udp)" + tcp_before="$(snapshot_nstat)" + + # Snapshot per-cpu stats just before the bench starts. + snapshot_cpu_stat "$cell_dir/cpu_stat.before" + + # Background GPU dmon (1-sec sample, RUN_SECONDS samples). + ( nvidia-smi dmon -s pucvmet -c "$RUN_SECONDS" > "$cell_dir/nvidia_smi_dmon.txt" 2>&1 ) & + local dmon_pid=$! + + # Run the bench. Stderr captures DAQIRI_LOG_* output (DPDK/RDMA drop sources). + local stdout="$cell_dir/stdout.txt" + local stderr="$cell_dir/stderr.txt" + local args=("$yaml" --seconds "$RUN_SECONDS") + [[ "$target_gbps" != "0" ]] && args+=(--target-gbps "$target_gbps") + [[ "$BACKEND" == "rdma" || "$BACKEND" =~ ^socket- ]] && args+=(--mode both) + + "$BENCH_BIN" "${args[@]}" > "$stdout" 2> "$stderr" || true + cp "$stderr" "$DRIVER_LOG" + + # Snapshot per-cpu stats right after the bench exits (before background + # captures finish reaping, to bound the window). + snapshot_cpu_stat "$cell_dir/cpu_stat.after" + + # Stop background captures (they self-terminate at -c , but reap if needed). + wait "$dmon_pid" 2>/dev/null || true + + # Parse bench stdout. For RX-bearing benches "RX complete" is authoritative; + # for TX-only configs fall back to "TX complete". + local pkts bytes secs + pkts="$(extract_field 'RX complete' packets "$stdout")" + bytes="$(extract_field 'RX complete' bytes "$stdout")" + secs="$(extract_field 'RX complete' seconds "$stdout")" + if [[ -z "$pkts" ]]; then + pkts="$(extract_field 'TX complete' packets "$stdout")" + bytes="$(extract_field 'TX complete' bytes "$stdout")" + secs="$(extract_field 'TX complete' seconds "$stdout")" + fi + if [[ -z "$pkts" ]]; then + case "$BACKEND" in + rdma) + # RDMA prints "Client/Server complete: ... send_completions=N send_bytes=N seconds=S" + pkts="$(extract_field 'Client complete' send_completions "$stdout")" + bytes="$(extract_field 'Client complete' send_bytes "$stdout")" + secs="$(extract_field 'Client complete' seconds "$stdout")" + ;; + socket-udp|socket-tcp) + # socket_bench prints "Client/Server complete: ... sent_packets=N sent_bytes=N + # recv_packets=N recv_bytes=N seconds=S". Use the TX-side counters for parity + # with how the RDMA fallback reports throughput. + pkts="$(extract_field 'Client complete' sent_packets "$stdout")" + bytes="$(extract_field 'Client complete' sent_bytes "$stdout")" + secs="$(extract_field 'Client complete' seconds "$stdout")" + ;; + esac + fi + pkts="${pkts:-0}"; bytes="${bytes:-0}"; secs="${secs:-0}" + + local pps gbps + pps="$(awk -v p="$pkts" -v s="$secs" 'BEGIN { if (s+0>0) printf "%.0f", p/s; else print 0 }')" + gbps="$(awk -v b="$bytes" -v s="$secs" 'BEGIN { if (s+0>0) printf "%.3f", (b*8.0)/s/1e9; else print 0 }')" + + # Drops per backend. + local drops drops_kind + case "$BACKEND" in + dpdk) + drops="$(parse_dpdk_drops "$stderr")" + drops_kind="dpdk-imissed+ierrors+nombuf" + ;; + rdma) + drops="$(parse_rdma_drops "$stderr")" + drops_kind="rdma-cqe-error" + ;; + socket-udp) + local udp_after; udp_after="$(snapshot_proc_net_udp)" + drops="$((udp_after - udp_before))" + drops_kind="udp-proc-net-udp-drops" + ;; + socket-tcp) + local tcp_after; tcp_after="$(snapshot_nstat)" + drops="$((tcp_after - tcp_before))" + drops_kind="tcp-nstat-retrans+inerrs" + ;; + esac + + # Per-core CPU busy% over the bench window. Cores defined per-backend + # (master/TX/RX) match the YAML so we measure the threads we actually pin. + local cpu_master_pct cpu_tx_pct cpu_rx_pct + cpu_master_pct="$(cpu_busy_pct "$cell_dir/cpu_stat.before" "$cell_dir/cpu_stat.after" "$CPU_MASTER")" + cpu_tx_pct="$(cpu_busy_pct "$cell_dir/cpu_stat.before" "$cell_dir/cpu_stat.after" "$CPU_TX")" + cpu_rx_pct="$(cpu_busy_pct "$cell_dir/cpu_stat.before" "$cell_dir/cpu_stat.after" "$CPU_RX")" + + # GPU SM% (column 5) and memory-controller % (column 6) from nvidia-smi + # dmon -s pucvmet. These are near zero for GPUDirect workloads (GPU is a + # DMA target, not a compute engine). + local gpu_sm gpu_mem + gpu_sm="$(awk '/^ *[0-9]/ { count++; sum += $5 } END { if (count) printf "%.1f", sum/count; else print 0 }' \ + "$cell_dir/nvidia_smi_dmon.txt" 2>/dev/null || echo 0)" + gpu_mem="$(awk '/^ *[0-9]/ { count++; sum += $6 } END { if (count) printf "%.1f", sum/count; else print 0 }' \ + "$cell_dir/nvidia_smi_dmon.txt" 2>/dev/null || echo 0)" + + echo "$lang,$BACKEND,none,$payload,$batch,$target_gbps,$secs,$pkts,$bytes,$pps,$gbps,$drops,$drops_kind,$cpu_master_pct,$cpu_tx_pct,$cpu_rx_pct,$gpu_sm,$gpu_mem" \ + | tee -a "$CSV" +} + +# -------------------------------------------------------------------------- +# Driver +# -------------------------------------------------------------------------- + +case "$MODE" in + smoke) + # One cell, native-shape, unpaced. + for p in "${PAYLOADS_HEADLINE[@]}"; do + for b in "${BATCHES_HEADLINE[@]}"; do + run_cell cpp "$p" "$b" 0 + done + done + ;; + sweep) + # Full payload × batch matrix at line rate. + for p in "${PAYLOADS_SWEEP[@]}"; do + for b in "${BATCHES_SWEEP[@]}"; do + run_cell cpp "$p" "$b" 0 + done + done + ;; + drop-curve) + # Hold native-shape constant, sweep target_gbps. + for p in "${PAYLOADS_HEADLINE[@]}"; do + for b in "${BATCHES_HEADLINE[@]}"; do + for g in "${DROP_CURVE_TARGETS[@]}"; do + run_cell cpp "$p" "$b" "$g" + done + done + done + ;; + drop-curve-matrix) + # 2D drop curve: sweep payload × target_gbps at the headline batch. + for p in "${PAYLOADS_SWEEP[@]}"; do + for b in "${BATCHES_HEADLINE[@]}"; do + for g in "${DROP_CURVE_TARGETS[@]}"; do + run_cell cpp "$p" "$b" "$g" + done + done + done + ;; + *) echo "Unknown mode: $MODE" >&2; exit 1 ;; +esac + +echo +echo "Results in: $OUT_DIR" +echo "CSV: $CSV" diff --git a/scripts/setup_spark_rdma_loopback.sh b/scripts/setup_spark_rdma_loopback.sh new file mode 100755 index 0000000..1512332 --- /dev/null +++ b/scripts/setup_spark_rdma_loopback.sh @@ -0,0 +1,53 @@ +#!/bin/bash +# Host network config for the DGX Spark RDMA loopback bench. +# Adapted from a colleague's single-adapter script for this host's +# inter-adapter loopback: Adapter1:port0 (1.1.1.1) <-> Adapter2:port0 (2.2.2.2). +# +# Matches examples/daqiri_bench_rdma_tx_rx_spark.yaml (1.1.1.1 / 2.2.2.2). +# Re-running is safe: replaces addresses, flushes per-port tables, and +# deletes any matching rules before re-adding. + +set -euo pipefail + +p0="enp1s0f0np0" # Adapter1 port 0, PCI 0000:01:00.0, MAC 4c:bb:47:7c:f2:d8 +p1="enP2p1s0f0np0" # Adapter2 port 0, PCI 0002:01:00.0, MAC 4c:bb:47:7c:f2:dc + +p0_mac="4c:bb:47:7c:f2:d8" +p1_mac="4c:bb:47:7c:f2:dc" + +p0_ip="1.1.1.1" +p1_ip="2.2.2.2" + +# Numeric table IDs avoid having to edit /etc/iproute2/rt_tables. +table_p0=100 +table_p1=101 + +ip link set "${p0}" up +ip link set "${p1}" up + +ip addr replace "${p0_ip}/24" dev "${p0}" +ip addr replace "${p1_ip}/24" dev "${p1}" + +ip route flush table "${table_p0}" 2>/dev/null || true +ip route flush table "${table_p1}" 2>/dev/null || true +ip route add table "${table_p0}" default dev "${p0}" +ip route add table "${table_p1}" default dev "${p1}" + +ip rule del from "${p0_ip}/32" table "${table_p0}" 2>/dev/null || true +ip rule del to "${p0_ip}/32" table "${table_p0}" 2>/dev/null || true +ip rule del from "${p1_ip}/32" table "${table_p1}" 2>/dev/null || true +ip rule del to "${p1_ip}/32" table "${table_p1}" 2>/dev/null || true + +ip rule add from "${p0_ip}/32" table "${table_p0}" +ip rule add to "${p0_ip}/32" table "${table_p0}" +ip rule add from "${p1_ip}/32" table "${table_p1}" +ip rule add to "${p1_ip}/32" table "${table_p1}" + +arp -i "${p0}" -s "${p0_ip}" "${p0_mac}" +arp -i "${p0}" -s "${p1_ip}" "${p1_mac}" +arp -i "${p1}" -s "${p0_ip}" "${p0_mac}" +arp -i "${p1}" -s "${p1_ip}" "${p1_mac}" + +echo "RDMA loopback config applied." +echo " ${p0} (${p0_mac}) -> ${p0_ip}/24, table ${table_p0}" +echo " ${p1} (${p1_mac}) -> ${p1_ip}/24, table ${table_p1}" diff --git a/scripts/spark_data_fill.sh b/scripts/spark_data_fill.sh new file mode 100755 index 0000000..0e4f81b --- /dev/null +++ b/scripts/spark_data_fill.sh @@ -0,0 +1,143 @@ +#!/usr/bin/env bash +# Drives the PR 1 data-fill bench runs for the DGX Spark performance report. +# +# Runs DPDK GPUDirect, socket-UDP, and socket-TCP through their sweep and +# drop-curve modes via examples/run_spark_bench.sh, with pre-flight checks +# and orphan-hugepage cleanup. RDMA is deferred from PR 1 (single-host +# loopback over the cable needs a netns+two-process refactor; tracked +# separately). +# +# Run inside the project container (privileged, --gpus all, /dev/hugepages +# and /mnt/huge mounted, repo at /workspace). +# +# Usage: +# ./scripts/spark_data_fill.sh # all three backends +# ./scripts/spark_data_fill.sh dpdk # just DPDK +# ./scripts/spark_data_fill.sh socket-udp socket-tcp +# +# Env overrides: +# ETH_DST_ADDR — RX-side MAC. Auto-detected from +# /sys/class/net/enP2p1s0f0np0/address if unset. +# RX_IFACE — RX netdev name (default enP2p1s0f0np0). +# DAQIRI_BUILD_DIR — defaults to ./build. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +WRAPPER="$REPO_ROOT/examples/run_spark_bench.sh" +BUILD_DIR="${DAQIRI_BUILD_DIR:-$REPO_ROOT/build}" +RX_IFACE="${RX_IFACE:-enP2p1s0f0np0}" + +BACKENDS=("$@") +[[ ${#BACKENDS[@]} -eq 0 ]] && BACKENDS=(dpdk socket-udp socket-tcp) + +# --- pre-flight ------------------------------------------------------------ + +preflight_fail() { echo "PREFLIGHT FAIL: $*" >&2; exit 1; } +note() { echo "[$(date -u +%H:%M:%SZ)] $*"; } + +[[ -x "$WRAPPER" ]] || preflight_fail "wrapper missing or not executable: $WRAPPER" + +for be in "${BACKENDS[@]}"; do + case "$be" in + dpdk) bin="$BUILD_DIR/examples/daqiri_bench_raw_gpudirect" ;; + socket-udp|socket-tcp) bin="$BUILD_DIR/examples/daqiri_bench_socket" ;; + rdma) preflight_fail "RDMA is deferred from PR 1; see follow-up issue" ;; + *) preflight_fail "unknown backend: $be" ;; + esac + [[ -x "$bin" ]] || preflight_fail "missing bench binary: $bin (run cmake --build first)" +done + +# DPDK-only checks. +if [[ " ${BACKENDS[*]} " == *" dpdk "* ]]; then + free_hp="$(awk '/^HugePages_Free:/ { print $2 }' /proc/meminfo)" + [[ "${free_hp:-0}" -ge 4 ]] || preflight_fail "HugePages_Free=$free_hp (need >=4); clean /mnt/huge and /dev/hugepages from prior runs" + + if [[ -z "${ETH_DST_ADDR:-}" ]]; then + mac_path="/sys/class/net/$RX_IFACE/address" + [[ -r "$mac_path" ]] || preflight_fail "cannot read $mac_path; set ETH_DST_ADDR explicitly" + ETH_DST_ADDR="$(cat "$mac_path")" + export ETH_DST_ADDR + note "ETH_DST_ADDR auto-detected from $RX_IFACE: $ETH_DST_ADDR" + fi + + carrier="$(cat "/sys/class/net/$RX_IFACE/carrier" 2>/dev/null || echo 0)" + [[ "$carrier" == "1" ]] || preflight_fail "RX iface $RX_IFACE has no carrier (cable unplugged or link down)" +fi + +note "Pre-flight OK. Backends: ${BACKENDS[*]}" +note "Build dir: $BUILD_DIR" +note "Repo root: $REPO_ROOT" + +# --- hugepage cleanup helper ---------------------------------------------- + +# DPDK leaves orphan rtemap_* files when a bench aborts. Clean between runs so +# we don't run out of hugepages mid-sweep. +clean_orphan_hugepages() { + local pre post freed + pre="$(awk '/^HugePages_Free:/ { print $2 }' /proc/meminfo)" + : "${pre:=0}" + shopt -s nullglob + # DPDK uses a random per-process file prefix (override with --file-prefix); + # match anything ending in `map_` to catch the common shape without + # nuking unrelated files. Skip any that are still held by a live process. + for f in /dev/hugepages/*map_[0-9]* /mnt/huge/*map_[0-9]*; do + if ! fuser -- "$f" >/dev/null 2>&1; then + rm -f -- "$f" 2>/dev/null || true + fi + done + shopt -u nullglob + post="$(awk '/^HugePages_Free:/ { print $2 }' /proc/meminfo)" + : "${post:=0}" + freed=$((post - pre)) + if [[ "$freed" -gt 0 ]]; then + note "Freed $freed orphan hugepages (now ${post} free)" + fi + return 0 +} + +# --- driver loop ----------------------------------------------------------- + +declare -a RESULT_DIRS + +run_backend_mode() { + local backend="$1" mode="$2" + note "=== Running: $backend $mode ===" + clean_orphan_hugepages + + # Stream wrapper output live (per-cell CSV rows appear as they finish) while + # also keeping a log for post-run parsing of the "Results in:" line. + local log="/tmp/spark_data_fill.$backend.$mode.log" + local rc=0 + "$WRAPPER" "$backend" "$mode" 2>&1 | tee "$log" || rc=$? + rc="${PIPESTATUS[0]:-$rc}" + + if [[ "$rc" -eq 0 ]]; then + local result_dir + result_dir="$(awk '/^Results in:/ { print $3 }' "$log" | tail -n1)" + [[ -n "$result_dir" ]] && RESULT_DIRS+=("$backend/$mode -> $result_dir") + note "$backend $mode complete" + else + note "$backend $mode FAILED (exit $rc); continuing" + tail -n 40 "$log" >&2 + fi + clean_orphan_hugepages +} + +for be in "${BACKENDS[@]}"; do + run_backend_mode "$be" sweep + run_backend_mode "$be" drop-curve +done + +# --- summary --------------------------------------------------------------- + +echo +echo "==========================================" +echo "Data-fill complete. Result directories:" +echo "==========================================" +for r in "${RESULT_DIRS[@]}"; do + echo " $r" +done +echo +echo "Next: aggregate CSVs and fill docs/performance-dgx-spark.md." From 94a7d2efe294c8e8ccfa91ee5afd1d59841f0719 Mon Sep 17 00:00:00 2001 From: Cliff Burdick <30670611+cliffburdick@users.noreply.github.com> Date: Wed, 3 Jun 2026 08:19:53 -0700 Subject: [PATCH 2/2] Updates to RDMA bench profiling (#103) * #15 - Add RDMA benchmark profiling knobs Instrument the RDMA benchmark and manager hot paths so SEND/RECV posting, CQ polling, ring traffic, and app refill behavior can be measured while debugging the perftest gap. Add RDMA tuning hooks for relaxed ordering, SEND signaling cadence, queue depth, SPSC rings, and RDMA buffer alignment, plus a server connection readiness guard. Signed-off-by: Cliff Burdick * #15 - Remove RDMA benchmark debug instrumentation Drop benchmark-side timing/profile output and remove RDMA manager profiling timers while keeping the non-debug RDMA transport tuning changes. Signed-off-by: Cliff Burdick * #15 - Avoid RDMA benchmark buffer starvation deadlock Treat TX buffer exhaustion as backpressure in the RDMA benchmark so the worker returns to completion draining before retrying. Also clean up RDMA TX packet burst allocations on partial failure. Signed-off-by: Cliff Burdick * #15 - Address Greptile RDMA review comments Fix RDMA completion enqueue failure cleanup to avoid returning metadata twice, and document the single-producer/single-consumer invariant for RDMA connection rings. Signed-off-by: Cliff Burdick * #15 - Retire RDMA send completions on CQ errors Sweep outstanding SEND work requests through an errored CQE so selective signaling cannot orphan unsignaled bursts on TX errors. Signed-off-by: Cliff Burdick --------- Signed-off-by: Cliff Burdick --- examples/rdma_bench.cpp | 61 +++++---- src/managers/rdma/daqiri_rdma_mgr.cpp | 173 ++++++++++++++++++-------- src/managers/rdma/daqiri_rdma_mgr.h | 4 +- 3 files changed, 163 insertions(+), 75 deletions(-) diff --git a/examples/rdma_bench.cpp b/examples/rdma_bench.cpp index 41c7b3f..929b033 100644 --- a/examples/rdma_bench.cpp +++ b/examples/rdma_bench.cpp @@ -67,12 +67,10 @@ RdmaBenchConfig parse_rdma_cfg(const YAML::Node& node) { void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pacer, std::atomic& stop, RdmaWorkerStats& stats) { - // Matches the per-MR num_bufs in the YAML configs. Higher values deadlock - // the bench: post_req blocks in get_tx_packet_burst when the pool is empty, - // but free_tx_burst (which refills it) only runs later in the same loop - // iteration via get_rx_burst. Until the loop is refactored to interleave - // drain with post, this constant must stay <= num_bufs. - static constexpr int kMaxOutstanding = 20; + // Application credit limit. Buffer exhaustion is handled as backpressure so + // this can be larger than a memory region's num_bufs: post_req returns to the + // outer loop, which drains completions and releases buffers before retrying. + static constexpr int kMaxOutstanding = 64; int outstanding_send = 0; int outstanding_recv = 0; uint64_t send_wr_id = 0x1234; @@ -97,31 +95,28 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa } auto post_req = [&](int& outstanding, uint64_t& wr_id, daqiri::RDMAOpCode op, - const std::string& mr_name) { - if (outstanding >= kMaxOutstanding) { return; } + const std::string& mr_name) -> bool { + if (outstanding >= kMaxOutstanding) { return false; } auto* msg = daqiri::create_burst_params(); if (daqiri::rdma_set_header(msg, op, conn_id, cfg.server, 1, wr_id, mr_name) != daqiri::Status::SUCCESS) { - daqiri::free_tx_burst(msg); - return; + daqiri::free_tx_metadata(msg); + return false; } - while (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS && !stop.load()) { - std::this_thread::sleep_for(std::chrono::microseconds(50)); - } - if (stop.load()) { - daqiri::free_tx_burst(msg); - return; + if (daqiri::get_tx_packet_burst(msg) != daqiri::Status::SUCCESS) { + daqiri::free_tx_metadata(msg); + return false; } if (daqiri::set_packet_lengths(msg, 0, {cfg.message_size}) != daqiri::Status::SUCCESS) { daqiri::free_tx_burst(msg); - return; + return false; } if (daqiri::send_tx_burst(msg) != daqiri::Status::SUCCESS) { daqiri::free_tx_burst(msg); - return; + return false; } outstanding++; wr_id++; @@ -129,14 +124,16 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa if (op == daqiri::RDMAOpCode::SEND) { pacer.wait_for_bytes(static_cast(cfg.message_size), stop); } + return true; }; - if (cfg.send) { post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr); } - if (cfg.receive) { post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr); } + bool got_completion = false; + while (true) { + daqiri::BurstParams* completion = nullptr; + const auto completion_status = daqiri::get_rx_burst(&completion, conn_id, cfg.server); + if (completion_status != daqiri::Status::SUCCESS || completion == nullptr) { break; } - daqiri::BurstParams* completion = nullptr; - if (daqiri::get_rx_burst(&completion, conn_id, cfg.server) == daqiri::Status::SUCCESS && - completion != nullptr) { + got_completion = true; if (daqiri::rdma_get_opcode(completion) == daqiri::RDMAOpCode::SEND && outstanding_send > 0) { outstanding_send--; stats.send_completions++; @@ -148,7 +145,23 @@ void rdma_worker(const RdmaBenchConfig& cfg, daqiri::bench::TokenBucketPacer& pa stats.recv_bytes += static_cast(cfg.message_size); } daqiri::free_tx_burst(completion); - } else { + } + + bool posted_work = false; + if (cfg.receive) { + while (outstanding_recv < kMaxOutstanding && + post_req(outstanding_recv, recv_wr_id, daqiri::RDMAOpCode::RECEIVE, recv_mr)) { + posted_work = true; + } + } + if (cfg.send) { + while (outstanding_send < kMaxOutstanding && + post_req(outstanding_send, send_wr_id, daqiri::RDMAOpCode::SEND, send_mr)) { + posted_work = true; + } + } + + if (!got_completion && !posted_work) { std::this_thread::sleep_for(std::chrono::microseconds(100)); } } diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index d5bb676..7b5160f 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -20,6 +20,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -154,6 +157,7 @@ int RdmaMgr::mr_access_to_ibv(uint32_t access) { if (access & MEM_ACCESS_LOCAL) { ibv_access |= IBV_ACCESS_LOCAL_WRITE; } if (access & MEM_ACCESS_RDMA_READ) { ibv_access |= IBV_ACCESS_REMOTE_READ; } if (access & MEM_ACCESS_RDMA_WRITE) { ibv_access |= IBV_ACCESS_REMOTE_WRITE; } + ibv_access |= IBV_ACCESS_RELAXED_ORDERING; return ibv_access; } @@ -169,6 +173,17 @@ int RdmaMgr::rdma_register_mr(const MemoryRegionConfig& mr, void* ptr) { int access = mr_access_to_ibv(mr.access_); params.ctx_mr_map_[pd.second] = ibv_reg_mr(pd.second, ptr, mr.adj_size_ * mr.num_bufs_, access); + if (params.ctx_mr_map_[pd.second] == nullptr && + (access & IBV_ACCESS_RELAXED_ORDERING) != 0) { + const int fallback_access = access & ~IBV_ACCESS_RELAXED_ORDERING; + DAQIRI_LOG_WARN( + "Failed to register MR {} with relaxed ordering on PD {}; retrying without it", + mr.name_, + (void*)pd.second); + params.ctx_mr_map_[pd.second] = + ibv_reg_mr(pd.second, ptr, mr.adj_size_ * mr.num_bufs_, fallback_access); + access = fallback_access; + } if (params.ctx_mr_map_[pd.second] == nullptr) { DAQIRI_LOG_CRITICAL("Failed to register MR {} on PD {}", mr.name_, (void*)pd.second); return -1; @@ -181,7 +196,7 @@ int RdmaMgr::rdma_register_mr(const MemoryRegionConfig& mr, void* ptr) { (void*)ptr, (void*)((uint8_t*)ptr + mr.adj_size_ * mr.num_bufs_), params.ctx_mr_map_[pd.second]->lkey, - mr.access_); + access); } } } @@ -383,11 +398,17 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { struct ibv_wc wc; int num_comp; BurstParams* msg; + int send_signal_every = 1; + if (const char* env = std::getenv("DAQIRI_RDMA_SEND_SIGNAL_EVERY")) { + const long parsed = strtol(env, nullptr, 10); + if (parsed > 0) { send_signal_every = static_cast(parsed); } + } + uint64_t sends_since_signal = 0; const auto& qref = cfg_.ifs_[tparams->if_idx].tx_.queues_[tparams->queue_idx]; const long cpu_core = strtol(qref.common_.cpu_core_.c_str(), NULL, 10); struct rte_ring* tx_ring = tparams->qp_params.tx_ring; struct rte_ring* rx_ring = tparams->qp_params.rx_ring; - std::unordered_map outstanding_send_wr_ids; + std::map outstanding_send_wr_ids; std::unordered_map outstanding_receive_wr_ids; if (set_affinity(cpu_core) != 0) { @@ -401,7 +422,9 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { // periodically poll the CQ. while (!rdma_force_quit.load()) { // Check RQ first to reduce latency - while ((num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc)) != 0) { + while (true) { + num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc); + if (num_comp == 0) { break; } DAQIRI_LOG_DEBUG("GOT RX COMPLETION in thread {} core {} wrid {}", (void*)tparams->client_id, cpu_core, @@ -436,6 +459,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { } else { msg = create_burst_params(); } + const bool msg_owns_packets = wc.opcode == IBV_WC_RECV; // Only populate a header to indicate which burst needs to be freed // msg->transport_hdr.opcode = ibv_opcode_to_daqiri_opcode(wc.opcode); @@ -448,14 +472,57 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); - free_tx_burst(msg); - free_tx_metadata(msg); + if (msg_owns_packets) { + free_tx_burst(msg); + } else { + free_tx_metadata(msg); + } return; } } + auto complete_send_wrs = [&](uint64_t wr_id, Status status) -> bool { + auto completed_end = outstanding_send_wr_ids.upper_bound(wr_id); + if (completed_end == outstanding_send_wr_ids.begin()) { + DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wr_id); + return true; + } + if (outstanding_send_wr_ids.find(wr_id) == outstanding_send_wr_ids.end()) { + DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wr_id); + } + + for (auto completed_it = outstanding_send_wr_ids.begin(); completed_it != completed_end;) { + msg = completed_it->second; + + const auto conn_id = get_connection_id(msg); + const auto expected_conn_id = reinterpret_cast(tparams->client_id); + if (conn_id != expected_conn_id) { + DAQIRI_LOG_CRITICAL("Wrong connection ID in send completion {}: {} != {}", + completed_it->first, + conn_id, + expected_conn_id); + } + + completed_it = outstanding_send_wr_ids.erase(completed_it); + + msg->transport_hdr.tx = true; + msg->transport_hdr.status = status; + msg->transport_hdr.server = is_server; + + if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { + DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); + free_tx_burst(msg); + return false; + } + } + + return true; + }; + // Check TX CQ for completion - while ((num_comp = ibv_poll_cq(tparams->qp_params.tx_cq, 1, &wc)) != 0) { + while (true) { + num_comp = ibv_poll_cq(tparams->qp_params.tx_cq, 1, &wc); + if (num_comp == 0) { break; } DAQIRI_LOG_DEBUG("GOT TX COMPLETION in thread {} core {} wrid {}", (void*)tparams->client_id, cpu_core, @@ -467,28 +534,16 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { (int)wc.status, (int64_t)wc.wr_id, (int)wc.opcode); + if (wc.opcode == IBV_WC_SEND && + !complete_send_wrs(wc.wr_id, Status::GENERIC_FAILURE)) { + return; + } continue; } if (wc.opcode == IBV_WC_SEND) { - auto it = outstanding_send_wr_ids.find(wc.wr_id); - if (it == outstanding_send_wr_ids.end()) { - DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wc.wr_id); - continue; - } - - msg = it->second; - - const auto conn_id = get_connection_id(msg); - const auto expected_conn_id = reinterpret_cast(tparams->client_id); - if (conn_id != expected_conn_id) { - DAQIRI_LOG_CRITICAL("Wrong connection ID in send completion {}: {} != {}", - wc.wr_id, - conn_id, - expected_conn_id); - } - - outstanding_send_wr_ids.erase(it); + if (!complete_send_wrs(wc.wr_id, Status::SUCCESS)) { return; } + continue; } else { msg = create_burst_params(); } @@ -504,7 +559,6 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); - free_tx_burst(msg); free_tx_metadata(msg); return; } @@ -515,9 +569,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { // ssize_t bytes = mq_receive(tparams.tx_mq, reinterpret_cast(&burst), sizeof(burst), // nullptr); - if (rte_ring_dequeue(tparams->qp_params.tx_ring, reinterpret_cast(&burst)) != 0) { - continue; - } + if (rte_ring_dequeue(tx_ring, reinterpret_cast(&burst)) != 0) { continue; } const auto local_mr = mrs_.find(std::string(burst->transport_hdr.local_mr_name)); if (local_mr == mrs_.end()) { @@ -559,7 +611,12 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { wr.sg_list = &sge; wr.num_sge = 1; wr.opcode = IBV_WR_SEND; - wr.send_flags = IBV_SEND_SIGNALED; + // Keep signaled completions frequent enough for callers that use SEND + // completions as their credit to refill the outstanding window. + const bool signal_send = send_signal_every == 1 || + sends_since_signal + 1 >= send_signal_every || + outstanding_send_wr_ids.size() >= 15; + wr.send_flags = signal_send ? IBV_SEND_SIGNALED : 0; int ret = ibv_post_send(tparams->client_id->qp, &wr, &bad_wr); if (ret != 0) { @@ -567,6 +624,11 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { free_tx_burst(burst); continue; } + if (signal_send) { + sends_since_signal = 0; + } else { + sends_since_signal++; + } outstanding_send_wr_ids[burst->transport_hdr.wr_id + p] = burst; } @@ -681,12 +743,20 @@ Status RdmaMgr::rdma_get_server_conn_id(const std::string& server_addr, uint16_t // Find the next queue ID that's not already in use for (size_t i = 0; i < server_params->second.size(); i++) { if (server_params->second[i].client_id != nullptr && !server_params->second[i].active) { - *conn_id = reinterpret_cast(server_params->second[i].client_id); + auto* client_id = server_params->second[i].client_id; + std::lock_guard lock(threads_mutex_); + if (worker_threads_.find(client_id) == worker_threads_.end() || + tx_rings_map_.find(client_id) == tx_rings_map_.end() || + rx_rings_map_.find(client_id) == rx_rings_map_.end()) { + continue; + } + + *conn_id = reinterpret_cast(client_id); server_params->second[i].active = true; DAQIRI_LOG_INFO("Found available queue ID for server {}:{} with cm_id {}", server_addr, server_port, - (void*)server_params->second[i].client_id); + (void*)client_id); return Status::SUCCESS; } } @@ -935,23 +1005,23 @@ Status RdmaMgr::get_tx_packet_burst(BurstParams* burst) { return Status::NO_FREE_BURST_BUFFERS; } - int rx = rte_ring_dequeue_bulk(burst_pool->second, - reinterpret_cast(burst->pkts[0]), - burst->transport_hdr.num_pkts, - nullptr); - if (rx != burst->transport_hdr.num_pkts) { - DAQIRI_LOG_ERROR("Asked for {} packets, got {}", burst->transport_hdr.num_pkts, rx); - rte_ring_enqueue_bulk(burst_pool->second, - reinterpret_cast(burst->pkts[0]), - burst->transport_hdr.num_pkts, - nullptr); + const unsigned num_pkts = static_cast(burst->transport_hdr.num_pkts); + const unsigned rx = + rte_ring_dequeue_bulk(burst_pool->second, reinterpret_cast(burst->pkts[0]), num_pkts, nullptr); + if (rx != num_pkts) { + DAQIRI_LOG_ERROR("Asked for {} packets, got {}", num_pkts, rx); + rte_mempool_put(tx_burst_pool_, reinterpret_cast(burst->pkts[0])); + burst->pkts[0] = nullptr; return Status::NO_FREE_BURST_BUFFERS; } // Allocate packet length buffer if (rte_mempool_get(pkt_len_pool_, reinterpret_cast(&burst->pkt_lens[0])) != 0) { DAQIRI_LOG_ERROR("Failed to get packet length buffer"); + rte_ring_enqueue_bulk( + burst_pool->second, reinterpret_cast(burst->pkts[0]), num_pkts, nullptr); rte_mempool_put(tx_burst_pool_, reinterpret_cast(burst->pkts[0])); + burst->pkts[0] = nullptr; return Status::NO_FREE_PACKET_BUFFERS; } @@ -1245,11 +1315,15 @@ int RdmaMgr::setup_pools_and_rings() { // RX rings DAQIRI_LOG_INFO("Setting up TX/RX per-queue rings"); + // Each connection ring is single-producer/single-consumer by design: one + // rdma_thread owns the manager side and one application thread owns the API + // side for a given conn_id. If multi-threaded app access per conn_id is ever + // allowed, these rings must go back to MP/MC-safe flags. for (int i = 0; i < MAX_RDMA_CONNECTIONS; i++) { std::string ring_name = "RX_RING_" + std::to_string(i); DAQIRI_LOG_DEBUG("Setting up RX ring {}", ring_name); - struct rte_ring* ring = rte_ring_create( - ring_name.c_str(), 2048, rte_socket_id(), RING_F_MC_RTS_DEQ | RING_F_MP_RTS_ENQ); + struct rte_ring* ring = + rte_ring_create(ring_name.c_str(), 2048, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); if (ring == nullptr) { DAQIRI_LOG_CRITICAL("Failed to allocate RX ring {}!", ring_name); return -1; @@ -1259,7 +1333,7 @@ int RdmaMgr::setup_pools_and_rings() { ring_name = "TX_RING_" + std::to_string(i); DAQIRI_LOG_DEBUG("Setting up TX ring {}", ring_name); ring = rte_ring_create( - ring_name.c_str(), 2048, rte_socket_id(), RING_F_MC_RTS_DEQ | RING_F_MP_RTS_ENQ); + ring_name.c_str(), 2048, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); if (ring == nullptr) { DAQIRI_LOG_CRITICAL("Failed to allocate TX ring {}!", ring_name); return -1; @@ -1270,7 +1344,7 @@ int RdmaMgr::setup_pools_and_rings() { // Packet length buffers DAQIRI_LOG_DEBUG("Setting up RX meta pool"); pkt_len_pool_ = rte_mempool_create("PKT_LEN_POOL", - (1U << 7) - 1U, + (1U << 11) - 1U, sizeof(uint32_t) * MAX_RDMA_BATCH, 0, 0, @@ -1287,7 +1361,7 @@ int RdmaMgr::setup_pools_and_rings() { DAQIRI_LOG_DEBUG("Setting up RX meta pool"); rx_meta = rte_mempool_create("RX_META_POOL", - (1U << 6) - 1U, + (1U << 11) - 1U, sizeof(BurstParams), 0, 0, @@ -1304,7 +1378,7 @@ int RdmaMgr::setup_pools_and_rings() { DAQIRI_LOG_DEBUG("Setting up TX meta pool"); tx_meta = rte_mempool_create("TX_META_POOL", - (1U << 6) - 1U, + (1U << 11) - 1U, sizeof(BurstParams), 0, 0, @@ -1320,7 +1394,7 @@ int RdmaMgr::setup_pools_and_rings() { } tx_burst_pool_ = rte_mempool_create("TX_BURST_POOL", - (1U << 7) - 1U, + (1U << 11) - 1U, sizeof(void*) * MAX_RDMA_BATCH, 0, 0, @@ -1484,7 +1558,8 @@ void RdmaMgr::initialize() { // Set up memory region sizes for (auto& mr : cfg_.mrs_) { - mr.second.adj_size_ = RTE_ALIGN_CEIL(mr.second.buf_size_, get_alignment(mr.second.kind_)); + const size_t rdma_alignment = std::max(get_alignment(mr.second.kind_), GPU_PAGE_SIZE); + mr.second.adj_size_ = RTE_ALIGN_CEIL(mr.second.buf_size_, rdma_alignment); } for (const auto& intf : cfg_.ifs_) { diff --git a/src/managers/rdma/daqiri_rdma_mgr.h b/src/managers/rdma/daqiri_rdma_mgr.h index c3499e1..cd12dfd 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.h +++ b/src/managers/rdma/daqiri_rdma_mgr.h @@ -162,11 +162,11 @@ class RdmaMgr : public Manager { private: static constexpr int MAX_RDMA_CONNECTIONS = 128; - static constexpr int MAX_CQ = 16; + static constexpr int MAX_CQ = 1024; static constexpr int NUM_SGE_ELS = 1024; static constexpr int NUM_SGE_BUFS = 256; static constexpr int MAX_NUM_MR = 16; // Maximum number of memory registers to exchange - static constexpr int MAX_OUSTANDING_WR = 64; + static constexpr int MAX_OUSTANDING_WR = 1024; static constexpr int MAX_NUM_PORTS = 4; static constexpr int MAX_RDMA_BATCH = 1024;