From b6603ab6194337c3dfbe53b7fa2d4a26a72433da Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Mon, 1 Jun 2026 13:08:16 -0700 Subject: [PATCH 1/7] #111 - Add opt-in OpenTelemetry metrics Signed-off-by: Cliff Burdick --- AGENTS.md | 1 + CMakeLists.txt | 4 + Dockerfile | 26 +- README.md | 7 + cmake/daqiriConfig.cmake.in | 4 + docs/api-reference/cpp.md | 20 + docs/getting-started.md | 12 + examples/CMakeLists.txt | 13 + examples/daqiri_bench_raw_tx_rx.yaml | 4 +- examples/grafana/README.md | 62 +++ examples/grafana/compose.yaml | 51 +++ .../grafana/dashboards/daqiri-metrics.json | 215 +++++++++++ .../provisioning/dashboards/daqiri.yml | 11 + .../provisioning/datasources/prometheus.yml | 11 + examples/grafana/otel_prometheus.cpp | 87 +++++ examples/grafana/otel_prometheus.h | 37 ++ examples/grafana/prometheus.yml | 10 + examples/grafana/run-benchmark.sh | 16 + examples/raw_gpudirect_bench.cpp | 6 + scripts/build-container.sh | 2 + src/CMakeLists.txt | 17 + src/common.cpp | 2 + src/managers/dpdk/daqiri_dpdk_stats.cpp | 106 +++++- src/managers/dpdk/daqiri_dpdk_stats.h | 12 + src/managers/rdma/daqiri_rdma_mgr.cpp | 68 ++++ src/managers/socket/daqiri_socket_mgr.cpp | 41 +- src/managers/socket/daqiri_socket_mgr.h | 4 + src/metrics.cpp | 352 ++++++++++++++++++ src/metrics.h | 76 ++++ 29 files changed, 1251 insertions(+), 26 deletions(-) create mode 100644 examples/grafana/README.md create mode 100644 examples/grafana/compose.yaml create mode 100644 examples/grafana/grafana/dashboards/daqiri-metrics.json create mode 100644 examples/grafana/grafana/provisioning/dashboards/daqiri.yml create mode 100644 examples/grafana/grafana/provisioning/datasources/prometheus.yml create mode 100644 examples/grafana/otel_prometheus.cpp create mode 100644 examples/grafana/otel_prometheus.h create mode 100644 examples/grafana/prometheus.yml create mode 100755 examples/grafana/run-benchmark.sh create mode 100644 src/metrics.cpp create mode 100644 src/metrics.h diff --git a/AGENTS.md b/AGENTS.md index c422328..0d03bf5 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -18,6 +18,7 @@ CMake options (full table in `docs/getting-started.md`): - `DAQIRI_MGR` — space-separated backend list. Valid values: `dpdk`, `socket`, `rdma`. Default in `src/CMakeLists.txt:137` is `"dpdk socket"` (which, due to the rule below, effectively builds all three). - `DAQIRI_BUILD_PYTHON` — builds `pybind11` bindings from `python/`. - `DAQIRI_BUILD_EXAMPLES` — builds the benchmark executables (default `ON`). +- `DAQIRI_ENABLE_OTEL_METRICS` — enables OpenTelemetry metrics instrumentation (default `OFF`). - `DAQIRI_REORDER_GPU_PROFILE` — enable CUDA event timing in the DPDK reorder kernels (off by default). CUDA architectures are hardcoded to `80;90;121` (A100, H100, GB10) in `src/CMakeLists.txt:25`. Change this when targeting other GPUs. diff --git a/CMakeLists.txt b/CMakeLists.txt index aa07234..310442d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,6 +21,7 @@ find_package(CUDAToolkit REQUIRED) option(DAQIRI_BUILD_PYTHON "Build Python bindings" OFF) option(DAQIRI_BUILD_EXAMPLES "Build standalone examples" ON) +option(DAQIRI_ENABLE_OTEL_METRICS "Enable OpenTelemetry metrics instrumentation" OFF) set(DAQIRI_MGR "dpdk socket" CACHE STRING "Manager backend list") add_subdirectory(src) @@ -83,6 +84,9 @@ endif() set(DAQIRI_PC_REQUIRES "") set(DAQIRI_PC_REQUIRES_PRIVATE "libdpdk") set(DAQIRI_PC_LIBS "-lyaml-cpp") +if(DAQIRI_ENABLE_OTEL_METRICS) + string(APPEND DAQIRI_PC_LIBS " -lopentelemetry_api") +endif() separate_arguments(DAQIRI_PC_MGR_LIST UNIX_COMMAND "${DAQIRI_MGR}") list(FIND DAQIRI_PC_MGR_LIST "socket" DAQIRI_PC_HAS_SOCKET_IDX) list(FIND DAQIRI_PC_MGR_LIST "rdma" DAQIRI_PC_HAS_RDMA_IDX) diff --git a/Dockerfile b/Dockerfile index 8c9bb92..fc3035b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,6 +19,7 @@ ARG DAQIRI_BASE_TARGET=dpdk ARG DAQIRI_MGR="dpdk socket" ARG DAQIRI_BUILD_PYTHON=OFF ARG BUILD_SHARED_LIBS=ON +ARG DAQIRI_ENABLE_OTEL_METRICS=OFF ARG DAQIRI_OS_BASE_IMAGE=nvcr.io/nvidia/cuda:13.1.0-devel-ubuntu24.04 # ============================================================ @@ -172,6 +173,26 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ mft \ && rm -rf /var/lib/apt/lists/* +ARG DAQIRI_ENABLE_OTEL_METRICS +ARG OPENTELEMETRY_CPP_VERSION=v1.27.0 +RUN if [ "${DAQIRI_ENABLE_OTEL_METRICS}" = "ON" ]; then \ + git clone --depth 1 --branch "${OPENTELEMETRY_CPP_VERSION}" \ + https://github.com/open-telemetry/opentelemetry-cpp.git /tmp/opentelemetry-cpp \ + && cmake -S /tmp/opentelemetry-cpp -B /tmp/opentelemetry-cpp-build \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_INSTALL_PREFIX=/usr/local \ + -DBUILD_TESTING=OFF \ + -DWITH_EXAMPLES=OFF \ + -DWITH_OTLP=OFF \ + -DWITH_PROMETHEUS=ON \ + -DWITH_ZIPKIN=OFF \ + -DWITH_ABSEIL=OFF \ + -DWITH_STL=CXX17 \ + && cmake --build /tmp/opentelemetry-cpp-build --target install -j "$(nproc)" \ + && ldconfig \ + && rm -rf /tmp/opentelemetry-cpp /tmp/opentelemetry-cpp-build; \ + fi + # ============================================================== # rdma: Named target for consistent per-manager container builds. # Identical to dpdk (which already includes RDMA/ibverbs deps). @@ -181,7 +202,7 @@ FROM dpdk AS rdma # ============================================================== # gpunetio: Add DOCA SDK packages for GPUNetIO support # ============================================================== -FROM dpdk AS gpunetio +FROM rdma AS gpunetio # Install DOCA SDK packages required for GPUNetIO # (DOCA repo is already configured in dpdk stage) @@ -281,6 +302,7 @@ FROM ${DAQIRI_BASE_TARGET} AS daqiri-build ARG DAQIRI_MGR ARG DAQIRI_BUILD_PYTHON ARG BUILD_SHARED_LIBS +ARG DAQIRI_ENABLE_OTEL_METRICS WORKDIR /workspace/daqiri COPY . . @@ -292,6 +314,7 @@ RUN cmake -S . -B build \ -DCMAKE_CUDA_ARCHITECTURES=all-major \ -DBUILD_SHARED_LIBS=${BUILD_SHARED_LIBS} \ -DDAQIRI_BUILD_PYTHON=${DAQIRI_BUILD_PYTHON} \ + -DDAQIRI_ENABLE_OTEL_METRICS=${DAQIRI_ENABLE_OTEL_METRICS} \ -DDAQIRI_MGR="${DAQIRI_MGR}" \ && cmake --build build -j "$(nproc)" \ && cmake --install build @@ -304,4 +327,5 @@ FROM ${DAQIRI_BASE_TARGET} AS runtime COPY --from=daqiri-build /opt/daqiri /opt/daqiri ENV CMAKE_PREFIX_PATH=/opt/daqiri ENV LD_LIBRARY_PATH=/opt/daqiri/lib +EXPOSE 9464 WORKDIR /opt/daqiri diff --git a/README.md b/README.md index 85e00b8..d0e6d7c 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,8 @@ DAQIRI provides direct NIC hardware access in userspace, bypassing the Linux ker - **Flow Steering** — Configure the NIC's hardware flow engine to route packets by UDP source/destination port. - **RDMA** — RDMA verbs (READ, WRITE, SEND) over RoCE on Ethernet NICs or InfiniBand. +- **Optional OpenTelemetry metrics** — Expose per-interface or per-queue packet, + byte, and drop counters when built with `DAQIRI_ENABLE_OTEL_METRICS=ON`. ### Backends @@ -71,6 +73,11 @@ Container build: BASE_TARGET=dpdk DAQIRI_MGR="dpdk rdma" scripts/build-container.sh ``` +OpenTelemetry metrics are opt-in. Build with `-DDAQIRI_ENABLE_OTEL_METRICS=ON` +for CMake builds or `DAQIRI_ENABLE_OTEL_METRICS=ON` for container builds. DAQIRI +registers the instruments, while applications configure the OpenTelemetry SDK and +exporters. + See [Getting Started](https://nvidia.github.io/daqiri/getting-started/) for requirements, CMake options, and running the benchmarks. diff --git a/cmake/daqiriConfig.cmake.in b/cmake/daqiriConfig.cmake.in index a5c068d..c0c89f8 100644 --- a/cmake/daqiriConfig.cmake.in +++ b/cmake/daqiriConfig.cmake.in @@ -2,5 +2,9 @@ include(CMakeFindDependencyMacro) find_dependency(CUDAToolkit) +set(DAQIRI_ENABLE_OTEL_METRICS @DAQIRI_ENABLE_OTEL_METRICS@) +if(DAQIRI_ENABLE_OTEL_METRICS) + find_dependency(opentelemetry-cpp CONFIG COMPONENTS api) +endif() include("${CMAKE_CURRENT_LIST_DIR}/daqiriTargets.cmake") diff --git a/docs/api-reference/cpp.md b/docs/api-reference/cpp.md index 2cdf189..0442d85 100644 --- a/docs/api-reference/cpp.md +++ b/docs/api-reference/cpp.md @@ -348,6 +348,26 @@ daqiri::print_stats(); daqiri::shutdown(); ``` +## OpenTelemetry Metrics + +OpenTelemetry metrics are disabled by default and add no runtime instrumentation when +DAQIRI is built without `DAQIRI_ENABLE_OTEL_METRICS=ON`. Metrics-enabled builds register +observable counters through the OpenTelemetry C++ API: + +| Metric | Unit | +| --- | --- | +| `daqiri.rx.packets` | `{packet}` | +| `daqiri.tx.packets` | `{packet}` | +| `daqiri.rx.bytes` | `By` | +| `daqiri.tx.bytes` | `By` | +| `daqiri.dropped.packets` | `{packet}` | + +All metrics include `daqiri.backend`, `daqiri.interface.name`, `daqiri.port.id`, and +`daqiri.queue.id`. Drop metrics also include `daqiri.drop.reason`. + +DAQIRI only owns library instrumentation. Applications remain responsible for +configuring the OpenTelemetry C++ SDK, metric readers, and exporters. + ## Function Reference This section summarizes the C++ functions available through `daqiri/daqiri.h`. The diff --git a/docs/getting-started.md b/docs/getting-started.md index 6decaf8..3abba62 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -87,6 +87,12 @@ Then build the DAQIRI library: BASE_IMAGE=torch BASE_TARGET=dpdk DAQIRI_MGR="dpdk socket rdma" scripts/build-container.sh ``` + OpenTelemetry metrics are optional. Enable them with: + + ```bash + DAQIRI_ENABLE_OTEL_METRICS=ON BASE_TARGET=dpdk DAQIRI_MGR="dpdk socket rdma" scripts/build-container.sh + ``` + === "CMake build (bare-metal)" ```bash @@ -128,6 +134,7 @@ Both methods use the same public C++ include: | `DAQIRI_BUILD_PYTHON` | `OFF` | Build pybind11 Python bindings. | | `DAQIRI_BUILD_EXAMPLES` | `ON` | Build benchmark executables. | | `DAQIRI_ENABLE_GDS` | `OFF` | Enable cuFile-backed burst file writes from CUDA device memory. Host-memory writes use POSIX APIs without GDS. | +| `DAQIRI_ENABLE_OTEL_METRICS` | `OFF` | Enable OpenTelemetry C++ metrics instrumentation. When enabled, OpenTelemetry C++ API package metadata must be available to CMake. | | `BUILD_SHARED_LIBS` | — | Build as shared library. | CUDA architectures are hardcoded to `80;90;121` (A100, H100, GB10) in `src/CMakeLists.txt`. @@ -146,6 +153,11 @@ GDS-supported filesystem such as XFS. If `nvidia-fs` is not loaded, or the desti storage is not supported, DAQIRI returns `NOT_SUPPORTED` for CUDA device-backed burst writes. Host-backed burst writes continue to use POSIX APIs and do not require GDS. +OpenTelemetry metrics builds register observable counters for received packets, +transmitted packets, received bytes, transmitted bytes, and dropped packets. DAQIRI +does not configure an SDK reader or exporter; applications that want exported data +must configure the OpenTelemetry C++ SDK before or during DAQIRI initialization. + ## Next Steps Once DAQIRI is built, follow the tutorials to configure your system and run your first benchmark: diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 9d55f25..efabe1d 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -67,6 +67,19 @@ endfunction() add_daqiri_raw_bench(daqiri_bench_raw_hds raw_hds_bench.cpp) add_daqiri_raw_bench(daqiri_bench_raw_gpudirect raw_gpudirect_bench.cpp) +if(DAQIRI_ENABLE_OTEL_METRICS) + find_package(opentelemetry-cpp CONFIG QUIET COMPONENTS sdk exporters_prometheus) + if(TARGET opentelemetry-cpp::prometheus_exporter) + target_sources(daqiri_bench_raw_gpudirect PRIVATE grafana/otel_prometheus.cpp) + target_compile_definitions(daqiri_bench_raw_gpudirect PRIVATE DAQIRI_GRAFANA_PROMETHEUS=1) + target_link_libraries(daqiri_bench_raw_gpudirect PRIVATE + opentelemetry-cpp::sdk + opentelemetry-cpp::prometheus_exporter + ) + else() + message(STATUS "OpenTelemetry Prometheus exporter not found; Grafana live metrics example disabled") + endif() +endif() add_daqiri_raw_bench(daqiri_bench_raw_reorder_seq raw_reorder_seq_bench.cpp) add_daqiri_raw_bench(daqiri_bench_raw_reorder_quantize raw_reorder_quantize_bench.cpp) add_daqiri_raw_bench(daqiri_example_gds_write gds_write_example.cpp) diff --git a/examples/daqiri_bench_raw_tx_rx.yaml b/examples/daqiri_bench_raw_tx_rx.yaml index 60480e4..e1cbe9b 100644 --- a/examples/daqiri_bench_raw_tx_rx.yaml +++ b/examples/daqiri_bench_raw_tx_rx.yaml @@ -23,7 +23,7 @@ daqiri: interfaces: - name: "tx_port" - address: <0000:00:00.0> + address: 0000:01:00.0 tx: queues: - name: "tx_q_0" @@ -35,7 +35,7 @@ daqiri: offloads: - "tx_eth_src" - name: "rx_port" - address: <0000:00:00.0> + address: 0000:01:00.1 rx: flow_isolation: true queues: diff --git a/examples/grafana/README.md b/examples/grafana/README.md new file mode 100644 index 0000000..a07fd76 --- /dev/null +++ b/examples/grafana/README.md @@ -0,0 +1,62 @@ +# DAQIRI Grafana Metrics Example + +This example runs `daqiri_bench_raw_gpudirect` for 60 seconds with DAQIRI's +OpenTelemetry metrics exposed through a Prometheus pull endpoint, scraped by +Prometheus, and visualized in a local Grafana dashboard. + +## Ports + +| Service | URL | +| --- | --- | +| Grafana | | +| Prometheus | | +| DAQIRI metrics | | + +Grafana is provisioned with the `DAQIRI OpenTelemetry Metrics` dashboard and a +Prometheus datasource. The default Grafana login is `admin` / `daqiri`; anonymous +viewer access is also enabled for the local example. + +## Build + +Build `daqiri:local` with DAQIRI metrics and the OpenTelemetry Prometheus exporter: + +```bash +DAQIRI_ENABLE_OTEL_METRICS=ON DAQIRI_MGR="dpdk socket rdma" scripts/build-container.sh +``` + +## Run + +Update `examples/daqiri_bench_raw_tx_rx.yaml` for your NIC, GPU, MAC, IP, and CPU +core values. Then start the stack: + +```bash +cd examples/grafana +docker compose up +``` + +The DAQIRI benchmark container follows the repository run requirements from +`AGENTS.md`: it runs as root with `privileged: true`, host networking, all NVIDIA +GPUs exposed through the NVIDIA runtime, and `/dev/hugepages` mounted from the +host. + +The DAQIRI service runs: + +```bash +/opt/daqiri/bin/daqiri_bench_raw_gpudirect /workspace/daqiri/examples/daqiri_bench_raw_tx_rx.yaml --seconds 60 +``` + +To use a different config or binary, override the environment variables: + +```bash +DAQIRI_CONFIG=/workspace/daqiri/examples/daqiri_bench_raw_tx_rx.yaml docker compose up +``` + +Stop the stack with: + +```bash +docker compose down +``` + +The DAQIRI and Prometheus services use host networking so DPDK and the Prometheus +scrape path can use the same host-visible network namespace. Grafana exposes port +`3000` through Docker port mapping. diff --git a/examples/grafana/compose.yaml b/examples/grafana/compose.yaml new file mode 100644 index 0000000..93fe2a5 --- /dev/null +++ b/examples/grafana/compose.yaml @@ -0,0 +1,51 @@ +services: + daqiri: + image: daqiri:local + container_name: daqiri-grafana-bench + depends_on: + - prometheus + runtime: nvidia + privileged: true + network_mode: host + environment: + NVIDIA_VISIBLE_DEVICES: "all" + NVIDIA_DRIVER_CAPABILITIES: "all" + DAQIRI_OTEL_PROMETHEUS_ENDPOINT: "0.0.0.0:9464" + DAQIRI_SECONDS: "${DAQIRI_SECONDS:-60}" + DAQIRI_CONFIG: "${DAQIRI_CONFIG:-/workspace/daqiri/examples/daqiri_bench_raw_tx_rx.yaml}" + DAQIRI_BIN: "${DAQIRI_BIN:-/opt/daqiri/bin/daqiri_bench_raw_gpudirect}" + volumes: + - /dev/hugepages:/dev/hugepages + - ../..:/workspace/daqiri:ro + working_dir: /workspace/daqiri + command: ["bash", "/workspace/daqiri/examples/grafana/run-benchmark.sh"] + + prometheus: + image: prom/prometheus:latest + container_name: daqiri-grafana-prometheus + network_mode: host + volumes: + - ./prometheus.yml:/etc/prometheus/prometheus.yml:ro + command: + - "--config.file=/etc/prometheus/prometheus.yml" + - "--web.listen-address=0.0.0.0:9090" + - "--storage.tsdb.retention.time=2h" + + grafana: + image: grafana/grafana:latest + container_name: daqiri-grafana + depends_on: + - prometheus + ports: + - "3000:3000" + extra_hosts: + - "host.docker.internal:host-gateway" + environment: + GF_SECURITY_ADMIN_USER: admin + GF_SECURITY_ADMIN_PASSWORD: daqiri + GF_AUTH_ANONYMOUS_ENABLED: "true" + GF_AUTH_ANONYMOUS_ORG_ROLE: Viewer + GF_DASHBOARDS_MIN_REFRESH_INTERVAL: 1s + volumes: + - ./grafana/provisioning:/etc/grafana/provisioning:ro + - ./grafana/dashboards:/var/lib/grafana/dashboards:ro diff --git a/examples/grafana/grafana/dashboards/daqiri-metrics.json b/examples/grafana/grafana/dashboards/daqiri-metrics.json new file mode 100644 index 0000000..ae52d6d --- /dev/null +++ b/examples/grafana/grafana/dashboards/daqiri-metrics.json @@ -0,0 +1,215 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "pps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "legend": { + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_rx_packets_total[5s]))", + "legendFormat": "rx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", + "refId": "A" + }, + { + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_tx_packets_total[5s]))", + "legendFormat": "tx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", + "refId": "B" + } + ], + "title": "Packet Rate", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "gbps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 2, + "options": { + "legend": { + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_rx_bytes_total[5s])) * 8 / 1000000000", + "legendFormat": "rx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", + "refId": "A" + }, + { + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_tx_bytes_total[5s])) * 8 / 1000000000", + "legendFormat": "tx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", + "refId": "B" + } + ], + "title": "Throughput", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "pps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 3, + "options": { + "legend": { + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id, daqiri_drop_reason) (rate(daqiri_dropped_packets_total[5s]))", + "legendFormat": "{{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}} {{daqiri_drop_reason}}", + "refId": "A" + } + ], + "title": "Drop Rate", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "Prometheus" + }, + "fieldConfig": { + "defaults": { + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 4, + "options": { + "legend": { + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (daqiri_rx_packets_total)", + "legendFormat": "rx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", + "refId": "A" + }, + { + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (daqiri_tx_packets_total)", + "legendFormat": "tx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", + "refId": "B" + } + ], + "title": "Total Packets", + "type": "timeseries" + } + ], + "refresh": "1s", + "schemaVersion": 39, + "tags": [ + "daqiri", + "opentelemetry" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-5m", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "DAQIRI OpenTelemetry Metrics", + "uid": "daqiri-otel-metrics", + "version": 1, + "weekStart": "" +} diff --git a/examples/grafana/grafana/provisioning/dashboards/daqiri.yml b/examples/grafana/grafana/provisioning/dashboards/daqiri.yml new file mode 100644 index 0000000..10cbce6 --- /dev/null +++ b/examples/grafana/grafana/provisioning/dashboards/daqiri.yml @@ -0,0 +1,11 @@ +apiVersion: 1 + +providers: + - name: DAQIRI + orgId: 1 + folder: DAQIRI + type: file + disableDeletion: false + editable: true + options: + path: /var/lib/grafana/dashboards diff --git a/examples/grafana/grafana/provisioning/datasources/prometheus.yml b/examples/grafana/grafana/provisioning/datasources/prometheus.yml new file mode 100644 index 0000000..7ec9e3a --- /dev/null +++ b/examples/grafana/grafana/provisioning/datasources/prometheus.yml @@ -0,0 +1,11 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://host.docker.internal:9090 + isDefault: true + editable: false + jsonData: + timeInterval: 1s diff --git a/examples/grafana/otel_prometheus.cpp b/examples/grafana/otel_prometheus.cpp new file mode 100644 index 0000000..12896f1 --- /dev/null +++ b/examples/grafana/otel_prometheus.cpp @@ -0,0 +1,87 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. + * All rights reserved. SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "otel_prometheus.h" + +#if defined(DAQIRI_GRAFANA_PROMETHEUS) && DAQIRI_GRAFANA_PROMETHEUS + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace daqiri::bench::grafana { +namespace { + +namespace metrics_api = opentelemetry::metrics; +namespace metrics_exporter = opentelemetry::exporter::metrics; +namespace metrics_sdk = opentelemetry::sdk::metrics; + +bool g_metrics_initialized = false; + +} // namespace + +bool init_prometheus_metrics_from_env() { + const char *endpoint = std::getenv("DAQIRI_OTEL_PROMETHEUS_ENDPOINT"); + if (endpoint == nullptr || std::string(endpoint).empty()) { return false; } + + metrics_exporter::PrometheusExporterOptions options; + options.url = endpoint; + options.without_otel_scope = true; + + auto reader = metrics_exporter::PrometheusExporterFactory::Create(options); + auto provider = metrics_sdk::MeterProviderFactory::Create(); + auto *sdk_provider = static_cast(provider.get()); + sdk_provider->AddMetricReader(std::move(reader)); + + std::shared_ptr api_provider(std::move(provider)); + metrics_sdk::Provider::SetMeterProvider(api_provider); + g_metrics_initialized = true; + + std::cout << "DAQIRI OpenTelemetry Prometheus metrics listening on " << endpoint << "\n"; + return true; +} + +void shutdown_prometheus_metrics() { + if (!g_metrics_initialized) { return; } + std::shared_ptr none; + metrics_sdk::Provider::SetMeterProvider(none); + g_metrics_initialized = false; +} + +} // namespace daqiri::bench::grafana + +#else + +namespace daqiri::bench::grafana { + +bool init_prometheus_metrics_from_env() { + return false; +} + +void shutdown_prometheus_metrics() {} + +} // namespace daqiri::bench::grafana + +#endif diff --git a/examples/grafana/otel_prometheus.h b/examples/grafana/otel_prometheus.h new file mode 100644 index 0000000..f2fe3e6 --- /dev/null +++ b/examples/grafana/otel_prometheus.h @@ -0,0 +1,37 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. + * All rights reserved. SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace daqiri::bench::grafana { + +#if defined(DAQIRI_GRAFANA_PROMETHEUS) && DAQIRI_GRAFANA_PROMETHEUS + +bool init_prometheus_metrics_from_env(); +void shutdown_prometheus_metrics(); + +#else + +inline bool init_prometheus_metrics_from_env() { + return false; +} + +inline void shutdown_prometheus_metrics() {} + +#endif + +} // namespace daqiri::bench::grafana diff --git a/examples/grafana/prometheus.yml b/examples/grafana/prometheus.yml new file mode 100644 index 0000000..2cd8de5 --- /dev/null +++ b/examples/grafana/prometheus.yml @@ -0,0 +1,10 @@ +global: + scrape_interval: 1s + evaluation_interval: 1s + +scrape_configs: + - job_name: daqiri + metrics_path: /metrics + static_configs: + - targets: + - 127.0.0.1:9464 diff --git a/examples/grafana/run-benchmark.sh b/examples/grafana/run-benchmark.sh new file mode 100755 index 0000000..4cae3b9 --- /dev/null +++ b/examples/grafana/run-benchmark.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -euo pipefail + +DAQIRI_BIN="${DAQIRI_BIN:-/opt/daqiri/bin/daqiri_bench_raw_gpudirect}" +DAQIRI_CONFIG="${DAQIRI_CONFIG:-/opt/daqiri/bin/daqiri_bench_raw_tx_rx.yaml}" +DAQIRI_SECONDS="${DAQIRI_SECONDS:-60}" +DAQIRI_OTEL_PROMETHEUS_ENDPOINT="${DAQIRI_OTEL_PROMETHEUS_ENDPOINT:-0.0.0.0:9464}" +export DAQIRI_OTEL_PROMETHEUS_ENDPOINT + +echo "Starting DAQIRI Grafana metrics run" +echo " binary: ${DAQIRI_BIN}" +echo " config: ${DAQIRI_CONFIG}" +echo " seconds: ${DAQIRI_SECONDS}" +echo " metrics: http://${DAQIRI_OTEL_PROMETHEUS_ENDPOINT}/metrics" + +exec "${DAQIRI_BIN}" "${DAQIRI_CONFIG}" --seconds "${DAQIRI_SECONDS}" diff --git a/examples/raw_gpudirect_bench.cpp b/examples/raw_gpudirect_bench.cpp index 69cd4f0..45e9011 100644 --- a/examples/raw_gpudirect_bench.cpp +++ b/examples/raw_gpudirect_bench.cpp @@ -30,6 +30,7 @@ #include #include +#include "grafana/otel_prometheus.h" #include "raw_bench_common.h" #include @@ -143,6 +144,8 @@ int main(int argc, char **argv) { return 1; } + const auto prometheus_metrics = + daqiri::bench::grafana::init_prometheus_metrics_from_env(); const int run_seconds = daqiri::bench::parse_run_seconds(argc, argv); const auto root = YAML::LoadFile(argv[1]); @@ -195,5 +198,8 @@ int main(int argc, char **argv) { daqiri::print_stats(); daqiri::shutdown(); + if (prometheus_metrics) { + daqiri::bench::grafana::shutdown_prometheus_metrics(); + } return 0; } diff --git a/scripts/build-container.sh b/scripts/build-container.sh index 8267b27..9987ebb 100755 --- a/scripts/build-container.sh +++ b/scripts/build-container.sh @@ -7,6 +7,7 @@ BASE_IMAGE="${BASE_IMAGE:-cuda}" DAQIRI_MGR="${DAQIRI_MGR:-dpdk socket}" DAQIRI_BUILD_PYTHON="${DAQIRI_BUILD_PYTHON:-OFF}" BUILD_SHARED_LIBS="${BUILD_SHARED_LIBS:-ON}" +DAQIRI_ENABLE_OTEL_METRICS="${DAQIRI_ENABLE_OTEL_METRICS:-OFF}" case "${BASE_IMAGE}" in cuda) @@ -28,6 +29,7 @@ docker build \ --build-arg DAQIRI_MGR="${DAQIRI_MGR}" \ --build-arg DAQIRI_BUILD_PYTHON="${DAQIRI_BUILD_PYTHON}" \ --build-arg BUILD_SHARED_LIBS="${BUILD_SHARED_LIBS}" \ + --build-arg DAQIRI_ENABLE_OTEL_METRICS="${DAQIRI_ENABLE_OTEL_METRICS}" \ -t "${IMAGE_TAG}" \ . diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 67578b3..93b79d7 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -31,8 +31,16 @@ if(DAQIRI_REORDER_GPU_PROFILE) add_compile_definitions(DAQIRI_REORDER_GPU_PROFILE=1) endif() option(DAQIRI_ENABLE_GDS "Enable cuFile-backed burst file writes from CUDA device memory" OFF) +option(DAQIRI_ENABLE_OTEL_METRICS "Enable OpenTelemetry metrics instrumentation" OFF) set(DOCA_PATH /opt/mellanox/doca) +if(DAQIRI_ENABLE_OTEL_METRICS) + add_compile_definitions(DAQIRI_ENABLE_OTEL_METRICS=1) + find_package(opentelemetry-cpp CONFIG REQUIRED COMPONENTS api) +else() + add_compile_definitions(DAQIRI_ENABLE_OTEL_METRICS=0) +endif() + set(DAQIRI_USING_VENDORED_YAML_CPP OFF) if(NOT TARGET yaml-cpp AND NOT TARGET yaml-cpp::yaml-cpp) if(EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/../third_party/yaml-cpp/CMakeLists.txt") @@ -78,6 +86,9 @@ add_library(daqiri_common manager.cpp logging.cpp ) +if(DAQIRI_ENABLE_OTEL_METRICS) + target_sources(daqiri_common PRIVATE metrics.cpp) +endif() target_include_directories(daqiri_common PUBLIC $ @@ -103,6 +114,12 @@ target_link_libraries(daqiri_common ${DPDK_LIBRARIES} PRIVATE ) +if(DAQIRI_ENABLE_OTEL_METRICS) + target_compile_definitions(daqiri_common PUBLIC DAQIRI_ENABLE_OTEL_METRICS=1) + target_link_libraries(daqiri_common PRIVATE opentelemetry-cpp::api) +else() + target_compile_definitions(daqiri_common PUBLIC DAQIRI_ENABLE_OTEL_METRICS=0) +endif() if(DAQIRI_ENABLE_GDS) find_path(DAQIRI_CUFILE_INCLUDE_DIR NAMES cufile.h diff --git a/src/common.cpp b/src/common.cpp index 4127c00..bca3d31 100644 --- a/src/common.cpp +++ b/src/common.cpp @@ -25,6 +25,7 @@ #include #include "src/manager.h" +#include "src/metrics.h" #include #include #if DAQIRI_MGR_DPDK @@ -299,6 +300,7 @@ void* get_packet_ptr(BurstParams* burst, int idx) { void shutdown() { ASSERT_DAQIRI_MGR_INITIALIZED(); g_daqiri_mgr->shutdown(); + metrics::shutdown(); } Status send_tx_burst(BurstParams* burst) { diff --git a/src/managers/dpdk/daqiri_dpdk_stats.cpp b/src/managers/dpdk/daqiri_dpdk_stats.cpp index b0d4d6c..492cf69 100644 --- a/src/managers/dpdk/daqiri_dpdk_stats.cpp +++ b/src/managers/dpdk/daqiri_dpdk_stats.cpp @@ -25,6 +25,8 @@ namespace daqiri { void DpdkStats::Init(const NetworkConfig &cfg) { cfg_ = cfg; init_ = true; + port_metrics_.clear(); + queue_metrics_.clear(); if (cfg_.common_.loopback_ == LoopbackType::LOOPBACK_TYPE_SW) { DAQIRI_LOG_INFO("Skipping DPDK stats initialization for software loopback mode"); @@ -37,6 +39,8 @@ void DpdkStats::Init(const NetworkConfig &cfg) { port_queue_memory_regions_.clear(); for (const auto &intf : cfg_.ifs_) { int port_id = intf.port_id_; + port_metrics_[port_id] = + metrics::get_or_create_queue("dpdk", intf.name_, port_id, "all"); // Process RX queues for (const auto &q : intf.rx_.queues_) { @@ -52,9 +56,20 @@ void DpdkStats::Init(const NetworkConfig &cfg) { } port_queue_memory_regions_[key] = mr_names; + queue_metrics_[key] = + metrics::get_or_create_queue("dpdk", intf.name_, port_id, + std::to_string(q.common_.id_)); DAQIRI_LOG_INFO("Port {}, Queue {}: Memory regions: {}", port_id, q.common_.id_, mr_names); } + + for (const auto &q : intf.tx_.queues_) { + uint32_t key = (port_id << 16) | q.common_.id_; + queue_metrics_.try_emplace( + key, + metrics::get_or_create_queue("dpdk", intf.name_, port_id, + std::to_string(q.common_.id_))); + } } // Initialize xstats for each port @@ -115,21 +130,33 @@ void DpdkStats::Init(const NetworkConfig &cfg) { port_stats.rx_mbuf_allocation_errors_idx = i; } - // Check for rx_q*_errors counters (queue-specific error counters) - const char* rx_q_errors_prefix = "rx_q"; - const char* rx_q_errors_suffix = "_errors"; - if (strncmp(xstats_names[i].name, rx_q_errors_prefix, strlen(rx_q_errors_prefix)) == 0) { - const char* queue_num_str = xstats_names[i].name + strlen(rx_q_errors_prefix); - char* endptr; - int queue_id = strtol(queue_num_str, &endptr, 10); - - // Verify this is a valid rx_q*_errors counter - if (endptr != queue_num_str && strcmp(endptr, rx_q_errors_suffix) == 0) { - if (queue_id >= 0 && queue_id < port_stats.MAX_QUEUE_COUNT) { - port_stats.rx_queue_errors_idx[queue_id] = i; - DAQIRI_LOG_INFO("Found rx_q{}_errors counter at index {}", queue_id, i); - } - } + const std::string xstat_name = xstats_names[i].name; + auto parse_queue_xstat = [&xstat_name](const char* prefix, + const char* suffix, + int* queue_id) { + if (strncmp(xstat_name.c_str(), prefix, strlen(prefix)) != 0) { return false; } + const char* queue_num_str = xstat_name.c_str() + strlen(prefix); + char* endptr = nullptr; + const long parsed_queue = strtol(queue_num_str, &endptr, 10); + if (endptr == queue_num_str || strcmp(endptr, suffix) != 0) { return false; } + if (parsed_queue < 0 || parsed_queue >= PortXStats::MAX_QUEUE_COUNT) { return false; } + *queue_id = static_cast(parsed_queue); + return true; + }; + + int queue_id = -1; + if (parse_queue_xstat("rx_q", "_errors", &queue_id)) { + port_stats.rx_queue_errors_idx[queue_id] = i; + port_stats.queue_xstats[queue_id].rx_errors_idx = i; + DAQIRI_LOG_INFO("Found rx_q{}_errors counter at index {}", queue_id, i); + } else if (parse_queue_xstat("rx_q", "_packets", &queue_id)) { + port_stats.queue_xstats[queue_id].rx_packets_idx = i; + } else if (parse_queue_xstat("tx_q", "_packets", &queue_id)) { + port_stats.queue_xstats[queue_id].tx_packets_idx = i; + } else if (parse_queue_xstat("rx_q", "_bytes", &queue_id)) { + port_stats.queue_xstats[queue_id].rx_bytes_idx = i; + } else if (parse_queue_xstat("tx_q", "_bytes", &queue_id)) { + port_stats.queue_xstats[queue_id].tx_bytes_idx = i; } } @@ -226,10 +253,59 @@ void DpdkStats::Run() { continue; } + struct rte_eth_stats eth_stats {}; + const bool eth_stats_valid = rte_eth_stats_get(port_id, ð_stats) == 0; + const auto port_metrics = port_metrics_[port_id]; + bool has_queue_rx_packets = false; + bool has_queue_tx_packets = false; + bool has_queue_rx_bytes = false; + bool has_queue_tx_bytes = false; + + for (const auto& [queue_id, xstats] : port_stats.queue_xstats) { + const uint32_t key = (port_id << 16) | queue_id; + const auto metrics_it = queue_metrics_.find(key); + if (metrics_it == queue_metrics_.end()) { continue; } + + const auto& queue_metrics = metrics_it->second; + if (xstats.rx_packets_idx >= 0) { + metrics::set_rx_packets(queue_metrics, port_stats.xstats[xstats.rx_packets_idx].value); + has_queue_rx_packets = true; + } + if (xstats.tx_packets_idx >= 0) { + metrics::set_tx_packets(queue_metrics, port_stats.xstats[xstats.tx_packets_idx].value); + has_queue_tx_packets = true; + } + if (xstats.rx_bytes_idx >= 0) { + metrics::set_rx_bytes(queue_metrics, port_stats.xstats[xstats.rx_bytes_idx].value); + has_queue_rx_bytes = true; + } + if (xstats.tx_bytes_idx >= 0) { + metrics::set_tx_bytes(queue_metrics, port_stats.xstats[xstats.tx_bytes_idx].value); + has_queue_tx_bytes = true; + } + if (xstats.rx_errors_idx >= 0) { + metrics::set_dropped(queue_metrics, + "rx_queue_errors", + port_stats.xstats[xstats.rx_errors_idx].value); + } + } + + if (eth_stats_valid) { + if (!has_queue_rx_packets) { metrics::set_rx_packets(port_metrics, eth_stats.ipackets); } + if (!has_queue_tx_packets) { metrics::set_tx_packets(port_metrics, eth_stats.opackets); } + if (!has_queue_rx_bytes) { metrics::set_rx_bytes(port_metrics, eth_stats.ibytes); } + if (!has_queue_tx_bytes) { metrics::set_tx_bytes(port_metrics, eth_stats.obytes); } + metrics::set_dropped(port_metrics, "rx_missed", eth_stats.imissed); + metrics::set_dropped(port_metrics, "rx_nombuf", eth_stats.rx_nombuf); + metrics::set_dropped(port_metrics, "rx_errors", eth_stats.ierrors); + metrics::set_dropped(port_metrics, "tx_errors", eth_stats.oerrors); + } + // Check if rx_mbuf_alloc_err counter has increased if (port_stats.rx_mbuf_allocation_errors_idx >= 0) { uint64_t rx_mbuf_alloc_err = port_stats.xstats[port_stats.rx_mbuf_allocation_errors_idx].value; + metrics::set_dropped(port_metrics, "rx_mbuf_allocation_errors", rx_mbuf_alloc_err); uint64_t old_rx_mbuf_alloc_err = port_stats.old_xstats[port_stats.rx_mbuf_allocation_errors_idx].value; diff --git a/src/managers/dpdk/daqiri_dpdk_stats.h b/src/managers/dpdk/daqiri_dpdk_stats.h index 131eea3..0dfc778 100644 --- a/src/managers/dpdk/daqiri_dpdk_stats.h +++ b/src/managers/dpdk/daqiri_dpdk_stats.h @@ -21,6 +21,7 @@ #include #include #include +#include "src/metrics.h" namespace daqiri { @@ -62,6 +63,15 @@ class DpdkStats { static constexpr int MAX_QUEUE_COUNT = 128; // Maximum number of queues supported std::unordered_map rx_queue_errors_idx; // Indices for rx_q*_errors counters + struct QueueXStats { + int rx_packets_idx = -1; + int tx_packets_idx = -1; + int rx_bytes_idx = -1; + int tx_bytes_idx = -1; + int rx_errors_idx = -1; + }; + std::unordered_map queue_xstats; + PortXStats() : len(0), xstats(nullptr), old_xstats(nullptr), rx_missed_idx(-1), rx_mbuf_allocation_errors_idx(-1) { // No need to initialize the map, it starts empty @@ -78,6 +88,8 @@ class DpdkStats { // Map to store memory region names for each port/queue combination // Key: (port_id << 16) | queue_id, Value: comma-separated list of memory region names std::unordered_map port_queue_memory_regions_; + std::unordered_map> port_metrics_; + std::unordered_map> queue_metrics_; }; } // namespace daqiri diff --git a/src/managers/rdma/daqiri_rdma_mgr.cpp b/src/managers/rdma/daqiri_rdma_mgr.cpp index 5b78675..0d781fb 100644 --- a/src/managers/rdma/daqiri_rdma_mgr.cpp +++ b/src/managers/rdma/daqiri_rdma_mgr.cpp @@ -25,6 +25,7 @@ #include #include #include "src/dpdk_log.h" +#include "src/metrics.h" #include "daqiri_rdma_mgr.h" /* The ordering of most RDMA/CM setup follows the ordering specified here: @@ -48,6 +49,43 @@ void reset_rdma_burst_metadata(BurstParams* burst) { burst->event = nullptr; } +std::shared_ptr get_rdma_metrics(const NetworkConfig& cfg, + const rdma_thread_params& params) { + if (params.if_idx < 0 || params.if_idx >= static_cast(cfg.ifs_.size())) { return nullptr; } + + const auto& intf = cfg.ifs_[params.if_idx]; + auto queue_id = params.queue_idx; + if (params.queue_idx >= 0 && params.queue_idx < static_cast(intf.tx_.queues_.size())) { + queue_id = intf.tx_.queues_[params.queue_idx].common_.id_; + } + + return metrics::get_or_create_queue("rdma", + intf.name_.empty() ? intf.address_ : intf.name_, + intf.port_id_, + std::to_string(queue_id)); +} + +std::shared_ptr get_rdma_metrics_for_connection( + const NetworkConfig& cfg, + const std::unordered_map& client_params, + const std::unordered_map>& server_params, + struct rdma_cm_id* conn_id, + bool is_server) { + if (is_server) { + for (const auto& [server_id, params_list] : server_params) { + (void)server_id; + for (const auto& params : params_list) { + if (params.client_id == conn_id) { return get_rdma_metrics(cfg, params); } + } + } + return nullptr; + } + + const auto params = client_params.find(conn_id); + if (params == client_params.end()) { return nullptr; } + return get_rdma_metrics(cfg, params->second); +} + } // namespace bool RdmaMgr::set_config_and_initialize(const NetworkConfig& cfg) { @@ -379,6 +417,13 @@ Status RdmaMgr::send_tx_burst(BurstParams* burst) { ring = ri->second; if (rte_ring_enqueue(ring, reinterpret_cast(burst)) != 0) { + auto counters = get_rdma_metrics_for_connection(cfg_, + client_q_params_, + server_q_params_, + reinterpret_cast(conn_id), + burst->transport_hdr.server); + const auto dropped_packets = burst->transport_hdr.num_pkts > 0 ? burst->transport_hdr.num_pkts : 1; + metrics::add_dropped(counters, "tx_enqueue_failure", dropped_packets); free_tx_burst(burst); DAQIRI_LOG_CRITICAL("Failed to enqueue TX work"); return Status::NO_SPACE_AVAILABLE; @@ -415,6 +460,14 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { struct rte_ring* rx_ring = tparams->qp_params.rx_ring; std::unordered_map outstanding_send_wr_ids; std::unordered_map outstanding_receive_wr_ids; + auto counters = get_rdma_metrics(cfg_, *tparams); + auto packet_length_for_wr = [](BurstParams* burst, uint64_t wr_id) -> uint64_t { + if (burst == nullptr || burst->pkt_lens[0] == nullptr) { return 0; } + if (wr_id < burst->transport_hdr.wr_id) { return 0; } + const auto idx = static_cast(wr_id - burst->transport_hdr.wr_id); + if (idx >= burst->transport_hdr.num_pkts) { return 0; } + return burst->pkt_lens[0][idx]; + }; if (set_affinity(cpu_core) != 0) { DAQIRI_LOG_CRITICAL("Failed to set RDMA core affinity"); @@ -437,6 +490,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { (int)wc.status, (int64_t)wc.wr_id, (int)wc.opcode); + metrics::add_dropped(counters, "rx_completion_error", 1); continue; } @@ -444,10 +498,12 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { auto it = outstanding_receive_wr_ids.find(wc.wr_id); if (it == outstanding_receive_wr_ids.end()) { DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding RECEIVE WR IDs", wc.wr_id); + metrics::add_dropped(counters, "rx_unknown_wr_id", 1); continue; } msg = it->second; + metrics::add_rx(counters, 1, static_cast(wc.byte_len)); const auto conn_id = get_connection_id(msg); const auto expected_conn_id = reinterpret_cast(tparams->client_id); @@ -474,6 +530,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); + metrics::add_dropped(counters, "rx_completion_enqueue_failure", 1); free_tx_burst(msg); return; } @@ -492,6 +549,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { (int)wc.status, (int64_t)wc.wr_id, (int)wc.opcode); + metrics::add_dropped(counters, "tx_completion_error", 1); continue; } @@ -499,10 +557,12 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { auto it = outstanding_send_wr_ids.find(wc.wr_id); if (it == outstanding_send_wr_ids.end()) { DAQIRI_LOG_CRITICAL("WR ID {} not found in outstanding SEND WR IDs", wc.wr_id); + metrics::add_dropped(counters, "tx_unknown_wr_id", 1); continue; } msg = it->second; + metrics::add_tx(counters, 1, packet_length_for_wr(msg, wc.wr_id)); const auto conn_id = get_connection_id(msg); const auto expected_conn_id = reinterpret_cast(tparams->client_id); @@ -529,6 +589,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (rte_ring_enqueue(rx_ring, reinterpret_cast(msg)) != 0) { DAQIRI_LOG_CRITICAL("Failed to enqueue RX completion message"); + metrics::add_dropped(counters, "tx_completion_enqueue_failure", 1); free_tx_burst(msg); return; } @@ -547,6 +608,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (local_mr == mrs_.end()) { DAQIRI_LOG_CRITICAL("Couldn't find MR with name {} in registry", burst->transport_hdr.local_mr_name); + metrics::add_dropped(counters, "missing_memory_region", burst->transport_hdr.num_pkts); free_tx_burst(burst); continue; } @@ -557,6 +619,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { auto pd = pd_map_.find(tparams->client_id->verbs); if (pd == pd_map_.end()) { DAQIRI_LOG_CRITICAL("Couldn't find PD for client"); + metrics::add_dropped(counters, "missing_protection_domain", burst->transport_hdr.num_pkts); free_tx_burst(burst); continue; } @@ -566,6 +629,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (lkey == local_mr->second.ctx_mr_map_.end()) { DAQIRI_LOG_CRITICAL("Couldn't find MR with name {} in registry", burst->transport_hdr.local_mr_name); + metrics::add_dropped(counters, "missing_memory_key", burst->transport_hdr.num_pkts); free_tx_burst(burst); continue; } @@ -588,6 +652,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { int ret = ibv_post_send(tparams->client_id->qp, &wr, &bad_wr); if (ret != 0) { DAQIRI_LOG_CRITICAL("Failed to post SEND request, errno: {}", strerror(errno)); + metrics::add_dropped(counters, "tx_post_failure", 1); free_tx_burst(burst); continue; } @@ -602,6 +667,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { auto pd = pd_map_.find(tparams->client_id->verbs); if (pd == pd_map_.end()) { DAQIRI_LOG_CRITICAL("Couldn't find PD for client"); + metrics::add_dropped(counters, "missing_protection_domain", burst->transport_hdr.num_pkts); free_tx_burst(burst); continue; } @@ -611,6 +677,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { if (lkey == local_mr->second.ctx_mr_map_.end()) { DAQIRI_LOG_CRITICAL("Couldn't find MR with name {} in registry", burst->transport_hdr.local_mr_name); + metrics::add_dropped(counters, "missing_memory_key", burst->transport_hdr.num_pkts); free_tx_burst(burst); continue; } @@ -638,6 +705,7 @@ void RdmaMgr::rdma_thread(bool is_server, rdma_thread_params* tparams) { ret = ibv_post_recv(tparams->client_id->qp, &recv_wr, &bad_wr); if (ret) { DAQIRI_LOG_CRITICAL("ibv_post_recv failed: {}", strerror(errno)); + metrics::add_dropped(counters, "rx_post_failure", 1); free_tx_burst(burst); continue; } diff --git a/src/managers/socket/daqiri_socket_mgr.cpp b/src/managers/socket/daqiri_socket_mgr.cpp index 642f9a6..4361544 100644 --- a/src/managers/socket/daqiri_socket_mgr.cpp +++ b/src/managers/socket/daqiri_socket_mgr.cpp @@ -142,6 +142,16 @@ void SocketMgr::initialize() { ep->tx_batch_size = select_batch_size(if_cfg.tx_.queues_); ep->max_packet_size = static_cast(std::max(1, select_max_packet_size(if_cfg))); ep->rx_queue_state = get_or_create_rx_queue(ep->port, ep->rx_queue); + ep->rx_metrics = metrics::get_or_create_queue("socket", + if_cfg.name_.empty() ? if_cfg.address_ + : if_cfg.name_, + ep->port, + std::to_string(ep->rx_queue)); + ep->tx_metrics = metrics::get_or_create_queue("socket", + if_cfg.name_.empty() ? if_cfg.address_ + : if_cfg.name_, + ep->port, + std::to_string(ep->tx_queue)); endpoints_.push_back(std::move(ep)); } @@ -844,30 +854,35 @@ Status SocketMgr::send_tx_burst(BurstParams* burst) { } Status status = Status::SUCCESS; + size_t sent_pkts = 0; + uint64_t sent_bytes = 0; + const auto requested_pkts = static_cast(burst->hdr.hdr.num_pkts); if (cfg_.common_.protocol == SocketProtocol::UDP) { - size_t sent_pkts = 0; - uint64_t sent_bytes = 0; if (!send_udp_burst(*ep, burst, &sent_pkts, &sent_bytes)) { status = Status::CONNECT_FAILURE; } - if (sent_pkts > 0) { tx_pkts_.fetch_add(sent_pkts); } - if (sent_bytes > 0) { tx_bytes_.fetch_add(sent_bytes); } } else if (cfg_.common_.protocol == SocketProtocol::TCP) { if (conn == nullptr) { DAQIRI_LOG_ERROR("No active TCP connection for port {}", ep->port); status = Status::CONNECT_FAILURE; } else { - size_t sent_pkts = 0; - uint64_t sent_bytes = 0; if (!send_tcp_burst(conn->fd, burst, &sent_pkts, &sent_bytes)) { status = Status::CONNECT_FAILURE; } - if (sent_pkts > 0) { tx_pkts_.fetch_add(sent_pkts); } - if (sent_bytes > 0) { tx_bytes_.fetch_add(sent_bytes); } } } + if (sent_pkts > 0) { + tx_pkts_.fetch_add(sent_pkts); + metrics::add_tx(ep->tx_metrics, sent_pkts, sent_bytes); + } + if (sent_bytes > 0) { tx_bytes_.fetch_add(sent_bytes); } + if (status != Status::SUCCESS) { + const uint64_t dropped = requested_pkts > sent_pkts ? requested_pkts - sent_pkts : 1; + metrics::add_dropped(ep->tx_metrics, "send_failure", dropped); + } + free_all_packets(burst); free_tx_burst(burst); return status; @@ -970,6 +985,14 @@ std::shared_ptr SocketMgr::register_connection( conn->is_udp = is_udp; conn->rx_queue = rx_queue; conn->running.store(start_rx_thread); + if (if_index >= 0 && if_index < static_cast(cfg_.ifs_.size())) { + const auto& if_cfg = cfg_.ifs_[if_index]; + conn->rx_metrics = metrics::get_or_create_queue("socket", + if_cfg.name_.empty() ? if_cfg.address_ + : if_cfg.name_, + port, + std::to_string(queue)); + } { std::lock_guard lock(state_mutex_); @@ -1248,6 +1271,7 @@ void SocketMgr::tcp_rx_loop(std::shared_ptr conn) { push_rx_burst(conn->rx_queue, burst); rx_pkts_.fetch_add(1); rx_bytes_.fetch_add(static_cast(rx)); + metrics::add_rx(conn->rx_metrics, 1, static_cast(rx)); } conn->running.store(false); @@ -1319,6 +1343,7 @@ void SocketMgr::udp_rx_loop(int if_index) { push_rx_burst(ep->rx_queue_state, burst); rx_pkts_.fetch_add(1); rx_bytes_.fetch_add(static_cast(rx)); + metrics::add_rx(ep->rx_metrics, 1, static_cast(rx)); } } } diff --git a/src/managers/socket/daqiri_socket_mgr.h b/src/managers/socket/daqiri_socket_mgr.h index 0027ba1..569100d 100644 --- a/src/managers/socket/daqiri_socket_mgr.h +++ b/src/managers/socket/daqiri_socket_mgr.h @@ -29,6 +29,7 @@ #include #include "src/manager.h" +#include "src/metrics.h" namespace daqiri { @@ -127,6 +128,7 @@ class SocketMgr : public Manager { std::atomic running{false}; std::thread rx_thread; std::shared_ptr rx_queue; + std::shared_ptr rx_metrics; }; struct EndpointState { @@ -144,6 +146,8 @@ class SocketMgr : public Manager { std::thread accept_thread; std::thread io_thread; std::shared_ptr rx_queue_state; + std::shared_ptr rx_metrics; + std::shared_ptr tx_metrics; sockaddr_in udp_peer_addr{}; bool udp_peer_valid = false; uintptr_t primary_conn_id = 0; diff --git a/src/metrics.cpp b/src/metrics.cpp new file mode 100644 index 0000000..0a98283 --- /dev/null +++ b/src/metrics.cpp @@ -0,0 +1,352 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "metrics.h" + +#if DAQIRI_ENABLE_OTEL_METRICS + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace daqiri::metrics { + +namespace { + +namespace otel = opentelemetry; + +using IntObserver = + otel::nostd::shared_ptr>; + +struct DropCounter { + std::string reason; + std::atomic packets{0}; +}; + +struct QueueLabels { + std::string backend; + std::string interface_name; + std::string port_id; + std::string queue_id; +}; + +enum class MetricKind { RX_PACKETS, TX_PACKETS, RX_BYTES, TX_BYTES, DROPPED_PACKETS }; + +std::string normalize_interface_name(const std::string& interface_name, uint16_t port_id) { + if (!interface_name.empty()) { return interface_name; } + return "port" + std::to_string(port_id); +} + +std::string make_key(const std::string& backend, + const std::string& interface_name, + uint16_t port_id, + const std::string& queue_id) { + return backend + "|" + interface_name + "|" + std::to_string(port_id) + "|" + queue_id; +} + +otel::nostd::string_view view(const std::string& value) { + return otel::nostd::string_view(value.data(), value.size()); +} + +template +void observe(IntObserver observer, + uint64_t value, + const std::array, N>& attrs) { + if (observer == nullptr) { return; } + observer->Observe(static_cast(value), attrs); +} + +} // namespace + +struct CounterSet { + QueueLabels labels; + std::atomic rx_packets{0}; + std::atomic tx_packets{0}; + std::atomic rx_bytes{0}; + std::atomic tx_bytes{0}; + std::mutex drops_mutex; + std::unordered_map> drops; + + void observe_drops(IntObserver observer) { + std::vector> snapshot; + { + std::lock_guard guard(drops_mutex); + snapshot.reserve(drops.size()); + for (const auto& [reason, drop] : drops) { + (void)reason; + snapshot.push_back(drop); + } + } + + for (const auto& drop : snapshot) { + if (drop == nullptr) { continue; } + std::array, 5> attrs = {{ + {"daqiri.backend", view(labels.backend)}, + {"daqiri.interface.name", view(labels.interface_name)}, + {"daqiri.port.id", view(labels.port_id)}, + {"daqiri.queue.id", view(labels.queue_id)}, + {"daqiri.drop.reason", view(drop->reason)}, + }}; + observe(observer, drop->packets.load(std::memory_order_relaxed), attrs); + } + } + + std::shared_ptr get_drop_counter(const std::string& reason) { + std::lock_guard guard(drops_mutex); + auto it = drops.find(reason); + if (it != drops.end()) { return it->second; } + + auto drop = std::make_shared(); + drop->reason = reason; + drops.emplace(reason, drop); + return drop; + } +}; + +namespace { + +class Registry { + public: + std::shared_ptr get_or_create_queue(const std::string& backend, + const std::string& interface_name, + uint16_t port_id, + const std::string& queue_id) { + std::lock_guard guard(mutex_); + initialize_locked(); + + const auto normalized_name = normalize_interface_name(interface_name, port_id); + const auto key = make_key(backend, normalized_name, port_id, queue_id); + auto it = by_key_.find(key); + if (it != by_key_.end()) { return it->second; } + + auto counters = std::make_shared(); + counters->labels.backend = backend; + counters->labels.interface_name = normalized_name; + counters->labels.port_id = std::to_string(port_id); + counters->labels.queue_id = queue_id; + + by_key_.emplace(key, counters); + counters_.push_back(counters); + return counters; + } + + void shutdown() { + std::lock_guard guard(mutex_); + if (rx_packets_) { rx_packets_->RemoveCallback(&Registry::observe_rx_packets, this); } + if (tx_packets_) { tx_packets_->RemoveCallback(&Registry::observe_tx_packets, this); } + if (rx_bytes_) { rx_bytes_->RemoveCallback(&Registry::observe_rx_bytes, this); } + if (tx_bytes_) { tx_bytes_->RemoveCallback(&Registry::observe_tx_bytes, this); } + if (dropped_packets_) { + dropped_packets_->RemoveCallback(&Registry::observe_dropped_packets, this); + } + + rx_packets_.reset(); + tx_packets_.reset(); + rx_bytes_.reset(); + tx_bytes_.reset(); + dropped_packets_.reset(); + counters_.clear(); + by_key_.clear(); + initialized_ = false; + } + + private: + void initialize_locked() { + if (initialized_) { return; } + + auto provider = otel::metrics::Provider::GetMeterProvider(); + auto meter = provider->GetMeter("daqiri", "0.1.0"); + rx_packets_ = meter->CreateInt64ObservableCounter( + "daqiri.rx.packets", "Packets received by DAQIRI", "{packet}"); + tx_packets_ = meter->CreateInt64ObservableCounter( + "daqiri.tx.packets", "Packets transmitted by DAQIRI", "{packet}"); + rx_bytes_ = meter->CreateInt64ObservableCounter( + "daqiri.rx.bytes", "Bytes received by DAQIRI", "By"); + tx_bytes_ = meter->CreateInt64ObservableCounter( + "daqiri.tx.bytes", "Bytes transmitted by DAQIRI", "By"); + dropped_packets_ = meter->CreateInt64ObservableCounter( + "daqiri.dropped.packets", "Packets dropped by DAQIRI or the active backend", "{packet}"); + + rx_packets_->AddCallback(&Registry::observe_rx_packets, this); + tx_packets_->AddCallback(&Registry::observe_tx_packets, this); + rx_bytes_->AddCallback(&Registry::observe_rx_bytes, this); + tx_bytes_->AddCallback(&Registry::observe_tx_bytes, this); + dropped_packets_->AddCallback(&Registry::observe_dropped_packets, this); + initialized_ = true; + } + + static void observe_rx_packets(otel::metrics::ObserverResult result, void* state) noexcept { + static_cast(state)->observe_counter(result, MetricKind::RX_PACKETS); + } + + static void observe_tx_packets(otel::metrics::ObserverResult result, void* state) noexcept { + static_cast(state)->observe_counter(result, MetricKind::TX_PACKETS); + } + + static void observe_rx_bytes(otel::metrics::ObserverResult result, void* state) noexcept { + static_cast(state)->observe_counter(result, MetricKind::RX_BYTES); + } + + static void observe_tx_bytes(otel::metrics::ObserverResult result, void* state) noexcept { + static_cast(state)->observe_counter(result, MetricKind::TX_BYTES); + } + + static void observe_dropped_packets(otel::metrics::ObserverResult result, + void* state) noexcept { + static_cast(state)->observe_counter(result, MetricKind::DROPPED_PACKETS); + } + + std::vector> snapshot_counters() { + std::lock_guard guard(mutex_); + return counters_; + } + + void observe_counter(otel::metrics::ObserverResult result, MetricKind kind) noexcept { + auto observer_ptr = otel::nostd::get_if(&result); + if (observer_ptr == nullptr || *observer_ptr == nullptr) { return; } + + try { + const auto snapshot = snapshot_counters(); + for (const auto& counters : snapshot) { + if (counters == nullptr) { continue; } + + if (kind == MetricKind::DROPPED_PACKETS) { + counters->observe_drops(*observer_ptr); + continue; + } + + uint64_t value = 0; + switch (kind) { + case MetricKind::RX_PACKETS: + value = counters->rx_packets.load(std::memory_order_relaxed); + break; + case MetricKind::TX_PACKETS: + value = counters->tx_packets.load(std::memory_order_relaxed); + break; + case MetricKind::RX_BYTES: + value = counters->rx_bytes.load(std::memory_order_relaxed); + break; + case MetricKind::TX_BYTES: + value = counters->tx_bytes.load(std::memory_order_relaxed); + break; + case MetricKind::DROPPED_PACKETS: + break; + } + + std::array, 4> attrs = {{ + {"daqiri.backend", view(counters->labels.backend)}, + {"daqiri.interface.name", view(counters->labels.interface_name)}, + {"daqiri.port.id", view(counters->labels.port_id)}, + {"daqiri.queue.id", view(counters->labels.queue_id)}, + }}; + observe(*observer_ptr, value, attrs); + } + } catch (...) { + return; + } + } + + bool initialized_ = false; + std::mutex mutex_; + std::vector> counters_; + std::unordered_map> by_key_; + otel::nostd::shared_ptr rx_packets_; + otel::nostd::shared_ptr tx_packets_; + otel::nostd::shared_ptr rx_bytes_; + otel::nostd::shared_ptr tx_bytes_; + otel::nostd::shared_ptr dropped_packets_; +}; + +Registry& registry() { + static Registry registry; + return registry; +} + +} // namespace + +std::shared_ptr get_or_create_queue(const std::string& backend, + const std::string& interface_name, + uint16_t port_id, + const std::string& queue_id) { + return registry().get_or_create_queue(backend, interface_name, port_id, queue_id); +} + +void add_rx(const std::shared_ptr& counters, uint64_t packets, uint64_t bytes) { + if (counters == nullptr) { return; } + counters->rx_packets.fetch_add(packets, std::memory_order_relaxed); + counters->rx_bytes.fetch_add(bytes, std::memory_order_relaxed); +} + +void add_tx(const std::shared_ptr& counters, uint64_t packets, uint64_t bytes) { + if (counters == nullptr) { return; } + counters->tx_packets.fetch_add(packets, std::memory_order_relaxed); + counters->tx_bytes.fetch_add(bytes, std::memory_order_relaxed); +} + +void add_dropped(const std::shared_ptr& counters, + const std::string& reason, + uint64_t packets) { + if (counters == nullptr || packets == 0) { return; } + counters->get_drop_counter(reason)->packets.fetch_add(packets, std::memory_order_relaxed); +} + +void set_rx_packets(const std::shared_ptr& counters, uint64_t packets) { + if (counters == nullptr) { return; } + counters->rx_packets.store(packets, std::memory_order_relaxed); +} + +void set_tx_packets(const std::shared_ptr& counters, uint64_t packets) { + if (counters == nullptr) { return; } + counters->tx_packets.store(packets, std::memory_order_relaxed); +} + +void set_rx_bytes(const std::shared_ptr& counters, uint64_t bytes) { + if (counters == nullptr) { return; } + counters->rx_bytes.store(bytes, std::memory_order_relaxed); +} + +void set_tx_bytes(const std::shared_ptr& counters, uint64_t bytes) { + if (counters == nullptr) { return; } + counters->tx_bytes.store(bytes, std::memory_order_relaxed); +} + +void set_dropped(const std::shared_ptr& counters, + const std::string& reason, + uint64_t packets) { + if (counters == nullptr) { return; } + counters->get_drop_counter(reason)->packets.store(packets, std::memory_order_relaxed); +} + +void shutdown() { + registry().shutdown(); +} + +} // namespace daqiri::metrics + +#endif // DAQIRI_ENABLE_OTEL_METRICS diff --git a/src/metrics.h b/src/metrics.h new file mode 100644 index 0000000..440fd69 --- /dev/null +++ b/src/metrics.h @@ -0,0 +1,76 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#ifndef DAQIRI_ENABLE_OTEL_METRICS +#define DAQIRI_ENABLE_OTEL_METRICS 0 +#endif + +namespace daqiri::metrics { + +#if DAQIRI_ENABLE_OTEL_METRICS + +struct CounterSet; + +std::shared_ptr get_or_create_queue(const std::string& backend, + const std::string& interface_name, + uint16_t port_id, + const std::string& queue_id); + +void add_rx(const std::shared_ptr& counters, uint64_t packets, uint64_t bytes); +void add_tx(const std::shared_ptr& counters, uint64_t packets, uint64_t bytes); +void add_dropped(const std::shared_ptr& counters, + const std::string& reason, + uint64_t packets); +void set_rx_packets(const std::shared_ptr& counters, uint64_t packets); +void set_tx_packets(const std::shared_ptr& counters, uint64_t packets); +void set_rx_bytes(const std::shared_ptr& counters, uint64_t bytes); +void set_tx_bytes(const std::shared_ptr& counters, uint64_t bytes); +void set_dropped(const std::shared_ptr& counters, + const std::string& reason, + uint64_t packets); +void shutdown(); + +#else + +struct CounterSet {}; + +inline std::shared_ptr get_or_create_queue(const std::string&, + const std::string&, + uint16_t, + const std::string&) { + return nullptr; +} + +inline void add_rx(const std::shared_ptr&, uint64_t, uint64_t) {} +inline void add_tx(const std::shared_ptr&, uint64_t, uint64_t) {} +inline void add_dropped(const std::shared_ptr&, const std::string&, uint64_t) {} +inline void set_rx_packets(const std::shared_ptr&, uint64_t) {} +inline void set_tx_packets(const std::shared_ptr&, uint64_t) {} +inline void set_rx_bytes(const std::shared_ptr&, uint64_t) {} +inline void set_tx_bytes(const std::shared_ptr&, uint64_t) {} +inline void set_dropped(const std::shared_ptr&, const std::string&, uint64_t) {} +inline void shutdown() {} + +#endif + +} // namespace daqiri::metrics From 825d7d7a724b63f3f0682c682e4b907353a0391c Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Mon, 1 Jun 2026 21:05:40 +0000 Subject: [PATCH 2/7] #111 - Clarify Grafana dashboard metric display Signed-off-by: Cliff Burdick --- .../grafana/dashboards/daqiri-metrics.json | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/grafana/grafana/dashboards/daqiri-metrics.json b/examples/grafana/grafana/dashboards/daqiri-metrics.json index ae52d6d..0217506 100644 --- a/examples/grafana/grafana/dashboards/daqiri-metrics.json +++ b/examples/grafana/grafana/dashboards/daqiri-metrics.json @@ -52,12 +52,12 @@ }, "targets": [ { - "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_rx_packets_total[5s]))", + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_rx_packets_total{daqiri_queue_id!=\"all\"}[5s])) != 0", "legendFormat": "rx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", "refId": "A" }, { - "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_tx_packets_total[5s]))", + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_tx_packets_total{daqiri_queue_id!=\"all\"}[5s])) != 0", "legendFormat": "tx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", "refId": "B" } @@ -72,7 +72,7 @@ }, "fieldConfig": { "defaults": { - "unit": "gbps" + "unit": "suffix: Gb/s" }, "overrides": [] }, @@ -96,12 +96,12 @@ }, "targets": [ { - "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_rx_bytes_total[5s])) * 8 / 1000000000", + "expr": "(sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_rx_bytes_total{daqiri_queue_id!=\"all\"}[5s])) != 0) * 8 / 1000000000", "legendFormat": "rx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", "refId": "A" }, { - "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_tx_bytes_total[5s])) * 8 / 1000000000", + "expr": "(sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_tx_bytes_total{daqiri_queue_id!=\"all\"}[5s])) != 0) * 8 / 1000000000", "legendFormat": "tx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", "refId": "B" } @@ -140,7 +140,7 @@ }, "targets": [ { - "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id, daqiri_drop_reason) (rate(daqiri_dropped_packets_total[5s]))", + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id, daqiri_drop_reason) (rate(daqiri_dropped_packets_total{daqiri_queue_id!=\"all\"}[5s]))", "legendFormat": "{{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}} {{daqiri_drop_reason}}", "refId": "A" } @@ -179,12 +179,12 @@ }, "targets": [ { - "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (daqiri_rx_packets_total)", + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (daqiri_rx_packets_total{daqiri_queue_id!=\"all\"}) != 0", "legendFormat": "rx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", "refId": "A" }, { - "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (daqiri_tx_packets_total)", + "expr": "sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (daqiri_tx_packets_total{daqiri_queue_id!=\"all\"}) != 0", "legendFormat": "tx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", "refId": "B" } From c8a35bff03960d45cc8b700dfd5f1d49efb0ca5f Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Mon, 1 Jun 2026 21:40:29 +0000 Subject: [PATCH 3/7] #111 - Clean up Grafana rate window and GPUNetIO references Signed-off-by: Cliff Burdick --- Dockerfile | 24 ------------------- docs/tutorials/system_configuration.md | 4 ---- .../grafana/dashboards/daqiri-metrics.json | 4 ++-- 3 files changed, 2 insertions(+), 30 deletions(-) diff --git a/Dockerfile b/Dockerfile index fc3035b..a322896 100644 --- a/Dockerfile +++ b/Dockerfile @@ -199,30 +199,6 @@ RUN if [ "${DAQIRI_ENABLE_OTEL_METRICS}" = "ON" ]; then \ # ============================================================== FROM dpdk AS rdma -# ============================================================== -# gpunetio: Add DOCA SDK packages for GPUNetIO support -# ============================================================== -FROM rdma AS gpunetio - -# Install DOCA SDK packages required for GPUNetIO -# (DOCA repo is already configured in dpdk stage) -# - libdoca-sdk-gpunetio-dev: for gpunetio backend (doca-gpunetio module) -# - libdoca-sdk-eth-dev: for gpunetio backend (doca-eth module) -# - libdoca-sdk-flow-dev: for gpunetio backend (doca-flow module) -RUN apt-get update && apt-get install -y --no-install-recommends \ - mlnx-dpdk-dev \ - libdoca-sdk-gpunetio-dev \ - libdoca-sdk-eth-dev \ - libdoca-sdk-flow-dev \ - mlnx-ofed-kernel-utils \ - && rm -rf /var/lib/apt/lists/* - -RUN git clone https://github.com/NVIDIA/gdrcopy.git /opt/mellanox/gdrcopy \ - && cd /opt/mellanox/gdrcopy \ - && make lib - -ENV GDRCOPY_PATH_L=/opt/mellanox/gdrcopy/src - # ============================== # Rivermax Target # This stage is only built when --target rivermax is specified. It installs and configures Rivermax SDK. diff --git a/docs/tutorials/system_configuration.md b/docs/tutorials/system_configuration.md index 8dc67e5..689d05e 100644 --- a/docs/tutorials/system_configuration.md +++ b/docs/tutorials/system_configuration.md @@ -812,10 +812,6 @@ DAQIRI requires an [**NVIDIA SmartNIC**](https://www.nvidia.com/en-us/networking ### Step 5: Isolate CPU cores - !!! note - - This optimization is less impactful when using the `gpunetio` backend since the GPU polls the NIC. - The CPU interacting with the NIC to route packets is sensitive to perturbations, especially with smaller packet/batch sizes requiring more frequent work. Isolating a CPU in Linux prevents unwanted user or kernel threads from running on it, reducing context switching and latency spikes from noisy neighbors. We recommend isolating the CPU cores you will select to interact with the NIC (defined in the `daqiri` configuration [described in the configuration reference](configuration-walkthrough.md) in this tutorial). This is done by setting additional flags on the kernel bootline. diff --git a/examples/grafana/grafana/dashboards/daqiri-metrics.json b/examples/grafana/grafana/dashboards/daqiri-metrics.json index 0217506..783b025 100644 --- a/examples/grafana/grafana/dashboards/daqiri-metrics.json +++ b/examples/grafana/grafana/dashboards/daqiri-metrics.json @@ -96,12 +96,12 @@ }, "targets": [ { - "expr": "(sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_rx_bytes_total{daqiri_queue_id!=\"all\"}[5s])) != 0) * 8 / 1000000000", + "expr": "(sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_rx_bytes_total{daqiri_queue_id!=\"all\"}[30s])) != 0) * 8 / 1000000000", "legendFormat": "rx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", "refId": "A" }, { - "expr": "(sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_tx_bytes_total{daqiri_queue_id!=\"all\"}[5s])) != 0) * 8 / 1000000000", + "expr": "(sum by (daqiri_backend, daqiri_interface_name, daqiri_queue_id) (rate(daqiri_tx_bytes_total{daqiri_queue_id!=\"all\"}[30s])) != 0) * 8 / 1000000000", "legendFormat": "tx {{daqiri_backend}} {{daqiri_interface_name}} q{{daqiri_queue_id}}", "refId": "B" } From cddef1a34c5b6223211afb6b13d7da503cfe6ee4 Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Mon, 1 Jun 2026 21:41:44 +0000 Subject: [PATCH 4/7] #111 - Restore raw benchmark PCI placeholders Signed-off-by: Cliff Burdick --- examples/daqiri_bench_raw_tx_rx.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/daqiri_bench_raw_tx_rx.yaml b/examples/daqiri_bench_raw_tx_rx.yaml index e1cbe9b..60480e4 100644 --- a/examples/daqiri_bench_raw_tx_rx.yaml +++ b/examples/daqiri_bench_raw_tx_rx.yaml @@ -23,7 +23,7 @@ daqiri: interfaces: - name: "tx_port" - address: 0000:01:00.0 + address: <0000:00:00.0> tx: queues: - name: "tx_q_0" @@ -35,7 +35,7 @@ daqiri: offloads: - "tx_eth_src" - name: "rx_port" - address: 0000:01:00.1 + address: <0000:00:00.0> rx: flow_isolation: true queues: From 2adfc002d21585243c10ced73037252f6f527840 Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Mon, 1 Jun 2026 22:18:20 +0000 Subject: [PATCH 5/7] #111 - Address Greptile PR feedback Signed-off-by: Cliff Burdick --- docs/api-reference/configuration.md | 5 ++ docs/tutorials/benchmarking_examples.md | 29 +++++++++++ docs/tutorials/configuration-walkthrough.md | 4 ++ examples/grafana/run-benchmark.sh | 2 +- src/metrics.cpp | 55 ++++++++++++--------- 5 files changed, 70 insertions(+), 25 deletions(-) diff --git a/docs/api-reference/configuration.md b/docs/api-reference/configuration.md index 7e7c76f..30c6b7c 100644 --- a/docs/api-reference/configuration.md +++ b/docs/api-reference/configuration.md @@ -7,6 +7,11 @@ want to interoperate with existing configuration code. See `examples/daqiri_bench_*.yaml` for complete working examples. +OpenTelemetry metrics do not add YAML fields. Metrics-enabled builds use the +same interface, queue, and flow names from the active configuration as metric +labels, and applications are still responsible for configuring the OpenTelemetry +SDK/exporter before running DAQIRI. + ## Common Configuration These settings apply globally to both TX and RX: diff --git a/docs/tutorials/benchmarking_examples.md b/docs/tutorials/benchmarking_examples.md index 803c92b..17004e3 100644 --- a/docs/tutorials/benchmarking_examples.md +++ b/docs/tutorials/benchmarking_examples.md @@ -169,6 +169,35 @@ After having modified the configuration file, ensure you have connected an SFP c By default the application runs for 10 seconds and then exits. You can change the duration by passing `--seconds ` after the YAML path, or stop it gracefully at any time with `Ctrl-C`. +## Watch live OpenTelemetry metrics in Grafana + +DAQIRI can expose the raw benchmark counters through OpenTelemetry when metrics +support is enabled at build time. The Grafana example uses the same benchmark +binary and YAML files as the loopback test above, then starts Prometheus and +Grafana beside the benchmark process. + +Build the container with metrics enabled: + +```bash +DAQIRI_ENABLE_OTEL_METRICS=ON DAQIRI_MGR="dpdk socket rdma" scripts/build-container.sh +``` + +Before starting the stack, fill in the required `` in the benchmark +YAML you plan to run. You can also pass a machine-local copy through +`DAQIRI_CONFIG` so the tracked example YAML keeps its placeholder syntax. + +```bash +cd examples/grafana +DAQIRI_CONFIG=/workspace/daqiri/examples/daqiri_bench_raw_tx_rx.yaml \ +DAQIRI_SECONDS=60 \ +docker compose up +``` + +Prometheus scrapes `http://localhost:9464/metrics`, and Grafana serves the +`DAQIRI OpenTelemetry Metrics` dashboard at `http://localhost:3000`. The +throughput panel reports payload counter rates in `Gb/s` for each active +interface and queue. + ??? abstract "See an example output" ```log diff --git a/docs/tutorials/configuration-walkthrough.md b/docs/tutorials/configuration-walkthrough.md index 5357691..af4abcc 100644 --- a/docs/tutorials/configuration-walkthrough.md +++ b/docs/tutorials/configuration-walkthrough.md @@ -24,6 +24,10 @@ With a backend in mind, read down the questions below and stop at the first one - **DGX Spark / GB10** (prefilled) — [`daqiri_bench_raw_tx_rx_spark.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_tx_rx_spark.yaml). `kind: host_pinned` for the integrated GPU; cores, PCIe addresses, and IPs are prefilled. See the [Spark profile callout](benchmarking_examples.md#update-the-loopback-configuration) for run details. - **No physical NIC available** — [`daqiri_bench_raw_sw_loopback.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_raw_sw_loopback.yaml). `loopback: "sw"`, no NIC required. Useful for first-time build verification, not representative of production performance. + To watch the same raw loopback benchmark with live Prometheus and Grafana + counters, use the Grafana compose stack described in + [Watch live OpenTelemetry metrics in Grafana](benchmarking_examples.md#watch-live-opentelemetry-metrics-in-grafana). + **RDMA / RoCE** — runs on `daqiri_bench_rdma` (use `--mode {tx,rx,both}`). Configs use `kind: host_pinned` regardless of platform. - **Generic** (template — replace IPs) — [`daqiri_bench_rdma_tx_rx.yaml`](https://github.com/nvidia/daqiri/blob/main/examples/daqiri_bench_rdma_tx_rx.yaml). diff --git a/examples/grafana/run-benchmark.sh b/examples/grafana/run-benchmark.sh index 4cae3b9..82a31e8 100755 --- a/examples/grafana/run-benchmark.sh +++ b/examples/grafana/run-benchmark.sh @@ -2,7 +2,7 @@ set -euo pipefail DAQIRI_BIN="${DAQIRI_BIN:-/opt/daqiri/bin/daqiri_bench_raw_gpudirect}" -DAQIRI_CONFIG="${DAQIRI_CONFIG:-/opt/daqiri/bin/daqiri_bench_raw_tx_rx.yaml}" +DAQIRI_CONFIG="${DAQIRI_CONFIG:-/workspace/daqiri/examples/daqiri_bench_raw_tx_rx.yaml}" DAQIRI_SECONDS="${DAQIRI_SECONDS:-60}" DAQIRI_OTEL_PROMETHEUS_ENDPOINT="${DAQIRI_OTEL_PROMETHEUS_ENDPOINT:-0.0.0.0:9464}" export DAQIRI_OTEL_PROMETHEUS_ENDPOINT diff --git a/src/metrics.cpp b/src/metrics.cpp index 0a98283..3e064dd 100644 --- a/src/metrics.cpp +++ b/src/metrics.cpp @@ -17,12 +17,11 @@ #include "metrics.h" -#if DAQIRI_ENABLE_OTEL_METRICS - #include #include #include #include +#include #include #include @@ -42,6 +41,8 @@ namespace otel = opentelemetry; using IntObserver = otel::nostd::shared_ptr>; +using ObservableInstrumentPtr = + otel::nostd::shared_ptr; struct DropCounter { std::string reason; @@ -157,23 +158,31 @@ class Registry { } void shutdown() { - std::lock_guard guard(mutex_); - if (rx_packets_) { rx_packets_->RemoveCallback(&Registry::observe_rx_packets, this); } - if (tx_packets_) { tx_packets_->RemoveCallback(&Registry::observe_tx_packets, this); } - if (rx_bytes_) { rx_bytes_->RemoveCallback(&Registry::observe_rx_bytes, this); } - if (tx_bytes_) { tx_bytes_->RemoveCallback(&Registry::observe_tx_bytes, this); } - if (dropped_packets_) { - dropped_packets_->RemoveCallback(&Registry::observe_dropped_packets, this); + ObservableInstrumentPtr rx_packets; + ObservableInstrumentPtr tx_packets; + ObservableInstrumentPtr rx_bytes; + ObservableInstrumentPtr tx_bytes; + ObservableInstrumentPtr dropped_packets; + + { + std::lock_guard guard(mutex_); + rx_packets = std::move(rx_packets_); + tx_packets = std::move(tx_packets_); + rx_bytes = std::move(rx_bytes_); + tx_bytes = std::move(tx_bytes_); + dropped_packets = std::move(dropped_packets_); + counters_.clear(); + by_key_.clear(); + initialized_ = false; } - rx_packets_.reset(); - tx_packets_.reset(); - rx_bytes_.reset(); - tx_bytes_.reset(); - dropped_packets_.reset(); - counters_.clear(); - by_key_.clear(); - initialized_ = false; + if (rx_packets) { rx_packets->RemoveCallback(&Registry::observe_rx_packets, this); } + if (tx_packets) { tx_packets->RemoveCallback(&Registry::observe_tx_packets, this); } + if (rx_bytes) { rx_bytes->RemoveCallback(&Registry::observe_rx_bytes, this); } + if (tx_bytes) { tx_bytes->RemoveCallback(&Registry::observe_tx_bytes, this); } + if (dropped_packets) { + dropped_packets->RemoveCallback(&Registry::observe_dropped_packets, this); + } } private: @@ -276,11 +285,11 @@ class Registry { std::mutex mutex_; std::vector> counters_; std::unordered_map> by_key_; - otel::nostd::shared_ptr rx_packets_; - otel::nostd::shared_ptr tx_packets_; - otel::nostd::shared_ptr rx_bytes_; - otel::nostd::shared_ptr tx_bytes_; - otel::nostd::shared_ptr dropped_packets_; + ObservableInstrumentPtr rx_packets_; + ObservableInstrumentPtr tx_packets_; + ObservableInstrumentPtr rx_bytes_; + ObservableInstrumentPtr tx_bytes_; + ObservableInstrumentPtr dropped_packets_; }; Registry& registry() { @@ -348,5 +357,3 @@ void shutdown() { } } // namespace daqiri::metrics - -#endif // DAQIRI_ENABLE_OTEL_METRICS From 5129d27acaf3a7be407db89ce6b2b3a7756ec217 Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Tue, 2 Jun 2026 01:03:07 +0000 Subject: [PATCH 6/7] #111 - Clean up metrics CMake definitions Signed-off-by: Cliff Burdick --- src/CMakeLists.txt | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 93b79d7..4a3e9de 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -115,10 +115,7 @@ target_link_libraries(daqiri_common PRIVATE ) if(DAQIRI_ENABLE_OTEL_METRICS) - target_compile_definitions(daqiri_common PUBLIC DAQIRI_ENABLE_OTEL_METRICS=1) target_link_libraries(daqiri_common PRIVATE opentelemetry-cpp::api) -else() - target_compile_definitions(daqiri_common PUBLIC DAQIRI_ENABLE_OTEL_METRICS=0) endif() if(DAQIRI_ENABLE_GDS) find_path(DAQIRI_CUFILE_INCLUDE_DIR From 134ad5e3abfc821caa0bc6ac2fb359460d702c97 Mon Sep 17 00:00:00 2001 From: Denis Leshchev Date: Tue, 2 Jun 2026 16:18:06 -0400 Subject: [PATCH 7/7] #111 - Populate DPDK port metrics Signed-off-by: Denis Leshchev --- src/managers/dpdk/daqiri_dpdk_stats.cpp | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/managers/dpdk/daqiri_dpdk_stats.cpp b/src/managers/dpdk/daqiri_dpdk_stats.cpp index 492cf69..90dd9db 100644 --- a/src/managers/dpdk/daqiri_dpdk_stats.cpp +++ b/src/managers/dpdk/daqiri_dpdk_stats.cpp @@ -256,10 +256,6 @@ void DpdkStats::Run() { struct rte_eth_stats eth_stats {}; const bool eth_stats_valid = rte_eth_stats_get(port_id, ð_stats) == 0; const auto port_metrics = port_metrics_[port_id]; - bool has_queue_rx_packets = false; - bool has_queue_tx_packets = false; - bool has_queue_rx_bytes = false; - bool has_queue_tx_bytes = false; for (const auto& [queue_id, xstats] : port_stats.queue_xstats) { const uint32_t key = (port_id << 16) | queue_id; @@ -269,19 +265,15 @@ void DpdkStats::Run() { const auto& queue_metrics = metrics_it->second; if (xstats.rx_packets_idx >= 0) { metrics::set_rx_packets(queue_metrics, port_stats.xstats[xstats.rx_packets_idx].value); - has_queue_rx_packets = true; } if (xstats.tx_packets_idx >= 0) { metrics::set_tx_packets(queue_metrics, port_stats.xstats[xstats.tx_packets_idx].value); - has_queue_tx_packets = true; } if (xstats.rx_bytes_idx >= 0) { metrics::set_rx_bytes(queue_metrics, port_stats.xstats[xstats.rx_bytes_idx].value); - has_queue_rx_bytes = true; } if (xstats.tx_bytes_idx >= 0) { metrics::set_tx_bytes(queue_metrics, port_stats.xstats[xstats.tx_bytes_idx].value); - has_queue_tx_bytes = true; } if (xstats.rx_errors_idx >= 0) { metrics::set_dropped(queue_metrics, @@ -291,10 +283,10 @@ void DpdkStats::Run() { } if (eth_stats_valid) { - if (!has_queue_rx_packets) { metrics::set_rx_packets(port_metrics, eth_stats.ipackets); } - if (!has_queue_tx_packets) { metrics::set_tx_packets(port_metrics, eth_stats.opackets); } - if (!has_queue_rx_bytes) { metrics::set_rx_bytes(port_metrics, eth_stats.ibytes); } - if (!has_queue_tx_bytes) { metrics::set_tx_bytes(port_metrics, eth_stats.obytes); } + metrics::set_rx_packets(port_metrics, eth_stats.ipackets); + metrics::set_tx_packets(port_metrics, eth_stats.opackets); + metrics::set_rx_bytes(port_metrics, eth_stats.ibytes); + metrics::set_tx_bytes(port_metrics, eth_stats.obytes); metrics::set_dropped(port_metrics, "rx_missed", eth_stats.imissed); metrics::set_dropped(port_metrics, "rx_nombuf", eth_stats.rx_nombuf); metrics::set_dropped(port_metrics, "rx_errors", eth_stats.ierrors);