From 85bad8c723536fb60004e824714f5734f88aa094 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 17 Apr 2026 10:29:17 +0200 Subject: [PATCH 1/7] Predefine statistics formatters --- cpp/CMakeLists.txt | 1 - .../rapidsmpf/coll/gather_statistics.hpp | 57 --- cpp/include/rapidsmpf/statistics.hpp | 234 +++++------ cpp/src/coll/gather_statistics.cpp | 65 --- cpp/src/statistics.cpp | 374 ++++++++++-------- cpp/src/streaming/cudf/parquet.cpp | 14 +- cpp/tests/CMakeLists.txt | 1 - cpp/tests/test_gather_statistics.cpp | 90 ----- cpp/tests/test_statistics.cpp | 238 ++++++----- docs/source/statistics.md | 8 +- .../rapidsmpf/rapidsmpf/coll/CMakeLists.txt | 2 +- python/rapidsmpf/rapidsmpf/coll/__init__.py | 3 +- .../rapidsmpf/coll/gather_statistics.pyi | 12 - .../rapidsmpf/coll/gather_statistics.pyx | 69 ---- python/rapidsmpf/rapidsmpf/statistics.pxd | 18 +- python/rapidsmpf/rapidsmpf/statistics.pyi | 19 +- python/rapidsmpf/rapidsmpf/statistics.pyx | 67 ++++ .../rapidsmpf/tests/test_gather_statistics.py | 58 --- .../rapidsmpf/tests/test_statistics.py | 69 +++- 19 files changed, 657 insertions(+), 742 deletions(-) delete mode 100644 cpp/include/rapidsmpf/coll/gather_statistics.hpp delete mode 100644 cpp/src/coll/gather_statistics.cpp delete mode 100644 cpp/tests/test_gather_statistics.cpp delete mode 100644 python/rapidsmpf/rapidsmpf/coll/gather_statistics.pyi delete mode 100644 python/rapidsmpf/rapidsmpf/coll/gather_statistics.pyx delete mode 100644 python/rapidsmpf/rapidsmpf/tests/test_gather_statistics.py diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 08ae8e704..093d8ceda 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -182,7 +182,6 @@ add_library( rapidsmpf src/coll/allgather.cpp src/coll/allreduce.cpp - src/coll/gather_statistics.cpp src/coll/utils.cpp src/bootstrap/bootstrap.cpp src/bootstrap/file_backend.cpp diff --git a/cpp/include/rapidsmpf/coll/gather_statistics.hpp b/cpp/include/rapidsmpf/coll/gather_statistics.hpp deleted file mode 100644 index 760ef5e2d..000000000 --- a/cpp/include/rapidsmpf/coll/gather_statistics.hpp +++ /dev/null @@ -1,57 +0,0 @@ -/** - * SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - */ -#pragma once - -#include -#include - -#include -#include - -namespace rapidsmpf::coll { - -/** - * @brief Gather statistics from all non-root ranks to the root rank. - * - * Non-root ranks serialize and send their statistics to the root rank. On the - * root rank the @p stats argument is ignored and the return value contains the - * deserialized statistics from every other rank. On non-root ranks the return - * value is an empty vector. - * - * This is a blocking collective: all ranks must call this function. - * - * @note The current implementation is not optimized for performance and should - * not be called on a critical path. - * - * @note It is safe to reuse the @p op_id as soon as this function has returned. - * - * Example usage: - * @code{.cpp} - * Rank root = 0; - * auto others = coll::gather_statistics(comm, op_id, stats, root); - * if (comm->rank() == root) { - * auto global = stats->merge(others); - * std::cout << global->report(); - * } - * @endcode - * - * @param comm The communicator. - * @param op_id Operation ID for tag disambiguation. Must be the same on all ranks. - * @param stats The local statistics to send (ignored on root). Must not be null. - * @param root The root rank that collects the statistics (default 0). - * @note The gathered Statistics objects contain only stats, no memory records - * or formatters. - * - * @return On root: a vector of `nranks - 1` deserialized Statistics from all - * non-root ranks. On non-root ranks: an empty vector. - */ -[[nodiscard]] std::vector> gather_statistics( - std::shared_ptr const& comm, - OpID op_id, - std::shared_ptr const& stats = Statistics::disabled(), - Rank root = 0 -); - -} // namespace rapidsmpf::coll diff --git a/cpp/include/rapidsmpf/statistics.hpp b/cpp/include/rapidsmpf/statistics.hpp index 1727447d8..511cddba7 100644 --- a/cpp/include/rapidsmpf/statistics.hpp +++ b/cpp/include/rapidsmpf/statistics.hpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -34,29 +33,33 @@ class StreamOrderedTiming; * * - **Stat name**: identifies an individual `Stat` accumulator, as passed to * `add_stat()`, `get_stat()`, `add_bytes_stat()`, and `add_duration_stat()`. - * Stats can be accumulated and retrieved without registering a formatter. + * Stats are pure numeric accumulators with no associated rendering + * information. * Examples: `"spill-time"`, `"spill-bytes"`. * - * - **Report entry name**: the label of a formatted line in `report()`, passed - * to `register_formatter()`. An entry may aggregate one or more stats. For - * the single-stat overload of `register_formatter()`, the report entry name - * and stat name are identical. + * - **Report entry name**: the label of a formatted line in `report()`, + * passed to `add_report_entry()`. An entry names one or more stats and + * a `Formatter` that selects how those stats are rendered. When the + * entry covers a single stat, the report entry name and stat name are + * typically identical. * Example: `"spill"` (aggregating `"spill-bytes"` and `"spill-time"`). * + * Formatters are a fixed, predefined set (see `Statistics::Formatter`). + * * @code{.cpp} * Statistics stats; * - * // Register a multi-stat formatter under a single report entry name. - * stats.register_formatter( - * "spill", // report entry name - * {"spill-bytes", "spill-time"}, // stat names - * [](std::ostream& os, auto const& s) { - * os << format_nbytes(s[0].value()) << " in " << format_duration(s[1].value()); - * } + * // Associate two stats with a predefined multi-stat formatter. + * stats.add_report_entry( + * "copy-device-to-host", // report entry name + * {"copy-device-to-host-bytes", + * "copy-device-to-host-time", + * "copy-device-to-host-stream-delay"}, + * Statistics::Formatter::MemCopy * ); * - * stats.add_bytes_stat("spill-bytes", 1024); - * stats.add_duration_stat("spill-time", 0.5); + * stats.add_bytes_stat("spill-bytes", 1024); // helper: registers Bytes entry + * stats.add_duration_stat("spill-time", 0.5s); // helper: registers Duration entry * * auto s = stats.get_stat("spill-bytes"); // retrieve without formatter * std::cout << stats.report(); @@ -64,6 +67,44 @@ class StreamOrderedTiming; */ class Statistics { public: + /** + * @brief Identifies a predefined formatter used by `report()`. + * + * Each formatter consumes a fixed number of `Stat` entries and renders them + * into a human-readable string. + * + * Available formatters (examples): + * + * - Default (1 stat): + * "123" + * + * - Bytes (1 stat): + * "1.2 GiB | avg 300 MiB" + * + * - Duration (1 stat): + * "2.5 ms | avg 600 us" + * + * - HitRate (1 stat): + * "42/100 (hits/lookups)" + * + * - MemCopy (3 stats: bytes, time, delay): + * "1.2 GiB | 2.5 ms | 480 GiB/s | avg-stream-delay 10 us" + * + * - MemAlloc (3 stats: bytes, time, delay): + * "512 MiB | 1.0 ms | 512 GiB/s | avg-stream-delay 5 us" + * + * `_Count` is an internal sentinel — always keep it last. + */ + enum class Formatter : std::uint8_t { + Default = 0, + Bytes, + Duration, + HitRate, + MemCopy, + MemAlloc, + _Count, ///< Sentinel; must remain last. + }; + /** * @brief Constructs a Statistics object without memory profiling. * @@ -107,8 +148,13 @@ class Statistics { ); ~Statistics() noexcept; + + // `Statistics` is owned exclusively through `std::shared_ptr` (see `disabled()` and + // `from_options()`). Statistics(Statistics const&) = delete; Statistics& operator=(Statistics const&) = delete; + Statistics(Statistics&&) = delete; + Statistics& operator=(Statistics&&) = delete; /** * @brief Returns a shared pointer to a disabled (no-op) Statistics instance. @@ -120,29 +166,6 @@ class Statistics { */ static std::shared_ptr disabled(); - /** - * @brief Move constructor. - * - * @param o The Statistics object to move from. - */ - Statistics(Statistics&& o) noexcept - : enabled_(o.enabled()), - stats_{std::move(o.stats_)}, - formatters_{std::move(o.formatters_)} {} - - /** - * @brief Move assignment operator. - * - * @param o The Statistics object to move from. - * @return Reference to this updated instance. - */ - Statistics& operator=(Statistics&& o) noexcept { - enabled_ = o.enabled(); - stats_ = std::move(o.stats_); - formatters_ = std::move(o.formatters_); - return *this; - } - /** * @brief Checks if statistics tracking is enabled. * @@ -169,10 +192,12 @@ class Statistics { /** * @brief Generates a formatted report of all collected statistics. * - * Every registered formatter always produces an entry. If all its required - * statistics have been recorded the formatter renders the values; otherwise the - * entry reads "No data collected". Statistics not covered by any formatter are - * shown as plain numeric values. All entries are sorted alphabetically. + * Every registered report entry always produces a line. If all the stats + * it references have been recorded, the entry's `Formatter` renders + * the values; otherwise the line reads "No data collected". Statistics + * not covered by any report entry are shown with `Formatter::Default` + * (raw numeric value, optionally annotated with the count). All entries + * are sorted alphabetically. * * @note If any statistics are collected via stream-ordered timing (e.g. through * `record_copy()`), all relevant CUDA streams must be synchronized before calling @@ -187,15 +212,14 @@ class Statistics { /** * @brief Writes a JSON representation of all collected statistics to a stream. * - * Values are written as raw numbers (count, sum, max). Registered formatters, - * which produce human-readable strings such as "1.0 KiB" or "3.5 ms" in the - * text report, are not applied so that the output remains machine-parseable - * with consistent numeric types. + * Values are written as raw numbers (count, sum, max). Formatter + * metadata is not emitted — use `report()` for the human-readable + * rendering. * * @param os Output stream to write to. - * @throws std::invalid_argument If any stat name or memory record name contains - * characters that require JSON escaping (double quotes, backslashes, or ASCII - * control characters 0x00–0x1F). + * @throws std::invalid_argument If any stat name or memory record name + * contains characters that require JSON escaping (double quotes, + * backslashes, or ASCII control characters 0x00–0x1F). */ void write_json(std::ostream& os) const; @@ -217,9 +241,9 @@ class Statistics { [[nodiscard]] std::shared_ptr copy() const; /** - * @brief Serializes the stats to a binary byte vector. + * @brief Serializes the stats and report entries to a binary byte vector. * - * @note Memory records and formatters are not serialized. + * @note Memory records are not serialized. * * @return A vector of bytes containing the serialized statistics. */ @@ -228,7 +252,7 @@ class Statistics { /** * @brief Deserializes a Statistics object from a binary byte vector. * - * @note The resulting object has no memory records or formatters. + * @note The resulting object has no memory records. * * @param data The serialized statistics data. * @return A shared pointer to the reconstructed Statistics object. @@ -242,8 +266,9 @@ class Statistics { * @brief Merges this Statistics with another, returning a new Statistics. * * For each stat name present in either object, the result has the summed - * count, summed value, and the maximum of the two maxima. Formatters are - * taken from `*this`. + * count, summed value, and the maximum of the two maxima. Report entries + * are taken from `*this`; entries present only in @p other are copied + * across. * * @note Memory records are not merged. * @@ -257,7 +282,9 @@ class Statistics { /** * @brief Merges this Statistics with multiple others. * - * Equivalent to calling `merge()` repeatedly. Formatters are taken from `*this`. + * Equivalent to calling `merge()` repeatedly. Report entries are taken + * from `*this`; entries present only in one of @p others are copied + * across in iteration order (first-wins on name conflict). * * @note Memory records are not merged. * @@ -378,13 +405,6 @@ class Statistics { double max_{-std::numeric_limits::infinity()}; }; - /** - * @brief Type alias for a statistics formatting function. - * - * The formatter receives all the named stats it declared interest in as a vector. - */ - using Formatter = std::function const&)>; - /** * @brief Retrieves a statistic by name. * @@ -396,7 +416,9 @@ class Statistics { /** * @brief Adds a numeric value to the named statistic. * - * Creates the statistic if it doesn't exist. + * Creates the statistic if it doesn't exist. Does not associate any + * formatter with the stat — use `add_report_entry()` (or a helper like + * `add_bytes_stat()`) for that. * * @param name Name of the statistic. * @param value Value to add. @@ -404,57 +426,49 @@ class Statistics { void add_stat(std::string const& name, double value); /** - * @brief Check whether a report entry name already has a formatter registered. - * - * Intended as a cheap pre-check before constructing arguments to - * `register_formatter()`. + * @brief Associate a formatter with one or more named statistics for + * report rendering. * - * @note The result may be outdated by the time it is acted upon. This method - * should only be used as an optimization hint to avoid unnecessary work, never - * for correctness decisions. Once this method returns `true` for a given name it - * will never return `false` again, because formatters cannot be unregistered. + * First-wins: if a report entry is already registered under + * @p report_entry_name, this call has no effect. The entry appears in + * `report()` as a single line; if any stat it references is missing, + * the line reads "No data collected". * - * @param name Report entry name to look up. - * @return True if a formatter is registered under @p name, otherwise false. + * @param report_entry_name Report entry name. + * @param stat_names Names of the stats this entry aggregates. Caller is + * responsible for passing the number of stats the chosen @p formatter + * expects; a mismatch surfaces as `std::out_of_range` when `report()` + * renders the entry. + * @param formatter Predefined formatter to render the entry with. */ - bool exist_report_entry_name(std::string const& name) const; + void add_report_entry( + std::string const& report_entry_name, + std::vector const& stat_names, + Formatter formatter + ); /** - * @brief Register a formatter for a single named statistic. - * - * If a formatter is already registered under @p name, this call has no effect. - * The formatter is only invoked during `report()` if the named statistic has - * been recorded. + * @brief Check whether a report entry is already registered under @p name. * - * @param name Report entry name (also used as the stat name to collect). - * @param formatter Function used to format this statistic when reporting. - */ - void register_formatter(std::string const& name, Formatter formatter); - - /** - * @brief Register a formatter that takes multiple named statistics. + * Intended as a cheap pre-check so hot-path callers can skip building + * the `stat_names` vector when the entry is already present. * - * If a formatter is already registered under @p report_entry_name, this call has - * no effect. The formatter is invoked during `report()` only if all stats in - * @p stat_names have been recorded; if any are missing the entry reads - * "No data collected". + * @note The result may be stale by the time it is acted upon. It must + * only be used as an optimization hint. Because report entries cannot + * be removed, once this returns `true` for a given @p name it will + * never return `false` again. * - * @param report_entry_name Report entry name. - * @param stat_names Names of the stats to collect and pass to the formatter. - * @param formatter Function called with all collected stats during reporting. + * @param name Report entry name to look up. + * @return True if a report entry is registered under @p name. */ - void register_formatter( - std::string const& report_entry_name, - std::vector const& stat_names, - Formatter formatter - ); + [[nodiscard]] bool has_report_entry(std::string const& name) const; /** * @brief Adds a byte count to the named statistic. * - * Registers a formatter that formats values as human-readable byte sizes if no - * formatter is already registered for @p name, then adds @p nbytes to the - * named statistic. + * Registers a `Formatter::Bytes` report entry named @p name if no + * report entry already exists under that name, then adds @p nbytes to + * the named statistic. * * @param name Name of the statistic. * @param nbytes Number of bytes to add. @@ -464,9 +478,9 @@ class Statistics { /** * @brief Adds a duration to the named statistic. * - * Registers a formatter that formats values as time durations in seconds if no - * formatter is already registered for @p name, then adds @p seconds to the - * named statistic. + * Registers a `Formatter::Duration` report entry named @p name if no + * report entry already exists under that name, then adds @p seconds to + * the named statistic. * * @param name Name of the statistic. * @param seconds Duration in seconds to add. @@ -528,7 +542,7 @@ class Statistics { /** * @brief Clears all statistics. * - * @note Memory profiling records and registered formatters are not cleared. + * @note Memory profiling records and report entries are not cleared. */ void clear(); @@ -607,17 +621,17 @@ class Statistics { private: /** - * @brief Associates a display name with a formatter and the stats it aggregates. + * @brief A report entry describing which stats to aggregate and how to render them. */ - struct FormatterEntry { - std::vector stat_names; ///< Stats to collect and pass to fn. - Formatter fn; + struct ReportEntry { + std::vector stat_names; + Formatter formatter; }; mutable std::mutex mutex_; std::atomic enabled_; std::map stats_; - std::map formatters_; + std::map report_entries_; std::unordered_map memory_records_; RmmResourceAdaptor* mr_; std::shared_ptr diff --git a/cpp/src/coll/gather_statistics.cpp b/cpp/src/coll/gather_statistics.cpp deleted file mode 100644 index 8c6ff1f48..000000000 --- a/cpp/src/coll/gather_statistics.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/** - * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. - * SPDX-License-Identifier: Apache-2.0 - */ - -#include -#include -#include -#include - -#include -#include -#include - -namespace rapidsmpf::coll { - -std::vector> gather_statistics( - std::shared_ptr const& comm, - OpID op_id, - std::shared_ptr const& stats, - Rank root -) { - RAPIDSMPF_EXPECTS(comm != nullptr, "Communicator must not be null"); - RAPIDSMPF_EXPECTS(stats != nullptr, "Statistics must not be null"); - - auto const nranks = comm->nranks(); - auto const rank = comm->rank(); - Tag const tag{op_id, 0}; - - if (nranks == 1) { - return {}; - } - - if (rank != root) { - // Serialize and send to root. - auto serialized = stats->serialize(); - auto msg = std::make_unique>(std::move(serialized)); - auto future = comm->send(std::move(msg), root, tag); - // Poll until send completes. - while (!comm->test(future)) { - std::this_thread::yield(); - } - return {}; - } - - // Root: receive from all other ranks. - // No ack/barrier is needed because the communicator guarantees no message - // overtaking on the same (rank, tag) pair, so consecutive calls with the - // same op_id cannot interfere. - std::vector> ret; - ret.reserve(safe_cast(nranks - 1)); - - Rank received = 0; - while (received < nranks - 1) { - auto [msg, sender] = comm->recv_any(tag); - if (msg) { - ret.push_back(Statistics::deserialize(*msg)); - ++received; - } - std::this_thread::yield(); - } - return ret; -} - -} // namespace rapidsmpf::coll diff --git a/cpp/src/statistics.cpp b/cpp/src/statistics.cpp index 7fd8a3a1c..0ff8eca5d 100644 --- a/cpp/src/statistics.cpp +++ b/cpp/src/statistics.cpp @@ -4,6 +4,7 @@ */ #include +#include #include #include #include @@ -16,26 +17,87 @@ #include #include +namespace rapidsmpf { namespace { + bool has_json_unsafe_chars(std::string_view s) { return std::ranges::any_of(s, [](unsigned char c) { return c == '"' || c == '\\' || c < 0x20; }); } -// For pre-computed names. +// Pre-computed stat names for record_copy / record_alloc. struct Names { - std::string base; // a string like "alloc-{memtype}" or "copy--to-" + std::string base; // "alloc-{memtype}" or "copy-{src}-to-{dst}" std::string nbytes; // "{base}-bytes" std::string time; // "{base}-time" std::string stream_delay; // "{base}-stream-delay" }; -using NamesArray = std::array; -using Names2DArray = std::array; -} // namespace +using NamesArray = std::array; +using Names2DArray = std::array; + +// Predefined render functions indexed by `Statistics::Formatter`. Per-entry +// rendering description lives on the enum in statistics.hpp. +using FormatterFn = void (*)(std::ostream&, std::vector const&); +constexpr std::array(Statistics::Formatter::_Count)> + FORMATTERS = {{ + // Implement `Statistics::Formatter:Default` + [](std::ostream& os, std::vector const& s) { + os << s.at(0).value(); + if (s.at(0).count() > 1) { + os << " (count " << s.at(0).count() << ")"; + } + }, + // Implement `Statistics::Formatter:Bytes` + [](std::ostream& os, std::vector const& s) { + auto const val = s.at(0).value(); + auto const count = s.at(0).count(); + os << format_nbytes(val); + if (count > 1) { + os << " | avg " << format_nbytes(val / static_cast(count)); + } + }, + // Implement `Statistics::Formatter:Duration` + [](std::ostream& os, std::vector const& s) { + auto const val = s.at(0).value(); + auto const count = s.at(0).count(); + os << format_duration(val); + if (count > 1) { + os << " | avg " << format_duration(val / static_cast(count)); + } + }, + // Implement `Statistics::Formatter:HitRate` + [](std::ostream& os, std::vector const& s) { + os << s.at(0).value() << "/" << s.at(0).count() << " (hits/lookups)"; + }, + // Implement `Statistics::Formatter:MemCopy` + [](std::ostream& os, std::vector const& s) { + RAPIDSMPF_EXPECTS( + s.at(0).count() == s.at(1).count() && s.at(1).count() == s.at(2).count(), + "MemCopy formatter expects the record counters to match" + ); + os << format_nbytes(s.at(0).value()) << " | " + << format_duration(s.at(1).value()) << " | " + << format_nbytes(s.at(0).value() / s.at(1).value()) << "/s" + << " | avg-stream-delay " + << format_duration(s.at(2).value() / static_cast(s.at(1).count())); + }, + // Implement `Statistics::Formatter:MemAlloc` + [](std::ostream& os, std::vector const& s) { + RAPIDSMPF_EXPECTS( + s.at(0).count() == s.at(1).count() && s.at(1).count() == s.at(2).count(), + "MemAlloc formatter expects the record counters to match" + ); + os << format_nbytes(s.at(0).value()) << " | " + << format_duration(s.at(1).value()) << " | " + << format_nbytes(s.at(0).value() / s.at(1).value()) << "/s" + << " | avg-stream-delay " + << format_duration(s.at(2).value() / static_cast(s.at(1).count())); + }, + }}; -namespace rapidsmpf { +} // namespace // -- Stat -------------------------------------------------------------------- @@ -147,56 +209,37 @@ void Statistics::add_stat(std::string const& name, double value) { it->second.add(value); } -bool Statistics::exist_report_entry_name(std::string const& name) const { - std::lock_guard lock(mutex_); - return formatters_.contains(name); -} - -void Statistics::register_formatter(std::string const& name, Formatter formatter) { - if (!enabled() || exist_report_entry_name(name)) { - return; - } - register_formatter(name, {name}, std::move(formatter)); -} - -void Statistics::register_formatter( +void Statistics::add_report_entry( std::string const& report_entry_name, std::vector const& stat_names, Formatter formatter ) { - if (!enabled() || exist_report_entry_name(report_entry_name)) { + if (!enabled()) { return; } std::lock_guard lock(mutex_); - formatters_.try_emplace(report_entry_name, stat_names, std::move(formatter)); + report_entries_.try_emplace( + report_entry_name, ReportEntry{.stat_names = stat_names, .formatter = formatter} + ); +} + +bool Statistics::has_report_entry(std::string const& name) const { + std::lock_guard lock(mutex_); + return report_entries_.contains(name); } void Statistics::add_bytes_stat(std::string const& name, std::size_t nbytes) { - if (!exist_report_entry_name(name)) { - register_formatter(name, [](std::ostream& os, std::vector const& stats) { - auto const val = stats[0].value(); - auto const count = stats[0].count(); - os << format_nbytes(val); - if (count > 1) { - os << " | avg " << format_nbytes(val / count); - } - }); - } add_stat(name, static_cast(nbytes)); + if (!has_report_entry(name)) { + add_report_entry(name, {name}, Formatter::Bytes); + } } void Statistics::add_duration_stat(std::string const& name, Duration seconds) { - if (!exist_report_entry_name(name)) { - register_formatter(name, [](std::ostream& os, std::vector const& stats) { - auto const val = stats[0].value(); - auto const count = stats[0].count(); - os << format_duration(val); - if (count > 1) { - os << " | avg " << format_duration(val / count); - } - }); - } add_stat(name, seconds.count()); + if (!has_report_entry(name)) { + add_report_entry(name, {name}, Formatter::Duration); + } } std::vector Statistics::list_stat_names() const { @@ -261,56 +304,46 @@ std::string Statistics::report(std::string const& header) const { // Reporting strategy: // - // Each registered formatter claims one or more stat names and renders them into a - // single labelled entry line using a custom function. Any stat that is not claimed - // by any formatter is rendered with a plain numeric default entry line. All entry - // lines are then sorted alphabetically by their label and printed together. + // Each report entry claims one or more stat names and renders them into + // a single labelled line using the formatter selected by its + // `Formatter`. Any stat that is not claimed by a report entry is + // rendered with `Formatter::Default`. All entry lines are sorted + // alphabetically and printed together. using EntryLine = std::pair; std::vector lines; std::unordered_set consumed; - // Returns true only if every stat name required by a formatter has been recorded. - // If false, the entry is rendered as "No data collected". auto has_all_stats = [&](auto const& names) { return std::ranges::all_of(names, [&](auto const& sname) { return stats_.contains(sname); }); }; - // Formatter-based lines. Emit "No data collected" if any required stats are missing. - for (auto const& [report_entry_name, entry] : formatters_) { + for (auto const& [report_entry_name, entry] : report_entries_) { if (!has_all_stats(entry.stat_names)) { lines.emplace_back(report_entry_name, "No data collected"); continue; } - - std::vector stat_vec; - stat_vec.reserve(entry.stat_names.size()); - for (auto const& sname : entry.stat_names) { - stat_vec.push_back(stats_.at(sname)); - } - + std::vector args; + args.reserve(entry.stat_names.size()); for (auto const& sname : entry.stat_names) { + args.push_back(stats_.at(sname)); consumed.insert(sname); } - std::ostringstream line; - entry.fn(line, stat_vec); + FORMATTERS.at(static_cast(entry.formatter))(line, args); lines.emplace_back(report_entry_name, std::move(line).str()); } - // Uncovered stats get a default raw-value format. + // Stats not covered by any report entry fall back to Formatter::Default. for (auto const& [name, stat] : stats_) { if (consumed.contains(name)) { continue; } std::ostringstream line; - line << stat.value(); - if (stat.count() > 1) { - line << " (count " << stat.count() << ")"; - } + FORMATTERS.at(static_cast(Formatter::Default))(line, {stat}); lines.emplace_back(name, std::move(line).str()); } @@ -322,8 +355,8 @@ std::string Statistics::report(std::string const& header) const { ss << "\n"; for (auto const& [name, text] : lines) { - ss << " - " << std::setw(max_length + 3) << std::left << name + ": " << text - << "\n"; + ss << " - " << std::setw(safe_cast(max_length) + 3) << std::left + << name + ": " << text << "\n"; } ss << "\n"; @@ -393,20 +426,20 @@ std::string Statistics::report(std::string const& header) const { void Statistics::write_json(std::ostream& os) const { std::lock_guard lock(mutex_); - for (auto const& [name, _] : stats_) { + auto const check_name = [](std::string_view name, char const* context) { RAPIDSMPF_EXPECTS( !has_json_unsafe_chars(name), - "stat name cannot contains characters that require JSON escaping: " + name, + std::string(context) + + " cannot contain characters that require JSON escaping: " + + std::string(name), std::invalid_argument ); + }; + for (auto const& [name, _] : stats_) { + check_name(name, "stat name"); } for (auto const& [name, _] : memory_records_) { - RAPIDSMPF_EXPECTS( - !has_json_unsafe_chars(name), - "memory record name cannot contains characters that require JSON escaping: " - + name, - std::invalid_argument - ); + check_name(name, "memory record name"); } os << "{\n"; @@ -456,73 +489,141 @@ std::shared_ptr Statistics::copy() const { std::lock_guard lock(mutex_); auto ret = std::make_shared(enabled_.load(std::memory_order_acquire)); ret->stats_ = stats_; - ret->formatters_ = formatters_; + ret->report_entries_ = report_entries_; return ret; } +namespace { +// POD writer/reader helpers shared by serialize/deserialize. +void write_pod(std::uint8_t*& ptr, auto const& val) { + std::memcpy(ptr, &val, sizeof(val)); + ptr += sizeof(val); +} + +template +std::span read_pod(std::span buf, T& val) { + RAPIDSMPF_EXPECTS( + buf.size() >= sizeof(T), + "truncated Statistics serialization data", + std::invalid_argument + ); + std::memcpy(&val, buf.data(), sizeof(T)); + return buf.subspan(sizeof(T)); +} +} // namespace + std::vector Statistics::serialize() const { std::lock_guard lock(mutex_); // Binary layout: + // [enabled: uint8] // [num_stats: uint64] - // Per stat (sorted by name): - // [name_length: uint64] [name: char[]] [Stat::serialized_size() bytes] - std::size_t total = sizeof(std::uint64_t); // num_stats + // Per stat (sorted by name): [name_len: uint64] [name] [Stat payload] + // [num_entries: uint64] + // Per entry (sorted by name): [name_len: uint64] [name] + // [formatter: uint8] + // [num_stat_names: uint64] + // Per stat_name: [len: uint64] [bytes] + std::size_t total = sizeof(std::uint8_t); // enabled flag + total += sizeof(std::uint64_t); for (auto const& [name, stat] : stats_) { total += sizeof(std::uint64_t) + name.size() + Stat::serialized_size(); } + total += sizeof(std::uint64_t); + for (auto const& [name, entry] : report_entries_) { + total += sizeof(std::uint64_t) + name.size(); + total += sizeof(std::uint8_t); + total += sizeof(std::uint64_t); + for (auto const& sn : entry.stat_names) { + total += sizeof(std::uint64_t) + sn.size(); + } + } std::vector buf(total); std::uint8_t* ptr = buf.data(); - auto const write = [&ptr](auto const& val) { - std::memcpy(ptr, &val, sizeof(val)); - ptr += sizeof(val); - }; - - write(static_cast(stats_.size())); - + write_pod(ptr, static_cast(enabled_.load(std::memory_order_acquire))); + write_pod(ptr, static_cast(stats_.size())); for (auto const& [name, stat] : stats_) { - write(static_cast(name.size())); + write_pod(ptr, static_cast(name.size())); std::memcpy(ptr, name.data(), name.size()); ptr += name.size(); ptr = stat.serialize(ptr); } + + write_pod(ptr, static_cast(report_entries_.size())); + for (auto const& [name, entry] : report_entries_) { + write_pod(ptr, static_cast(name.size())); + std::memcpy(ptr, name.data(), name.size()); + ptr += name.size(); + write_pod(ptr, static_cast(entry.formatter)); + write_pod(ptr, static_cast(entry.stat_names.size())); + for (auto const& sn : entry.stat_names) { + write_pod(ptr, static_cast(sn.size())); + std::memcpy(ptr, sn.data(), sn.size()); + ptr += sn.size(); + } + } return buf; } std::shared_ptr Statistics::deserialize(std::span data) { - // Read a POD value from the front of `buf` and return the remainder. - auto const read = [](std::span buf, T& val) { + auto const read_string = [&](std::uint64_t len) { RAPIDSMPF_EXPECTS( - buf.size() >= sizeof(T), + data.size() >= len, "truncated Statistics serialization data", std::invalid_argument ); - std::memcpy(&val, buf.data(), sizeof(T)); - return buf.subspan(sizeof(T)); + std::string s(reinterpret_cast(data.data()), len); + data = data.subspan(len); + return s; }; - auto ret = std::make_shared(); + std::uint8_t enabled{}; + data = read_pod(data, enabled); + auto ret = std::make_shared(enabled != 0); std::uint64_t num_stats{}; - data = read(data, num_stats); - + data = read_pod(data, num_stats); for (std::uint64_t i = 0; i < num_stats; ++i) { std::uint64_t name_len{}; - data = read(data, name_len); - RAPIDSMPF_EXPECTS( - data.size() >= name_len, - "truncated Statistics serialization data", - std::invalid_argument - ); - std::string name(reinterpret_cast(data.data()), name_len); - data = data.subspan(name_len); - + data = read_pod(data, name_len); + auto name = read_string(name_len); auto [stat, remaining] = Stat::deserialize(data); data = remaining; ret->stats_.emplace(std::move(name), std::move(stat)); } + + std::uint64_t num_entries{}; + data = read_pod(data, num_entries); + for (std::uint64_t i = 0; i < num_entries; ++i) { + std::uint64_t name_len{}; + data = read_pod(data, name_len); + auto name = read_string(name_len); + + std::uint8_t fmt{}; + data = read_pod(data, fmt); + RAPIDSMPF_EXPECTS( + fmt < static_cast(Formatter::_Count), + "Statistics::deserialize: Formatter value out of range", + std::invalid_argument + ); + auto const formatter = static_cast(fmt); + + std::uint64_t num_stat_names{}; + data = read_pod(data, num_stat_names); + std::vector stat_names; + stat_names.reserve(num_stat_names); + for (std::uint64_t j = 0; j < num_stat_names; ++j) { + std::uint64_t sn_len{}; + data = read_pod(data, sn_len); + stat_names.push_back(read_string(sn_len)); + } + ret->report_entries_.emplace( + std::move(name), + ReportEntry{.stat_names = std::move(stat_names), .formatter = formatter} + ); + } return ret; } @@ -534,7 +635,7 @@ std::shared_ptr Statistics::merge( auto ret = std::make_shared(enabled_.load(std::memory_order_acquire)); ret->stats_ = stats_; - ret->formatters_ = formatters_; + ret->report_entries_ = report_entries_; for (auto const& [name, stat] : other->stats_) { auto [it, inserted] = ret->stats_.try_emplace(name, stat); @@ -542,6 +643,11 @@ std::shared_ptr Statistics::merge( it->second = it->second.merge(stat); } } + // Report entries from `other` fill in any gaps (first-wins matches + // the single-rank behavior of add_report_entry). + for (auto const& [name, entry] : other->report_entries_) { + ret->report_entries_.try_emplace(name, entry); + } return ret; } @@ -580,34 +686,12 @@ void Statistics::record_copy( name_map[static_cast(src)][static_cast(dst)]; timing.stop_and_record(names.time, names.stream_delay); - add_stat(names.nbytes, nbytes); - - if (exist_report_entry_name(names.base)) { - return; // exit early to limit overhead. + add_stat(names.nbytes, static_cast(nbytes)); + if (!has_report_entry(names.base)) { + add_report_entry( + names.base, {names.nbytes, names.time, names.stream_delay}, Formatter::MemCopy + ); } - - register_formatter( - names.base, - {names.nbytes, names.time, names.stream_delay}, - [](std::ostream& os, std::vector const& stats) { - auto const nbytes = stats.at(0); - auto const time = stats.at(1); - auto const stream_delay = stats.at(2); - - RAPIDSMPF_EXPECTS( - nbytes.count() == time.count() && time.count() == stream_delay.count(), - "record_copy() expects the record counters to match" - ); - - os << format_nbytes(nbytes.value()); - os << " | " << format_duration(time.value()); - os << " | " << format_nbytes(nbytes.value() / time.value()) << "/s"; - os << " | avg-stream-delay " - << format_duration( - stream_delay.value() / static_cast(time.count()) - ); - } - ); } void Statistics::record_alloc( @@ -631,34 +715,10 @@ void Statistics::record_alloc( auto const& n = names[static_cast(mem_type)]; timing.stop_and_record(n.time, n.stream_delay); - add_stat(n.nbytes, nbytes); - - if (exist_report_entry_name(n.base)) { - return; // exit early to limit overhead. + add_stat(n.nbytes, static_cast(nbytes)); + if (!has_report_entry(n.base)) { + add_report_entry(n.base, {n.nbytes, n.time, n.stream_delay}, Formatter::MemAlloc); } - - register_formatter( - n.base, - {n.nbytes, n.time, n.stream_delay}, - [](std::ostream& os, std::vector const& stats) { - auto const nbytes = stats.at(0); - auto const time = stats.at(1); - auto const stream_delay = stats.at(2); - - RAPIDSMPF_EXPECTS( - nbytes.count() == time.count() && time.count() == stream_delay.count(), - "record_alloc() expects the record counters to match" - ); - - os << format_nbytes(nbytes.value()); - os << " | " << format_duration(time.value()); - os << " | " << format_nbytes(nbytes.value() / time.value()) << "/s"; - os << " | avg-stream-delay " - << format_duration( - stream_delay.value() / static_cast(time.count()) - ); - } - ); } } // namespace rapidsmpf diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 3310d9725..3aff6fa44 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -106,12 +107,13 @@ class FileCache { std::optional get(std::shared_ptr ctx, Key const& key) const { auto& stats = *ctx->statistics(); - stats.register_formatter( - "unbounded_file_read_cache hits", - [](std::ostream& os, std::vector const& s) { - os << s[0].value() << "/" << s[0].count() << " (hits/lookups)"; - } - ); + if (!stats.has_report_entry("unbounded_file_read_cache hits")) { + stats.add_report_entry( + "unbounded_file_read_cache hits", + {"unbounded_file_read_cache hits"}, + Statistics::Formatter::HitRate + ); + } SpillableMessages::MessageId mid; { diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index 924d50229..7f1b713e1 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -81,7 +81,6 @@ target_sources( test_cudf_utils.cpp test_cupti_monitor.cpp test_error_macros.cpp - test_gather_statistics.cpp test_host_buffer.cpp test_metadata_payload_exchange.cpp test_misc.cpp diff --git a/cpp/tests/test_gather_statistics.cpp b/cpp/tests/test_gather_statistics.cpp deleted file mode 100644 index 4412f61d1..000000000 --- a/cpp/tests/test_gather_statistics.cpp +++ /dev/null @@ -1,90 +0,0 @@ -/** - * SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -#include -#include -#include - -#include - -#include -#include -#include - -#include "environment.hpp" - -using namespace rapidsmpf; - -extern Environment* GlobalEnvironment; - -TEST(GatherStatisticsTest, Basic) { - auto const& comm = GlobalEnvironment->comm_; - auto const rank = comm->rank(); - auto const nranks = comm->nranks(); - - auto stats = std::make_shared(); - stats->add_stat("x", safe_cast(rank)); - - auto others = coll::gather_statistics(comm, 0, stats); - - if (rank == 0) { - // Root receives nranks - 1 Statistics from other ranks. - ASSERT_EQ(others.size(), safe_cast(nranks - 1)); - for (auto const& s : others) { - EXPECT_TRUE(s->enabled()); - EXPECT_EQ(s->list_stat_names().size(), 1); - // Each non-root rank sent stat "x" with its rank value. - EXPECT_EQ(s->get_stat("x").count(), 1); - } - // Merge and verify the total. - auto global = stats->merge(others); - // Sum of ranks: 0 + 1 + 2 + ... + (nranks-1) = nranks*(nranks-1)/2 - double expected_sum = safe_cast(nranks) * (nranks - 1) / 2.0; - EXPECT_EQ(global->get_stat("x").value(), expected_sum); - EXPECT_EQ(global->get_stat("x").count(), safe_cast(nranks)); - } else { - EXPECT_TRUE(others.empty()); - } - - GlobalEnvironment->barrier(); -} - -TEST(GatherStatisticsTest, SingleRank) { - auto const& comm = GlobalEnvironment->comm_; - if (comm->nranks() != 1) { - GTEST_SKIP() << "Test only meaningful with 1 rank"; - } - - auto stats = std::make_shared(); - stats->add_stat("x", 42.0); - - auto others = coll::gather_statistics(comm, 1, stats); - EXPECT_TRUE(others.empty()); -} - -TEST(GatherStatisticsTest, DisjointStatNames) { - auto const& comm = GlobalEnvironment->comm_; - auto const rank = comm->rank(); - auto const nranks = comm->nranks(); - - auto stats = std::make_shared(); - stats->add_stat("rank-" + std::to_string(rank), 1.0); - - auto others = coll::gather_statistics(comm, 2, stats); - - if (rank == 0) { - ASSERT_EQ(others.size(), safe_cast(nranks - 1)); - auto global = stats->merge(others); - // Should have one stat per rank. - EXPECT_EQ(global->list_stat_names().size(), safe_cast(nranks)); - for (Rank r = 0; r < nranks; ++r) { - EXPECT_EQ(global->get_stat("rank-" + std::to_string(r)).value(), 1.0); - } - } else { - EXPECT_TRUE(others.empty()); - } - - GlobalEnvironment->barrier(); -} diff --git a/cpp/tests/test_statistics.cpp b/cpp/tests/test_statistics.cpp index 29bb87da0..021152f77 100644 --- a/cpp/tests/test_statistics.cpp +++ b/cpp/tests/test_statistics.cpp @@ -52,24 +52,30 @@ TEST_F(StatisticsTest, Communication) { EXPECT_THROW(stats.get_stat("unknown-name"), std::out_of_range); - auto custom_formatter = [](std::ostream& os, - std::vector const& s) { - os << s[0].value() << " by custom formatter"; - }; - - stats.register_formatter("custom-formatter", custom_formatter); - stats.add_stat("custom-formatter", 10); - stats.add_stat("custom-formatter", 1); - EXPECT_EQ(stats.get_stat("custom-formatter").count(), 2); - EXPECT_EQ(stats.get_stat("custom-formatter").value(), 11); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("custom-formatter")); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("11 by custom formatter")); + // Default-formatted stat (no report entry needed). + stats.add_stat("plain-stat", 10); + stats.add_stat("plain-stat", 1); + EXPECT_EQ(stats.get_stat("plain-stat").count(), 2); + EXPECT_EQ(stats.get_stat("plain-stat").value(), 11); + EXPECT_THAT(stats.report(), ::testing::HasSubstr("plain-stat")); + EXPECT_THAT(stats.report(), ::testing::HasSubstr("11 (count 2)")); stats.add_bytes_stat("byte-statistics", 20); EXPECT_THAT(stats.report(), ::testing::HasSubstr("byte-statistics")); EXPECT_THAT(stats.report(), ::testing::HasSubstr("20 B")); } +TEST_F(StatisticsTest, AddReportEntryArityMismatchThrowsOnRender) { + rapidsmpf::Statistics stats; + // MemCopy expects 3 stats; passing one is accepted at registration but + // fails when report() tries to render the entry. + stats.add_report_entry( + "bad", {"only-one"}, rapidsmpf::Statistics::Formatter::MemCopy + ); + stats.add_stat("only-one", 1.0); + EXPECT_THROW(std::ignore = stats.report(), std::out_of_range); +} + TEST_F(StatisticsTest, StatMax) { Statistics::Stat s; EXPECT_EQ(s.max(), -std::numeric_limits::infinity()); @@ -84,81 +90,51 @@ TEST_F(StatisticsTest, StatMax) { EXPECT_EQ(s.max(), 10.0); // max stays at 10 } -TEST_F(StatisticsTest, ExistReportEntryName) { +TEST_F(StatisticsTest, AddReportEntryFirstWins) { rapidsmpf::Statistics stats; - - // Unknown name returns false. - EXPECT_FALSE(stats.exist_report_entry_name("foo")); - - // Returns true after registration. - stats.register_formatter( - "foo", [](std::ostream& os, std::vector const& s) { - os << s[0].value(); - } + // The first add_report_entry wins: a Default (count-aware) entry stays + // in place even after add_bytes_stat tries to upgrade it to Bytes. + stats.add_report_entry( + "my-bytes", {"my-bytes"}, rapidsmpf::Statistics::Formatter::Default ); - EXPECT_TRUE(stats.exist_report_entry_name("foo")); - - // Unrelated name is still absent. - EXPECT_FALSE(stats.exist_report_entry_name("bar")); - - // Disabled statistics always returns false (no formatters are ever registered). - rapidsmpf::Statistics disabled(false); - disabled.register_formatter( - "foo", [](std::ostream& os, std::vector const& s) { - os << s[0].value(); - } - ); - EXPECT_FALSE(disabled.exist_report_entry_name("foo")); -} - -TEST_F(StatisticsTest, RegisterFormatterFirstWins) { - rapidsmpf::Statistics stats; - // Register a custom formatter first. - stats.register_formatter( - "my-bytes", - [](std::ostream& os, std::vector const& s) { - os << "custom:" << s[0].value(); - } - ); - // add_bytes_stat tries to register a bytes formatter, but the custom one takes - // precedence because the first registered formatter is always used. stats.add_bytes_stat("my-bytes", 1024); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("custom:1024")); + EXPECT_THAT(stats.report(), ::testing::HasSubstr("my-bytes")); + EXPECT_THAT(stats.report(), ::testing::HasSubstr("1024")); EXPECT_THAT(stats.report(), ::testing::Not(::testing::HasSubstr("KiB"))); } -TEST_F(StatisticsTest, MultiStatFormatter) { +TEST_F(StatisticsTest, MultiStatReportEntry) { rapidsmpf::Statistics stats; - stats.register_formatter( - "spill-summary", - {"spill-bytes", "spill-time"}, - [](std::ostream& os, std::vector const& s) { - os << format_nbytes(s[0].value()) << " in " << format_duration(s[1].value()); - } + // Build a MemCopy-style 3-stat report entry. + stats.add_report_entry( + "copy-summary", + {"copy-summary-bytes", "copy-summary-time", "copy-summary-stream-delay"}, + rapidsmpf::Statistics::Formatter::MemCopy ); - stats.add_stat("spill-bytes", 1024 * 1024); - stats.add_stat("spill-time", 0.001); - EXPECT_THAT(stats.report(), ::testing::HasSubstr("spill-summary")); + stats.add_stat("copy-summary-bytes", 1024 * 1024); + stats.add_stat("copy-summary-time", 0.001); + stats.add_stat("copy-summary-stream-delay", 0.0001); + EXPECT_THAT(stats.report(), ::testing::HasSubstr("copy-summary")); EXPECT_THAT(stats.report(), ::testing::HasSubstr("1 MiB")); - // The component stats should not appear as individual report entries. - EXPECT_THAT(stats.report(), ::testing::Not(::testing::HasSubstr("spill-bytes"))); - EXPECT_THAT(stats.report(), ::testing::Not(::testing::HasSubstr("spill-time"))); + // Component stats are consumed by the report entry and don't emit + // their own lines. + EXPECT_THAT( + stats.report(), ::testing::Not(::testing::HasSubstr("copy-summary-bytes:")) + ); } TEST_F(StatisticsTest, ReportNoDataCollected) { rapidsmpf::Statistics stats; - stats.register_formatter( + stats.add_report_entry( "spill-summary", - {"spill-bytes", "spill-time"}, - [](std::ostream& os, std::vector const& s) { - os << format_nbytes(s[0].value()) << " in " << format_duration(s[1].value()); - } + {"spill-bytes", "spill-time", "spill-delay"}, + rapidsmpf::Statistics::Formatter::MemCopy ); - // No stats recorded — formatter should still appear with "No data collected". + // No stats recorded — entry should still appear with "No data collected". EXPECT_THAT(stats.report(), ::testing::HasSubstr("spill-summary")); EXPECT_THAT(stats.report(), ::testing::HasSubstr("No data collected")); - // Adding only one of the two required stats still yields "No data collected". + // Adding only one of the three required stats still yields "No data collected". stats.add_stat("spill-bytes", 1024 * 1024); EXPECT_THAT(stats.report(), ::testing::HasSubstr("No data collected")); EXPECT_THAT(stats.report(), ::testing::HasSubstr("spill-bytes")); // uncovered @@ -167,28 +143,20 @@ TEST_F(StatisticsTest, ReportNoDataCollected) { TEST_F(StatisticsTest, ReportSorting) { rapidsmpf::Statistics stats; - // Register formatter entries for "banana" and "cherry". - stats.register_formatter( - "banana", - [](std::ostream& os, std::vector const& s) { - os << s[0].value(); - } - ); stats.add_stat("banana", 2); - - stats.register_formatter( - "cherry", - [](std::ostream& os, std::vector const& s) { - os << s[0].value(); - } + stats.add_report_entry( + "banana", {"banana"}, rapidsmpf::Statistics::Formatter::Default ); + stats.add_stat("cherry", 3); + stats.add_report_entry( + "cherry", {"cherry"}, rapidsmpf::Statistics::Formatter::Default + ); - // Add uncovered raw stats for "apple" and "date". + // Uncovered raw stats for "apple" and "date". stats.add_stat("apple", 1); stats.add_stat("date", 4); - // All four entries must appear. auto const r = stats.report(); auto const pos_apple = r.find("apple"); auto const pos_banana = r.find("banana"); @@ -200,7 +168,6 @@ TEST_F(StatisticsTest, ReportSorting) { ASSERT_NE(pos_cherry, std::string::npos); ASSERT_NE(pos_date, std::string::npos); - // They must appear in alphabetical order. EXPECT_LT(pos_apple, pos_banana); EXPECT_LT(pos_banana, pos_cherry); EXPECT_LT(pos_cherry, pos_date); @@ -367,6 +334,9 @@ TEST_F(StatisticsTest, JsonStream) { EXPECT_THAT(s, ::testing::HasSubstr(R"("max": 10)")); EXPECT_THAT(s, ::testing::HasSubstr(R"("bar")")); EXPECT_THAT(s, ::testing::Not(::testing::HasSubstr("memory_records"))); + // JSON carries numeric data only — no formatter/report-entry metadata. + EXPECT_THAT(s, ::testing::Not(::testing::HasSubstr("report_entries"))); + EXPECT_THAT(s, ::testing::Not(::testing::HasSubstr("formatter"))); } TEST_F(StatisticsTest, InvalidStatNames) { @@ -464,26 +434,89 @@ TEST_F(StatisticsTest, SerializeMalformed) { std::ignore = rapidsmpf::Statistics::deserialize(empty), std::invalid_argument ); - // Truncated: just one byte, not enough for num_stats. + // Truncated: just one byte, not enough for the num_stats field. std::vector truncated = {1}; EXPECT_THROW( std::ignore = rapidsmpf::Statistics::deserialize(truncated), std::invalid_argument ); } -TEST_F(StatisticsTest, Copy) { +TEST_F(StatisticsTest, DeserializeRejectsOutOfRangeFormatter) { + // Craft a payload with one report-entry whose formatter byte is out of + // range. Layout: [enabled=1][num_stats=0][num_entries=1] + // [name_len=1]["x"][formatter=0xFF][num_stat_names=1] + // [sn_len=1]["y"] + auto const poke_u64 = [](std::vector& v, std::uint64_t x) { + for (int i = 0; i < 8; ++i) + v.push_back(static_cast((x >> (i * 8)) & 0xFF)); + }; + std::vector buf; + buf.push_back(1); // enabled + poke_u64(buf, 0); // num_stats + poke_u64(buf, 1); // num_entries + poke_u64(buf, 1); // name_len + buf.push_back('x'); + buf.push_back(0xFF); // formatter value well past Formatter::_Count + poke_u64(buf, 1); // num_stat_names + poke_u64(buf, 1); // sn_len + buf.push_back('y'); + + EXPECT_THROW( + std::ignore = rapidsmpf::Statistics::deserialize(buf), std::invalid_argument + ); +} + +TEST_F(StatisticsTest, SerializeRoundTripPreservesEnabledFlag) { + // A disabled Statistics should come back disabled after a round-trip. + rapidsmpf::Statistics disabled(false); + auto const bytes = disabled.serialize(); + auto deserialized = rapidsmpf::Statistics::deserialize(bytes); + EXPECT_FALSE(deserialized->enabled()); + + // And an enabled one comes back enabled. + rapidsmpf::Statistics enabled(true); + auto const bytes2 = enabled.serialize(); + auto deserialized2 = rapidsmpf::Statistics::deserialize(bytes2); + EXPECT_TRUE(deserialized2->enabled()); +} + +TEST_F(StatisticsTest, SerializeRoundTripWithReportEntries) { rapidsmpf::Statistics stats; - stats.add_stat("x", 10.0); - stats.register_formatter( - "x", [](std::ostream& os, std::vector const& s) { - os << "fmt:" << s[0].value(); - } + stats.add_bytes_stat("alpha", 2048); // Bytes entry + stats.add_duration_stat("beta", rapidsmpf::Duration{0.005}); // Duration entry + stats.add_report_entry( + "copy", + {"copy-bytes", "copy-time", "copy-delay"}, + rapidsmpf::Statistics::Formatter::MemCopy ); + stats.add_stat("copy-bytes", 1024.0 * 1024.0); + stats.add_stat("copy-time", 0.002); + stats.add_stat("copy-delay", 0.00001); + + auto const bytes = stats.serialize(); + auto deserialized = rapidsmpf::Statistics::deserialize(bytes); + + // Stats round-trip numerically. + EXPECT_EQ(deserialized->get_stat("alpha"), stats.get_stat("alpha")); + EXPECT_EQ(deserialized->get_stat("beta"), stats.get_stat("beta")); + EXPECT_EQ(deserialized->get_stat("copy-bytes"), stats.get_stat("copy-bytes")); + + // And crucially, formatter metadata is preserved: the deserialized + // report renders formatted values, not raw numbers. + auto const deser_report = deserialized->report(); + EXPECT_THAT(deser_report, ::testing::HasSubstr("2 KiB")); // alpha via Bytes + EXPECT_THAT(deser_report, ::testing::HasSubstr("1 MiB")); // copy composite +} + +TEST_F(StatisticsTest, Copy) { + rapidsmpf::Statistics stats; + stats.add_bytes_stat("x", 2048); // registers a Bytes report entry auto copied = stats.copy(); EXPECT_TRUE(copied->enabled()); EXPECT_EQ(copied->get_stat("x"), stats.get_stat("x")); - EXPECT_THAT(copied->report(), ::testing::HasSubstr("fmt:10")); + // The Bytes formatter carried over the copy. + EXPECT_THAT(copied->report(), ::testing::HasSubstr("2 KiB")); } TEST_F(StatisticsTest, MergeOverlapping) { @@ -529,25 +562,20 @@ TEST_F(StatisticsTest, MergeEmpty) { EXPECT_EQ(merged2->list_stat_names().size(), 1); } -TEST_F(StatisticsTest, MergeUsesThisFormatters) { +TEST_F(StatisticsTest, MergeCombinesReportEntries) { auto a = std::make_shared(); - a->register_formatter( - "x", [](std::ostream& os, std::vector const& s) { - os << "custom:" << s[0].value(); - } - ); - a->add_stat("x", 10.0); + a->add_bytes_stat("x", 10); // Bytes report entry auto b = std::make_shared(); - b->add_stat("x", 5.0); + b->add_stat("x", 5.0); // no formatter on this side - // Merging a (has formatter) with b: result uses a's formatter. + // Merging a (has Bytes entry) with b: result uses a's entry. auto merged = a->merge(b); - EXPECT_THAT(merged->report(), ::testing::HasSubstr("custom:15")); + EXPECT_THAT(merged->report(), ::testing::HasSubstr("15 B")); - // Merging b (no formatter) with a: result does not have the formatter. + // Merging b (no entry) with a: entry from a fills in. auto merged2 = b->merge(a); - EXPECT_THAT(merged2->report(), ::testing::Not(::testing::HasSubstr("custom:"))); + EXPECT_THAT(merged2->report(), ::testing::HasSubstr("15 B")); } TEST_F(StatisticsTest, MergeSpan) { diff --git a/docs/source/statistics.md b/docs/source/statistics.md index 877218ad3..b9f231b1c 100644 --- a/docs/source/statistics.md +++ b/docs/source/statistics.md @@ -41,10 +41,10 @@ Statistics: ### JSON (`write_json()`) -JSON output contains raw numeric values for all statistics. Registered -formatters (which produce human-readable strings such as "1.0 KiB" or "3.5 ms" -in the text report) are not applied — values remain as plain numbers to keep -the output machine-parseable. For example, a bytes statistic that reads +JSON output contains raw numeric values for all statistics. Formatters +(which produce human-readable strings such as "1.0 KiB" or "3.5 ms" in the +text report) are not applied — values remain as plain numbers to keep the +output machine-parseable. For example, a bytes statistic that reads `"2.9957e+09"` is roughly three billion bytes; the text report would show `"2.79 GiB"` for the same figure. diff --git a/python/rapidsmpf/rapidsmpf/coll/CMakeLists.txt b/python/rapidsmpf/rapidsmpf/coll/CMakeLists.txt index f850e21e4..ebce60b32 100644 --- a/python/rapidsmpf/rapidsmpf/coll/CMakeLists.txt +++ b/python/rapidsmpf/rapidsmpf/coll/CMakeLists.txt @@ -5,7 +5,7 @@ # cmake-format: on # ================================================================================= -set(cython_modules allgather.pyx gather_statistics.pyx) +set(cython_modules allgather.pyx) rapids_cython_create_modules( CXX diff --git a/python/rapidsmpf/rapidsmpf/coll/__init__.py b/python/rapidsmpf/rapidsmpf/coll/__init__.py index f8a0bfc49..f50ac53a3 100644 --- a/python/rapidsmpf/rapidsmpf/coll/__init__.py +++ b/python/rapidsmpf/rapidsmpf/coll/__init__.py @@ -5,6 +5,5 @@ from __future__ import annotations from rapidsmpf.coll.allgather import AllGather -from rapidsmpf.coll.gather_statistics import gather_statistics -__all__ = ["AllGather", "gather_statistics"] +__all__ = ["AllGather"] diff --git a/python/rapidsmpf/rapidsmpf/coll/gather_statistics.pyi b/python/rapidsmpf/rapidsmpf/coll/gather_statistics.pyi deleted file mode 100644 index 1b4b72d66..000000000 --- a/python/rapidsmpf/rapidsmpf/coll/gather_statistics.pyi +++ /dev/null @@ -1,12 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. -# SPDX-License-Identifier: Apache-2.0 - -from rapidsmpf.communicator.communicator import Communicator -from rapidsmpf.statistics import Statistics - -def gather_statistics( - comm: Communicator, - op_id: int, - stats: Statistics, - root: int = 0, -) -> list[Statistics]: ... diff --git a/python/rapidsmpf/rapidsmpf/coll/gather_statistics.pyx b/python/rapidsmpf/rapidsmpf/coll/gather_statistics.pyx deleted file mode 100644 index fc3977bc3..000000000 --- a/python/rapidsmpf/rapidsmpf/coll/gather_statistics.pyx +++ /dev/null @@ -1,69 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. -# SPDX-License-Identifier: Apache-2.0 - -from libc.stdint cimport int32_t -from libcpp.memory cimport shared_ptr -from libcpp.vector cimport vector - -from rapidsmpf._detail.exception_handling cimport ex_handler -from rapidsmpf.communicator.communicator cimport (Communicator, Rank, - cpp_Communicator) -from rapidsmpf.statistics cimport Statistics, cpp_Statistics - - -cdef extern from "" nogil: - vector[shared_ptr[cpp_Statistics]] cpp_gather_statistics \ - "rapidsmpf::coll::gather_statistics"( - const shared_ptr[cpp_Communicator]& comm, - int32_t op_id, - const shared_ptr[cpp_Statistics]& stats, - Rank root, - ) except +ex_handler - - -def gather_statistics( - Communicator comm not None, - int32_t op_id, - Statistics stats not None, - Rank root = 0, -): - """ - Gather statistics from all non-root ranks to the root rank. - - Non-root ranks serialize and send their statistics to the root rank. - On root, the ``stats`` argument is ignored and the return value contains - the deserialized statistics from every other rank. On non-root ranks the - return value is an empty list. - - This is a blocking collective: all ranks must call this function. - - Parameters - ---------- - comm - The communicator. - op_id - Operation ID for tag disambiguation. - stats - The local statistics to send (ignored on root). - root - The root rank that collects the statistics (default 0). - - Returns - ------- - On root: a list of deserialized Statistics from all non-root ranks. - The gathered Statistics contain only stats, no memory records or formatters. - On non-root ranks: an empty list. - """ - cdef vector[shared_ptr[cpp_Statistics]] cpp_ret - with nogil: - cpp_ret = cpp_gather_statistics( - comm._handle, op_id, stats._handle, root - ) - - cdef list ret = [] - cdef Statistics s - for i in range(cpp_ret.size()): - s = Statistics.__new__(Statistics) - s._handle = cpp_ret[i] - ret.append(s) - return ret diff --git a/python/rapidsmpf/rapidsmpf/statistics.pxd b/python/rapidsmpf/rapidsmpf/statistics.pxd index 7f528d88b..ced1f3762 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pxd +++ b/python/rapidsmpf/rapidsmpf/statistics.pxd @@ -2,7 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 from libc.stddef cimport size_t -from libc.stdint cimport int64_t, uint64_t +from libc.stdint cimport int64_t, uint8_t, uint64_t from libcpp cimport bool from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.string cimport string @@ -16,6 +16,16 @@ from rapidsmpf.rmm_resource_adaptor cimport (RmmResourceAdaptor, cdef extern from "" nogil: + cpdef enum class Formatter "rapidsmpf::Statistics::Formatter" (uint8_t): + Default + Bytes + Duration + HitRate + MemCopy + MemAlloc + # `_Count` sentinel from the C++ enum is intentionally omitted — + # it's an internal implementation detail not meant for Python callers. + cdef cppclass cpp_Statistics "rapidsmpf::Statistics": bool enabled() except +ex_handler string report() except +ex_handler @@ -23,6 +33,11 @@ cdef extern from "" nogil: string name, double value ) except +ex_handler + void add_report_entry( + string report_entry_name, + vector[string] stat_names, + Formatter formatter + ) except +ex_handler bool is_memory_profiling_enabled() except +ex_handler unordered_map[string, cpp_MemoryRecord] get_memory_records() \ except +ex_handler @@ -30,6 +45,7 @@ cdef extern from "" nogil: shared_ptr[cpp_Statistics] merge_many "merge"( vector[shared_ptr[cpp_Statistics]] others ) except +ex_handler + vector[uint8_t] serialize() except +ex_handler cdef struct cpp_MemoryRecord "rapidsmpf::Statistics::MemoryRecord": cpp_ScopedMemoryRecord scoped diff --git a/python/rapidsmpf/rapidsmpf/statistics.pyi b/python/rapidsmpf/rapidsmpf/statistics.pyi index 39c3bba50..e752c4ff9 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pyi +++ b/python/rapidsmpf/rapidsmpf/statistics.pyi @@ -1,9 +1,10 @@ -# SPDX-FileCopyrightText: Copyright (c) 2025, NVIDIA CORPORATION & AFFILIATES. +# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -from collections.abc import Sequence +from collections.abc import Iterable, Sequence from dataclasses import dataclass +from enum import IntEnum from os import PathLike from typing import Any, Self @@ -12,6 +13,14 @@ from rapidsmpf.memory.pinned_memory_resource import PinnedMemoryResource from rapidsmpf.memory.scoped_memory_record import ScopedMemoryRecord from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor +class Formatter(IntEnum): + Default = ... + Bytes = ... + Duration = ... + HitRate = ... + MemCopy = ... + MemAlloc = ... + class Statistics: def __init__( self, @@ -33,6 +42,12 @@ class Statistics: def get_stat(self, name: str) -> dict[str, int | float]: ... def list_stat_names(self) -> list[str]: ... def add_stat(self, name: str, value: float) -> None: ... + def add_report_entry( + self, + name: str, + stat_names: Iterable[str], + formatter: Formatter, + ) -> None: ... @property def memory_profiling_enabled(self) -> bool: ... def get_memory_records(self) -> dict[str, MemoryRecord]: ... diff --git a/python/rapidsmpf/rapidsmpf/statistics.pyx b/python/rapidsmpf/rapidsmpf/statistics.pyx index 14cd2c1c3..4265e1c14 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pyx +++ b/python/rapidsmpf/rapidsmpf/statistics.pyx @@ -1,8 +1,11 @@ # SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 +from cpython.bytes cimport PyBytes_FromStringAndSize from cython.operator cimport dereference as deref from cython.operator cimport preincrement +from libc.stdint cimport uint8_t +from libc.string cimport memcpy from libcpp cimport bool as bool_t from libcpp.memory cimport make_shared, make_unique, shared_ptr from libcpp.string cimport string @@ -65,6 +68,14 @@ cdef extern from *: stats.write_json(ss); return ss.str(); } + // Wrap the span-based Statistics::deserialize so Cython can pass a vector. + std::shared_ptr cpp_deserialize_statistics( + std::vector const& v + ) { + return rapidsmpf::Statistics::deserialize( + std::span(v.data(), v.size()) + ); + } """ size_t cpp_get_statistic_count(cpp_Statistics stats, string name) \ except +ex_handler nogil @@ -77,6 +88,9 @@ cdef extern from *: void cpp_write_json(cpp_Statistics stats, string filepath) \ except +ex_handler nogil string cpp_write_json_string(cpp_Statistics stats) except +ex_handler nogil + shared_ptr[cpp_Statistics] cpp_deserialize_statistics( + const vector[uint8_t]& v + ) except +ex_handler nogil cdef class Statistics: """ @@ -228,6 +242,33 @@ cdef class Statistics: with nogil: deref(self._handle).add_stat(name_, value) + def add_report_entry(self, name, stat_names, Formatter formatter): + """ + Associate a predefined formatter with one or more stat names. + + Mirrors the C++ ``rapidsmpf::Statistics::add_report_entry``. + First-wins: if a report entry already exists under ``name``, this + call has no effect. + + Parameters + ---------- + name + Report entry name. Becomes one line in :meth:`report`. + stat_names + Iterable of stat names this entry aggregates. The number of + names must match the arity of ``formatter``. + formatter + A `Formatter` selecting the predefined render function. + """ + cdef string name_ = str.encode(name) + cdef vector[string] cpp_stat_names + for sn in stat_names: + cpp_stat_names.push_back(str.encode(sn)) + with nogil: + deref(self._handle).add_report_entry( + name_, cpp_stat_names, formatter + ) + @property def memory_profiling_enabled(self): """ @@ -394,6 +435,32 @@ cdef class Statistics: result = cpp_write_json_string(deref(self._handle)) return result.decode("utf-8") + def __getstate__(self): + """Serialize stats and report entries for pickling. + + Memory records and the memory-profiling resource pointer are not + included — matching the C++ ``Statistics::serialize()`` contract. + """ + cdef vector[uint8_t] vec + with nogil: + vec = deref(self._handle).serialize() + return PyBytes_FromStringAndSize( + vec.data() if not vec.empty() else NULL, + vec.size() + ) + + def __setstate__(self, bytes state not None): + """Restore stats and report entries from a pickled bytes buffer.""" + cdef Py_ssize_t size = len(state) + cdef const char* src = state + cdef vector[uint8_t] vec + with nogil: + vec.resize(size) + memcpy(vec.data(), src, size) + self._handle = cpp_deserialize_statistics(vec) + # Memory-profiling state is not serialized. + self._mr = None + @dataclass class MemoryRecord: diff --git a/python/rapidsmpf/rapidsmpf/tests/test_gather_statistics.py b/python/rapidsmpf/rapidsmpf/tests/test_gather_statistics.py deleted file mode 100644 index d9cb94b8e..000000000 --- a/python/rapidsmpf/rapidsmpf/tests/test_gather_statistics.py +++ /dev/null @@ -1,58 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION & AFFILIATES. -# SPDX-License-Identifier: Apache-2.0 -"""Tests for gather_statistics.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -from rapidsmpf.coll import gather_statistics -from rapidsmpf.statistics import Statistics - -if TYPE_CHECKING: - from rapidsmpf.communicator.communicator import Communicator - - -def test_gather_basic(comm: Communicator) -> None: - stats = Statistics(enable=True) - stats.add_stat("x", float(comm.rank)) - - others = gather_statistics(comm, 0, stats) - - if comm.rank == 0: - assert len(others) == comm.nranks - 1 - for s in others: - assert s.enabled - assert len(s.list_stat_names()) == 1 - else: - assert len(others) == 0 - - -def test_gather_single_rank(comm: Communicator) -> None: - if comm.nranks != 1: - return - - stats = Statistics(enable=True) - stats.add_stat("x", 42.0) - - others = gather_statistics(comm, 1, stats) - assert len(others) == 0 - - -def test_gather_disjoint_names(comm: Communicator) -> None: - stats = Statistics(enable=True) - stats.add_stat(f"rank-{comm.rank}", 1.0) - - others = gather_statistics(comm, 2, stats) - - if comm.rank == 0: - assert len(others) == comm.nranks - 1 - # Each remote rank contributed a uniquely named stat. - all_names: set[str] = set(stats.list_stat_names()) - for s in others: - all_names.update(s.list_stat_names()) - assert len(all_names) == comm.nranks - for r in range(comm.nranks): - assert f"rank-{r}" in all_names - else: - assert len(others) == 0 diff --git a/python/rapidsmpf/rapidsmpf/tests/test_statistics.py b/python/rapidsmpf/rapidsmpf/tests/test_statistics.py index a331bb2e3..b16393bb5 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_statistics.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_statistics.py @@ -4,12 +4,13 @@ import json import pathlib +import pickle from typing import TYPE_CHECKING import pytest from rapidsmpf.rmm_resource_adaptor import RmmResourceAdaptor -from rapidsmpf.statistics import Statistics +from rapidsmpf.statistics import Formatter, Statistics if TYPE_CHECKING: import rmm.mr @@ -236,3 +237,69 @@ def test_merge_empty() -> None: merged = a.merge([]) assert merged.get_stat("x") == a.get_stat("x") + + +def test_add_report_entry_bytes() -> None: + stats = Statistics(enable=True) + stats.add_stat("payload", 1024.0) + stats.add_report_entry("payload", ["payload"], Formatter.Bytes) + assert "1 KiB" in stats.report() + + +def test_add_report_entry_memcopy() -> None: + stats = Statistics(enable=True) + stats.add_stat("copy-bytes", 1024 * 1024) + stats.add_stat("copy-time", 0.002) + stats.add_stat("copy-delay", 0.00001) + stats.add_report_entry( + "copy", ["copy-bytes", "copy-time", "copy-delay"], Formatter.MemCopy + ) + assert "1 MiB" in stats.report() + + +def test_json_has_no_formatter_info() -> None: + stats = Statistics(enable=True) + stats.add_stat("alpha", 1024.0) + stats.add_report_entry("alpha", ["alpha"], Formatter.Bytes) + + data = json.loads(stats.write_json_string()) + # JSON carries numeric data only — no formatter/report-entry metadata. + assert "report_entries" not in data + assert "formatter" not in data + + +def test_pickle_roundtrip() -> None: + stats = Statistics(enable=True) + stats.add_stat("payload", 2048.0) + stats.add_report_entry("payload", ["payload"], Formatter.Bytes) + stats.add_stat("plain", 7.0) + stats.add_report_entry( + "copy", + ["copy-bytes", "copy-time", "copy-delay"], + Formatter.MemCopy, + ) + stats.add_stat("copy-bytes", 1024 * 1024) + stats.add_stat("copy-time", 0.002) + stats.add_stat("copy-delay", 0.00001) + + unpickled = pickle.loads(pickle.dumps(stats)) + assert isinstance(unpickled, Statistics) + # Stats round-trip numerically. + assert unpickled.get_stat("payload") == stats.get_stat("payload") + assert unpickled.get_stat("plain") == stats.get_stat("plain") + # Formatter metadata round-trips — the report is identical. + assert unpickled.report() == stats.report() + + +def test_pickle_empty() -> None: + stats = Statistics(enable=True) + unpickled = pickle.loads(pickle.dumps(stats)) + assert unpickled.enabled + assert unpickled.list_stat_names() == [] + + +def test_pickle_preserves_disabled_flag() -> None: + stats = Statistics(enable=False) + assert not stats.enabled + unpickled = pickle.loads(pickle.dumps(stats)) + assert not unpickled.enabled From 9541e64d4a433b8d3e362b36414250a4de34f817 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Fri, 17 Apr 2026 17:16:32 +0200 Subject: [PATCH 2/7] merge is now static --- cpp/include/rapidsmpf/statistics.hpp | 44 +++----- cpp/src/statistics.cpp | 72 +++++++----- cpp/tests/test_statistics.cpp | 106 ++++++++++++++++-- python/rapidsmpf/rapidsmpf/statistics.pxd | 3 - python/rapidsmpf/rapidsmpf/statistics.pyi | 3 +- python/rapidsmpf/rapidsmpf/statistics.pyx | 46 ++++++-- .../rapidsmpf/tests/test_statistics.py | 50 +++++++-- 7 files changed, 239 insertions(+), 85 deletions(-) diff --git a/cpp/include/rapidsmpf/statistics.hpp b/cpp/include/rapidsmpf/statistics.hpp index 511cddba7..d8a1a1263 100644 --- a/cpp/include/rapidsmpf/statistics.hpp +++ b/cpp/include/rapidsmpf/statistics.hpp @@ -263,37 +263,29 @@ class Statistics { ); /** - * @brief Merges this Statistics with another, returning a new Statistics. + * @brief Merge a set of Statistics into a new instance. * - * For each stat name present in either object, the result has the summed - * count, summed value, and the maximum of the two maxima. Report entries - * are taken from `*this`; entries present only in @p other are copied - * across. + * For each stat name present across the inputs, the result contains the + * summed count, summed value, and the maximum of the recorded maxima. + * The result's `enabled()` is true if any input is enabled. Memory + * records are not merged. * - * @note Memory records are not merged. + * Report entries are unified by name. If multiple inputs contain the + * same report-entry name, their `Formatter` and `stat_names` must match; + * otherwise, this function throws `std::invalid_argument` to prevent + * silent rendering inconsistencies (especially across + * serialize/deserialize boundaries). * - * @param other The Statistics to merge with. Must not be null. - * @return A shared pointer to a new Statistics containing the merged stats. - */ - [[nodiscard]] std::shared_ptr merge( - std::shared_ptr const& other - ) const; - - /** - * @brief Merges this Statistics with multiple others. + * @param stats Non-empty span of non-null `Statistics` instances to merge. + * @return A new `Statistics` instance containing the merged data. * - * Equivalent to calling `merge()` repeatedly. Report entries are taken - * from `*this`; entries present only in one of @p others are copied - * across in iteration order (first-wins on name conflict). - * - * @note Memory records are not merged. - * - * @param others The Statistics objects to merge with. No element may be null. - * @return A shared pointer to a new Statistics containing the merged stats. + * @throws std::invalid_argument If @p stats is empty, contains a null + * pointer, or if inputs disagree on the formatter or stat-name set for + * a shared report entry. */ - [[nodiscard]] std::shared_ptr merge( - std::span const> others - ) const; + [[nodiscard]] static std::shared_ptr merge( + std::span const> stats + ); /** * @brief Represents a single tracked statistic. diff --git a/cpp/src/statistics.cpp b/cpp/src/statistics.cpp index 0ff8eca5d..075749da7 100644 --- a/cpp/src/statistics.cpp +++ b/cpp/src/statistics.cpp @@ -628,35 +628,57 @@ std::shared_ptr Statistics::deserialize(std::span Statistics::merge( - std::shared_ptr const& other -) const { - RAPIDSMPF_EXPECTS(other != nullptr, "Statistics pointer must not be null"); - std::scoped_lock lock(mutex_, other->mutex_); + std::span const> stats +) { + RAPIDSMPF_EXPECTS( + !stats.empty(), + "Statistics::merge: input span must not be empty", + std::invalid_argument + ); - auto ret = std::make_shared(enabled_.load(std::memory_order_acquire)); - ret->stats_ = stats_; - ret->report_entries_ = report_entries_; + // Snapshot each input under its own mutex. Folding the snapshots + // afterwards avoids having to hold multiple mutexes at once. + struct Snapshot { + std::map stats; + std::map entries; + bool enabled; + }; - for (auto const& [name, stat] : other->stats_) { - auto [it, inserted] = ret->stats_.try_emplace(name, stat); - if (!inserted) { - it->second = it->second.merge(stat); - } - } - // Report entries from `other` fill in any gaps (first-wins matches - // the single-rank behavior of add_report_entry). - for (auto const& [name, entry] : other->report_entries_) { - ret->report_entries_.try_emplace(name, entry); + std::vector snapshots; + snapshots.reserve(stats.size()); + for (auto const& s : stats) { + RAPIDSMPF_EXPECTS( + s != nullptr, + "Statistics::merge: pointer must not be null", + std::invalid_argument + ); + std::lock_guard lock(s->mutex_); + snapshots.push_back({s->stats_, s->report_entries_, s->enabled()}); } - return ret; -} -std::shared_ptr Statistics::merge( - std::span const> others -) const { - auto ret = copy(); - for (auto const& other : others) { - ret = ret->merge(other); + bool const any_enabled = + std::ranges::any_of(snapshots, [](auto const& s) { return s.enabled; }); + auto ret = std::make_shared(any_enabled); + + for (auto const& snap : snapshots) { + for (auto const& [name, stat] : snap.stats) { + auto [it, inserted] = ret->stats_.try_emplace(name, stat); + if (!inserted) { + it->second = it->second.merge(stat); + } + } + for (auto const& [name, entry] : snap.entries) { + auto [it, inserted] = ret->report_entries_.try_emplace(name, entry); + if (!inserted) { + RAPIDSMPF_EXPECTS( + it->second.formatter == entry.formatter + && it->second.stat_names == entry.stat_names, + "Statistics::merge: report entry '" + name + + "' has conflicting formatter or stat_names", + std::invalid_argument + ); + } + } } return ret; } diff --git a/cpp/tests/test_statistics.cpp b/cpp/tests/test_statistics.cpp index 021152f77..e812525be 100644 --- a/cpp/tests/test_statistics.cpp +++ b/cpp/tests/test_statistics.cpp @@ -527,7 +527,8 @@ TEST_F(StatisticsTest, MergeOverlapping) { auto b = std::make_shared(); b->add_stat("x", 7.0); // count=1, value=7, max=7 - auto merged = a->merge(b); + std::vector> inputs{a, b}; + auto merged = rapidsmpf::Statistics::merge(std::span{inputs}); auto s = merged->get_stat("x"); EXPECT_EQ(s.count(), 3); EXPECT_EQ(s.value(), 20.0); @@ -541,23 +542,26 @@ TEST_F(StatisticsTest, MergeDisjoint) { auto b = std::make_shared(); b->add_stat("y", 2.0); - auto merged = a->merge(b); + std::vector> inputs{a, b}; + auto merged = rapidsmpf::Statistics::merge(std::span{inputs}); EXPECT_EQ(merged->list_stat_names().size(), 2); EXPECT_EQ(merged->get_stat("x"), a->get_stat("x")); EXPECT_EQ(merged->get_stat("y"), b->get_stat("y")); } -TEST_F(StatisticsTest, MergeEmpty) { +TEST_F(StatisticsTest, MergeWithEmpty) { auto a = std::make_shared(); a->add_stat("x", 5.0); auto empty = std::make_shared(); - auto merged = a->merge(empty); + std::vector> inputs{a, empty}; + auto merged = rapidsmpf::Statistics::merge(std::span{inputs}); EXPECT_EQ(merged->get_stat("x"), a->get_stat("x")); EXPECT_EQ(merged->list_stat_names().size(), 1); - auto merged2 = empty->merge(a); + std::vector> rev{empty, a}; + auto merged2 = rapidsmpf::Statistics::merge(std::span{rev}); EXPECT_EQ(merged2->get_stat("x"), a->get_stat("x")); EXPECT_EQ(merged2->list_stat_names().size(), 1); } @@ -570,15 +574,17 @@ TEST_F(StatisticsTest, MergeCombinesReportEntries) { b->add_stat("x", 5.0); // no formatter on this side // Merging a (has Bytes entry) with b: result uses a's entry. - auto merged = a->merge(b); + std::vector> inputs{a, b}; + auto merged = rapidsmpf::Statistics::merge(std::span{inputs}); EXPECT_THAT(merged->report(), ::testing::HasSubstr("15 B")); - // Merging b (no entry) with a: entry from a fills in. - auto merged2 = b->merge(a); + // Order doesn't matter for filling in a missing entry. + std::vector> rev{b, a}; + auto merged2 = rapidsmpf::Statistics::merge(std::span{rev}); EXPECT_THAT(merged2->report(), ::testing::HasSubstr("15 B")); } -TEST_F(StatisticsTest, MergeSpan) { +TEST_F(StatisticsTest, MergeMultiple) { auto a = std::make_shared(); a->add_stat("x", 1.0); @@ -588,10 +594,88 @@ TEST_F(StatisticsTest, MergeSpan) { auto c = std::make_shared(); c->add_stat("y", 10.0); - std::vector> others{b, c}; - auto merged = a->merge(std::span{others}); + std::vector> inputs{a, b, c}; + auto merged = rapidsmpf::Statistics::merge(std::span{inputs}); EXPECT_EQ(merged->list_stat_names().size(), 2); EXPECT_EQ(merged->get_stat("x").value(), 3.0); EXPECT_EQ(merged->get_stat("y").value(), 10.0); } + +TEST_F(StatisticsTest, MergeRejectsEmptySpan) { + std::vector> empty; + EXPECT_THROW( + std::ignore = rapidsmpf::Statistics::merge(std::span{empty}), + std::invalid_argument + ); +} + +TEST_F(StatisticsTest, MergeRejectsNullElement) { + auto a = std::make_shared(); + std::vector> inputs{a, nullptr}; + EXPECT_THROW( + std::ignore = rapidsmpf::Statistics::merge(std::span{inputs}), + std::invalid_argument + ); +} + +TEST_F(StatisticsTest, MergeRejectsConflictingFormatter) { + auto a = std::make_shared(); + a->add_report_entry("x", {"x"}, rapidsmpf::Statistics::Formatter::Bytes); + a->add_stat("x", 1.0); + + auto b = std::make_shared(); + b->add_report_entry("x", {"x"}, rapidsmpf::Statistics::Formatter::Duration); + b->add_stat("x", 2.0); + + std::vector> inputs{a, b}; + EXPECT_THROW( + std::ignore = rapidsmpf::Statistics::merge(std::span{inputs}), + std::invalid_argument + ); +} + +TEST_F(StatisticsTest, MergeIdenticalReportEntries) { + // Two inputs with the same report entry (same formatter + stat_names) + // must merge cleanly — no conflict. + auto a = std::make_shared(); + a->add_bytes_stat("x", 10); + + auto b = std::make_shared(); + b->add_bytes_stat("x", 20); + + std::vector> inputs{a, b}; + auto merged = rapidsmpf::Statistics::merge(std::span{inputs}); + EXPECT_THAT(merged->report(), ::testing::HasSubstr("30 B")); +} + +TEST_F(StatisticsTest, MergeEnabledFlagPropagates) { + auto enabled = std::make_shared(true); + auto disabled = std::make_shared(false); + + // disabled + disabled -> disabled. + std::vector> both_off{disabled, disabled}; + EXPECT_FALSE(rapidsmpf::Statistics::merge(std::span{both_off})->enabled()); + + // disabled + enabled -> enabled. + std::vector> mixed{disabled, enabled}; + EXPECT_TRUE(rapidsmpf::Statistics::merge(std::span{mixed})->enabled()); +} + +TEST_F(StatisticsTest, MergeRejectsConflictingStatNames) { + auto a = std::make_shared(); + a->add_report_entry( + "copy", {"b1", "t1", "d1"}, rapidsmpf::Statistics::Formatter::MemCopy + ); + + auto b = std::make_shared(); + b->add_report_entry( + "copy", {"b2", "t2", "d2"}, rapidsmpf::Statistics::Formatter::MemCopy + ); + + std::vector> inputs{a, b}; + EXPECT_THROW( + std::ignore = rapidsmpf::Statistics::merge(std::span{inputs}), + std::invalid_argument + ); +} diff --git a/python/rapidsmpf/rapidsmpf/statistics.pxd b/python/rapidsmpf/rapidsmpf/statistics.pxd index ced1f3762..4fbe1c854 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pxd +++ b/python/rapidsmpf/rapidsmpf/statistics.pxd @@ -42,9 +42,6 @@ cdef extern from "" nogil: unordered_map[string, cpp_MemoryRecord] get_memory_records() \ except +ex_handler shared_ptr[cpp_Statistics] copy() except +ex_handler - shared_ptr[cpp_Statistics] merge_many "merge"( - vector[shared_ptr[cpp_Statistics]] others - ) except +ex_handler vector[uint8_t] serialize() except +ex_handler cdef struct cpp_MemoryRecord "rapidsmpf::Statistics::MemoryRecord": diff --git a/python/rapidsmpf/rapidsmpf/statistics.pyi b/python/rapidsmpf/rapidsmpf/statistics.pyi index e752c4ff9..c94d3e346 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pyi +++ b/python/rapidsmpf/rapidsmpf/statistics.pyi @@ -53,7 +53,8 @@ class Statistics: def get_memory_records(self) -> dict[str, MemoryRecord]: ... def memory_profiling(self, name: str) -> MemoryRecorder: ... def copy(self) -> Statistics: ... - def merge(self, others: Sequence[Statistics]) -> Statistics: ... + @staticmethod + def merge(stats: Sequence[Statistics]) -> Statistics: ... def clear(self) -> None: ... def write_json(self, filepath: str | PathLike[str]) -> None: ... def write_json_string(self) -> str: ... diff --git a/python/rapidsmpf/rapidsmpf/statistics.pyx b/python/rapidsmpf/rapidsmpf/statistics.pyx index 4265e1c14..172bddda5 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pyx +++ b/python/rapidsmpf/rapidsmpf/statistics.pyx @@ -76,6 +76,16 @@ cdef extern from *: std::span(v.data(), v.size()) ); } + // Wrap the span-based Statistics::merge so Cython can pass a vector. + std::shared_ptr cpp_merge_statistics( + std::vector> const& v + ) { + return rapidsmpf::Statistics::merge( + std::span const>( + v.data(), v.size() + ) + ); + } """ size_t cpp_get_statistic_count(cpp_Statistics stats, string name) \ except +ex_handler nogil @@ -91,6 +101,9 @@ cdef extern from *: shared_ptr[cpp_Statistics] cpp_deserialize_statistics( const vector[uint8_t]& v ) except +ex_handler nogil + shared_ptr[cpp_Statistics] cpp_merge_statistics( + const vector[shared_ptr[cpp_Statistics]]& v + ) except +ex_handler nogil cdef class Statistics: """ @@ -391,29 +404,38 @@ cdef class Statistics: ret._handle = deref(self._handle).copy() return ret - def merge(self, others): + @staticmethod + def merge(stats): """ - Merges this Statistics with a sequence of others. + Merge a sequence of Statistics into a new one. - For each stat name present in any object, the result has the summed - count, summed value, and the maximum of the two maxima. Formatters are - taken from this object. Memory records are not merged. + For each stat name present in any input, the result has the summed + count, summed value, and the maximum of the maxes. Report entries + with the same name must agree on formatter and stat-name list; + otherwise the call raises ``ValueError``. Memory records are not + merged. Parameters ---------- - others - A sequence of Statistics to merge with. + stats + A non-empty sequence of :class:`Statistics` to merge. Returns ------- - A new Statistics containing the merged stats. + A new :class:`Statistics` containing the merged data. + + Raises + ------ + ValueError + If ``stats`` is empty or two inputs have conflicting report + entries. """ cdef Statistics ret = Statistics.__new__(Statistics) - cdef vector[shared_ptr[cpp_Statistics]] cpp_others - for item in others: - cpp_others.push_back((item)._handle) + cdef vector[shared_ptr[cpp_Statistics]] v + for item in stats: + v.push_back((item)._handle) with nogil: - ret._handle = deref(self._handle).merge_many(cpp_others) + ret._handle = cpp_merge_statistics(v) return ret def write_json_string(self) -> str: diff --git a/python/rapidsmpf/rapidsmpf/tests/test_statistics.py b/python/rapidsmpf/rapidsmpf/tests/test_statistics.py index b16393bb5..f500ebd95 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_statistics.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_statistics.py @@ -195,7 +195,7 @@ def test_merge_overlapping() -> None: b = Statistics(enable=True) b.add_stat("x", 7.0) - merged = a.merge([b]) + merged = Statistics.merge([a, b]) s = merged.get_stat("x") assert s["count"] == 3 assert s["value"] == 20.0 @@ -209,7 +209,7 @@ def test_merge_disjoint() -> None: b = Statistics(enable=True) b.add_stat("y", 2.0) - merged = a.merge([b]) + merged = Statistics.merge([a, b]) assert len(merged.list_stat_names()) == 2 assert merged.get_stat("x") == a.get_stat("x") assert merged.get_stat("y") == b.get_stat("y") @@ -225,18 +225,54 @@ def test_merge_multiple() -> None: c = Statistics(enable=True) c.add_stat("y", 10.0) - merged = a.merge([b, c]) + merged = Statistics.merge([a, b, c]) assert len(merged.list_stat_names()) == 2 assert merged.get_stat("x")["value"] == 3.0 assert merged.get_stat("y")["value"] == 10.0 -def test_merge_empty() -> None: +def test_merge_rejects_empty() -> None: + with pytest.raises(ValueError): + Statistics.merge([]) + + +def test_merge_rejects_conflicting_formatter() -> None: a = Statistics(enable=True) - a.add_stat("x", 5.0) + a.add_report_entry("x", ["x"], Formatter.Bytes) + a.add_stat("x", 1.0) - merged = a.merge([]) - assert merged.get_stat("x") == a.get_stat("x") + b = Statistics(enable=True) + b.add_report_entry("x", ["x"], Formatter.Duration) + b.add_stat("x", 2.0) + + with pytest.raises(ValueError): + Statistics.merge([a, b]) + + +def test_merge_rejects_conflicting_stat_names() -> None: + a = Statistics(enable=True) + a.add_report_entry("copy", ["b1", "t1", "d1"], Formatter.MemCopy) + + b = Statistics(enable=True) + b.add_report_entry("copy", ["b2", "t2", "d2"], Formatter.MemCopy) + + with pytest.raises(ValueError): + Statistics.merge([a, b]) + + +def test_merge_preserves_report_entry_formatter() -> None: + # Both sides declare the same Bytes report entry; the merged report + # renders with the Bytes formatter. + a = Statistics(enable=True) + a.add_report_entry("payload", ["payload"], Formatter.Bytes) + a.add_stat("payload", 1024.0) + + b = Statistics(enable=True) + b.add_report_entry("payload", ["payload"], Formatter.Bytes) + b.add_stat("payload", 1024.0) + + merged = Statistics.merge([a, b]) + assert "2 KiB" in merged.report() def test_add_report_entry_bytes() -> None: From f82126ed5c5e9806e9a366429e428f5f96ff7a27 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 20 Apr 2026 09:58:24 +0200 Subject: [PATCH 3/7] remove has_report_entry --- cpp/include/rapidsmpf/statistics.hpp | 25 ++++++++------- cpp/src/statistics.cpp | 46 +++++++++++++++++----------- cpp/src/streaming/cudf/parquet.cpp | 12 +++----- 3 files changed, 45 insertions(+), 38 deletions(-) diff --git a/cpp/include/rapidsmpf/statistics.hpp b/cpp/include/rapidsmpf/statistics.hpp index d8a1a1263..9a63a861b 100644 --- a/cpp/include/rapidsmpf/statistics.hpp +++ b/cpp/include/rapidsmpf/statistics.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #include #include +#include #include #include @@ -435,25 +437,22 @@ class Statistics { */ void add_report_entry( std::string const& report_entry_name, - std::vector const& stat_names, + std::initializer_list stat_names, Formatter formatter ); + // clang-format off /** - * @brief Check whether a report entry is already registered under @p name. + * @copydoc add_report_entry(std::string const&,std::initializer_list, Formatter) * - * Intended as a cheap pre-check so hot-path callers can skip building - * the `stat_names` vector when the entry is already present. - * - * @note The result may be stale by the time it is acted upon. It must - * only be used as an optimization hint. Because report entries cannot - * be removed, once this returns `true` for a given @p name it will - * never return `false` again. - * - * @param name Report entry name to look up. - * @return True if a report entry is registered under @p name. + * Overload for callers whose stat names come from a runtime container (e.g. the Python bindings). */ - [[nodiscard]] bool has_report_entry(std::string const& name) const; + // clang-format on + void add_report_entry( + std::string const& report_entry_name, + std::vector stat_names, + Formatter formatter + ); /** * @brief Adds a byte count to the named statistic. diff --git a/cpp/src/statistics.cpp b/cpp/src/statistics.cpp index 075749da7..0932fbe9f 100644 --- a/cpp/src/statistics.cpp +++ b/cpp/src/statistics.cpp @@ -211,35 +211,49 @@ void Statistics::add_stat(std::string const& name, double value) { void Statistics::add_report_entry( std::string const& report_entry_name, - std::vector const& stat_names, + std::initializer_list stat_names, Formatter formatter ) { if (!enabled()) { return; } std::lock_guard lock(mutex_); + if (report_entries_.contains(report_entry_name)) { + return; + } + std::vector names(stat_names.begin(), stat_names.end()); report_entries_.try_emplace( - report_entry_name, ReportEntry{.stat_names = stat_names, .formatter = formatter} + report_entry_name, + ReportEntry{.stat_names = std::move(names), .formatter = formatter} ); } -bool Statistics::has_report_entry(std::string const& name) const { +void Statistics::add_report_entry( + std::string const& report_entry_name, + std::vector stat_names, + Formatter formatter +) { + if (!enabled()) { + return; + } std::lock_guard lock(mutex_); - return report_entries_.contains(name); + if (report_entries_.contains(report_entry_name)) { + return; + } + report_entries_.try_emplace( + report_entry_name, + ReportEntry{.stat_names = std::move(stat_names), .formatter = formatter} + ); } void Statistics::add_bytes_stat(std::string const& name, std::size_t nbytes) { add_stat(name, static_cast(nbytes)); - if (!has_report_entry(name)) { - add_report_entry(name, {name}, Formatter::Bytes); - } + add_report_entry(name, {name}, Formatter::Bytes); } void Statistics::add_duration_stat(std::string const& name, Duration seconds) { add_stat(name, seconds.count()); - if (!has_report_entry(name)) { - add_report_entry(name, {name}, Formatter::Duration); - } + add_report_entry(name, {name}, Formatter::Duration); } std::vector Statistics::list_stat_names() const { @@ -709,11 +723,9 @@ void Statistics::record_copy( timing.stop_and_record(names.time, names.stream_delay); add_stat(names.nbytes, static_cast(nbytes)); - if (!has_report_entry(names.base)) { - add_report_entry( - names.base, {names.nbytes, names.time, names.stream_delay}, Formatter::MemCopy - ); - } + add_report_entry( + names.base, {names.nbytes, names.time, names.stream_delay}, Formatter::MemCopy + ); } void Statistics::record_alloc( @@ -738,9 +750,7 @@ void Statistics::record_alloc( timing.stop_and_record(n.time, n.stream_delay); add_stat(n.nbytes, static_cast(nbytes)); - if (!has_report_entry(n.base)) { - add_report_entry(n.base, {n.nbytes, n.time, n.stream_delay}, Formatter::MemAlloc); - } + add_report_entry(n.base, {n.nbytes, n.time, n.stream_delay}, Formatter::MemAlloc); } } // namespace rapidsmpf diff --git a/cpp/src/streaming/cudf/parquet.cpp b/cpp/src/streaming/cudf/parquet.cpp index 3aff6fa44..2edd16625 100644 --- a/cpp/src/streaming/cudf/parquet.cpp +++ b/cpp/src/streaming/cudf/parquet.cpp @@ -107,13 +107,11 @@ class FileCache { std::optional get(std::shared_ptr ctx, Key const& key) const { auto& stats = *ctx->statistics(); - if (!stats.has_report_entry("unbounded_file_read_cache hits")) { - stats.add_report_entry( - "unbounded_file_read_cache hits", - {"unbounded_file_read_cache hits"}, - Statistics::Formatter::HitRate - ); - } + stats.add_report_entry( + "unbounded_file_read_cache hits", + {"unbounded_file_read_cache hits"}, + Statistics::Formatter::HitRate + ); SpillableMessages::MessageId mid; { From cb07e2a1dba1a47d8174716b66a0930154fccdb3 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 20 Apr 2026 11:15:49 +0200 Subject: [PATCH 4/7] Apply suggestions from code review Co-authored-by: Peter Andreas Entschev --- cpp/src/statistics.cpp | 2 +- cpp/tests/test_statistics.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/statistics.cpp b/cpp/src/statistics.cpp index 0932fbe9f..83a613507 100644 --- a/cpp/src/statistics.cpp +++ b/cpp/src/statistics.cpp @@ -42,7 +42,7 @@ using Names2DArray = std::array; using FormatterFn = void (*)(std::ostream&, std::vector const&); constexpr std::array(Statistics::Formatter::_Count)> FORMATTERS = {{ - // Implement `Statistics::Formatter:Default` + // Implement `Statistics::Formatter::Default` [](std::ostream& os, std::vector const& s) { os << s.at(0).value(); if (s.at(0).count() > 1) { diff --git a/cpp/tests/test_statistics.cpp b/cpp/tests/test_statistics.cpp index e812525be..48248dd32 100644 --- a/cpp/tests/test_statistics.cpp +++ b/cpp/tests/test_statistics.cpp @@ -119,7 +119,7 @@ TEST_F(StatisticsTest, MultiStatReportEntry) { // Component stats are consumed by the report entry and don't emit // their own lines. EXPECT_THAT( - stats.report(), ::testing::Not(::testing::HasSubstr("copy-summary-bytes:")) + stats.report(), ::testing::Not(::testing::HasSubstr("copy-summary-bytes")) ); } From 3a61ede5c16b18291dedcfdedd30bfaf0aa1e57c Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 20 Apr 2026 11:22:15 +0200 Subject: [PATCH 5/7] MemCopyAllocFormatter --- cpp/src/statistics.cpp | 43 +++++++++++++++++++----------------------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/cpp/src/statistics.cpp b/cpp/src/statistics.cpp index 83a613507..056f5b0f9 100644 --- a/cpp/src/statistics.cpp +++ b/cpp/src/statistics.cpp @@ -37,9 +37,24 @@ struct Names { using NamesArray = std::array; using Names2DArray = std::array; -// Predefined render functions indexed by `Statistics::Formatter`. Per-entry -// rendering description lives on the enum in statistics.hpp. +// Predefined render functions. using FormatterFn = void (*)(std::ostream&, std::vector const&); + +// Shared by the `MemCopy` and `MemAlloc` formatters, both render identically. +constexpr FormatterFn MemCopyAllocFormatter = [](std::ostream& os, + std::vector const& s) { + RAPIDSMPF_EXPECTS( + s.at(0).count() == s.at(1).count() && s.at(1).count() == s.at(2).count(), + "memory copy and allocation formatters expect the record counters to match" + ); + os << format_nbytes(s.at(0).value()) << " | " << format_duration(s.at(1).value()) + << " | " << format_nbytes(s.at(0).value() / s.at(1).value()) << "/s" + << " | avg-stream-delay " + << format_duration(s.at(2).value() / static_cast(s.at(1).count())); +}; + +// Formatters indexed by `Statistics::Formatter`. Per-entry rendering description lives on +// the enum `Statistics::Formatter` in statistics.hpp. constexpr std::array(Statistics::Formatter::_Count)> FORMATTERS = {{ // Implement `Statistics::Formatter::Default` @@ -72,29 +87,9 @@ constexpr std::array(Statistics::Formatter os << s.at(0).value() << "/" << s.at(0).count() << " (hits/lookups)"; }, // Implement `Statistics::Formatter:MemCopy` - [](std::ostream& os, std::vector const& s) { - RAPIDSMPF_EXPECTS( - s.at(0).count() == s.at(1).count() && s.at(1).count() == s.at(2).count(), - "MemCopy formatter expects the record counters to match" - ); - os << format_nbytes(s.at(0).value()) << " | " - << format_duration(s.at(1).value()) << " | " - << format_nbytes(s.at(0).value() / s.at(1).value()) << "/s" - << " | avg-stream-delay " - << format_duration(s.at(2).value() / static_cast(s.at(1).count())); - }, + MemCopyAllocFormatter, // Implement `Statistics::Formatter:MemAlloc` - [](std::ostream& os, std::vector const& s) { - RAPIDSMPF_EXPECTS( - s.at(0).count() == s.at(1).count() && s.at(1).count() == s.at(2).count(), - "MemAlloc formatter expects the record counters to match" - ); - os << format_nbytes(s.at(0).value()) << " | " - << format_duration(s.at(1).value()) << " | " - << format_nbytes(s.at(0).value() / s.at(1).value()) << "/s" - << " | avg-stream-delay " - << format_duration(s.at(2).value() / static_cast(s.at(1).count())); - }, + MemCopyAllocFormatter, }}; } // namespace From 6034086e14ac60d0d38821f9d1a28b936ea1f7ef Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 20 Apr 2026 11:26:17 +0200 Subject: [PATCH 6/7] sty --- cpp/src/statistics.cpp | 2 +- cpp/tests/test_statistics.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/statistics.cpp b/cpp/src/statistics.cpp index 056f5b0f9..540bfbfca 100644 --- a/cpp/src/statistics.cpp +++ b/cpp/src/statistics.cpp @@ -57,7 +57,7 @@ constexpr FormatterFn MemCopyAllocFormatter = [](std::ostream& os, // the enum `Statistics::Formatter` in statistics.hpp. constexpr std::array(Statistics::Formatter::_Count)> FORMATTERS = {{ - // Implement `Statistics::Formatter::Default` + // Implement `Statistics::Formatter::Default` [](std::ostream& os, std::vector const& s) { os << s.at(0).value(); if (s.at(0).count() > 1) { diff --git a/cpp/tests/test_statistics.cpp b/cpp/tests/test_statistics.cpp index 48248dd32..79260de48 100644 --- a/cpp/tests/test_statistics.cpp +++ b/cpp/tests/test_statistics.cpp @@ -119,7 +119,7 @@ TEST_F(StatisticsTest, MultiStatReportEntry) { // Component stats are consumed by the report entry and don't emit // their own lines. EXPECT_THAT( - stats.report(), ::testing::Not(::testing::HasSubstr("copy-summary-bytes")) + stats.report(), ::testing::Not(::testing::HasSubstr("copy-summary-bytes")) ); } From a0b81012b84c8f1d9362ad34240c22139e62698c Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Mon, 20 Apr 2026 11:37:26 +0200 Subject: [PATCH 7/7] MemoryThroughput --- cpp/include/rapidsmpf/statistics.hpp | 11 ++--- cpp/src/statistics.cpp | 43 +++++++++---------- cpp/tests/test_statistics.cpp | 16 +++---- python/rapidsmpf/rapidsmpf/statistics.pxd | 3 +- python/rapidsmpf/rapidsmpf/statistics.pyi | 3 +- .../rapidsmpf/tests/test_statistics.py | 8 ++-- 6 files changed, 39 insertions(+), 45 deletions(-) diff --git a/cpp/include/rapidsmpf/statistics.hpp b/cpp/include/rapidsmpf/statistics.hpp index 9a63a861b..ade742503 100644 --- a/cpp/include/rapidsmpf/statistics.hpp +++ b/cpp/include/rapidsmpf/statistics.hpp @@ -57,7 +57,7 @@ class StreamOrderedTiming; * {"copy-device-to-host-bytes", * "copy-device-to-host-time", * "copy-device-to-host-stream-delay"}, - * Statistics::Formatter::MemCopy + * Statistics::Formatter::MemoryThroughput * ); * * stats.add_bytes_stat("spill-bytes", 1024); // helper: registers Bytes entry @@ -89,12 +89,10 @@ class Statistics { * - HitRate (1 stat): * "42/100 (hits/lookups)" * - * - MemCopy (3 stats: bytes, time, delay): + * - MemoryThroughput (3 stats: bytes, time, stream-delay), where `stream-delay` is + * the wall-clock gap between CPU submission and GPU execution of the operation: * "1.2 GiB | 2.5 ms | 480 GiB/s | avg-stream-delay 10 us" * - * - MemAlloc (3 stats: bytes, time, delay): - * "512 MiB | 1.0 ms | 512 GiB/s | avg-stream-delay 5 us" - * * `_Count` is an internal sentinel — always keep it last. */ enum class Formatter : std::uint8_t { @@ -102,8 +100,7 @@ class Statistics { Bytes, Duration, HitRate, - MemCopy, - MemAlloc, + MemoryThroughput, _Count, ///< Sentinel; must remain last. }; diff --git a/cpp/src/statistics.cpp b/cpp/src/statistics.cpp index 540bfbfca..d388e59b3 100644 --- a/cpp/src/statistics.cpp +++ b/cpp/src/statistics.cpp @@ -40,19 +40,6 @@ using Names2DArray = std::array; // Predefined render functions. using FormatterFn = void (*)(std::ostream&, std::vector const&); -// Shared by the `MemCopy` and `MemAlloc` formatters, both render identically. -constexpr FormatterFn MemCopyAllocFormatter = [](std::ostream& os, - std::vector const& s) { - RAPIDSMPF_EXPECTS( - s.at(0).count() == s.at(1).count() && s.at(1).count() == s.at(2).count(), - "memory copy and allocation formatters expect the record counters to match" - ); - os << format_nbytes(s.at(0).value()) << " | " << format_duration(s.at(1).value()) - << " | " << format_nbytes(s.at(0).value() / s.at(1).value()) << "/s" - << " | avg-stream-delay " - << format_duration(s.at(2).value() / static_cast(s.at(1).count())); -}; - // Formatters indexed by `Statistics::Formatter`. Per-entry rendering description lives on // the enum `Statistics::Formatter` in statistics.hpp. constexpr std::array(Statistics::Formatter::_Count)> @@ -64,7 +51,7 @@ constexpr std::array(Statistics::Formatter os << " (count " << s.at(0).count() << ")"; } }, - // Implement `Statistics::Formatter:Bytes` + // Implement `Statistics::Formatter::Bytes` [](std::ostream& os, std::vector const& s) { auto const val = s.at(0).value(); auto const count = s.at(0).count(); @@ -73,7 +60,7 @@ constexpr std::array(Statistics::Formatter os << " | avg " << format_nbytes(val / static_cast(count)); } }, - // Implement `Statistics::Formatter:Duration` + // Implement `Statistics::Formatter::Duration` [](std::ostream& os, std::vector const& s) { auto const val = s.at(0).value(); auto const count = s.at(0).count(); @@ -82,14 +69,22 @@ constexpr std::array(Statistics::Formatter os << " | avg " << format_duration(val / static_cast(count)); } }, - // Implement `Statistics::Formatter:HitRate` + // Implement `Statistics::Formatter::HitRate` [](std::ostream& os, std::vector const& s) { os << s.at(0).value() << "/" << s.at(0).count() << " (hits/lookups)"; }, - // Implement `Statistics::Formatter:MemCopy` - MemCopyAllocFormatter, - // Implement `Statistics::Formatter:MemAlloc` - MemCopyAllocFormatter, + // Implement `Statistics::Formatter::MemoryThroughput` + [](std::ostream& os, std::vector const& s) { + RAPIDSMPF_EXPECTS( + s.at(0).count() == s.at(1).count() && s.at(1).count() == s.at(2).count(), + "MemoryThroughput formatter expects the record counters to match" + ); + os << format_nbytes(s.at(0).value()) << " | " + << format_duration(s.at(1).value()) << " | " + << format_nbytes(s.at(0).value() / s.at(1).value()) << "/s" + << " | avg-stream-delay " + << format_duration(s.at(2).value() / static_cast(s.at(1).count())); + }, }}; } // namespace @@ -719,7 +714,9 @@ void Statistics::record_copy( timing.stop_and_record(names.time, names.stream_delay); add_stat(names.nbytes, static_cast(nbytes)); add_report_entry( - names.base, {names.nbytes, names.time, names.stream_delay}, Formatter::MemCopy + names.base, + {names.nbytes, names.time, names.stream_delay}, + Formatter::MemoryThroughput ); } @@ -745,7 +742,9 @@ void Statistics::record_alloc( timing.stop_and_record(n.time, n.stream_delay); add_stat(n.nbytes, static_cast(nbytes)); - add_report_entry(n.base, {n.nbytes, n.time, n.stream_delay}, Formatter::MemAlloc); + add_report_entry( + n.base, {n.nbytes, n.time, n.stream_delay}, Formatter::MemoryThroughput + ); } } // namespace rapidsmpf diff --git a/cpp/tests/test_statistics.cpp b/cpp/tests/test_statistics.cpp index 79260de48..ff577e80e 100644 --- a/cpp/tests/test_statistics.cpp +++ b/cpp/tests/test_statistics.cpp @@ -67,10 +67,10 @@ TEST_F(StatisticsTest, Communication) { TEST_F(StatisticsTest, AddReportEntryArityMismatchThrowsOnRender) { rapidsmpf::Statistics stats; - // MemCopy expects 3 stats; passing one is accepted at registration but + // MemoryThroughput expects 3 stats; passing one is accepted at registration but // fails when report() tries to render the entry. stats.add_report_entry( - "bad", {"only-one"}, rapidsmpf::Statistics::Formatter::MemCopy + "bad", {"only-one"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); stats.add_stat("only-one", 1.0); EXPECT_THROW(std::ignore = stats.report(), std::out_of_range); @@ -105,11 +105,11 @@ TEST_F(StatisticsTest, AddReportEntryFirstWins) { TEST_F(StatisticsTest, MultiStatReportEntry) { rapidsmpf::Statistics stats; - // Build a MemCopy-style 3-stat report entry. + // Build a MemoryThroughput-style 3-stat report entry. stats.add_report_entry( "copy-summary", {"copy-summary-bytes", "copy-summary-time", "copy-summary-stream-delay"}, - rapidsmpf::Statistics::Formatter::MemCopy + rapidsmpf::Statistics::Formatter::MemoryThroughput ); stats.add_stat("copy-summary-bytes", 1024 * 1024); stats.add_stat("copy-summary-time", 0.001); @@ -128,7 +128,7 @@ TEST_F(StatisticsTest, ReportNoDataCollected) { stats.add_report_entry( "spill-summary", {"spill-bytes", "spill-time", "spill-delay"}, - rapidsmpf::Statistics::Formatter::MemCopy + rapidsmpf::Statistics::Formatter::MemoryThroughput ); // No stats recorded — entry should still appear with "No data collected". EXPECT_THAT(stats.report(), ::testing::HasSubstr("spill-summary")); @@ -487,7 +487,7 @@ TEST_F(StatisticsTest, SerializeRoundTripWithReportEntries) { stats.add_report_entry( "copy", {"copy-bytes", "copy-time", "copy-delay"}, - rapidsmpf::Statistics::Formatter::MemCopy + rapidsmpf::Statistics::Formatter::MemoryThroughput ); stats.add_stat("copy-bytes", 1024.0 * 1024.0); stats.add_stat("copy-time", 0.002); @@ -665,12 +665,12 @@ TEST_F(StatisticsTest, MergeEnabledFlagPropagates) { TEST_F(StatisticsTest, MergeRejectsConflictingStatNames) { auto a = std::make_shared(); a->add_report_entry( - "copy", {"b1", "t1", "d1"}, rapidsmpf::Statistics::Formatter::MemCopy + "copy", {"b1", "t1", "d1"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); auto b = std::make_shared(); b->add_report_entry( - "copy", {"b2", "t2", "d2"}, rapidsmpf::Statistics::Formatter::MemCopy + "copy", {"b2", "t2", "d2"}, rapidsmpf::Statistics::Formatter::MemoryThroughput ); std::vector> inputs{a, b}; diff --git a/python/rapidsmpf/rapidsmpf/statistics.pxd b/python/rapidsmpf/rapidsmpf/statistics.pxd index 4fbe1c854..71d41d9bc 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pxd +++ b/python/rapidsmpf/rapidsmpf/statistics.pxd @@ -21,8 +21,7 @@ cdef extern from "" nogil: Bytes Duration HitRate - MemCopy - MemAlloc + MemoryThroughput # `_Count` sentinel from the C++ enum is intentionally omitted — # it's an internal implementation detail not meant for Python callers. diff --git a/python/rapidsmpf/rapidsmpf/statistics.pyi b/python/rapidsmpf/rapidsmpf/statistics.pyi index c94d3e346..23713e4ff 100644 --- a/python/rapidsmpf/rapidsmpf/statistics.pyi +++ b/python/rapidsmpf/rapidsmpf/statistics.pyi @@ -18,8 +18,7 @@ class Formatter(IntEnum): Bytes = ... Duration = ... HitRate = ... - MemCopy = ... - MemAlloc = ... + MemoryThroughput = ... class Statistics: def __init__( diff --git a/python/rapidsmpf/rapidsmpf/tests/test_statistics.py b/python/rapidsmpf/rapidsmpf/tests/test_statistics.py index f500ebd95..68f9b8bd2 100644 --- a/python/rapidsmpf/rapidsmpf/tests/test_statistics.py +++ b/python/rapidsmpf/rapidsmpf/tests/test_statistics.py @@ -251,10 +251,10 @@ def test_merge_rejects_conflicting_formatter() -> None: def test_merge_rejects_conflicting_stat_names() -> None: a = Statistics(enable=True) - a.add_report_entry("copy", ["b1", "t1", "d1"], Formatter.MemCopy) + a.add_report_entry("copy", ["b1", "t1", "d1"], Formatter.MemoryThroughput) b = Statistics(enable=True) - b.add_report_entry("copy", ["b2", "t2", "d2"], Formatter.MemCopy) + b.add_report_entry("copy", ["b2", "t2", "d2"], Formatter.MemoryThroughput) with pytest.raises(ValueError): Statistics.merge([a, b]) @@ -288,7 +288,7 @@ def test_add_report_entry_memcopy() -> None: stats.add_stat("copy-time", 0.002) stats.add_stat("copy-delay", 0.00001) stats.add_report_entry( - "copy", ["copy-bytes", "copy-time", "copy-delay"], Formatter.MemCopy + "copy", ["copy-bytes", "copy-time", "copy-delay"], Formatter.MemoryThroughput ) assert "1 MiB" in stats.report() @@ -312,7 +312,7 @@ def test_pickle_roundtrip() -> None: stats.add_report_entry( "copy", ["copy-bytes", "copy-time", "copy-delay"], - Formatter.MemCopy, + Formatter.MemoryThroughput, ) stats.add_stat("copy-bytes", 1024 * 1024) stats.add_stat("copy-time", 0.002)