From f90b6fa20b8b0c10e059fb5bb6fe6fcf4bab5e0d Mon Sep 17 00:00:00 2001 From: Cliff Burdick Date: Thu, 4 Jun 2026 22:39:41 +0000 Subject: [PATCH] Add async raw S3 packet writes Signed-off-by: Cliff Burdick --- AGENTS.md | 1 + CMakeLists.txt | 3 + Dockerfile | 27 ++ README.md | 8 + cmake/daqiriConfig.cmake.in | 4 + docs/api-reference/cpp.md | 69 ++++ docs/getting-started.md | 10 + examples/README.md | 7 + include/daqiri/common.h | 96 +++++ python/daqiri_common_pybind.cpp | 190 +++++++++ scripts/build-container.sh | 4 + src/CMakeLists.txt | 10 + src/common_s3.cpp | 660 ++++++++++++++++++++++++++++++++ 13 files changed, 1089 insertions(+) create mode 100644 src/common_s3.cpp diff --git a/AGENTS.md b/AGENTS.md index 08dee4e..f041224 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -20,6 +20,7 @@ CMake options (full table in `docs/getting-started.md`): - `DAQIRI_BUILD_EXAMPLES` — builds the benchmark executables (default `ON`). - `DAQIRI_ENABLE_OTEL_METRICS` — enables OpenTelemetry metrics instrumentation (default `OFF`). - `DAQIRI_REORDER_GPU_PROFILE` — enable CUDA event timing in the DPDK reorder kernels (off by default). +- `DAQIRI_ENABLE_S3` — enable AWS SDK-backed asynchronous raw packet writes to S3 (off by default). CUDA architectures default to `80;90` (A100, H100), with `121` (GB10) added when configuring with CUDA Toolkit 13.0 or newer. Override `CMAKE_CUDA_ARCHITECTURES` when targeting other GPUs. diff --git a/CMakeLists.txt b/CMakeLists.txt index 2a15ff2..d1bc375 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -126,6 +126,9 @@ set(DAQIRI_PC_LIBS_PRIVATE "-lcudart -lcuda") if(CUDAToolkit_LIBRARY_DIR) string(PREPEND DAQIRI_PC_LIBS_PRIVATE "-L${CUDAToolkit_LIBRARY_DIR} ") endif() +if(DAQIRI_ENABLE_S3) + string(APPEND DAQIRI_PC_LIBS_PRIVATE " -laws-cpp-sdk-s3 -laws-cpp-sdk-core") +endif() configure_file( ${CMAKE_CURRENT_SOURCE_DIR}/cmake/daqiri.pc.in diff --git a/Dockerfile b/Dockerfile index a322896..f5836a2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,8 +18,10 @@ ARG DAQIRI_BASE_TARGET=dpdk ARG DAQIRI_MGR="dpdk socket" ARG DAQIRI_BUILD_PYTHON=OFF +ARG DAQIRI_ENABLE_S3=OFF ARG BUILD_SHARED_LIBS=ON ARG DAQIRI_ENABLE_OTEL_METRICS=OFF +ARG AWS_SDK_CPP_VERSION=1.11.822 ARG DAQIRI_OS_BASE_IMAGE=nvcr.io/nvidia/cuda:13.1.0-devel-ubuntu24.04 # ============================================================ @@ -63,6 +65,7 @@ ARG TARGETARCH ARG CACHEBUST=1 ARG DEBIAN_FRONTEND=noninteractive ARG DOCA_VERSION=3.2.1 +ARG AWS_SDK_CPP_VERSION WORKDIR /opt @@ -100,8 +103,30 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ ibverbs-utils \ python3-dev \ pybind11-dev \ + libcurl4-openssl-dev \ + libssl-dev \ + uuid-dev \ + zlib1g-dev \ && rm -rf /var/lib/apt/lists/* +# Build only the AWS SDK for C++ S3 component. DAQIRI links this SDK only when +# configured with -DDAQIRI_ENABLE_S3=ON, but installing it here keeps the +# recommended container build path self-contained. +RUN git clone --depth 1 --branch "${AWS_SDK_CPP_VERSION}" \ + https://github.com/aws/aws-sdk-cpp.git /tmp/aws-sdk-cpp \ + && cmake -S /tmp/aws-sdk-cpp -B /tmp/aws-sdk-cpp/build \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_INSTALL_PREFIX=/usr/local \ + -DBUILD_ONLY=s3 \ + -DBUILD_SHARED_LIBS=ON \ + -DENABLE_TESTING=OFF \ + -DAUTORUN_UNIT_TESTS=OFF \ + -DCUSTOM_MEMORY_MANAGEMENT=OFF \ + && cmake --build /tmp/aws-sdk-cpp/build -j "$(nproc)" \ + && cmake --install /tmp/aws-sdk-cpp/build \ + && ldconfig \ + && rm -rf /tmp/aws-sdk-cpp + # PIP installs # - pytest: test harness # - pyyaml: to parse yaml configs in tests @@ -277,6 +302,7 @@ FROM ${DAQIRI_BASE_TARGET} AS daqiri-build ARG DAQIRI_MGR ARG DAQIRI_BUILD_PYTHON +ARG DAQIRI_ENABLE_S3 ARG BUILD_SHARED_LIBS ARG DAQIRI_ENABLE_OTEL_METRICS @@ -291,6 +317,7 @@ RUN cmake -S . -B build \ -DBUILD_SHARED_LIBS=${BUILD_SHARED_LIBS} \ -DDAQIRI_BUILD_PYTHON=${DAQIRI_BUILD_PYTHON} \ -DDAQIRI_ENABLE_OTEL_METRICS=${DAQIRI_ENABLE_OTEL_METRICS} \ + -DDAQIRI_ENABLE_S3=${DAQIRI_ENABLE_S3} \ -DDAQIRI_MGR="${DAQIRI_MGR}" \ && cmake --build build -j "$(nproc)" \ && cmake --install build diff --git a/README.md b/README.md index 617ed90..21002c0 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,8 @@ DAQIRI provides direct NIC hardware access in userspace, bypassing the Linux ker - *Batched GPU*: Entire packets to GPU memory (maximum bandwidth, GPU-side parsing required). - **Burst file writes** — Write received bursts as raw packet files or appendable PCAP captures. Host-backed buffers use POSIX writes; CUDA device-backed buffers can use cuFile/GDS. +- **S3 raw object writes** — Optionally upload raw burst packets to Amazon S3 or an + S3-compatible object store through the AWS SDK for C++. - **Flow Steering** — Configure the NIC's hardware flow engine to route packets by UDP source/destination port. - **RDMA** — RDMA verbs (READ, WRITE, SEND) over RoCE on Ethernet NICs or InfiniBand. @@ -67,10 +69,16 @@ and `libcufile` in the build environment. At runtime, regular GDS writes through NVIDIA's `nvidia-fs` path require the `nvidia-fs` kernel module to be loaded and the target storage stack to be reported as supported by `gdscheck.py -p`. +Enable raw packet uploads to S3 with `-DDAQIRI_ENABLE_S3=ON`. The recommended +container build installs AWS SDK for C++ with S3 support; bare-metal builds need +`aws-cpp-sdk-core` and `aws-cpp-sdk-s3` discoverable by CMake. S3 credentials are +resolved through the AWS SDK provider chain. + Container build: ```bash BASE_TARGET=dpdk DAQIRI_MGR="dpdk socket rdma" scripts/build-container.sh +DAQIRI_ENABLE_S3=ON DAQIRI_BUILD_PYTHON=ON BASE_TARGET=dpdk scripts/build-container.sh ``` OpenTelemetry metrics are opt-in. Build with `-DDAQIRI_ENABLE_OTEL_METRICS=ON` diff --git a/cmake/daqiriConfig.cmake.in b/cmake/daqiriConfig.cmake.in index c0c89f8..af18099 100644 --- a/cmake/daqiriConfig.cmake.in +++ b/cmake/daqiriConfig.cmake.in @@ -6,5 +6,9 @@ set(DAQIRI_ENABLE_OTEL_METRICS @DAQIRI_ENABLE_OTEL_METRICS@) if(DAQIRI_ENABLE_OTEL_METRICS) find_dependency(opentelemetry-cpp CONFIG COMPONENTS api) endif() +set(DAQIRI_ENABLE_S3 @DAQIRI_ENABLE_S3@) +if(DAQIRI_ENABLE_S3) + find_dependency(AWSSDK COMPONENTS s3) +endif() include("${CMAKE_CURRENT_LIST_DIR}/daqiriTargets.cmake") diff --git a/docs/api-reference/cpp.md b/docs/api-reference/cpp.md index 0af6728..06f4499 100644 --- a/docs/api-reference/cpp.md +++ b/docs/api-reference/cpp.md @@ -329,6 +329,69 @@ writes raw or pcap output with the synchronous API, the asynchronous API, or bot to send real Ethernet/IPv4/UDP frames out of a NIC and receive them back through a hardware RX port. +### Writing Raw Packets to S3 + +Build with `DAQIRI_ENABLE_S3=ON` to upload raw packet objects through AWS SDK +for C++. This path uses normal S3 `PutObject` requests, so it can target Amazon +S3 or an S3-compatible service. It is not a cuObject/RDMA path. + +Before creating the writer, choose a bucket and region, configure AWS +credentials through the SDK provider chain, grant `s3:PutObject` on the target +prefix, and make sure the host can reach the S3 endpoint. For S3-compatible +stores, set `endpoint_override` and `path_style` if that service requires them. + +```cpp +daqiri::S3WriterConfig cfg; +cfg.bucket = "daqiri-captures"; +cfg.region = "us-west-2"; +cfg.max_inflight_uploads = 8; + +daqiri::S3Writer *writer = nullptr; +auto st = daqiri::daqiri_s3_writer_create(cfg, &writer); + +daqiri::S3WriteHandle *handle = nullptr; +if (st == daqiri::Status::SUCCESS) { + st = daqiri::daqiri_write_raw_to_s3_objects_async( + writer, + burst, + "runs/run42/packet", + 60, + &handle); +} + +daqiri::S3WriteStatus s3_status{}; +if (st == daqiri::Status::SUCCESS) { + st = daqiri::daqiri_s3_write_wait(handle, &s3_status); + daqiri::daqiri_s3_write_destroy(handle); +} +daqiri::daqiri_s3_writer_destroy(writer); +``` + +Object keys mirror raw file naming: `object_prefix_`. DAQIRI +copies each packet's post-offset logical bytes into owned host staging memory +before submission, so the burst may be released after +`daqiri_write_raw_to_s3_objects_async()` succeeds. Header-data split and other +multi-segment packets are concatenated into one object. The first S3 version +uses one single-part `PutObject` per packet; objects larger than 5 GiB return +`NOT_SUPPORTED`, and multipart/burst aggregation is future work. + +The Python bindings expose the same C++ writer when both +`DAQIRI_BUILD_PYTHON=ON` and `DAQIRI_ENABLE_S3=ON` are used: + +```python +cfg = daqiri.S3WriterConfig() +cfg.bucket = "daqiri-captures" +cfg.region = "us-west-2" + +writer = daqiri.S3Writer(cfg) +status = writer.write_raw_objects( + burst, + "runs/run42/packet", + packet_data_offset=60, +) +writer.destroy() +``` + ## Utility Functions ```cpp @@ -468,6 +531,12 @@ workflow sections above show the common call order and ownership rules. | `daqiri_file_write_poll(handle, &status)` | Poll an asynchronous file-write handle. | | `daqiri_file_write_wait(handle, &status)` | Wait for asynchronous file writes to complete. | | `daqiri_file_write_destroy(handle)` | Release asynchronous file-write resources. | +| `daqiri_s3_writer_create(config, &writer)` | Create an AWS SDK-backed S3 raw object writer. | +| `daqiri_write_raw_to_s3_objects_async(writer, burst, object_prefix, packet_data_offset, &handle)` | Submit asynchronous raw packet uploads to S3. | +| `daqiri_s3_write_poll(handle, &status)` | Poll asynchronous S3 uploads. | +| `daqiri_s3_write_wait(handle, &status)` | Wait for asynchronous S3 uploads to complete. | +| `daqiri_s3_write_destroy(handle)` | Release asynchronous S3 upload resources. | +| `daqiri_s3_writer_destroy(writer)` | Release an S3 writer. | ### Ports, Traffic, Socket, and RDMA diff --git a/docs/getting-started.md b/docs/getting-started.md index 36d45da..f5e96ac 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -18,6 +18,7 @@ DAQIRI's baseline requirements depend on which [stream type](concepts.md#stream- | **DPDK** | Included in the DAQIRI container (patched for dma-buf, so `nvidia-peermem` is **not required** inside the container); see [bare-metal dependencies](#bare-metal-dependencies) below for the host build. | | **RoCE** | `libibverbs` and `librdmacm` (for `stream_type: "socket"`, `protocol: "roce"`). | | **GDS** | Optional `cufile.h` and `libcufile` for file writes from CUDA device memory. Runtime device-memory writes require a working cuFile installation; for regular `nvidia-fs` mode, the `nvidia-fs` kernel module must be loaded and the destination storage stack must be supported. | +| **S3** | Optional AWS SDK for C++ with the `s3` component for raw packet uploads to Amazon S3 or S3-compatible object stores. The DAQIRI container builds this SDK from source. | Supported platforms include [NVIDIA Data Center](https://www.nvidia.com/en-us/data-center/) systems, edge systems like [NVIDIA IGX](https://www.nvidia.com/en-us/edge-computing/products/igx/) and [NVIDIA DGX Spark](https://www.nvidia.com/en-us/products/workstations/dgx-spark/), and `x86_64` systems with the above components. @@ -194,6 +195,7 @@ Both methods use the same public C++ include: | `DAQIRI_BUILD_EXAMPLES` | `ON` | Build benchmark executables. | | `DAQIRI_ENABLE_GDS` | `OFF` | Enable cuFile-backed burst file writes from CUDA device memory. Host-memory writes use POSIX APIs without GDS. | | `DAQIRI_ENABLE_OTEL_METRICS` | `OFF` | Enable OpenTelemetry C++ metrics instrumentation. When enabled, OpenTelemetry C++ API package metadata must be available to CMake. | +| `DAQIRI_ENABLE_S3` | `OFF` | Enable AWS SDK-backed asynchronous raw packet writes to S3. | | `BUILD_SHARED_LIBS` | — | Build as shared library. | CUDA architectures default to `80;90` (A100, H100), with `121` (GB10) added @@ -219,6 +221,14 @@ transmitted packets, received bytes, transmitted bytes, and dropped packets. DAQ does not configure an SDK reader or exporter; applications that want exported data must configure the OpenTelemetry C++ SDK before or during DAQIRI initialization. +When using `DAQIRI_ENABLE_S3=ON`, the container build installs AWS SDK for C++ +with S3 support. Bare-metal builds must provide `aws-cpp-sdk-core` and +`aws-cpp-sdk-s3` so CMake can resolve `find_package(AWSSDK COMPONENTS s3)`. +Configure credentials through the AWS SDK provider chain, such as environment +variables, a shared AWS profile, container credentials, or an EC2 instance role. +DAQIRI writes one object per packet with a single `PutObject`; multipart uploads +and PCAP output are not part of the S3 path. + ## Next Steps Once DAQIRI is built, follow the tutorials to configure your system and run your first benchmark: diff --git a/examples/README.md b/examples/README.md index 2f24ff5..97f4e0c 100644 --- a/examples/README.md +++ b/examples/README.md @@ -29,6 +29,13 @@ cmake -S . -B build -DDAQIRI_BUILD_EXAMPLES=ON -DDAQIRI_ENABLE_GDS=ON -DDAQIRI_M cmake --build build -j ``` +Build with S3 raw object upload support: + +```bash +cmake -S . -B build -DDAQIRI_BUILD_EXAMPLES=ON -DDAQIRI_ENABLE_S3=ON -DDAQIRI_MGR="dpdk socket" +cmake --build build -j +``` + For CUDA device-memory output, the runtime must have a working cuFile/GDS stack. In regular `nvidia-fs` mode, verify that the kernel module is loaded and the destination storage is supported before running the example: diff --git a/include/daqiri/common.h b/include/daqiri/common.h index dee1b5d..1aaa729 100644 --- a/include/daqiri/common.h +++ b/include/daqiri/common.h @@ -49,6 +49,25 @@ struct FileWriteStatus { uint64_t bytes_written = 0; }; +struct S3Writer; +struct S3WriteHandle; + +struct S3WriterConfig { + std::string bucket; + std::string region; + std::string endpoint_override; + bool path_style = false; + bool aws_sdk_already_initialized = false; + uint32_t max_inflight_uploads = 8; + uint64_t max_staged_bytes = 1ULL << 30; +}; + +struct S3WriteStatus { + uint32_t completed_objects = 0; + uint32_t failed_objects = 0; + uint64_t bytes_uploaded = 0; +}; + static constexpr uint32_t DEFAULT_TX_META_BUFFERS = 1UL << 8; static constexpr uint32_t DEFAULT_RX_META_BUFFERS = 1UL << 8; namespace detail { @@ -457,6 +476,83 @@ Status daqiri_file_write_wait(FileWriteHandle *handle, FileWriteStatus *status); */ Status daqiri_file_write_destroy(FileWriteHandle *handle); +/** + * @brief Create an S3 raw object writer. + * + * Credentials are resolved by the AWS SDK provider chain. endpoint_override and + * path_style are only needed for S3-compatible services that require them. + * Returns NOT_SUPPORTED when DAQIRI was built without DAQIRI_ENABLE_S3=ON. + * + * @param config S3 writer configuration + * @param writer Output writer handle + * @return Status indicating success or failure + */ +Status daqiri_s3_writer_create(const S3WriterConfig &config, + S3Writer **writer); + +/** + * @brief Asynchronously write each packet in a burst to a separate S3 object. + * + * Object keys are object_prefix_. The write path stages each + * packet's logical bytes into DAQIRI-owned host memory before returning, so the + * caller may free the burst after successful submission. No multipart upload is + * performed; objects larger than the S3 single-PUT limit are not supported. + * + * @param writer Writer returned by daqiri_s3_writer_create() + * @param burst Burst structure containing packets + * @param object_prefix Prefix used for each S3 object key + * @param packet_data_offset Bytes to skip from the start of each logical packet + * @param handle Output handle for polling, waiting, and cleanup + * @return Status indicating whether submission succeeded + */ +Status daqiri_write_raw_to_s3_objects_async(S3Writer *writer, + BurstParams *burst, + const std::string &object_prefix, + uint64_t packet_data_offset, + S3WriteHandle **handle); + +/** + * @brief Poll an asynchronous S3 write. + * + * @param handle Handle returned by daqiri_write_raw_to_s3_objects_async() + * @param status Optional output status summary + * @return SUCCESS when all uploads are complete, NOT_READY while pending, or an + * error status + */ +Status daqiri_s3_write_poll(S3WriteHandle *handle, S3WriteStatus *status); + +/** + * @brief Wait for asynchronous S3 writes to complete. + * + * @param handle Handle returned by daqiri_write_raw_to_s3_objects_async() + * @param status Optional output status summary + * @return SUCCESS when all uploads are complete or an error status + */ +Status daqiri_s3_write_wait(S3WriteHandle *handle, S3WriteStatus *status); + +/** + * @brief Destroy an asynchronous S3 write handle. + * + * If uploads are still pending, this call waits for completion before + * releasing staging buffers and request resources. + * + * @param handle Handle returned by daqiri_write_raw_to_s3_objects_async() + * @return SUCCESS when resources are released or an error status + */ +Status daqiri_s3_write_destroy(S3WriteHandle *handle); + +/** + * @brief Destroy an S3 raw object writer. + * + * The caller must not use the writer after this call. S3WriteHandle instances + * keep their own client references, so destroying a writer does not invalidate + * already submitted asynchronous writes. + * + * @param writer Writer returned by daqiri_s3_writer_create() + * @return SUCCESS when resources are released + */ +Status daqiri_s3_writer_destroy(S3Writer *writer); + /** * @brief Frees all segments of a single packet * diff --git a/python/daqiri_common_pybind.cpp b/python/daqiri_common_pybind.cpp index fac4ab7..739b0b3 100644 --- a/python/daqiri_common_pybind.cpp +++ b/python/daqiri_common_pybind.cpp @@ -27,6 +27,8 @@ #include #include #include +#include +#include #include #include @@ -287,6 +289,159 @@ Status daqiri_init_from_python(py::object config_obj) { } } +const char *status_to_string(Status status) { + switch (status) { + case Status::SUCCESS: + return "SUCCESS"; + case Status::NULL_PTR: + return "NULL_PTR"; + case Status::NO_FREE_BURST_BUFFERS: + return "NO_FREE_BURST_BUFFERS"; + case Status::NO_FREE_PACKET_BUFFERS: + return "NO_FREE_PACKET_BUFFERS"; + case Status::NOT_READY: + return "NOT_READY"; + case Status::INVALID_PARAMETER: + return "INVALID_PARAMETER"; + case Status::NO_SPACE_AVAILABLE: + return "NO_SPACE_AVAILABLE"; + case Status::NOT_SUPPORTED: + return "NOT_SUPPORTED"; + case Status::GENERIC_FAILURE: + return "GENERIC_FAILURE"; + case Status::CONNECT_FAILURE: + return "CONNECT_FAILURE"; + case Status::INTERNAL_ERROR: + return "INTERNAL_ERROR"; + } + return "UNKNOWN"; +} + +void throw_if_error(Status status, const char *operation) { + if (status == Status::SUCCESS) { + return; + } + throw std::runtime_error(std::string(operation) + + " failed with DAQIRI status " + + status_to_string(status)); +} + +struct PyS3WriteHandle { + S3WriteHandle *handle = nullptr; + + PyS3WriteHandle() = default; + explicit PyS3WriteHandle(S3WriteHandle *raw_handle) : handle(raw_handle) {} + PyS3WriteHandle(const PyS3WriteHandle &) = delete; + PyS3WriteHandle &operator=(const PyS3WriteHandle &) = delete; + + ~PyS3WriteHandle() { + if (handle != nullptr) { + daqiri_s3_write_destroy(handle); + handle = nullptr; + } + } + + py::tuple poll() { + if (handle == nullptr) { + throw std::runtime_error("S3 write handle has already been destroyed"); + } + S3WriteStatus status{}; + const Status rc = daqiri_s3_write_poll(handle, &status); + return py::make_tuple(rc, status); + } + + S3WriteStatus wait() { + if (handle == nullptr) { + throw std::runtime_error("S3 write handle has already been destroyed"); + } + S3WriteStatus status{}; + Status rc = Status::SUCCESS; + { + py::gil_scoped_release release; + rc = daqiri_s3_write_wait(handle, &status); + } + throw_if_error(rc, "daqiri_s3_write_wait"); + return status; + } + + Status destroy() { + if (handle == nullptr) { + return Status::SUCCESS; + } + Status rc = Status::SUCCESS; + { + py::gil_scoped_release release; + rc = daqiri_s3_write_destroy(handle); + } + handle = nullptr; + return rc; + } +}; + +struct PyS3Writer { + S3Writer *writer = nullptr; + + explicit PyS3Writer(const S3WriterConfig &config) { + Status rc = Status::SUCCESS; + { + py::gil_scoped_release release; + rc = daqiri_s3_writer_create(config, &writer); + } + throw_if_error(rc, "daqiri_s3_writer_create"); + } + + PyS3Writer(const PyS3Writer &) = delete; + PyS3Writer &operator=(const PyS3Writer &) = delete; + + ~PyS3Writer() { + if (writer != nullptr) { + daqiri_s3_writer_destroy(writer); + writer = nullptr; + } + } + + std::unique_ptr + write_raw_objects_async(BurstParams *burst, const std::string &object_prefix, + uint64_t packet_data_offset) { + if (writer == nullptr) { + throw std::runtime_error("S3 writer has already been destroyed"); + } + S3WriteHandle *handle = nullptr; + Status rc = Status::SUCCESS; + { + py::gil_scoped_release release; + rc = daqiri_write_raw_to_s3_objects_async( + writer, burst, object_prefix, packet_data_offset, &handle); + } + throw_if_error(rc, "daqiri_write_raw_to_s3_objects_async"); + return std::make_unique(handle); + } + + S3WriteStatus write_raw_objects(BurstParams *burst, + const std::string &object_prefix, + uint64_t packet_data_offset) { + auto handle = + write_raw_objects_async(burst, object_prefix, packet_data_offset); + S3WriteStatus status = handle->wait(); + const Status destroy_status = handle->destroy(); + throw_if_error(destroy_status, "daqiri_s3_write_destroy"); + return status; + } + + Status destroy() { + if (writer == nullptr) { + return Status::SUCCESS; + } + Status rc = Status::SUCCESS; + { + py::gil_scoped_release release; + rc = daqiri_s3_writer_destroy(writer); + } + writer = nullptr; + return rc; + } +}; + void bind_enums(py::module_ &m) { py::enum_(m, "Status") .value("SUCCESS", Status::SUCCESS) @@ -460,6 +615,41 @@ void bind_config_types(py::module_ &m) { burst.transport_hdr.wr_id = wr_id; }); + py::class_(m, "S3WriterConfig") + .def(py::init<>()) + .def_readwrite("bucket", &S3WriterConfig::bucket) + .def_readwrite("region", &S3WriterConfig::region) + .def_readwrite("endpoint_override", &S3WriterConfig::endpoint_override) + .def_readwrite("path_style", &S3WriterConfig::path_style) + .def_readwrite("aws_sdk_already_initialized", + &S3WriterConfig::aws_sdk_already_initialized) + .def_readwrite("max_inflight_uploads", + &S3WriterConfig::max_inflight_uploads) + .def_readwrite("max_staged_bytes", &S3WriterConfig::max_staged_bytes); + + py::class_(m, "S3WriteStatus") + .def(py::init<>()) + .def_readwrite("completed_objects", &S3WriteStatus::completed_objects) + .def_readwrite("failed_objects", &S3WriteStatus::failed_objects) + .def_readwrite("bytes_uploaded", &S3WriteStatus::bytes_uploaded); + + py::class_(m, "S3WriteHandle") + .def("poll", &PyS3WriteHandle::poll, + "Return (Status, S3WriteStatus) for an asynchronous S3 write") + .def("wait", &PyS3WriteHandle::wait, + "Wait for asynchronous S3 writes and return S3WriteStatus") + .def("destroy", &PyS3WriteHandle::destroy, + "Release asynchronous S3 write resources"); + + py::class_(m, "S3Writer") + .def(py::init(), "config"_a) + .def("write_raw_objects_async", &PyS3Writer::write_raw_objects_async, + "burst"_a, "object_prefix"_a, "packet_data_offset"_a = 0) + .def("write_raw_objects", &PyS3Writer::write_raw_objects, "burst"_a, + "object_prefix"_a, "packet_data_offset"_a = 0) + .def("destroy", &PyS3Writer::destroy, + "Release S3 writer resources"); + py::class_(m, "RDMAConfig") .def(py::init<>()) .def_readwrite("mode", &RDMAConfig::mode_) diff --git a/scripts/build-container.sh b/scripts/build-container.sh index 9987ebb..127d718 100755 --- a/scripts/build-container.sh +++ b/scripts/build-container.sh @@ -6,8 +6,10 @@ BASE_TARGET="${BASE_TARGET:-dpdk}" BASE_IMAGE="${BASE_IMAGE:-cuda}" DAQIRI_MGR="${DAQIRI_MGR:-dpdk socket}" DAQIRI_BUILD_PYTHON="${DAQIRI_BUILD_PYTHON:-OFF}" +DAQIRI_ENABLE_S3="${DAQIRI_ENABLE_S3:-OFF}" BUILD_SHARED_LIBS="${BUILD_SHARED_LIBS:-ON}" DAQIRI_ENABLE_OTEL_METRICS="${DAQIRI_ENABLE_OTEL_METRICS:-OFF}" +AWS_SDK_CPP_VERSION="${AWS_SDK_CPP_VERSION:-1.11.822}" case "${BASE_IMAGE}" in cuda) @@ -28,8 +30,10 @@ docker build \ --build-arg DAQIRI_OS_BASE_IMAGE="${DAQIRI_OS_BASE_IMAGE}" \ --build-arg DAQIRI_MGR="${DAQIRI_MGR}" \ --build-arg DAQIRI_BUILD_PYTHON="${DAQIRI_BUILD_PYTHON}" \ + --build-arg DAQIRI_ENABLE_S3="${DAQIRI_ENABLE_S3}" \ --build-arg BUILD_SHARED_LIBS="${BUILD_SHARED_LIBS}" \ --build-arg DAQIRI_ENABLE_OTEL_METRICS="${DAQIRI_ENABLE_OTEL_METRICS}" \ + --build-arg AWS_SDK_CPP_VERSION="${AWS_SDK_CPP_VERSION}" \ -t "${IMAGE_TAG}" \ . diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index a1f6a9c..95f44b3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -40,6 +40,7 @@ if(DAQIRI_REORDER_GPU_PROFILE) endif() option(DAQIRI_ENABLE_GDS "Enable cuFile-backed burst file writes from CUDA device memory" OFF) option(DAQIRI_ENABLE_OTEL_METRICS "Enable OpenTelemetry metrics instrumentation" OFF) +option(DAQIRI_ENABLE_S3 "Enable AWS SDK-backed raw packet writes to S3" OFF) set(DOCA_PATH /opt/mellanox/doca) if(DAQIRI_ENABLE_OTEL_METRICS) @@ -89,6 +90,7 @@ endif() add_library(daqiri_common common.cpp common_gds.cpp + common_s3.cpp dpdk_log.cpp kernels.cu manager.cpp @@ -156,6 +158,14 @@ if(DAQIRI_ENABLE_GDS) else() target_compile_definitions(daqiri_common PUBLIC DAQIRI_ENABLE_GDS=0) endif() +if(DAQIRI_ENABLE_S3) + find_package(AWSSDK REQUIRED COMPONENTS s3) + target_compile_definitions(daqiri_common PUBLIC DAQIRI_ENABLE_S3=1) + target_include_directories(daqiri_common PRIVATE ${AWSSDK_INCLUDE_DIRS}) + target_link_libraries(daqiri_common PRIVATE ${AWSSDK_LINK_LIBRARIES}) +else() + target_compile_definitions(daqiri_common PUBLIC DAQIRI_ENABLE_S3=0) +endif() set_target_properties(daqiri_common PROPERTIES CUDA_SEPARABLE_COMPILATION ON CUDA_RESOLVE_DEVICE_SYMBOLS ON diff --git a/src/common_s3.cpp b/src/common_s3.cpp new file mode 100644 index 0000000..b2a54fa --- /dev/null +++ b/src/common_s3.cpp @@ -0,0 +1,660 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. + * All rights reserved. SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#if DAQIRI_ENABLE_S3 +#include +#include +#include +#include +#include +#include +#include +#endif + +namespace daqiri { + +namespace { + +static constexpr uint64_t kMaxSinglePutObjectBytes = + 5ULL * 1024ULL * 1024ULL * 1024ULL; + +enum class PointerKind { + HOST, + CUDA, +}; + +struct HostStagingBuffer { + uint8_t *data = nullptr; + size_t size = 0; + + HostStagingBuffer() = default; + ~HostStagingBuffer() { reset(); } + + HostStagingBuffer(const HostStagingBuffer &) = delete; + HostStagingBuffer &operator=(const HostStagingBuffer &) = delete; + + HostStagingBuffer(HostStagingBuffer &&other) noexcept + : data(other.data), size(other.size) { + other.data = nullptr; + other.size = 0; + } + + HostStagingBuffer &operator=(HostStagingBuffer &&other) noexcept { + if (this != &other) { + reset(); + data = other.data; + size = other.size; + other.data = nullptr; + other.size = 0; + } + return *this; + } + + Status allocate(size_t nbytes) { + reset(); + size = nbytes; + if (size == 0) { + return Status::SUCCESS; + } + + void *raw_data = nullptr; + const cudaError_t err = cudaMallocHost(&raw_data, size); + if (err != cudaSuccess) { + DAQIRI_LOG_ERROR("cudaMallocHost for S3 staging failed: {}", + cudaGetErrorString(err)); + data = nullptr; + size = 0; + return Status::GENERIC_FAILURE; + } + + data = static_cast(raw_data); + return Status::SUCCESS; + } + + void reset() { + if (data == nullptr) { + size = 0; + return; + } + + const cudaError_t err = cudaFreeHost(data); + if (err != cudaSuccess) { + DAQIRI_LOG_ERROR("cudaFreeHost for S3 staging failed: {}", + cudaGetErrorString(err)); + } + data = nullptr; + size = 0; + } +}; + +PointerKind classify_pointer(const void *ptr) { + cudaPointerAttributes attrs{}; + const cudaError_t err = cudaPointerGetAttributes(&attrs, ptr); + if (err != cudaSuccess) { + cudaGetLastError(); + return PointerKind::HOST; + } + +#if defined(CUDART_VERSION) && CUDART_VERSION >= 10000 + if (attrs.type == cudaMemoryTypeDevice || + attrs.type == cudaMemoryTypeManaged) { + return PointerKind::CUDA; + } +#else + if (attrs.memoryType == cudaMemoryTypeDevice) { + return PointerKind::CUDA; + } +#endif + + return PointerKind::HOST; +} + +uint64_t packet_length_after_offset(BurstParams *burst, uint32_t packet_index, + uint64_t packet_data_offset) { + uint64_t packet_len = 0; + for (int seg = 0; seg < burst->hdr.hdr.num_segs; ++seg) { + packet_len += static_cast( + get_segment_packet_length(burst, seg, packet_index)); + } + + if (packet_data_offset >= packet_len) { + return 0; + } + return packet_len - packet_data_offset; +} + +Status validate_s3_write_request(S3Writer *writer, BurstParams *burst, + const std::string &object_prefix, + S3WriteHandle **handle, + uint32_t *num_packets) { + if (writer == nullptr || burst == nullptr || handle == nullptr || + num_packets == nullptr) { + return Status::NULL_PTR; + } + *handle = nullptr; + + if (object_prefix.empty()) { + DAQIRI_LOG_ERROR("S3 object prefix must not be empty"); + return Status::INVALID_PARAMETER; + } + + const int64_t packet_count = get_num_packets(burst); + if (packet_count < 0 || + static_cast(packet_count) > + std::numeric_limits::max()) { + DAQIRI_LOG_ERROR("Invalid packet count for S3 write: {}", packet_count); + return Status::INVALID_PARAMETER; + } + + const int num_segs = burst->hdr.hdr.num_segs; + if (num_segs <= 0 || num_segs > MAX_NUM_SEGS) { + DAQIRI_LOG_ERROR("Invalid segment count for S3 write: {}", num_segs); + return Status::INVALID_PARAMETER; + } + + *num_packets = static_cast(packet_count); + return Status::SUCCESS; +} + +Status copy_segment_to_host(const void *src, size_t nbytes, uint8_t *dst) { + if (nbytes == 0) { + return Status::SUCCESS; + } + if (src == nullptr || dst == nullptr) { + return Status::NULL_PTR; + } + + if (classify_pointer(src) == PointerKind::CUDA) { + const cudaError_t err = cudaMemcpy(dst, src, nbytes, cudaMemcpyDefault); + if (err != cudaSuccess) { + DAQIRI_LOG_ERROR("cudaMemcpy for S3 staging failed: {}", + cudaGetErrorString(err)); + return Status::GENERIC_FAILURE; + } + } else { + std::memcpy(dst, src, nbytes); + } + + return Status::SUCCESS; +} + +Status copy_packet_to_staging(BurstParams *burst, uint32_t packet_index, + uint64_t packet_data_offset, + HostStagingBuffer &staging) { + const uint64_t output_len = + packet_length_after_offset(burst, packet_index, packet_data_offset); + if (output_len > kMaxSinglePutObjectBytes || + output_len > std::numeric_limits::max()) { + return Status::NOT_SUPPORTED; + } + + const Status alloc_status = staging.allocate(static_cast(output_len)); + if (alloc_status != Status::SUCCESS || output_len == 0) { + return alloc_status; + } + + uint64_t bytes_to_skip = packet_data_offset; + size_t dst_offset = 0; + for (int seg = 0; seg < burst->hdr.hdr.num_segs; ++seg) { + const auto seg_len = + static_cast(get_segment_packet_length(burst, seg, + packet_index)); + if (seg_len == 0) { + continue; + } + + if (bytes_to_skip >= seg_len) { + bytes_to_skip -= seg_len; + continue; + } + + auto *seg_ptr = + static_cast(get_segment_packet_ptr(burst, seg, + packet_index)); + if (seg_ptr == nullptr) { + DAQIRI_LOG_ERROR("Null packet segment pointer for packet {} segment {}", + packet_index, seg); + return Status::NULL_PTR; + } + + const auto seg_offset = static_cast(bytes_to_skip); + const auto copy_len = static_cast(seg_len - bytes_to_skip); + bytes_to_skip = 0; + + const Status status = copy_segment_to_host(seg_ptr + seg_offset, copy_len, + staging.data + dst_offset); + if (status != Status::SUCCESS) { + return status; + } + dst_offset += copy_len; + } + + return Status::SUCCESS; +} + +#if DAQIRI_ENABLE_S3 + +static constexpr const char *kAwsAllocTag = "daqiri_s3"; + +class AwsSdkLease { +public: + explicit AwsSdkLease(bool owns_sdk) : owns_sdk_(owns_sdk) {} + ~AwsSdkLease(); + + AwsSdkLease(const AwsSdkLease &) = delete; + AwsSdkLease &operator=(const AwsSdkLease &) = delete; + +private: + bool owns_sdk_ = false; +}; + +std::mutex g_aws_sdk_mutex; +uint32_t g_aws_sdk_refcount = 0; +Aws::SDKOptions g_aws_sdk_options; + +void release_aws_sdk() { + std::lock_guard lock(g_aws_sdk_mutex); + if (g_aws_sdk_refcount == 0) { + return; + } + --g_aws_sdk_refcount; + if (g_aws_sdk_refcount == 0) { + Aws::ShutdownAPI(g_aws_sdk_options); + } +} + +AwsSdkLease::~AwsSdkLease() { + if (owns_sdk_) { + release_aws_sdk(); + } +} + +Status acquire_aws_sdk(bool already_initialized, + std::shared_ptr *lease) { + if (lease == nullptr) { + return Status::NULL_PTR; + } + + if (already_initialized) { + *lease = std::make_shared(false); + return Status::SUCCESS; + } + + std::lock_guard lock(g_aws_sdk_mutex); + if (g_aws_sdk_refcount == 0) { + Aws::InitAPI(g_aws_sdk_options); + } + ++g_aws_sdk_refcount; + *lease = std::make_shared(true); + return Status::SUCCESS; +} + +struct S3UploadEntry { + uint32_t packet_index = 0; + std::string key; + HostStagingBuffer staging; + std::shared_ptr body; + Aws::S3::Model::PutObjectRequest request; + bool submitted = false; + bool done = false; +}; + +#endif + +} // namespace + +#if DAQIRI_ENABLE_S3 + +struct S3Writer { + S3WriterConfig config; + std::shared_ptr sdk_lease; + std::shared_ptr client; +}; + +struct S3WriteHandle { + std::shared_ptr sdk_lease; + std::shared_ptr client; + std::vector entries; + std::mutex mutex; + std::condition_variable cv; + S3WriteStatus status; + uint32_t max_inflight_uploads = 1; + size_t next_to_submit = 0; + size_t inflight = 0; + size_t completed = 0; + bool done = false; + Status final_status = Status::SUCCESS; +}; + +namespace { + +void populate_status(const S3WriteHandle &handle, S3WriteStatus *status) { + if (status != nullptr) { + *status = handle.status; + } +} + +void submit_ready_uploads(S3WriteHandle &handle); + +void mark_upload_complete(S3WriteHandle &handle, size_t index, bool success) { + bool should_submit_more = false; + { + std::lock_guard lock(handle.mutex); + if (index >= handle.entries.size() || handle.entries[index].done) { + return; + } + + handle.entries[index].done = true; + if (handle.inflight > 0) { + --handle.inflight; + } + ++handle.completed; + + if (success) { + ++handle.status.completed_objects; + handle.status.bytes_uploaded += + static_cast(handle.entries[index].staging.size); + } else { + ++handle.status.failed_objects; + } + + if (handle.completed == handle.entries.size()) { + handle.done = true; + handle.final_status = handle.status.failed_objects == 0 + ? Status::SUCCESS + : Status::GENERIC_FAILURE; + handle.cv.notify_all(); + return; + } + + should_submit_more = true; + } + + handle.cv.notify_all(); + if (should_submit_more) { + submit_ready_uploads(handle); + } +} + +void submit_ready_uploads(S3WriteHandle &handle) { + while (true) { + size_t index = 0; + { + std::lock_guard lock(handle.mutex); + if (handle.done || + handle.inflight >= handle.max_inflight_uploads || + handle.next_to_submit >= handle.entries.size()) { + return; + } + + index = handle.next_to_submit++; + ++handle.inflight; + handle.entries[index].submitted = true; + } + + auto *handle_ptr = &handle; + try { + handle.client->PutObjectAsync( + handle.entries[index].request, + [handle_ptr, index]( + const Aws::S3::S3Client *, + const Aws::S3::Model::PutObjectRequest &, + const Aws::S3::Model::PutObjectOutcome &outcome, + const std::shared_ptr &) { + if (!outcome.IsSuccess()) { + const auto &error = outcome.GetError(); + DAQIRI_LOG_ERROR("S3 PutObject failed for object {}: {}", + handle_ptr->entries[index].key, + error.GetMessage().c_str()); + } + mark_upload_complete(*handle_ptr, index, outcome.IsSuccess()); + }, + nullptr); + } catch (const std::exception &e) { + DAQIRI_LOG_ERROR("S3 PutObjectAsync failed for object {}: {}", + handle.entries[index].key, e.what()); + mark_upload_complete(handle, index, false); + } + } +} + +Status prepare_s3_request(S3Writer &writer, S3UploadEntry &entry) { + entry.body = Aws::MakeShared(kAwsAllocTag); + if (entry.staging.size != 0) { + entry.body->write(reinterpret_cast(entry.staging.data), + static_cast(entry.staging.size)); + } + entry.body->seekg(0); + + entry.request.SetBucket(writer.config.bucket.c_str()); + entry.request.SetKey(entry.key.c_str()); + entry.request.SetContentLength(static_cast(entry.staging.size)); + entry.request.SetBody(entry.body); + return Status::SUCCESS; +} + +Status poll_handle(S3WriteHandle &handle, S3WriteStatus *status) { + std::lock_guard lock(handle.mutex); + populate_status(handle, status); + if (handle.done) { + return handle.final_status; + } + return Status::NOT_READY; +} + +} // namespace + +Status daqiri_s3_writer_create(const S3WriterConfig &config, + S3Writer **writer) { + if (writer == nullptr) { + return Status::NULL_PTR; + } + *writer = nullptr; + + if (config.bucket.empty() || config.region.empty() || + config.max_inflight_uploads == 0) { + return Status::INVALID_PARAMETER; + } + + try { + auto s3_writer = std::make_unique(); + s3_writer->config = config; + + Status status = acquire_aws_sdk(config.aws_sdk_already_initialized, + &s3_writer->sdk_lease); + if (status != Status::SUCCESS) { + return status; + } + + Aws::Client::ClientConfiguration client_config; + client_config.region = config.region.c_str(); + if (!config.endpoint_override.empty()) { + client_config.endpointOverride = config.endpoint_override.c_str(); + } + + s3_writer->client = std::make_shared( + client_config, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent, + !config.path_style); + *writer = s3_writer.release(); + return Status::SUCCESS; + } catch (const std::exception &e) { + DAQIRI_LOG_ERROR("Failed to create S3 writer: {}", e.what()); + return Status::GENERIC_FAILURE; + } +} + +Status daqiri_write_raw_to_s3_objects_async(S3Writer *writer, + BurstParams *burst, + const std::string &object_prefix, + uint64_t packet_data_offset, + S3WriteHandle **out_handle) { + uint32_t num_packets = 0; + Status status = validate_s3_write_request(writer, burst, object_prefix, + out_handle, &num_packets); + if (status != Status::SUCCESS) { + return status; + } + + uint64_t staged_bytes = 0; + for (uint32_t pkt = 0; pkt < num_packets; ++pkt) { + const uint64_t object_len = + packet_length_after_offset(burst, pkt, packet_data_offset); + if (object_len > kMaxSinglePutObjectBytes) { + DAQIRI_LOG_ERROR("S3 object for packet {} is larger than 5 GiB", pkt); + return Status::NOT_SUPPORTED; + } + if (object_len > writer->config.max_staged_bytes || + staged_bytes > writer->config.max_staged_bytes - object_len) { + DAQIRI_LOG_ERROR("S3 staging bytes exceed configured limit"); + return Status::NO_SPACE_AVAILABLE; + } + staged_bytes += object_len; + } + + auto handle = std::make_unique(); + handle->sdk_lease = writer->sdk_lease; + handle->client = writer->client; + handle->max_inflight_uploads = + std::max(1, writer->config.max_inflight_uploads); + handle->entries.reserve(num_packets); + + try { + for (uint32_t pkt = 0; pkt < num_packets; ++pkt) { + S3UploadEntry entry; + entry.packet_index = pkt; + entry.key = object_prefix + "_" + std::to_string(pkt); + + status = copy_packet_to_staging(burst, pkt, packet_data_offset, + entry.staging); + if (status != Status::SUCCESS) { + return status; + } + + status = prepare_s3_request(*writer, entry); + if (status != Status::SUCCESS) { + return status; + } + + handle->entries.push_back(std::move(entry)); + } + + if (handle->entries.empty()) { + handle->done = true; + handle->final_status = Status::SUCCESS; + } else { + submit_ready_uploads(*handle); + } + + *out_handle = handle.release(); + return Status::SUCCESS; + } catch (const std::exception &e) { + DAQIRI_LOG_ERROR("Failed to submit S3 uploads: {}", e.what()); + return Status::GENERIC_FAILURE; + } +} + +Status daqiri_s3_write_poll(S3WriteHandle *handle, S3WriteStatus *status) { + if (handle == nullptr) { + return Status::NULL_PTR; + } + return poll_handle(*handle, status); +} + +Status daqiri_s3_write_wait(S3WriteHandle *handle, S3WriteStatus *status) { + if (handle == nullptr) { + return Status::NULL_PTR; + } + + std::unique_lock lock(handle->mutex); + handle->cv.wait(lock, [&handle]() { return handle->done; }); + populate_status(*handle, status); + return handle->final_status; +} + +Status daqiri_s3_write_destroy(S3WriteHandle *handle) { + if (handle == nullptr) { + return Status::NULL_PTR; + } + + S3WriteStatus status{}; + const Status wait_status = daqiri_s3_write_wait(handle, &status); + delete handle; + return wait_status; +} + +Status daqiri_s3_writer_destroy(S3Writer *writer) { + if (writer == nullptr) { + return Status::NULL_PTR; + } + delete writer; + return Status::SUCCESS; +} + +#else + +struct S3Writer {}; +struct S3WriteHandle {}; + +Status daqiri_s3_writer_create(const S3WriterConfig &, S3Writer **writer) { + if (writer == nullptr) { + return Status::NULL_PTR; + } + *writer = nullptr; + return Status::NOT_SUPPORTED; +} + +Status daqiri_write_raw_to_s3_objects_async(S3Writer *, BurstParams *, + const std::string &, uint64_t, + S3WriteHandle **handle) { + if (handle != nullptr) { + *handle = nullptr; + } + return Status::NOT_SUPPORTED; +} + +Status daqiri_s3_write_poll(S3WriteHandle *, S3WriteStatus *) { + return Status::NOT_SUPPORTED; +} + +Status daqiri_s3_write_wait(S3WriteHandle *, S3WriteStatus *) { + return Status::NOT_SUPPORTED; +} + +Status daqiri_s3_write_destroy(S3WriteHandle *) { + return Status::NOT_SUPPORTED; +} + +Status daqiri_s3_writer_destroy(S3Writer *) { return Status::NOT_SUPPORTED; } + +#endif + +} // namespace daqiri