From 2b821365782e72e57f7f831fca19cce52c587b4d Mon Sep 17 00:00:00 2001 From: Denis Leshchev Date: Wed, 3 Jun 2026 13:27:01 -0400 Subject: [PATCH 1/2] Add DAQIRI Holoscan integration tutorial Signed-off-by: Denis Leshchev --- AGENTS.md | 2 +- README.md | 1 + docs/tutorials/daqiri-holoscan-integration.md | 330 ++++++++++++++++++ mkdocs.yml | 1 + 4 files changed, 333 insertions(+), 1 deletion(-) create mode 100644 docs/tutorials/daqiri-holoscan-integration.md diff --git a/AGENTS.md b/AGENTS.md index effe7ff..9dbad01 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -96,7 +96,7 @@ The web docs live in `docs/` and are built with [MkDocs Material](https://squidf - `docs/concepts.md` — terminology glossary (stream types and protocols, GPUDirect, packet/burst/segment, flow/queue, memory region, zero-copy ownership, RX reorder). Meant to be opened in parallel with the rest of the docs. - `docs/api-reference/index.md` — API guide (6-step application lifecycle, configuration-first model) - `docs/api-reference/configuration.md`, `docs/api-reference/cpp.md`, `docs/api-reference/python.md` — YAML schema, C++ API, and Python bindings docs -- `docs/tutorials/` — tutorial walkthroughs (system config, config-file walkthrough) +- `docs/tutorials/` — tutorial walkthroughs (system config, config-file walkthrough, Holoscan integration) - `docs/tutorials/benchmarking_examples.md` — surfaced as a top-level "Benchmarks" nav entry in `mkdocs.yml` and `docs/index.html`; file kept at its original path for inbound-link stability - `docs/stylesheets/extra.css` — custom theme overrides diff --git a/README.md b/README.md index 44867bd..31a062c 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ Step-by-step walkthroughs to get hands-on: - [System Configuration](https://nvidia.github.io/daqiri/tutorials/system_configuration/) — NIC drivers, link layers, GPUDirect, hugepages, CPU isolation, GPU clocks - [Benchmarking Examples](https://nvidia.github.io/daqiri/tutorials/benchmarking_examples/) — run `daqiri_bench_raw_gpudirect` with a loopback test - [Understanding the Configuration File](https://nvidia.github.io/daqiri/tutorials/configuration-walkthrough/) — annotated YAML walkthrough +- [DAQIRI + Holoscan Integration](https://nvidia.github.io/daqiri/tutorials/daqiri-holoscan-integration/) — use DAQIRI RX bursts from a Holoscan source operator ## License diff --git a/docs/tutorials/daqiri-holoscan-integration.md b/docs/tutorials/daqiri-holoscan-integration.md new file mode 100644 index 0000000..1e4f209 --- /dev/null +++ b/docs/tutorials/daqiri-holoscan-integration.md @@ -0,0 +1,330 @@ +# DAQIRI + Holoscan Integration + +DAQIRI can be used as the IO layer inside an NVIDIA Holoscan application. The +recommended pattern is to initialize DAQIRI once during application startup, let a +Holoscan source operator poll DAQIRI RX queues, aggregate packets into +application-owned tensor storage, and emit a `holoscan::TensorMap` to downstream +operators. + +This tutorial shows a minimal C++ integration skeleton. It is not a complete +runnable app in this repository. The exact tensor and DLPack wrapping code depends +on the Holoscan SDK version and should live in a runnable Holohub example. + +Before integrating with Holoscan, make sure the DAQIRI YAML works with the +standalone benchmarks in [Benchmarking Examples](benchmarking_examples.md), and +review the [configuration walkthrough](configuration-walkthrough.md). + +## Application Lifecycle + +The Holoscan application owns the graph. DAQIRI owns NIC setup, packet memory, +and RX burst lifetimes. + +```cpp +#include +#include + +#include + +class DaqiriHoloscanApp : public holoscan::Application { + public: + void compose() override { + auto rx = make_operator("daqiri_rx", from_config("bench_rx")); + auto sink = make_operator("tensor_sink"); + + add_flow(rx, sink, {{"out", "in"}}); + } +}; + +int main(int argc, char** argv) { + if (argc != 2) { + return 1; + } + + const std::filesystem::path config_path{argv[1]}; + + auto status = daqiri::daqiri_init(config_path.string()); + if (status != daqiri::Status::SUCCESS) { + return 1; + } + + auto app = holoscan::make_application(); + app->config(config_path.string()); + app->run(); + + daqiri::print_stats(); + daqiri::shutdown(); + return 0; +} +``` + +The shared YAML file can contain both Holoscan parameters and the `daqiri` block +passed to `daqiri_init(...)`. DAQIRI ignores Holoscan-only keys, and Holoscan +operators read their own parameter blocks with `from_config(...)`. + +```yaml +scheduler: + type: greedy + +daqiri: + cfg: + version: 1 + stream_type: "raw" + master_core: 3 + log_level: "info" + memory_regions: + - name: rx_payload + kind: device + affinity: 0 + num_bufs: 32768 + buf_size: 4096 + interfaces: + - name: rx_port + address: "<0000:00:00.0>" + rx: + queues: + - name: rx_q0 + id: 0 + cpu_core: 9 + batch_size: 64 + memory_regions: [rx_payload] + +bench_rx: + interface_name: rx_port + batch_size: 64 + max_packet_size: 4096 + header_size: 64 + gpu_direct: true + split_boundary: 0 +``` + +Replace placeholder PCIe addresses, CPU cores, queue fields, and memory regions +with values for your system. RX flow rules are omitted from this compact excerpt; +real raw Ethernet RX configs need a `flows:` block to steer packets to a queue. +See the [configuration walkthrough](configuration-walkthrough.md) and +[Configuration YAML Reference](../api-reference/configuration.md). + +## RX Operator Skeleton + +The source operator resolves the configured DAQIRI interface once, allocates any +CUDA resources it needs, polls all RX queues on each `compute()` call, and emits a +Holoscan tensor batch after aggregation. + +```cpp +#include +#include + +#include + +#include +#include +#include +#include + +class DaqiriRxOp : public holoscan::Operator { + public: + HOLOSCAN_OPERATOR_FORWARD_ARGS(DaqiriRxOp) + + ~DaqiriRxOp() override { + if (batch_ready_event_ != nullptr) { + cudaEventDestroy(batch_ready_event_); + } + if (stream_ != nullptr) { + cudaStreamDestroy(stream_); + } + } + + void setup(holoscan::OperatorSpec& spec) override { + spec.param(interface_name_, "interface_name", "Interface name", + "Name of the DAQIRI interface to receive from"); + spec.param(batch_size_, "batch_size", "Batch size", + "Number of packets to aggregate before emitting"); + spec.param(max_packet_size_, "max_packet_size", "Max packet size", + "Maximum packet bytes copied into one output slot"); + spec.param(header_size_, "header_size", "Header size", + "Header bytes used when HDS is enabled"); + spec.param(gpu_direct_, "gpu_direct", "GPU direct", + "Whether RX payloads are expected in CUDA-addressable memory"); + spec.param(split_boundary_, "split_boundary", "Split boundary", + "Payload segment index for header-data split; 0 for single segment"); + + spec.output("out"); + } + + void initialize() override { + holoscan::Operator::initialize(); + + port_id_ = daqiri::get_port_id(interface_name_.get()); + if (port_id_ < 0) { + throw std::runtime_error("DAQIRI interface was not found"); + } + + cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking); + cudaEventCreateWithFlags(&batch_ready_event_, cudaEventDisableTiming); + + allocate_batch_buffers(batch_size_.get(), max_packet_size_.get()); + } + + void compute(holoscan::InputContext&, + holoscan::OutputContext& op_output, + holoscan::ExecutionContext&) override { + const auto queue_count = daqiri::get_num_rx_queues(port_id_); + + for (uint16_t queue_id = 0; queue_id < queue_count; ++queue_id) { + daqiri::BurstParams* burst = nullptr; + const auto status = daqiri::get_rx_burst(&burst, port_id_, queue_id); + if (status != daqiri::Status::SUCCESS || burst == nullptr) { + continue; + } + + consume_burst_into_batch(burst); + daqiri::free_all_packets_and_burst_rx(burst); + } + + if (!batch_is_ready()) { + return; + } + + cudaEventRecord(batch_ready_event_, stream_); + cudaEventSynchronize(batch_ready_event_); + + holoscan::TensorMap out_message; + out_message["rx_tensor"] = wrap_current_batch_as_tensor(); + op_output.emit(out_message, "out"); + + reset_batch(); + } + + private: + void consume_burst_into_batch(daqiri::BurstParams* burst) { + const int packet_count = daqiri::get_num_packets(burst); + + for (int i = 0; i < packet_count; ++i) { + if (split_boundary_.get() == 0) { + void* packet = daqiri::get_packet_ptr(burst, i); + uint32_t packet_len = daqiri::get_packet_length(burst, i); + append_single_segment_packet(packet, packet_len, stream_); + } else { + void* header = daqiri::get_segment_packet_ptr(burst, 0, i); + uint32_t header_len = daqiri::get_segment_packet_length(burst, 0, i); + + const int payload_segment = split_boundary_.get(); + void* payload = daqiri::get_segment_packet_ptr(burst, payload_segment, i); + uint32_t payload_len = + daqiri::get_segment_packet_length(burst, payload_segment, i); + + append_hds_packet(header, header_len, payload, payload_len, stream_); + } + } + } + + void allocate_batch_buffers(int batch_size, int max_packet_size); + bool batch_is_ready() const; + void reset_batch(); + + void append_single_segment_packet(void* packet, uint32_t packet_len, + cudaStream_t stream); + void append_hds_packet(void* header, uint32_t header_len, + void* payload, uint32_t payload_len, + cudaStream_t stream); + + std::shared_ptr wrap_current_batch_as_tensor(); + + holoscan::Parameter interface_name_; + holoscan::Parameter batch_size_; + holoscan::Parameter max_packet_size_; + holoscan::Parameter header_size_; + holoscan::Parameter gpu_direct_; + holoscan::Parameter split_boundary_; + + int port_id_ = -1; + cudaStream_t stream_ = nullptr; + cudaEvent_t batch_ready_event_ = nullptr; +}; +``` + +For a single-segment GPU RX configuration, `daqiri::get_packet_ptr(...)` returns +the packet pointer and `daqiri::get_packet_length(...)` returns its length. For +header-data split, segment `0` is commonly the CPU header segment and the payload +segment is retrieved with `daqiri::get_segment_packet_ptr(...)` and +`daqiri::get_segment_packet_length(...)`. + +The skeleton above copies or aggregates packet contents into operator-owned batch +storage before emitting. That means it can return the RX buffers to DAQIRI +immediately with `daqiri::free_all_packets_and_burst_rx(burst)`. + +If your operator wraps DAQIRI-owned packet buffers directly in a tensor, the tensor +lifetime must call the matching DAQIRI cleanup function when downstream consumers +are finished. For normal RX bursts, use +`daqiri::free_all_packets_and_burst_rx(burst)`. For HDS or segmented ownership +where only one segment is transferred to downstream ownership, use +`daqiri::free_segment_packets_and_burst(burst, segment_id)` for the segment being +released. Freeing only the burst metadata is insufficient when RX packet buffers +are still owned by the RX path; missed packet-buffer frees eventually drain the +pool and cause RX drops. + +See [C++ API Usage](../api-reference/cpp.md#receiving-packets) for packet access +helpers and [RX Step 3 - Free buffers](../api-reference/cpp.md#rx-step-3-free-buffers) +for the complete cleanup API list. + +## Tensor Emission + +The source operator emits a `holoscan::TensorMap` on port `out`. The example uses +a single entry named `rx_tensor`: + +```cpp +holoscan::TensorMap out_message; +out_message["rx_tensor"] = wrap_current_batch_as_tensor(); +op_output.emit(out_message, "out"); +``` + +The tensor can represent any layout your downstream pipeline expects, such as +`[batch, bytes]`, `[batch, header_bytes]` plus `[batch, payload_bytes]`, or a +metadata tensor paired with a payload tensor. The important boundary is ownership: +either the operator emits tensors backed by its own batch storage, or it emits +zero-copy wrappers with a release callback that frees the DAQIRI packet buffers. + +The complete SDK-version-specific tensor and DLPack wrapping belongs in the +Holohub runnable example, where it can be compiled and tested against a concrete +Holoscan SDK release. + +## Sink Operator Skeleton + +A downstream sink receives the same `TensorMap` and can log, forward, or pass the +tensor metadata into additional processing operators. + +```cpp +#include + +class TensorSinkOp : public holoscan::Operator { + public: + HOLOSCAN_OPERATOR_FORWARD_ARGS(TensorSinkOp) + + void setup(holoscan::OperatorSpec& spec) override { + spec.input("in"); + } + + void compute(holoscan::InputContext& op_input, + holoscan::OutputContext&, + holoscan::ExecutionContext&) override { + auto maybe_message = op_input.receive("in"); + if (!maybe_message) { + return; + } + + const auto& tensor_map = maybe_message.value(); + auto it = tensor_map.find("rx_tensor"); + if (it == tensor_map.end()) { + return; + } + + const auto& tensor = it->second; + HOLOSCAN_LOG_INFO("Received rx_tensor at {}", static_cast(tensor.get())); + } +}; +``` + +## References + +- [Holohub PR #1553](https://github.com/nvidia-holoscan/holohub/pull/1553) is the current reference while the Holohub example is under review. +- After merge, the expected Holohub app path is `applications/daqiri_raw_ethernet_bench`. +- DAQIRI [configuration walkthrough](configuration-walkthrough.md) and [C++ API Usage](../api-reference/cpp.md) cover the DAQIRI-side configuration, packet access, and cleanup calls. diff --git a/mkdocs.yml b/mkdocs.yml index 65d9e7d..709f53c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -59,6 +59,7 @@ nav: - System Configuration: tutorials/system_configuration.md - Bare-Metal CMake Build: tutorials/bare-metal-cmake-build.md - Configuration YAML Walkthrough: tutorials/configuration-walkthrough.md + - DAQIRI + Holoscan Integration: tutorials/daqiri-holoscan-integration.md markdown_extensions: - admonition From 768c75b409c1477d9a600acb2d0a666ac44cb363 Mon Sep 17 00:00:00 2001 From: Denis Leshchev Date: Fri, 5 Jun 2026 16:42:17 -0400 Subject: [PATCH 2/2] Update DAQIRI Holoscan integration tutorial Signed-off-by: Denis Leshchev --- docs/tutorials/daqiri-holoscan-integration.md | 438 +++++++++++------- 1 file changed, 282 insertions(+), 156 deletions(-) diff --git a/docs/tutorials/daqiri-holoscan-integration.md b/docs/tutorials/daqiri-holoscan-integration.md index 1e4f209..8105cc5 100644 --- a/docs/tutorials/daqiri-holoscan-integration.md +++ b/docs/tutorials/daqiri-holoscan-integration.md @@ -1,23 +1,36 @@ # DAQIRI + Holoscan Integration -DAQIRI can be used as the IO layer inside an NVIDIA Holoscan application. The -recommended pattern is to initialize DAQIRI once during application startup, let a -Holoscan source operator poll DAQIRI RX queues, aggregate packets into -application-owned tensor storage, and emit a `holoscan::TensorMap` to downstream -operators. - -This tutorial shows a minimal C++ integration skeleton. It is not a complete -runnable app in this repository. The exact tensor and DLPack wrapping code depends -on the Holoscan SDK version and should live in a runnable Holohub example. - -Before integrating with Holoscan, make sure the DAQIRI YAML works with the -standalone benchmarks in [Benchmarking Examples](benchmarking_examples.md), and -review the [configuration walkthrough](configuration-walkthrough.md). +This tutorial demonstrates how the DAQIRI library can be integrated into an NVIDIA +Holoscan application. The [Holoscan SDK](https://developer.nvidia.com/holoscan-sdk) +is NVIDIA's sensor-processing platform optimized for real-time GPU processing and +AI inferencing, with applications in scientific computing, healthcare, medical +robotics, and more. + +This tutorial shows a minimal C++ example demonstrating the main principles of +using DAQIRI in a Holoscan application: it ingests a raw Ethernet stream, uses +DAQIRI's GPU reorder/quantize plan to turn it into sequence-ordered, quantized +batches, and emits each batch as a GPU tensor into a Holoscan pipeline. The snippets +below link against both `libdaqiri` and `libholoscan`; a complete, build-tested +project with a working CMake setup lives in Holohub (see [References](#references)). + +To achieve peak IO performance, be sure to review the +[system configuration](system_configuration.md) tutorial. It is also a good idea to +confirm the DAQIRI YAML works with the standalone benchmarks in +[Benchmarking Examples](benchmarking_examples.md) and to review the +[configuration walkthrough](configuration-walkthrough.md) before wiring DAQIRI into +Holoscan. ## Application Lifecycle -The Holoscan application owns the graph. DAQIRI owns NIC setup, packet memory, -and RX burst lifetimes. +A Holoscan application is a directed acyclic graph of operators — the fundamental +units of work — connected by data flows and run by a scheduler. DAQIRI owns NIC +setup, packet memory, and RX burst lifetimes; the operators in the graph pull +received data from DAQIRI and pass it downstream as tensors. + +The recommended pattern is to initialize DAQIRI once during application startup, +configure a GPU reorder/quantize plan in the DAQIRI config, let a Holoscan source +operator poll the reordered RX bursts, and emit each completed — sequence-ordered +and quantized — batch as a `holoscan::TensorMap` to downstream operators. ```cpp #include @@ -27,43 +40,72 @@ and RX burst lifetimes. class DaqiriHoloscanApp : public holoscan::Application { public: + // compose() wires up the operator graph. Holoscan calls it once when the + // application starts, before the scheduler begins running operators. void compose() override { - auto rx = make_operator("daqiri_rx", from_config("bench_rx")); + // Drive the graph with a multithreaded scheduler. High-bandwidth DAQIRI + // pipelines must overlap IO with processing: the RX source operator has to keep + // polling the NIC while downstream operators work on earlier batches. A + // single-threaded (greedy) scheduler would serialize them. Size the worker + // count to the number of operators that can run at once / available cores. + scheduler(make_scheduler( + "multithread", from_config("scheduler"))); + + // Source operator: polls DAQIRI for completed reordered+quantized batches. Its + // parameters come from the `reorder_rx` block of the YAML below. + auto rx = make_operator("daqiri_rx", from_config("reorder_rx")); + // Downstream operator that consumes the emitted tensors. auto sink = make_operator("tensor_sink"); + // Connect rx's "out" port to sink's "in" port, forming a two-node graph. add_flow(rx, sink, {{"out", "in"}}); } }; int main(int argc, char** argv) { + // Expect a single argument: the path to the shared YAML config file. if (argc != 2) { return 1; } const std::filesystem::path config_path{argv[1]}; + // Initialize DAQIRI once, up front: this sets up the NIC, packet memory pools, + // RX queues, and the GPU reorder/quantize plan described in the `daqiri` block. + // Must happen before the operators (which call into DAQIRI) start running. auto status = daqiri::daqiri_init(config_path.string()); if (status != daqiri::Status::SUCCESS) { return 1; } + // Build the Holoscan application and hand it the same YAML file. Holoscan reads + // its own keys (scheduler, operator parameter blocks) and ignores the `daqiri` + // block. auto app = holoscan::make_application(); app->config(config_path.string()); + // Runs the operator graph until the application is stopped. See "Running and + // stopping" below for how a polling RX source terminates. app->run(); + // Print DAQIRI's RX/TX counters, then tear down the NIC and free packet memory. daqiri::print_stats(); daqiri::shutdown(); return 0; } ``` -The shared YAML file can contain both Holoscan parameters and the `daqiri` block -passed to `daqiri_init(...)`. DAQIRI ignores Holoscan-only keys, and Holoscan -operators read their own parameter blocks with `from_config(...)`. +The shared YAML file carries Holoscan parameters (the `scheduler` block and each +operator's parameter block) alongside the `daqiri` block passed to +`daqiri_init(...)`. DAQIRI ignores Holoscan-only keys, and Holoscan operators read +their own parameter blocks with `from_config(...)`. ```yaml +# Multithreaded scheduler: runs the IO source operator and downstream processing on +# separate worker threads so ingestion and compute overlap. scheduler: - type: greedy + worker_thread_number: 4 # size to the operators that can run concurrently + stop_on_deadlock: true # return from run() once the graph goes idle + stop_on_deadlock_timeout: 500 # ms to wait before declaring the graph idle daqiri: cfg: @@ -72,233 +114,308 @@ daqiri: master_core: 3 log_level: "info" memory_regions: - - name: rx_payload + - name: rx_gpu # raw RX packet buffers the NIC DMAs into kind: device affinity: 0 - num_bufs: 32768 - buf_size: 4096 + num_bufs: 16384 + buf_size: 2048 + - name: reorder_gpu # holds one reordered + quantized output batch each + kind: device + affinity: 0 + num_bufs: 128 + buf_size: 4063232 # >= packets_per_batch * per-packet output bytes interfaces: - name: rx_port - address: "<0000:00:00.0>" + address: "<0000:00:00.0>" # put the correct PCIe address of the NIC here rx: + flow_isolation: true queues: - name: rx_q0 id: 0 cpu_core: 9 - batch_size: 64 - memory_regions: [rx_payload] - -bench_rx: + batch_size: 256 + timeout_us: 20000 + memory_regions: + - rx_gpu + flows: + - name: flow_0 # steer matching UDP packets to queue 0 + id: 201 + action: + type: queue + id: 0 + match: + udp_src: 5000 + udp_dst: 5000 + # GPU reorder + quantize plan. DAQIRI reorders packets by their in-payload + # sequence/batch number into a contiguous, gap-filled buffer and converts + # the payload from int4 to fp32 on the GPU, writing one batch into + # reorder_gpu. The operator just consumes the finished batch. + reorder_configs: + - name: rx_reorder_quantize + reorder_type: gpu + memory_region: reorder_gpu + payload_byte_offset: 64 # header bytes before the payload + flow_ids: + - 201 + data_types: + input_type: int4 # on-the-wire payload format + output_type: fp32 # quantized output dtype + endianness: host + method: + seq_batch_number: + sequence_number: # bits locating the per-packet sequence no. + bit_offset: 128 + bit_width: 10 + batch_number: # bits locating the batch no. + bit_offset: 144 + bit_width: 2 + +# Parameters for the DaqiriReorderRxOp operator, read via from_config("reorder_rx"). +reorder_rx: interface_name: rx_port - batch_size: 64 - max_packet_size: 4096 - header_size: 64 - gpu_direct: true - split_boundary: 0 + reorder_name: rx_reorder_quantize # must match a reorder_configs[].name above ``` -Replace placeholder PCIe addresses, CPU cores, queue fields, and memory regions -with values for your system. RX flow rules are omitted from this compact excerpt; -real raw Ethernet RX configs need a `flows:` block to steer packets to a queue. -See the [configuration walkthrough](configuration-walkthrough.md) and -[Configuration YAML Reference](../api-reference/configuration.md). +Replace the placeholder PCIe address, CPU cores, flow match fields, and bit-field +offsets with values for your stream. The reorder plan does the heavy lifting; the +operator's own parameter block (`reorder_rx`) is just the interface name and the +name of the reorder config whose CUDA stream it drives. See the +[configuration walkthrough](configuration-walkthrough.md) and +[Configuration YAML Reference](../api-reference/configuration.md) for the full +reorder/quantize schema. + +### Running and stopping + +The RX operator is a *source*: it has no input ports, so under the multithreaded +scheduler its `compute()` is invoked continuously on a worker thread, polling the +NIC while other workers run downstream operators. With `stop_on_deadlock: true` the +scheduler returns from `app->run()` once no operator can make progress for +`stop_on_deadlock_timeout` ms (e.g. the stream ends); `SIGINT` (Ctrl-C) also stops +it. Either way control returns to `main()` and the teardown prints stats and shuts +DAQIRI down. To bound a run explicitly — for a smoke test — attach a stop condition +such as `make_condition(num_batches)` to the operator in +`compose()`. ## RX Operator Skeleton -The source operator resolves the configured DAQIRI interface once, allocates any -CUDA resources it needs, polls all RX queues on each `compute()` call, and emits a -Holoscan tensor batch after aggregation. +The source operator resolves the configured DAQIRI interface, binds its CUDA stream +to the named reorder plan, and on each `compute()` call dequeues one RX burst. When +the burst is a completed reorder batch, it fences on the burst's CUDA event, reads +the batch metadata, and emits the reordered+quantized device buffer as a tensor. ```cpp #include #include #include +#include #include #include #include #include -class DaqiriRxOp : public holoscan::Operator { +// Emits each completed, sequence-ordered, quantized batch from DAQIRI's GPU reorder +// plan as a Holoscan tensor. +class DaqiriReorderRxOp : public holoscan::Operator { public: - HOLOSCAN_OPERATOR_FORWARD_ARGS(DaqiriRxOp) + // Boilerplate that lets Holoscan construct the operator from the YAML config. + HOLOSCAN_OPERATOR_FORWARD_ARGS(DaqiriReorderRxOp) - ~DaqiriRxOp() override { - if (batch_ready_event_ != nullptr) { - cudaEventDestroy(batch_ready_event_); - } + // Release the CUDA stream created in initialize(). + ~DaqiriReorderRxOp() override { if (stream_ != nullptr) { cudaStreamDestroy(stream_); } } + // setup() declares the operator's parameters (read from the `reorder_rx` block) + // and its output port. It runs before initialize(). void setup(holoscan::OperatorSpec& spec) override { spec.param(interface_name_, "interface_name", "Interface name", "Name of the DAQIRI interface to receive from"); - spec.param(batch_size_, "batch_size", "Batch size", - "Number of packets to aggregate before emitting"); - spec.param(max_packet_size_, "max_packet_size", "Max packet size", - "Maximum packet bytes copied into one output slot"); - spec.param(header_size_, "header_size", "Header size", - "Header bytes used when HDS is enabled"); - spec.param(gpu_direct_, "gpu_direct", "GPU direct", - "Whether RX payloads are expected in CUDA-addressable memory"); - spec.param(split_boundary_, "split_boundary", "Split boundary", - "Payload segment index for header-data split; 0 for single segment"); + spec.param(reorder_name_, "reorder_name", "Reorder config name", + "reorder_configs[].name whose CUDA stream this operator drives"); + // Single output port named "out" carrying a map of named tensors. spec.output("out"); } + // initialize() runs once when the graph is composed. DAQIRI is already + // initialized in main(), so here we resolve the interface and attach our CUDA + // stream to the reorder plan. void initialize() override { holoscan::Operator::initialize(); + // Resolve the configured interface name to a DAQIRI port id. A negative id + // means the name was not found. port_id_ = daqiri::get_port_id(interface_name_.get()); if (port_id_ < 0) { throw std::runtime_error("DAQIRI interface was not found"); } + // The GPU reorder+quantize kernel runs on a CUDA stream we own. Binding our + // stream to the named reorder plan means every reordered burst we dequeue was + // produced on this stream, and its completion event is recorded on it. cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking); - cudaEventCreateWithFlags(&batch_ready_event_, cudaEventDisableTiming); - - allocate_batch_buffers(batch_size_.get(), max_packet_size_.get()); + if (daqiri::set_reorder_cuda_stream(interface_name_.get(), reorder_name_.get(), + stream_) != daqiri::Status::SUCCESS) { + throw std::runtime_error("set_reorder_cuda_stream failed"); + } } + // compute() is invoked continuously by the scheduler. Each call drains at most one + // burst and, if it is a finished reorder batch, emits it as a tensor. void compute(holoscan::InputContext&, holoscan::OutputContext& op_output, holoscan::ExecutionContext&) override { - const auto queue_count = daqiri::get_num_rx_queues(port_id_); - - for (uint16_t queue_id = 0; queue_id < queue_count; ++queue_id) { - daqiri::BurstParams* burst = nullptr; - const auto status = daqiri::get_rx_burst(&burst, port_id_, queue_id); - if (status != daqiri::Status::SUCCESS || burst == nullptr) { - continue; - } + // Dequeue one RX burst from queue 0. With a reorder plan active, completed + // reorder batches are delivered as bursts flagged DAQIRI_BURST_FLAG_REORDERED. + daqiri::BurstParams* burst = nullptr; + if (daqiri::get_rx_burst(&burst, port_id_, 0) != daqiri::Status::SUCCESS || + burst == nullptr) { + return; // Nothing ready this tick. + } - consume_burst_into_batch(burst); + // Skip anything that is not a finished reorder batch (e.g. raw passthrough + // bursts). The flag lives in the burst header. + const bool reordered = + (burst->hdr.hdr.burst_flags & daqiri::DAQIRI_BURST_FLAG_REORDERED) != 0U; + if (!reordered) { daqiri::free_all_packets_and_burst_rx(burst); + return; } - if (!batch_is_ready()) { + // The reorder/quantize kernel filled the batch asynchronously on our stream; the + // burst carries the completion event. Fence on it before reading the data. + if (burst->event != nullptr) { + cudaEventSynchronize(burst->event); + } + + // Batch metadata: how many sequence slots, the per-slot output length, and the + // total reordered buffer size. + daqiri::ReorderBurstInfo info{}; + if (daqiri::get_reorder_burst_info(burst, &info) != daqiri::Status::SUCCESS) { + daqiri::free_all_packets_and_burst_rx(burst); return; } - cudaEventRecord(batch_ready_event_, stream_); - cudaEventSynchronize(batch_ready_event_); + // The reordered + quantized batch is one contiguous *device* buffer, exposed as + // "packet 0" of the burst (its length equals info.aggregate_len). + void* batch = daqiri::get_packet_ptr(burst, 0); + // Wrap that device buffer as a tensor and emit it zero-copy. The tensor takes + // ownership of the burst and frees it via the DLPack deleter once downstream is + // done — so we do NOT free the burst here. holoscan::TensorMap out_message; - out_message["rx_tensor"] = wrap_current_batch_as_tensor(); + out_message["rx_tensor"] = wrap_reorder_output_as_tensor(burst, batch, info); op_output.emit(out_message, "out"); - - reset_batch(); } private: - void consume_burst_into_batch(daqiri::BurstParams* burst) { - const int packet_count = daqiri::get_num_packets(burst); - - for (int i = 0; i < packet_count; ++i) { - if (split_boundary_.get() == 0) { - void* packet = daqiri::get_packet_ptr(burst, i); - uint32_t packet_len = daqiri::get_packet_length(burst, i); - append_single_segment_packet(packet, packet_len, stream_); - } else { - void* header = daqiri::get_segment_packet_ptr(burst, 0, i); - uint32_t header_len = daqiri::get_segment_packet_length(burst, 0, i); - - const int payload_segment = split_boundary_.get(); - void* payload = daqiri::get_segment_packet_ptr(burst, payload_segment, i); - uint32_t payload_len = - daqiri::get_segment_packet_length(burst, payload_segment, i); - - append_hds_packet(header, header_len, payload, payload_len, stream_); - } - } - } - - void allocate_batch_buffers(int batch_size, int max_packet_size); - bool batch_is_ready() const; - void reset_batch(); - - void append_single_segment_packet(void* packet, uint32_t packet_len, - cudaStream_t stream); - void append_hds_packet(void* header, uint32_t header_len, - void* payload, uint32_t payload_len, - cudaStream_t stream); - - std::shared_ptr wrap_current_batch_as_tensor(); + // Defined in the Tensor Emission section below. + std::shared_ptr wrap_reorder_output_as_tensor( + daqiri::BurstParams* burst, void* batch, + const daqiri::ReorderBurstInfo& info); + // Parameters populated from the YAML `reorder_rx` block. holoscan::Parameter interface_name_; - holoscan::Parameter batch_size_; - holoscan::Parameter max_packet_size_; - holoscan::Parameter header_size_; - holoscan::Parameter gpu_direct_; - holoscan::Parameter split_boundary_; - - int port_id_ = -1; - cudaStream_t stream_ = nullptr; - cudaEvent_t batch_ready_event_ = nullptr; + holoscan::Parameter reorder_name_; + + // Runtime state owned by the operator. + int port_id_ = -1; // resolved DAQIRI interface id + cudaStream_t stream_ = nullptr; // stream the reorder/quantize kernel runs on }; ``` -For a single-segment GPU RX configuration, `daqiri::get_packet_ptr(...)` returns -the packet pointer and `daqiri::get_packet_length(...)` returns its length. For -header-data split, segment `0` is commonly the CPU header segment and the payload -segment is retrieved with `daqiri::get_segment_packet_ptr(...)` and -`daqiri::get_segment_packet_length(...)`. - -The skeleton above copies or aggregates packet contents into operator-owned batch -storage before emitting. That means it can return the RX buffers to DAQIRI -immediately with `daqiri::free_all_packets_and_burst_rx(burst)`. - -If your operator wraps DAQIRI-owned packet buffers directly in a tensor, the tensor -lifetime must call the matching DAQIRI cleanup function when downstream consumers -are finished. For normal RX bursts, use -`daqiri::free_all_packets_and_burst_rx(burst)`. For HDS or segmented ownership -where only one segment is transferred to downstream ownership, use -`daqiri::free_segment_packets_and_burst(burst, segment_id)` for the segment being -released. Freeing only the burst metadata is insufficient when RX packet buffers -are still owned by the RX path; missed packet-buffer frees eventually drain the -pool and cause RX drops. - -See [C++ API Usage](../api-reference/cpp.md#receiving-packets) for packet access -helpers and [RX Step 3 - Free buffers](../api-reference/cpp.md#rx-step-3-free-buffers) -for the complete cleanup API list. +Because the DAQIRI reorder plan reorders and quantizes the payloads on the GPU, the +operator never touches individual packets: it dequeues a burst, checks the +`DAQIRI_BURST_FLAG_REORDERED` flag, waits on the burst's CUDA event, and reads one +contiguous output buffer. `daqiri::get_reorder_burst_info(...)` returns the batch +layout — `packets_per_batch` sequence slots, `payload_len` output bytes per slot, +`aggregate_len` total — so the operator can shape the tensor correctly. + +To process the stream differently — a different reorder layout, a custom +quantization, or any other per-batch GPU transform — you can add your own CUDA kernel +instead of (or after) the built-in plan: run it on the operator's stream over the raw +RX payloads and emit its output buffer the same way. DAQIRI's reorder kernels live in +`src/kernels.cu` (build with `-DDAQIRI_REORDER_GPU_PROFILE=ON` for CUDA event timing) +and are a useful starting point. + +### Buffer ownership + +The tensor emitted above is a zero-copy view of DAQIRI's `reorder_gpu` buffer, so the +burst must stay alive until every downstream consumer is finished. That is why the +burst is freed in the tensor's release callback rather than in `compute()`: +`daqiri::free_all_packets_and_burst_rx(burst)` returns both the burst metadata and its +packet buffers to the pool. Freeing only the burst metadata, or freeing in `compute()` +while a downstream operator still reads the tensor, leads to drops or use-after-free. +See [RX Step 3 - Free buffers](../api-reference/cpp.md#rx-step-3-free-buffers) for the +full cleanup API. ## Tensor Emission -The source operator emits a `holoscan::TensorMap` on port `out`. The example uses -a single entry named `rx_tensor`: +The source operator emits a `holoscan::TensorMap` on port `out` with a single entry +named `rx_tensor`. `holoscan::Tensor` interoperates through +[DLPack](https://dmlc.github.io/dlpack/latest/), so `wrap_reorder_output_as_tensor` +describes the reordered device buffer with a `DLManagedTensor` and constructs a tensor +from it (`DLPack` types come from `dlpack/dlpack.h`, bundled with the Holoscan SDK): ```cpp -holoscan::TensorMap out_message; -out_message["rx_tensor"] = wrap_current_batch_as_tensor(); -op_output.emit(out_message, "out"); +std::shared_ptr DaqiriReorderRxOp::wrap_reorder_output_as_tensor( + daqiri::BurstParams* burst, void* batch, + const daqiri::ReorderBurstInfo& info) { + // Reordered, sequence-ordered, fp32 layout: one row per sequence slot, one column + // per payload element. shape must outlive the tensor, so the DLManagedTensor owns + // it and the deleter frees it. + auto* shape = new int64_t[2]; + shape[0] = info.packets_per_batch; // number of sequence slots in the batch + shape[1] = info.payload_len / sizeof(float); // fp32 elements per slot + + auto* dl = new DLManagedTensor{}; + dl->dl_tensor.data = batch; // DAQIRI-owned device buffer (reorder_gpu) + dl->dl_tensor.device = DLDevice{kDLCUDA, 0}; // device memory, GPU 0 + dl->dl_tensor.ndim = 2; + dl->dl_tensor.dtype = DLDataType{kDLFloat, 32, 1}; // matches output_type: fp32 + dl->dl_tensor.shape = shape; + dl->dl_tensor.strides = nullptr; // row-major / contiguous + dl->dl_tensor.byte_offset = 0; + + // The deleter is the ownership hook: it runs when the last downstream consumer + // drops the tensor. Because the tensor is a zero-copy view of DAQIRI's buffer, this + // is where the burst (and its packet buffers) is returned to DAQIRI. + dl->manager_ctx = burst; + dl->deleter = [](DLManagedTensor* self) { + daqiri::free_all_packets_and_burst_rx( + static_cast(self->manager_ctx)); + delete[] self->dl_tensor.shape; + delete self; + }; + + return std::make_shared(dl); // explicit Tensor(DLManagedTensor*) +} ``` -The tensor can represent any layout your downstream pipeline expects, such as -`[batch, bytes]`, `[batch, header_bytes]` plus `[batch, payload_bytes]`, or a -metadata tensor paired with a payload tensor. The important boundary is ownership: -either the operator emits tensors backed by its own batch storage, or it emits -zero-copy wrappers with a release callback that frees the DAQIRI packet buffers. - -The complete SDK-version-specific tensor and DLPack wrapping belongs in the -Holohub runnable example, where it can be compiled and tested against a concrete -Holoscan SDK release. +Downstream operators receive an fp32 `[packets_per_batch, elements_per_packet]` tensor +on the GPU, ready for inference or further processing with no host copy. The exact +DLPack and tensor-construction details track the Holoscan SDK version; a complete, +build-tested implementation lives in the Holohub example (see [References](#references)). ## Sink Operator Skeleton A downstream sink receives the same `TensorMap` and can log, forward, or pass the -tensor metadata into additional processing operators. +tensor into additional processing operators. ```cpp #include class TensorSinkOp : public holoscan::Operator { public: + // Boilerplate that lets Holoscan construct the operator from the YAML config. HOLOSCAN_OPERATOR_FORWARD_ARGS(TensorSinkOp) + // Declare a single input port named "in"; it must match the downstream end of the + // add_flow(rx, sink, {{"out", "in"}}) edge created in compose(). void setup(holoscan::OperatorSpec& spec) override { spec.input("in"); } @@ -306,15 +423,19 @@ class TensorSinkOp : public holoscan::Operator { void compute(holoscan::InputContext& op_input, holoscan::OutputContext&, holoscan::ExecutionContext&) override { + // Try to receive a message from the upstream operator. Empty means no data is + // available on this tick. auto maybe_message = op_input.receive("in"); if (!maybe_message) { return; } + // Look up the named tensor the RX operator emitted ("rx_tensor"). A real sink + // would run inference or further processing here instead of just logging. const auto& tensor_map = maybe_message.value(); auto it = tensor_map.find("rx_tensor"); if (it == tensor_map.end()) { - return; + return; // The expected tensor was not present in the map. } const auto& tensor = it->second; @@ -325,6 +446,11 @@ class TensorSinkOp : public holoscan::Operator { ## References +- The standalone `daqiri_bench_raw_reorder_quantize` benchmark + (`examples/raw_reorder_quantize_bench.cpp`, config + `daqiri_bench_raw_tx_rx_reorder_quantize_seq_batch.yaml`) exercises the same + reorder/quantize path outside Holoscan — a good way to validate the DAQIRI config + first. See [Benchmarking Examples](benchmarking_examples.md). - [Holohub PR #1553](https://github.com/nvidia-holoscan/holohub/pull/1553) is the current reference while the Holohub example is under review. - After merge, the expected Holohub app path is `applications/daqiri_raw_ethernet_bench`. - DAQIRI [configuration walkthrough](configuration-walkthrough.md) and [C++ API Usage](../api-reference/cpp.md) cover the DAQIRI-side configuration, packet access, and cleanup calls.