From 3854d9d62a52f453c5635250f07277092f83f6ed Mon Sep 17 00:00:00 2001 From: Chloe Crozier Date: Tue, 2 Jun 2026 16:12:11 -0700 Subject: [PATCH 1/2] #113 - Fix RDMA cross-host SEND timeout and shutdown deadlock Signed-off-by: Chloe Crozier --- docs/tutorials/benchmarking_examples.md | 28 +++++ docs/tutorials/configuration-walkthrough.md | 2 + examples/daqiri_bench_raw_rx_spark_xhost.yaml | 54 +++++++++ examples/daqiri_bench_raw_tx_spark_xhost.yaml | 58 +++++++++ .../daqiri_bench_rdma_tx_rx_spark_xhost.yaml | 112 ++++++++++++++++++ src/managers/rdma/daqiri_rdma_mgr.cpp | 37 ++++-- src/managers/rdma/daqiri_rdma_mgr.h | 4 +- 7 files changed, 285 insertions(+), 10 deletions(-) create mode 100644 examples/daqiri_bench_raw_rx_spark_xhost.yaml create mode 100644 examples/daqiri_bench_raw_tx_spark_xhost.yaml create mode 100644 examples/daqiri_bench_rdma_tx_rx_spark_xhost.yaml diff --git a/docs/tutorials/benchmarking_examples.md b/docs/tutorials/benchmarking_examples.md index 8e861c8..f042b40 100644 --- a/docs/tutorials/benchmarking_examples.md +++ b/docs/tutorials/benchmarking_examples.md @@ -50,6 +50,34 @@ docker run --rm -it --privileged \ - [`daqiri_bench_raw_tx_rx_spark.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_tx_rx_spark.yaml) for `daqiri_bench_raw_gpudirect` — still set `eth_dst_addr` to the RX MAC. The rx_port is `0002:01:00.1` (physical port p1), so read its MAC: `cat /sys/class/net/enP2p1s0f1np1/address`. This p0-to-p1 pairing is intentional for an over-the-wire single-machine loopback; using two PFs that map to the same physical port exercises the on-chip eswitch path instead. - [`daqiri_bench_rdma_tx_rx_spark.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_rdma_tx_rx_spark.yaml) for `daqiri_bench_rdma` — no further edits needed. +#### Cross-host two-DGX-Spark loopback + +If you have two DGX Sparks cross-cabled p0↔p0 instead of a chassis QSFP loop on one machine, use the `_xhost` configs. Each host runs only its own role, so the YAML on each side configures one port instead of two. Both hosts must already be set up per the [DGX Spark profile](system_configuration.md#dgx-spark-profile), with one adjustment: the `daqiri-tx` (`1.1.1.1/24`) and `daqiri-rx` (`2.2.2.2/24`) nmcli profiles are *split across* the two hosts — bring up `daqiri-tx` on the TX host's p0 and `daqiri-rx` on the RX host's p0, instead of both on one box. + +**Raw GPUDirect.** Start the RX side first so the flow rule is installed before any traffic arrives: + +```bash +# RX host +sudo ./daqiri_bench_raw_gpudirect daqiri_bench_raw_rx_spark_xhost.yaml --seconds 30 + +# TX host (set eth_dst_addr to the RX host p0's MAC first: cat /sys/class/net/enp1s0f0np0/address on the RX host) +sudo ./daqiri_bench_raw_gpudirect daqiri_bench_raw_tx_spark_xhost.yaml --seconds 30 +``` + +Verify both sides report non-zero packet counts and no `NO_FREE_BURST_BUFFERS` / `NO_FREE_PACKET_BUFFERS` errors. + +**RDMA.** Start the server side first: + +```bash +# RX (server) host +sudo ./daqiri_bench_rdma daqiri_bench_rdma_tx_rx_spark_xhost.yaml --mode server --seconds 30 + +# TX (client) host +sudo ./daqiri_bench_rdma daqiri_bench_rdma_tx_rx_spark_xhost.yaml --mode client --seconds 30 +``` + +Verify both sides report non-zero send/receive completions. The server-side `Couldn't find server params for address …` log line that may appear once between the listener-create log and the "RDMA server successfully started" log is a benign startup race (the application thread polls for the listener before the CM thread finishes inserting it); subsequent lookups succeed. + The benchmark executables and example YAML configurations are located at: | | Binaries | YAML configs | diff --git a/docs/tutorials/configuration-walkthrough.md b/docs/tutorials/configuration-walkthrough.md index 5357691..6002b79 100644 --- a/docs/tutorials/configuration-walkthrough.md +++ b/docs/tutorials/configuration-walkthrough.md @@ -22,12 +22,14 @@ With a backend in mind, read down the questions below and stop at the first one - **Generic discrete GPU** (template — replace ``) — [`daqiri_bench_raw_tx_rx.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_tx_rx.yaml). This is the file annotated line-by-line in the [walkthrough below](#annotated-walkthrough). - **Four queue closed-loop TX+RX** (template — replace ``) — [`daqiri_bench_raw_tx_rx_4q.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_tx_rx_4q.yaml). Uses one application worker per TX/RX queue, with each `bench_tx` entry sending a different UDP flow. - **DGX Spark / GB10** (prefilled) — [`daqiri_bench_raw_tx_rx_spark.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_tx_rx_spark.yaml). `kind: host_pinned` for the integrated GPU; cores, PCIe addresses, and IPs are prefilled. See the [Spark profile callout](benchmarking_examples.md#update-the-loopback-configuration) for run details. + - **DGX Spark cross-host** (prefilled, runs on two Sparks) — [`daqiri_bench_raw_tx_spark_xhost.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_tx_spark_xhost.yaml) on the TX host and [`daqiri_bench_raw_rx_spark_xhost.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_rx_spark_xhost.yaml) on the RX host. Each host runs `daqiri_bench_raw_gpudirect` against its own half; cables connect p0↔p0 between the two boxes. See the [Cross-host two-DGX-Spark loopback](benchmarking_examples.md#cross-host-two-dgx-spark-loopback) section for run details. - **No physical NIC available** — [`daqiri_bench_raw_sw_loopback.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_sw_loopback.yaml). `loopback: "sw"`, no NIC required. Useful for first-time build verification, not representative of production performance. **RDMA / RoCE** — runs on `daqiri_bench_rdma` (use `--mode {tx,rx,both}`). Configs use `kind: host_pinned` regardless of platform. - **Generic** (template — replace IPs) — [`daqiri_bench_rdma_tx_rx.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_rdma_tx_rx.yaml). - **DGX Spark** (prefilled) — [`daqiri_bench_rdma_tx_rx_spark.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_rdma_tx_rx_spark.yaml). See the [Spark profile callout](benchmarking_examples.md#update-the-loopback-configuration) for run details. + - **DGX Spark cross-host** (prefilled, runs on two Sparks) — [`daqiri_bench_rdma_tx_rx_spark_xhost.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_rdma_tx_rx_spark_xhost.yaml). Run with `--mode server` on the RX host and `--mode client` on the TX host. See the [Cross-host two-DGX-Spark loopback](benchmarking_examples.md#cross-host-two-dgx-spark-loopback) section for run details. **Kernel TCP/UDP sockets** — runs on `daqiri_bench_socket`. Both bind to `127.0.0.1`. diff --git a/examples/daqiri_bench_raw_rx_spark_xhost.yaml b/examples/daqiri_bench_raw_rx_spark_xhost.yaml new file mode 100644 index 0000000..f18294d --- /dev/null +++ b/examples/daqiri_bench_raw_rx_spark_xhost.yaml @@ -0,0 +1,54 @@ +# DGX Spark (GB10) cross-host RX-side config for daqiri_bench_raw_gpudirect. +# Companion to daqiri_bench_raw_tx_spark_xhost.yaml on the peer TX host. Both +# hosts must be configured per the DGX Spark profile in +# docs/tutorials/system_configuration.md, with the chosen p0 port cross-cabled +# host-to-host (no chassis QSFP loop). +# +# Spark substitutions baked in here: +# - rx_port BDF = 0000:01:00.0 (p0); change if your p0 sits elsewhere +# - kind: host_pinned (GB10 dma-buf path; nvidia-peermem N/A on Spark) +# - master_core: 8; cpu_core: 18 (isolated big-cluster X925 16-19) +# - flow match: udp_src/dst = 4096 -- same UDP tuple the TX side sends to +# +%YAML 1.2 +--- +daqiri: + cfg: + version: 1 + stream_type: "raw" + master_core: 8 + debug: false + log_level: "info" + loopback: "" + + memory_regions: + - name: "Data_RX_GPU" + kind: "host_pinned" + affinity: 0 + num_bufs: 51200 + buf_size: 8064 + + interfaces: + - name: "rx_port" + address: 0000:01:00.0 + rx: + flow_isolation: true + queues: + - name: "rq_q_0" + id: 0 + cpu_core: 18 + batch_size: 10240 + memory_regions: + - "Data_RX_GPU" + flows: + - name: "flow_0" + id: 0 + action: + type: queue + id: 0 + match: + udp_src: 4096 + udp_dst: 4096 + +bench_rx: + interface_name: "rx_port" diff --git a/examples/daqiri_bench_raw_tx_spark_xhost.yaml b/examples/daqiri_bench_raw_tx_spark_xhost.yaml new file mode 100644 index 0000000..879cc10 --- /dev/null +++ b/examples/daqiri_bench_raw_tx_spark_xhost.yaml @@ -0,0 +1,58 @@ +# DGX Spark (GB10) cross-host TX-side config for daqiri_bench_raw_gpudirect. +# Companion to daqiri_bench_raw_rx_spark_xhost.yaml on the peer RX host. Both +# hosts must be configured per the DGX Spark profile in +# docs/tutorials/system_configuration.md, with the chosen p0 port cross-cabled +# host-to-host (no chassis QSFP loop). +# +# Spark substitutions baked in here: +# - tx_port BDF = 0000:01:00.0 (p0); change if your p0 sits elsewhere +# - kind: host_pinned (GB10 dma-buf path; nvidia-peermem N/A on Spark) +# - master_core: 8; cpu_core: 17 (isolated big-cluster X925 16-19) +# - eth_dst_addr is the *peer* RX port's MAC -- replace with your own: +# cat /sys/class/net//address # on the RX host +# - ip_src/ip_dst: arbitrary 1.1.1.1 -> 2.2.2.2 (kernel stack bypassed by +# the DPDK PMD; only the UDP src/dst ports below are matched by the RX +# flow rule in daqiri_bench_raw_rx_spark_xhost.yaml) +# +%YAML 1.2 +--- +daqiri: + cfg: + version: 1 + stream_type: "raw" + master_core: 8 + debug: false + log_level: "info" + loopback: "" + + memory_regions: + - name: "Data_TX_GPU" + kind: "host_pinned" + affinity: 0 + num_bufs: 51200 + buf_size: 8064 + + interfaces: + - name: "tx_port" + address: 0000:01:00.0 + tx: + queues: + - name: "tx_q_0" + id: 0 + batch_size: 10240 + cpu_core: 17 + memory_regions: + - "Data_TX_GPU" + offloads: + - "tx_eth_src" + +bench_tx: + interface_name: "tx_port" + batch_size: 10240 + payload_size: 8000 + header_size: 64 + eth_dst_addr: <00:00:00:00:00:00> + ip_src_addr: 1.1.1.1 + ip_dst_addr: 2.2.2.2 + udp_src_port: 4096 + udp_dst_port: 4096 diff --git a/examples/daqiri_bench_rdma_tx_rx_spark_xhost.yaml b/examples/daqiri_bench_rdma_tx_rx_spark_xhost.yaml new file mode 100644 index 0000000..039c8b4 --- /dev/null +++ b/examples/daqiri_bench_rdma_tx_rx_spark_xhost.yaml @@ -0,0 +1,112 @@ +# DGX Spark (GB10) cross-host config for daqiri_bench_rdma. +# Adapts the single-host daqiri_bench_rdma_tx_rx_spark.yaml to a two-host +# setup with each side's p0 cross-cabled to the peer's p0. Both hosts must +# be configured per the DGX Spark profile in +# docs/tutorials/system_configuration.md, except the IP assignment is +# split across hosts: put 1.1.1.1/24 on the client host's p0 and 2.2.2.2/24 +# on the server host's p0 (instead of both addresses on one machine). +# +# Run with --mode client on the TX host and --mode server on the RX host: +# server (RX): sudo ./daqiri_bench_rdma --mode server --seconds 30 +# client (TX): sudo ./daqiri_bench_rdma --mode client --seconds 30 +# +# This config is the regression test for the RDMA cross-host receive- +# provisioning bug fixed in issue #113. Before the fix, the server-side +# worker thread crashed on launch and no receives were ever posted. +# +# Spark substitutions baked in here: +# - IPs: 1.1.1.1 (client/TX p0) and 2.2.2.2 (server/RX p0) +# - cpu_core values from isolated big-cluster X925 16-19; master_core: 8 +# - kind: host_pinned (required upstream on GB10; peermem N/A, dma-buf used) +# +%YAML 1.2 +--- +daqiri: + cfg: + version: 1 + stream_type: "socket" + protocol: "roce" + master_core: 8 + debug: false + log_level: "info" + + memory_regions: + - name: "DATA_RX_GPU_SERVER" + kind: "host_pinned" + affinity: 0 + num_bufs: 20 + buf_size: 9000000 + - name: "DATA_TX_GPU_SERVER" + kind: "host_pinned" + affinity: 0 + num_bufs: 20 + buf_size: 9000000 + - name: "DATA_TX_GPU_CLIENT" + kind: "host_pinned" + affinity: 0 + num_bufs: 20 + buf_size: 90000000 + - name: "DATA_RX_GPU_CLIENT" + kind: "host_pinned" + affinity: 0 + num_bufs: 20 + buf_size: 90000000 + + interfaces: + - name: my_client + address: 1.1.1.1 + socket_config: + mode: client + remote_ip: 2.2.2.2 + remote_port: 4096 + roce_config: + transport_mode: RC + tx: + queues: + - name: "Client_TX_Queue" + id: 0 + batch_size: 1 + cpu_core: 17 + rx: + queues: + - name: "Client_RX_Queue" + id: 0 + cpu_core: 18 + batch_size: 1 + - name: my_server + address: 2.2.2.2 + socket_config: + mode: server + local_ip: 2.2.2.2 + local_port: 4096 + roce_config: + transport_mode: RC + rx: + queues: + - name: "Server_RX_Queue" + id: 0 + cpu_core: 19 + batch_size: 1 + tx: + queues: + - name: "Server_TX_Queue" + id: 0 + cpu_core: 16 + batch_size: 1 + +rdma_bench_server: + server_address: 2.2.2.2 + server_port: 4096 + message_size: 8000000 + send: true + receive: true + server: true + +rdma_bench_client: + message_size: 8000000 + client_address: 1.1.1.1 + server_address: 2.2.2.2 + server_port: 4096 + receive: true + send: true + server: false diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index 5b78675..4e796a5 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -424,8 +424,9 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { DAQIRI_LOG_INFO("Affined {} RDMA thread to core {}", is_server ? "Server" : "Client", cpu_core); // Main TX loop. Wait for send requests from the transmitters to arrive for sending. Also - // periodically poll the CQ. - while (!rdma_force_quit.load()) { + // periodically poll the CQ. Exit on either the global force-quit or the per-connection + // ready_to_exit signal (set by the DISCONNECTED CM handler). + while (!rdma_force_quit.load() && !tparams->ready_to_exit) { // Check RQ first to reduce latency while ((num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc)) != 0) { DAQIRI_LOG_DEBUG("GOT RX COMPLETION in thread {} core {} wrid {}", @@ -1152,9 +1153,12 @@ void RdmaMgr::run() { auto& params = server_iter->second[queue_idx]; params.active = false; + params.ready_to_exit = false; // Reset in case this slot was used by a previous client. params.client_id = cm_event->id; params.pd = pd_map_[cm_event->id->verbs]; - params.if_idx = cm_event->id->port_num; + // Use the listener's cfg-interface index, not cm_event->id->port_num + // (IB hardware port, different domain than cfg_.ifs_ indices) + params.if_idx = listen_iter->second.if_idx; params.queue_idx = queue_idx; setup_thread_params(¶ms, true); @@ -1218,14 +1222,26 @@ void RdmaMgr::run() { case RDMA_CM_EVENT_DISCONNECTED: { DAQIRI_LOG_INFO("Received disconnected event for client ID {}", (void*)cm_event->id); + // Three-step shutdown to avoid the deadlock that print_stats() exposes: + // (1) signal the worker via ready_to_exit; (2) move its thread handle + // out of worker_threads_ under threads_mutex_; (3) join *after* + // releasing the mutex, so a concurrent print_stats() (which also + // takes threads_mutex_) doesn't sit behind a thread-join that itself + // can only finish once we've signalled exit. bool found = false; + std::thread worker_to_join; for (auto& sp : server_q_params_) { for (auto& thread_params : sp.second) { if (thread_params.client_id == cm_event->id) { - threads_mutex_.lock(); - worker_threads_[cm_event->id].join(); - worker_threads_.erase(cm_event->id); - threads_mutex_.unlock(); + thread_params.ready_to_exit = true; + { + std::lock_guard lock(threads_mutex_); + auto it = worker_threads_.find(cm_event->id); + if (it != worker_threads_.end()) { + worker_to_join = std::move(it->second); + worker_threads_.erase(it); + } + } // Return the TX and RX rings to the pool if (thread_params.qp_params.tx_ring != nullptr) { @@ -1240,14 +1256,17 @@ void RdmaMgr::run() { thread_params.client_id = nullptr; thread_params.active = false; - DAQIRI_LOG_INFO("Joined and removed client thread for ID {}", (void*)cm_event->id); found = true; break; } } + if (found) { break; } } - if (!found) { + if (worker_to_join.joinable()) { worker_to_join.join(); } + if (found) { + DAQIRI_LOG_INFO("Joined and removed client thread for ID {}", (void*)cm_event->id); + } else { DAQIRI_LOG_CRITICAL("Received disconnected event for unknown client ID {}", (void*)cm_event->id); } diff --git a/src/managers/rdma/daqiri_rdma_mgr.h b/src/managers/rdma/daqiri_rdma_mgr.h index c3499e1..a6f3344 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.h +++ b/src/managers/rdma/daqiri_rdma_mgr.h @@ -51,7 +51,9 @@ struct rdma_thread_params { rdma_qp_params qp_params; int if_idx; int queue_idx; - bool ready_to_exit; + // Per-connection exit flag (set by DISCONNECTED handler, polled by + // rdma_thread). Default-initialised so a fresh vector element is safe. + bool ready_to_exit = false; }; // Used to spawn a new server thread for a particular client From 27347bba1a5fc918edfe3344aa2d2dec09ea8dde Mon Sep 17 00:00:00 2001 From: Chloe Crozier Date: Tue, 2 Jun 2026 16:28:06 -0700 Subject: [PATCH 2/2] #113 - Fix RDMA cross-host SEND timeout and shutdown deadlock Signed-off-by: Chloe Crozier --- AGENTS.md | 4 +- src/managers/rdma/daqiri_rdma_mgr.cpp | 59 +++++++++++++++------------ src/managers/rdma/daqiri_rdma_mgr.h | 12 ++++-- 3 files changed, 44 insertions(+), 31 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 7c61183..add6e35 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -30,11 +30,11 @@ There is no unit test suite. Verification is done via the benchmark executables | Executable | Source | Typical config | |---|---|---| -| `daqiri_bench_raw_gpudirect` | `raw_gpudirect_bench.cpp` | `daqiri_bench_raw_tx_rx.yaml`, `daqiri_bench_raw_tx_rx_4q.yaml`, `daqiri_bench_raw_tx_rx_spark.yaml`, `daqiri_bench_raw_sw_loopback.yaml`, `daqiri_bench_raw_rx_multi_q.yaml` | +| `daqiri_bench_raw_gpudirect` | `raw_gpudirect_bench.cpp` | `daqiri_bench_raw_tx_rx.yaml`, `daqiri_bench_raw_tx_rx_4q.yaml`, `daqiri_bench_raw_tx_rx_spark.yaml`, `daqiri_bench_raw_{tx,rx}_spark_xhost.yaml`, `daqiri_bench_raw_sw_loopback.yaml`, `daqiri_bench_raw_rx_multi_q.yaml` | | `daqiri_bench_raw_hds` | `raw_hds_bench.cpp` | `daqiri_bench_raw_tx_rx_hds.yaml` | | `daqiri_bench_raw_reorder_seq` | `raw_reorder_seq_bench.cpp` | `daqiri_bench_raw_tx_rx_reorder_seq_1024*.yaml`, `daqiri_bench_raw_rx_reorder_seq_*.yaml` | | `daqiri_bench_raw_reorder_quantize` | `raw_reorder_quantize_bench.cpp` | `daqiri_bench_raw_tx_rx_reorder_quantize_seq_batch.yaml` | -| `daqiri_bench_rdma` | `rdma_bench.cpp` | `daqiri_bench_rdma_tx_rx.yaml`, `daqiri_bench_rdma_tx_rx_spark.yaml` | +| `daqiri_bench_rdma` | `rdma_bench.cpp` | `daqiri_bench_rdma_tx_rx.yaml`, `daqiri_bench_rdma_tx_rx_spark.yaml`, `daqiri_bench_rdma_tx_rx_spark_xhost.yaml` | | `daqiri_bench_socket` | `socket_bench.cpp` | `daqiri_bench_socket_{udp,tcp}_tx_rx.yaml` | The four `raw_*` benches share `raw_bench_common.{cpp,h}` and accept `--seconds N`. `daqiri_bench_rdma` and `daqiri_bench_socket` also take `--mode {tx,rx,both}`. diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index 4e796a5..4cd7b57 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -426,7 +426,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { // Main TX loop. Wait for send requests from the transmitters to arrive for sending. Also // periodically poll the CQ. Exit on either the global force-quit or the per-connection // ready_to_exit signal (set by the DISCONNECTED CM handler). - while (!rdma_force_quit.load() && !tparams->ready_to_exit) { + while (!rdma_force_quit.load() && !tparams->ready_to_exit.load()) { // Check RQ first to reduce latency while ((num_comp = ibv_poll_cq(tparams->qp_params.rx_cq, 1, &wc)) != 0) { DAQIRI_LOG_DEBUG("GOT RX COMPLETION in thread {} core {} wrid {}", @@ -1051,8 +1051,12 @@ void RdmaMgr::run() { pd_params_[s_id].server_id = s_id; pd_params_[s_id].if_idx = intf.port_id_; - auto& vec = server_q_params_[s_id]; - vec.resize(intf.rx_.queues_.size()); + // try_emplace constructs the vector in place via vector(size_t n), which + // value-initialises n elements without ever moving them. We can't use the + // operator[] + resize() pattern any more: rdma_thread_params now holds an + // std::atomic and is therefore non-movable, and libstdc++'s resize() + // instantiates the move-if-noexcept path even when starting from empty. + server_q_params_.try_emplace(s_id, intf.rx_.queues_.size()); DAQIRI_LOG_INFO("Created RDMA server on {}:{} successfully with listener_id {}", intf.address_, @@ -1153,7 +1157,7 @@ void RdmaMgr::run() { auto& params = server_iter->second[queue_idx]; params.active = false; - params.ready_to_exit = false; // Reset in case this slot was used by a previous client. + params.ready_to_exit.store(false); // Reset in case this slot was used by a previous client. params.client_id = cm_event->id; params.pd = pd_map_[cm_event->id->verbs]; // Use the listener's cfg-interface index, not cm_event->id->port_num @@ -1222,18 +1226,23 @@ void RdmaMgr::run() { case RDMA_CM_EVENT_DISCONNECTED: { DAQIRI_LOG_INFO("Received disconnected event for client ID {}", (void*)cm_event->id); - // Three-step shutdown to avoid the deadlock that print_stats() exposes: - // (1) signal the worker via ready_to_exit; (2) move its thread handle - // out of worker_threads_ under threads_mutex_; (3) join *after* - // releasing the mutex, so a concurrent print_stats() (which also - // takes threads_mutex_) doesn't sit behind a thread-join that itself - // can only finish once we've signalled exit. + // Avoid the deadlock that print_stats() exposes: + // (1) signal the worker via ready_to_exit; + // (2) move its thread handle out of worker_threads_ under threads_mutex_; + // (3) release the mutex, then join (so a concurrent print_stats(), + // which also takes threads_mutex_, doesn't sit behind a join that + // can only finish once we've signalled exit); + // (4) only after the worker has actually stopped, push its rings back + // to the pool and clear the slot. The original code returned the + // rings after the join too, and the worker may still be polling + // tparams->qp_params.{tx,rx}_ring up until the moment it exits. bool found = false; std::thread worker_to_join; + rdma_thread_params* tparams_to_clear = nullptr; for (auto& sp : server_q_params_) { for (auto& thread_params : sp.second) { if (thread_params.client_id == cm_event->id) { - thread_params.ready_to_exit = true; + thread_params.ready_to_exit.store(true); { std::lock_guard lock(threads_mutex_); auto it = worker_threads_.find(cm_event->id); @@ -1242,20 +1251,7 @@ void RdmaMgr::run() { worker_threads_.erase(it); } } - - // Return the TX and RX rings to the pool - if (thread_params.qp_params.tx_ring != nullptr) { - tx_rings_.push(thread_params.qp_params.tx_ring); - tx_rings_map_.erase(cm_event->id); - } - - if (thread_params.qp_params.rx_ring != nullptr) { - rx_rings_.push(thread_params.qp_params.rx_ring); - rx_rings_map_.erase(cm_event->id); - } - - thread_params.client_id = nullptr; - thread_params.active = false; + tparams_to_clear = &thread_params; found = true; break; } @@ -1264,7 +1260,18 @@ void RdmaMgr::run() { } if (worker_to_join.joinable()) { worker_to_join.join(); } - if (found) { + + if (tparams_to_clear != nullptr) { + if (tparams_to_clear->qp_params.tx_ring != nullptr) { + tx_rings_.push(tparams_to_clear->qp_params.tx_ring); + tx_rings_map_.erase(cm_event->id); + } + if (tparams_to_clear->qp_params.rx_ring != nullptr) { + rx_rings_.push(tparams_to_clear->qp_params.rx_ring); + rx_rings_map_.erase(cm_event->id); + } + tparams_to_clear->client_id = nullptr; + tparams_to_clear->active = false; DAQIRI_LOG_INFO("Joined and removed client thread for ID {}", (void*)cm_event->id); } else { DAQIRI_LOG_CRITICAL("Received disconnected event for unknown client ID {}", diff --git a/src/managers/rdma/daqiri_rdma_mgr.h b/src/managers/rdma/daqiri_rdma_mgr.h index a6f3344..f953019 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.h +++ b/src/managers/rdma/daqiri_rdma_mgr.h @@ -51,9 +51,15 @@ struct rdma_thread_params { rdma_qp_params qp_params; int if_idx; int queue_idx; - // Per-connection exit flag (set by DISCONNECTED handler, polled by - // rdma_thread). Default-initialised so a fresh vector element is safe. - bool ready_to_exit = false; + // Per-connection exit flag, set by the DISCONNECTED CM handler and polled + // by rdma_thread() in its hot loop. std::atomic so the worker actually + // observes the write (a plain bool would race under the C++ memory model + // and the compiler is free to hoist the read into a register). Matches + // the style of rdma_force_quit. std::atomic is non-copyable/non-movable, + // which makes rdma_thread_params non-copyable too; the only containers + // holding it (server_q_params_ vectors and client_q_params_) construct + // elements in place via resize() / try_emplace(), so this is fine. + std::atomic ready_to_exit{false}; }; // Used to spawn a new server thread for a particular client