From d044a83ede163ca2a0d17233b8ce0d42500d7b5d Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Wed, 11 Mar 2026 06:24:22 +0530 Subject: [PATCH 01/10] fix(build): define schema to meet new `bear` validation rules --- src/.bear-tidy-config | 1 + 1 file changed, 1 insertion(+) diff --git a/src/.bear-tidy-config b/src/.bear-tidy-config index fd12f3d130ba..b4a22b96a696 100644 --- a/src/.bear-tidy-config +++ b/src/.bear-tidy-config @@ -1,4 +1,5 @@ { + "schema": "4.0", "output": { "content": { "include_only_existing_source": true, From 3dfa8ed36dd7097ad1aa0168ed888ae8b3faba87 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 9 Mar 2026 21:24:44 +0530 Subject: [PATCH 02/10] fix: sync and pin dependencies between `ci-slim` and `04-install.sh` --- ci/lint/04_install.sh | 2 ++ contrib/containers/ci/ci-slim.Dockerfile | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/ci/lint/04_install.sh b/ci/lint/04_install.sh index 9ca5ebe37216..64740093ccf9 100755 --- a/ci/lint/04_install.sh +++ b/ci/lint/04_install.sh @@ -36,7 +36,9 @@ fi # NOTE: BUMP ALSO contrib/containers/ci/ci-slim.Dockerfile ${CI_RETRY_EXE} pip3 install codespell==2.2.1 ${CI_RETRY_EXE} pip3 install flake8==5.0.4 +${CI_RETRY_EXE} pip3 install jinja2==3.1.6 ${CI_RETRY_EXE} pip3 install lief==0.13.2 +${CI_RETRY_EXE} pip3 install multiprocess==0.70.19 ${CI_RETRY_EXE} pip3 install mypy==0.981 ${CI_RETRY_EXE} pip3 install pyzmq==24.0.1 ${CI_RETRY_EXE} pip3 install vulture==2.6 diff --git a/contrib/containers/ci/ci-slim.Dockerfile b/contrib/containers/ci/ci-slim.Dockerfile index 5332f8504a57..1ed0eec9e0e0 100644 --- a/contrib/containers/ci/ci-slim.Dockerfile +++ b/contrib/containers/ci/ci-slim.Dockerfile @@ -79,9 +79,9 @@ ENV UV_SYSTEM_PYTHON=1 RUN uv pip install --system --break-system-packages \ codespell==2.2.1 \ flake8==5.0.4 \ - jinja2 \ + jinja2==3.1.6 \ lief==0.13.2 \ - multiprocess \ + multiprocess==0.70.19 \ mypy==0.981 \ pyzmq==24.0.1 \ vulture==2.6 From c9465e8a495893df28f3cf9e94977b231879b994 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 9 Mar 2026 21:25:44 +0530 Subject: [PATCH 03/10] build: add pinned `aiohttp` and `tabulate` Python dependencies --- ci/lint/04_install.sh | 2 ++ contrib/containers/ci/ci-slim.Dockerfile | 2 ++ 2 files changed, 4 insertions(+) diff --git a/ci/lint/04_install.sh b/ci/lint/04_install.sh index 64740093ccf9..0d8ac8312e53 100755 --- a/ci/lint/04_install.sh +++ b/ci/lint/04_install.sh @@ -34,6 +34,7 @@ if [ -z "${SKIP_PYTHON_INSTALL}" ]; then fi # NOTE: BUMP ALSO contrib/containers/ci/ci-slim.Dockerfile +${CI_RETRY_EXE} pip3 install aiohttp==3.13.3 ${CI_RETRY_EXE} pip3 install codespell==2.2.1 ${CI_RETRY_EXE} pip3 install flake8==5.0.4 ${CI_RETRY_EXE} pip3 install jinja2==3.1.6 @@ -41,6 +42,7 @@ ${CI_RETRY_EXE} pip3 install lief==0.13.2 ${CI_RETRY_EXE} pip3 install multiprocess==0.70.19 ${CI_RETRY_EXE} pip3 install mypy==0.981 ${CI_RETRY_EXE} pip3 install pyzmq==24.0.1 +${CI_RETRY_EXE} pip3 install tabulate==0.10.0 ${CI_RETRY_EXE} pip3 install vulture==2.6 SHELLCHECK_VERSION=v0.8.0 diff --git a/contrib/containers/ci/ci-slim.Dockerfile b/contrib/containers/ci/ci-slim.Dockerfile index 1ed0eec9e0e0..a2f13d28ea69 100644 --- a/contrib/containers/ci/ci-slim.Dockerfile +++ b/contrib/containers/ci/ci-slim.Dockerfile @@ -77,6 +77,7 @@ ENV UV_SYSTEM_PYTHON=1 # Install Python packages # NOTE: if versions are changed, update ci/lint/04_install.sh RUN uv pip install --system --break-system-packages \ + aiohttp==3.13.3 \ codespell==2.2.1 \ flake8==5.0.4 \ jinja2==3.1.6 \ @@ -84,6 +85,7 @@ RUN uv pip install --system --break-system-packages \ multiprocess==0.70.19 \ mypy==0.981 \ pyzmq==24.0.1 \ + tabulate==0.10.0 \ vulture==2.6 # Install packages relied on by tests From 9e2f44f1b4b9402e20fc904c82d825719e86b70e Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Thu, 12 Mar 2026 20:51:12 +0530 Subject: [PATCH 04/10] test: add bench framework skeleton and example benchmark --- Makefile.am | 3 +- test/bench/bench_framework.py | 126 ++++++++++++++++++++++++++++++++++ test/bench/example_bench.py | 44 ++++++++++++ 3 files changed, 172 insertions(+), 1 deletion(-) create mode 100755 test/bench/bench_framework.py create mode 100755 test/bench/example_bench.py diff --git a/Makefile.am b/Makefile.am index ea2cba9fd2fa..051fca740ee1 100644 --- a/Makefile.am +++ b/Makefile.am @@ -252,6 +252,7 @@ dist_noinst_SCRIPTS = autogen.sh EXTRA_DIST = $(DIST_SHARE) $(DIST_CONTRIB) $(WINDOWS_PACKAGING) $(OSX_PACKAGING) $(BIN_CHECKS) EXTRA_DIST += \ + test/bench \ test/functional \ test/fuzz @@ -319,5 +320,5 @@ clean-docs: clean-local: clean-docs rm -rf coverage_percent.txt test_dash.coverage/ total.coverage/ fuzz.coverage/ test/tmp/ cache/ $(OSX_APP) - rm -rf test/functional/__pycache__ test/functional/test_framework/__pycache__ test/cache share/rpcauth/__pycache__ + rm -rf test/bench/__pycache__ test/functional/__pycache__ test/functional/test_framework/__pycache__ test/cache share/rpcauth/__pycache__ rm -rf dist/ diff --git a/test/bench/bench_framework.py b/test/bench/bench_framework.py new file mode 100755 index 000000000000..687656019a26 --- /dev/null +++ b/test/bench/bench_framework.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 The Dash Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import os +import sys +import time +from typing import Dict, List, Optional + +# Allow imports from the functional test framework. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'functional')) + +from test_framework.test_framework import BitcoinTestFramework # noqa: E402 + + +class BenchFramework(BitcoinTestFramework): + + def set_test_params(self) -> None: + """Initialise benchmark state, then delegate to ``set_bench_params``.""" + self.warmup_iterations: int = 0 + self.bench_iterations: int = 1 + self.bench_name: str = type(self).__name__ + # Raw latency samples keyed by measurement name. + self._samples: Dict[str, List[float]] = {} + self._timer_start: Optional[float] = None + self.set_bench_params() + + def setup_nodes(self) -> None: + """Merge --daemon-args into extra_args just before nodes start.""" + raw = getattr(self.options, "daemon_args", None) or "" + daemon_args = raw.split() if raw else [] + if daemon_args and self.extra_args is not None: + for node_args in self.extra_args: + node_args.extend(daemon_args) + super().setup_nodes() + + def run_test(self) -> None: + """Execute warmup, timed iterations, then report.""" + self.results_file = getattr(self.options, "results_file", None) + if self.warmup_iterations > 0: + self.log.info( + "Warming up (%d iteration%s)...", + self.warmup_iterations, + "s" if self.warmup_iterations != 1 else "", + ) + for i in range(self.warmup_iterations): + self.log.debug(" warmup %d/%d", i + 1, self.warmup_iterations) + self.run_bench() + self._samples.clear() + + self.log.info( + "Running benchmark (%d iteration%s)...", + self.bench_iterations, + "s" if self.bench_iterations != 1 else "", + ) + for i in range(self.bench_iterations): + self.log.debug(" iteration %d/%d", i + 1, self.bench_iterations) + self.run_bench() + + self._report_results() + + def add_options(self, parser) -> None: # type: ignore[override] + """Adds bench-specific args. Subclasses should call super first.""" + parser.add_argument( + "--daemon-args", + dest="daemon_args", + default=None, + help="Extra daemon arguments as a single string " + "(e.g. --daemon-args=\"-rpcworkqueue=1024 -rpcthreads=8\")", + ) + + def set_bench_params(self) -> None: + """Benchmarks must override this to set ``num_nodes``, etc.""" + raise NotImplementedError + + def run_bench(self) -> None: + """Benchmarks must override this to define the workload.""" + raise NotImplementedError + + def start_timer(self) -> None: + """Mark the beginning of a timed section.""" + if self._timer_start is not None: + self.log.warning("start_timer() called twice without stop_timer()") + self._timer_start = time.perf_counter() + + def stop_timer(self, name: str) -> float: + """Record elapsed time (ms) since the last ``start_timer()`` call, returns in ms.""" + if self._timer_start is None: + raise RuntimeError("stop_timer() called without start_timer()") + elapsed_ms = (time.perf_counter() - self._timer_start) * 1000.0 + self._timer_start = None + self._samples.setdefault(name, []).append(elapsed_ms) + return elapsed_ms + + def record_sample(self, name: str, value_ms: float) -> None: + """Directly record a latency sample (ms) without using the timer.""" + self._samples.setdefault(name, []).append(value_ms) + + def _report_results(self) -> None: + """Print a summary of all recorded measurements.""" + self.log.info("=" * 60) + self.log.info("Benchmark: %s", self.bench_name) + self.log.info("=" * 60) + for name, samples in self._samples.items(): + n = len(samples) + if n == 0: + continue + samples_sorted = sorted(samples) + total = sum(samples_sorted) + mean = total / n + p50 = samples_sorted[n // 2] + p99_idx = min(int(n * 0.99), n - 1) + p99 = samples_sorted[p99_idx] + self.log.info( + " %-30s n=%-6d mean=%8.2fms p50=%8.2fms " + "p99=%8.2fms min=%8.2fms max=%8.2fms", + name, n, mean, p50, p99, + samples_sorted[0], samples_sorted[-1], + ) + self.log.info("=" * 60) + + @property + def samples(self) -> Dict[str, List[float]]: + """Access the raw sample data.""" + return self._samples diff --git a/test/bench/example_bench.py b/test/bench/example_bench.py new file mode 100755 index 000000000000..928593653c94 --- /dev/null +++ b/test/bench/example_bench.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 The Dash Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +from bench_framework import BenchFramework + + +class ExampleBench(BenchFramework): + + def set_test_params(self) -> None: + super().set_test_params() + + def run_test(self) -> None: + super().run_test() + + def set_bench_params(self) -> None: + self.bench_iterations = 3 + self.bench_name = "example_rpc" + self.num_nodes = 1 + self.setup_clean_chain = False + self.warmup_iterations = 1 + + def run_bench(self) -> None: + node = self.nodes[0] + + for _ in range(100): + self.start_timer() + node.getblockcount() + self.stop_timer("getblockcount") + + for _ in range(100): + self.start_timer() + node.getbestblockhash() + self.stop_timer("getbestblockhash") + + for _ in range(50): + self.start_timer() + node.getblockchaininfo() + self.stop_timer("getblockchaininfo") + + +if __name__ == '__main__': + ExampleBench().main() From 9a71d8e6528b0435def9421a17ac5607d155744d Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Thu, 12 Mar 2026 20:51:32 +0530 Subject: [PATCH 05/10] test: implement bench runner with pretty-printing --- configure.ac | 1 + test/bench/bench_framework.py | 39 +++++-- test/bench/bench_results.py | 97 ++++++++++++++++ test/bench/bench_runner.py | 208 ++++++++++++++++++++++++++++++++++ 4 files changed, 333 insertions(+), 12 deletions(-) create mode 100755 test/bench/bench_results.py create mode 100755 test/bench/bench_runner.py diff --git a/configure.ac b/configure.ac index 31951e1f7c47..1ead2d6806a4 100644 --- a/configure.ac +++ b/configure.ac @@ -2062,6 +2062,7 @@ AC_CONFIG_LINKS([src/.bear-tidy-config:src/.bear-tidy-config]) AC_CONFIG_LINKS([src/.clang-tidy:src/.clang-tidy]) AC_CONFIG_LINKS([src/ipc/.clang-tidy:src/ipc/.clang-tidy]) AC_CONFIG_LINKS([src/test/.clang-tidy:src/test/.clang-tidy]) +AC_CONFIG_LINKS([test/bench/bench_runner.py:test/bench/bench_runner.py]) AC_CONFIG_LINKS([test/functional/test_runner.py:test/functional/test_runner.py]) AC_CONFIG_LINKS([test/fuzz/test_runner.py:test/fuzz/test_runner.py]) AC_CONFIG_LINKS([test/util/test_runner.py:test/util/test_runner.py]) diff --git a/test/bench/bench_framework.py b/test/bench/bench_framework.py index 687656019a26..82a9b36dd3d8 100755 --- a/test/bench/bench_framework.py +++ b/test/bench/bench_framework.py @@ -11,6 +11,10 @@ # Allow imports from the functional test framework. sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'functional')) +from bench_results import ( # noqa: E402 + BenchResult, + save_results, +) from test_framework.test_framework import BitcoinTestFramework # noqa: E402 @@ -21,6 +25,7 @@ def set_test_params(self) -> None: self.warmup_iterations: int = 0 self.bench_iterations: int = 1 self.bench_name: str = type(self).__name__ + self.results_file: Optional[str] = None # Raw latency samples keyed by measurement name. self._samples: Dict[str, List[float]] = {} self._timer_start: Optional[float] = None @@ -69,6 +74,12 @@ def add_options(self, parser) -> None: # type: ignore[override] help="Extra daemon arguments as a single string " "(e.g. --daemon-args=\"-rpcworkqueue=1024 -rpcthreads=8\")", ) + parser.add_argument( + "--results-file", + dest="results_file", + default=None, + help="Save results to a JSON file", + ) def set_bench_params(self) -> None: """Benchmarks must override this to set ``num_nodes``, etc.""" @@ -97,29 +108,33 @@ def record_sample(self, name: str, value_ms: float) -> None: """Directly record a latency sample (ms) without using the timer.""" self._samples.setdefault(name, []).append(value_ms) + def _build_results(self) -> List[BenchResult]: + """Convert raw samples into a list of ``BenchResult`` objects.""" + return [ + BenchResult.from_samples(name, samples) + for name, samples in self._samples.items() + if samples + ] + def _report_results(self) -> None: """Print a summary of all recorded measurements.""" + results = self._build_results() self.log.info("=" * 60) self.log.info("Benchmark: %s", self.bench_name) self.log.info("=" * 60) - for name, samples in self._samples.items(): - n = len(samples) - if n == 0: - continue - samples_sorted = sorted(samples) - total = sum(samples_sorted) - mean = total / n - p50 = samples_sorted[n // 2] - p99_idx = min(int(n * 0.99), n - 1) - p99 = samples_sorted[p99_idx] + for r in results: self.log.info( " %-30s n=%-6d mean=%8.2fms p50=%8.2fms " "p99=%8.2fms min=%8.2fms max=%8.2fms", - name, n, mean, p50, p99, - samples_sorted[0], samples_sorted[-1], + r.name, r.sample_count, r.mean_ms, r.p50_ms, + r.p99_ms, r.min_ms, r.max_ms, ) self.log.info("=" * 60) + if self.results_file: + save_results(results, self.results_file, label=self.bench_name) + self.log.info("Results saved to %s", self.results_file) + @property def samples(self) -> Dict[str, List[float]]: """Access the raw sample data.""" diff --git a/test/bench/bench_results.py b/test/bench/bench_results.py new file mode 100755 index 000000000000..ed14df1a7430 --- /dev/null +++ b/test/bench/bench_results.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 The Dash Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import json +import math +import statistics +from dataclasses import asdict, dataclass, field +from typing import Any, Dict, List, Optional + + +def _percentile(data: List[float], p: float) -> float: + """Return the *p*-th percentile (0–100) of a **sorted** list.""" + if not data: + return 0.0 + k = (len(data) - 1) * p / 100.0 + f = math.floor(k) + c = math.ceil(k) + if f == c: + return data[int(k)] + return data[f] * (c - k) + data[c] * (k - f) + + +@dataclass +class BenchResult: + """Statistics computed from a set of latency samples.""" + + name: str + sample_count: int = 0 + mean_ms: float = 0.0 + stddev_ms: float = 0.0 + min_ms: float = 0.0 + p50_ms: float = 0.0 + p90_ms: float = 0.0 + p99_ms: float = 0.0 + p999_ms: float = 0.0 + max_ms: float = 0.0 + total_ms: float = 0.0 + ops_per_sec: float = 0.0 + extra: Dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_samples( + cls, + name: str, + samples_ms: List[float], + extra: Optional[Dict[str, Any]] = None, + ) -> "BenchResult": + """Compute statistics from a list of latency values in ms.""" + r = cls(name=name) + if not samples_ms: + return r + + data = sorted(samples_ms) + r.sample_count = len(data) + r.total_ms = sum(data) + r.mean_ms = round(r.total_ms / r.sample_count, 3) + r.min_ms = round(data[0], 3) + r.p50_ms = round(_percentile(data, 50), 3) + r.p90_ms = round(_percentile(data, 90), 3) + r.p99_ms = round(_percentile(data, 99), 3) + r.p999_ms = round(_percentile(data, 99.9), 3) + r.max_ms = round(data[-1], 3) + r.total_ms = round(r.total_ms, 3) + if r.sample_count > 1: + r.stddev_ms = round(statistics.stdev(data), 3) + if r.total_ms > 0: + r.ops_per_sec = round(r.sample_count / (r.total_ms / 1000.0), 1) + if extra: + r.extra = extra + return r + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + @classmethod + def from_dict(cls, d: Dict[str, Any]) -> "BenchResult": + return cls(**d) + + +def save_results( + results: List[BenchResult], + path: str, + label: str = "", + metadata: Optional[Dict[str, Any]] = None, +) -> None: + """Write results to a JSON file.""" + data: Dict[str, Any] = { + "label": label, + "results": [r.to_dict() for r in results], + } + if metadata: + data["metadata"] = metadata + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + diff --git a/test/bench/bench_runner.py b/test/bench/bench_runner.py new file mode 100755 index 000000000000..b36170171445 --- /dev/null +++ b/test/bench/bench_runner.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 The Dash Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import argparse +import configparser +import os +import subprocess +import sys +import time +from glob import glob +from typing import List, Tuple + +TEST_EXIT_SKIPPED = 77 + +DEFAULT, BOLD, GREEN, RED, SKIP = ("", ""), ("", ""), ("", ""), ("", ""), ("", "") +if os.name != "nt" and sys.stdout.isatty(): + DEFAULT = ("\033[0m", "\033[0m") + BOLD = ("\033[0m", "\033[1m") + GREEN = ("\033[0m", "\033[0;32m") + RED = ("\033[0m", "\033[0;31m") + SKIP = ("\033[0m", "\033[0;33m") + +TICK, CROSS = "P ", "x" +try: + "\u2713".encode("utf_8").decode(sys.stdout.encoding) + TICK = "\u2713 " + CROSS = "\u2716 " +except Exception: + pass # Do nothing + + +def _get_build_id() -> str: + """Return daemon version string, or 'unknown' on failure.""" + config = configparser.ConfigParser() + configfile = os.path.join( + os.path.dirname(__file__), "..", "config.ini", + ) + try: + with open(configfile, encoding="utf8") as f: + config.read_file(f) + except (FileNotFoundError, configparser.Error): + return "unknown" + build_dir = config.get("environment", "BUILDDIR", fallback="") + dashd = os.path.join(build_dir, "src", "dashd") + if not os.path.isfile(dashd): + return "unknown" + try: + out = subprocess.check_output( + [dashd, "--version"], text=True, timeout=5, + ) + # First line: "Dash Core version v23.1.0-167-gceab392..." + first_line = out.strip().splitlines()[0] + return first_line.replace("Dash Core version ", "") + except (subprocess.SubprocessError, IndexError): + return "unknown" + + +def discover_benchmarks(bench_dir: str) -> List[str]: + """Return sorted list of benchmark script filenames.""" + pattern = os.path.join(bench_dir, "*_bench.py") + all_files = glob(pattern) + return sorted( + os.path.basename(f) for f in all_files + if not os.path.basename(f).startswith("bench_") + ) + + +def _extract_failure_log(output: str) -> str: + """Extract failure information from benchmark output.""" + lines = output.splitlines() + relevant: List[str] = [] + in_traceback = False + + for line in lines: + if "Traceback (most recent call last)" in line: + in_traceback = True + if in_traceback: + relevant.append(line) + if relevant and not line.startswith(" ") and "Traceback" not in line: + in_traceback = False + elif "(ERROR)" in line: + relevant.append(line) + + return "\n".join(relevant) if relevant else output + + +def run_benchmark( + bench_dir: str, + script: str, + extra_args: List[str], + timeout: int = 600, +) -> Tuple[int, float, str]: + """Run a single benchmark script.""" + cmd = [sys.executable, os.path.join(bench_dir, script)] + extra_args + t0 = time.time() + try: + result = subprocess.run( + cmd, + cwd=os.path.dirname(bench_dir) or ".", + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + timeout=timeout, + ) + elapsed = time.time() - t0 + return result.returncode, elapsed, result.stdout or "" + except subprocess.TimeoutExpired: + elapsed = time.time() - t0 + return 1, elapsed, f"Benchmark timed out after {timeout}s" + + +def main() -> None: + bench_dir = os.path.dirname(os.path.abspath(__file__)) + + # Split on '--': runner args before, benchmark passthrough after. + argv = sys.argv[1:] + if "--" in argv: + split = argv.index("--") + runner_argv = argv[:split] + passthrough = argv[split + 1:] + else: + runner_argv = argv + passthrough = [] + + parser = argparse.ArgumentParser( + description="Run Dash Core benchmarks", + ) + parser.add_argument( + "benchmarks", + nargs="*", + help="Specific benchmark scripts to run (default: all)", + ) + parser.add_argument( + "--list", + action="store_true", + help="List available benchmarks and exit", + ) + parser.add_argument( + "--timeout", + type=int, + default=600, + help="Per-benchmark timeout in seconds (default: 600)", + ) + args = parser.parse_args(runner_argv) + + available = discover_benchmarks(bench_dir) + + if args.list: + print("Available benchmarks:") + for name in available: + print(f" {name}") + return + + to_run = args.benchmarks if args.benchmarks else available + if not to_run: + print("No benchmarks found.", file=sys.stderr) + sys.exit(1) + + for name in to_run: + if name not in available: + print(f"Unknown benchmark: {name}", file=sys.stderr) + print(f"Available: {', '.join(available)}", file=sys.stderr) + sys.exit(1) + + build_id = _get_build_id() + print(f"Running benchmarks for Dash Core {build_id}\n") + + total = len(to_run) + passed = 0 + skipped = 0 + failed_names: List[str] = [] + + for i, name in enumerate(to_run, 1): + rc, elapsed, output = run_benchmark( + bench_dir, name, passthrough, timeout=args.timeout, + ) + duration = int(elapsed) + label = f"{i}/{total} - {BOLD[1]}{name}{BOLD[0]}" + + if rc == 0: + passed += 1 + print(f"{GREEN[1]}{TICK}{label} passed, Duration: {duration} s{GREEN[0]}") + elif rc == TEST_EXIT_SKIPPED: + skipped += 1 + print(f"{SKIP[1]}{TICK}{label} skipped{SKIP[0]}") + else: + failed_names.append(name) + print(f"{RED[1]}{CROSS}{label} failed, Duration: {duration} s{RED[0]}") + print(f"\n{BOLD[1]}Error log:{BOLD[0]}") + print(_extract_failure_log(output)) + print() + + failed = len(failed_names) + print( + f"\n{BOLD[1]}{passed} passed, {failed} failed, " + f"{skipped} skipped{BOLD[0]}" + ) + + if failed_names: + print(f"{RED[1]}Failed: {', '.join(failed_names)}{RED[0]}") + + sys.exit(1 if failed else 0) + + +if __name__ == "__main__": + main() From a9ddac622d133bc34d226738f89036ed9e9d649a Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Wed, 11 Mar 2026 06:21:43 +0530 Subject: [PATCH 06/10] test: add support for Markdown table export replacing plain printing --- test/bench/bench_framework.py | 14 +++----------- test/bench/bench_results.py | 34 ++++++++++++++++++++++++++++++++++ test/bench/bench_runner.py | 35 ++++++++++++++++++++++++++++++++++- 3 files changed, 71 insertions(+), 12 deletions(-) diff --git a/test/bench/bench_framework.py b/test/bench/bench_framework.py index 82a9b36dd3d8..19744f133803 100755 --- a/test/bench/bench_framework.py +++ b/test/bench/bench_framework.py @@ -13,6 +13,7 @@ from bench_results import ( # noqa: E402 BenchResult, + results_to_markdown, save_results, ) from test_framework.test_framework import BitcoinTestFramework # noqa: E402 @@ -119,17 +120,8 @@ def _build_results(self) -> List[BenchResult]: def _report_results(self) -> None: """Print a summary of all recorded measurements.""" results = self._build_results() - self.log.info("=" * 60) - self.log.info("Benchmark: %s", self.bench_name) - self.log.info("=" * 60) - for r in results: - self.log.info( - " %-30s n=%-6d mean=%8.2fms p50=%8.2fms " - "p99=%8.2fms min=%8.2fms max=%8.2fms", - r.name, r.sample_count, r.mean_ms, r.p50_ms, - r.p99_ms, r.min_ms, r.max_ms, - ) - self.log.info("=" * 60) + md = results_to_markdown(results, title=self.bench_name) + print(md) if self.results_file: save_results(results, self.results_file, label=self.bench_name) diff --git a/test/bench/bench_results.py b/test/bench/bench_results.py index ed14df1a7430..ae2f19df3af4 100755 --- a/test/bench/bench_results.py +++ b/test/bench/bench_results.py @@ -9,6 +9,8 @@ from dataclasses import asdict, dataclass, field from typing import Any, Dict, List, Optional +from tabulate import tabulate + def _percentile(data: List[float], p: float) -> float: """Return the *p*-th percentile (0–100) of a **sorted** list.""" @@ -78,6 +80,38 @@ def to_dict(self) -> Dict[str, Any]: def from_dict(cls, d: Dict[str, Any]) -> "BenchResult": return cls(**d) + def to_row(self) -> List[Any]: + """Return a flat row suitable for ``tabulate``.""" + return [ + self.name, + self.sample_count, + f"{self.ops_per_sec:.1f}", + f"{self.mean_ms:.2f}", + f"{self.p50_ms:.2f}", + f"{self.p90_ms:.2f}", + f"{self.p99_ms:.2f}", + f"{self.max_ms:.2f}", + f"{self.stddev_ms:.2f}", + ] + + @staticmethod + def table_headers() -> List[str]: + return [ + "Name", "N", "ops/s", + "mean(ms)", "p50(ms)", "p90(ms)", "p99(ms)", "max(ms)", + "stddev(ms)", + ] + + +def results_to_markdown( + results: List[BenchResult], + title: str = "Benchmark Results", +) -> str: + """Render a list of ``BenchResult`` objects as a Markdown table.""" + rows = [r.to_row() for r in results] + table = tabulate(rows, headers=BenchResult.table_headers(), tablefmt="pipe") + return f"## {title}\n\n{table}\n" + def save_results( results: List[BenchResult], diff --git a/test/bench/bench_runner.py b/test/bench/bench_runner.py index b36170171445..fafa0ca1d00a 100755 --- a/test/bench/bench_runner.py +++ b/test/bench/bench_runner.py @@ -10,7 +10,7 @@ import sys import time from glob import glob -from typing import List, Tuple +from typing import List, Optional, Tuple TEST_EXIT_SKIPPED = 77 @@ -67,6 +67,31 @@ def discover_benchmarks(bench_dir: str) -> List[str]: ) +def _extract_markdown(output: str) -> Optional[str]: + """Extract Markdown table block from benchmark output.""" + lines = output.splitlines() + md_lines: List[str] = [] + capturing = False + + for raw_line in lines: + line = raw_line.rstrip() + if line.startswith("## "): + capturing = True + md_lines.append(line) + elif capturing: + if line.startswith("|") or line == "": + md_lines.append(line) + else: + capturing = False + + if not md_lines: + return None + # Strip trailing blank lines. + while md_lines and md_lines[-1] == "": + md_lines.pop() + return "\n".join(md_lines) + + def _extract_failure_log(output: str) -> str: """Extract failure information from benchmark output.""" lines = output.splitlines() @@ -171,6 +196,7 @@ def main() -> None: passed = 0 skipped = 0 failed_names: List[str] = [] + markdown_blocks: List[str] = [] for i, name in enumerate(to_run, 1): rc, elapsed, output = run_benchmark( @@ -182,6 +208,9 @@ def main() -> None: if rc == 0: passed += 1 print(f"{GREEN[1]}{TICK}{label} passed, Duration: {duration} s{GREEN[0]}") + md = _extract_markdown(output) + if md: + markdown_blocks.append(md) elif rc == TEST_EXIT_SKIPPED: skipped += 1 print(f"{SKIP[1]}{TICK}{label} skipped{SKIP[0]}") @@ -198,6 +227,10 @@ def main() -> None: f"{skipped} skipped{BOLD[0]}" ) + if markdown_blocks: + print("\n---\n") + print("\n\n".join(markdown_blocks)) + if failed_names: print(f"{RED[1]}Failed: {', '.join(failed_names)}{RED[0]}") From bd8a8e30bcd6169f9b5ce42f3981779cb4babe35 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Wed, 11 Mar 2026 06:21:56 +0530 Subject: [PATCH 07/10] test: introduce mempool submission pressure benchmark --- test/bench/bench_helpers.py | 42 +++++++++++++++++++ test/bench/mempool_bench.py | 83 +++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+) create mode 100755 test/bench/bench_helpers.py create mode 100755 test/bench/mempool_bench.py diff --git a/test/bench/bench_helpers.py b/test/bench/bench_helpers.py new file mode 100755 index 000000000000..db44ba12c901 --- /dev/null +++ b/test/bench/bench_helpers.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 The Dash Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import os +import sys +from typing import List + +# Allow imports from the functional test framework. +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'functional')) + +from test_framework.authproxy import JSONRPCException # noqa: E402 +from test_framework.test_node import TestNode # noqa: E402 +from test_framework.wallet import MiniWallet # noqa: E402 + + +def create_self_transfer_batch( + wallet: MiniWallet, + count: int, +) -> List[str]: + """Create *count* signed self-transfer transactions from *wallet* UTXOs.""" + txs: List[str] = [] + for _ in range(count): + tx = wallet.create_self_transfer() + txs.append(tx["hex"]) + return txs + + +def submit_transactions( + node: TestNode, + tx_hexes: List[str], +) -> List[str]: + """Submit a list of raw transactions to *node* via ``sendrawtransaction``.""" + txids: List[str] = [] + for tx_hex in tx_hexes: + try: + txid = node.sendrawtransaction(tx_hex) + txids.append(txid) + except JSONRPCException: + pass + return txids diff --git a/test/bench/mempool_bench.py b/test/bench/mempool_bench.py new file mode 100755 index 000000000000..39afb1f63855 --- /dev/null +++ b/test/bench/mempool_bench.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 The Dash Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import time +from typing import List + +from bench_framework import BenchFramework +from bench_helpers import create_self_transfer_batch + +from test_framework.authproxy import JSONRPCException # noqa: E402 +from test_framework.wallet import MiniWallet # noqa: E402 + + +class MempoolBench(BenchFramework): + + def set_test_params(self) -> None: + super().set_test_params() + + def run_test(self) -> None: + super().run_test() + + def add_options(self, parser) -> None: # type: ignore[override] + super().add_options(parser) + parser.add_argument( + "--tx-count", + dest="tx_count", + type=int, + default=50, + help="Transactions per iteration (default: 50)", + ) + + def set_bench_params(self) -> None: + self._tx_count = 50 + self.bench_iterations = 3 + self.bench_name = "mempool_acceptance" + self.num_nodes = 1 + self.setup_clean_chain = True + self.warmup_iterations = 1 + + def run_bench(self) -> None: + self._tx_count = self.options.tx_count + node = self.nodes[0] + wallet = MiniWallet(node) + + num_blocks = self._tx_count + 110 + self.log.info("Mining %d blocks for funding...", num_blocks) + self.generate(wallet, num_blocks, sync_fun=self.no_op) + + # Pre-create transactions + self.log.info("Creating %d transactions...", self._tx_count) + tx_hexes: List[str] = create_self_transfer_batch(wallet, self._tx_count) + + # Submit and time each one + self.log.info("Submitting %d transactions...", len(tx_hexes)) + accepted = 0 + rejected = 0 + t_batch_start = time.perf_counter() + for tx_hex in tx_hexes: + self.start_timer() + try: + node.sendrawtransaction(tx_hex) + self.stop_timer("sendrawtransaction") + accepted += 1 + except JSONRPCException: + self.stop_timer("sendrawtransaction_rejected") + rejected += 1 + + batch_elapsed_ms = (time.perf_counter() - t_batch_start) * 1000.0 + self.record_sample("batch_total", batch_elapsed_ms) + + self.log.info( + "Submitted %d tx: %d accepted, %d rejected in %.1fms", + len(tx_hexes), accepted, rejected, batch_elapsed_ms, + ) + + # Clear mempool for next iteration by mining + self.generate(node, 1, sync_fun=self.no_op) + + +if __name__ == '__main__': + MempoolBench().main() From 82ba05df219ba6222fb50004260aa37788c90f43 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Wed, 11 Mar 2026 06:22:05 +0530 Subject: [PATCH 08/10] test: add RPC throughput benchmark (sequential, keep-alive, concurrent) --- test/bench/bench_helpers.py | 88 +++++++++++++++++++++++++++++- test/bench/rpc_bench.py | 106 ++++++++++++++++++++++++++++++++++++ 2 files changed, 193 insertions(+), 1 deletion(-) create mode 100755 test/bench/rpc_bench.py diff --git a/test/bench/bench_helpers.py b/test/bench/bench_helpers.py index db44ba12c901..7ffb38000fb6 100755 --- a/test/bench/bench_helpers.py +++ b/test/bench/bench_helpers.py @@ -3,9 +3,15 @@ # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. +import asyncio +import json import os import sys -from typing import List +import time +import urllib.parse +from typing import Any, Dict, List, Optional, Tuple + +import aiohttp # Allow imports from the functional test framework. sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'functional')) @@ -40,3 +46,83 @@ def submit_transactions( except JSONRPCException: pass return txids + + +def _parse_rpc_url(node: TestNode) -> Tuple[str, Optional[aiohttp.BasicAuth]]: + """Extract base URL and auth from a TestNode's RPC URL.""" + parsed = urllib.parse.urlparse(node.url) + auth: Optional[aiohttp.BasicAuth] = None + if parsed.username: + auth = aiohttp.BasicAuth(parsed.username, parsed.password or "") + base = f"{parsed.scheme}://{parsed.hostname}:{parsed.port}" + return base, auth + + +def _rpc_payload(method: str, params: Optional[List[Any]] = None) -> bytes: + """Build a RPC request body.""" + return json.dumps({ + "version": "1.1", + "id": 1, + "method": method, + "params": params or [], + }).encode() + + +async def async_rpc_flood( + node: TestNode, + method: str, + params: Optional[List[Any]] = None, + concurrency: int = 50, + duration_s: float = 10.0, +) -> Dict[str, Any]: + """Flood a node's RPC endpoint with concurrent requests.""" + base_url, auth = _parse_rpc_url(node) + latencies: List[float] = [] + success = 0 + failed = 0 + status_codes: Dict[str, int] = {} + bytes_rx = 0 + deadline = time.monotonic() + duration_s + + async def worker() -> None: + nonlocal success, failed, bytes_rx + conn = aiohttp.TCPConnector(limit=0, keepalive_timeout=60) + async with aiohttp.ClientSession(connector=conn) as session: + while time.monotonic() < deadline: + payload = _rpc_payload(method, params) + t0 = time.perf_counter() + try: + async with session.post( + base_url, + data=payload, + auth=auth, + headers={"Content-Type": "application/json"}, + timeout=aiohttp.ClientTimeout(total=10), + ) as resp: + body = await resp.read() + elapsed_ms = (time.perf_counter() - t0) * 1000.0 + latencies.append(elapsed_ms) + bytes_rx += len(body) + key = str(resp.status) + status_codes[key] = status_codes.get(key, 0) + 1 + if resp.status == 200: + success += 1 + else: + failed += 1 + except Exception as e: + elapsed_ms = (time.perf_counter() - t0) * 1000.0 + latencies.append(elapsed_ms) + failed += 1 + key = type(e).__name__ + status_codes[key] = status_codes.get(key, 0) + 1 + + tasks = [asyncio.create_task(worker()) for _ in range(concurrency)] + await asyncio.gather(*tasks) + + return { + "latencies_ms": latencies, + "success": success, + "failed": failed, + "status_codes": status_codes, + "bytes_received": bytes_rx, + } diff --git a/test/bench/rpc_bench.py b/test/bench/rpc_bench.py new file mode 100755 index 000000000000..55073dfb6e7c --- /dev/null +++ b/test/bench/rpc_bench.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 The Dash Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import asyncio +from typing import List + +from bench_framework import BenchFramework +from bench_helpers import async_rpc_flood +from bench_results import BenchResult + + +class RpcBench(BenchFramework): + + def set_test_params(self) -> None: + super().set_test_params() + + def run_test(self) -> None: + super().run_test() + + def add_options(self, parser) -> None: # type: ignore[override] + super().add_options(parser) + parser.add_argument( + "--duration", + dest="bench_duration", + type=float, + default=10.0, + help="Duration per test in seconds (default: 10)", + ) + parser.add_argument( + "--concurrency", + dest="concurrency", + type=int, + default=100, + help="Max concurrent connections (default: 100)", + ) + + def set_bench_params(self) -> None: + self._concurrency: int = 100 + self._duration: float = 10.0 + self.bench_iterations = 1 + self.bench_name = "rpc_throughput" + self.num_nodes = 1 + self.setup_clean_chain = False + self.warmup_iterations = 0 + + def run_bench(self) -> None: + self._duration = self.options.bench_duration + self._concurrency = self.options.concurrency + node = self.nodes[0] + + # Sequential baseline + self.log.info("[1/3] Sequential baseline (c=1, %ds)...", self._duration) + result = asyncio.run( + async_rpc_flood(node, "getblockcount", concurrency=1, + duration_s=self._duration) + ) + for lat in result["latencies_ms"]: + self.record_sample("sequential", lat) + self.log.info( + " %d requests, %d ok, %d failed", + result["success"] + result["failed"], + result["success"], result["failed"], + ) + + # Sustained keep-alive + self.log.info( + "[2/3] Sustained keep-alive (c=%d, %ds)...", + self._concurrency, self._duration, + ) + result = asyncio.run( + async_rpc_flood(node, "getblockcount", + concurrency=self._concurrency, + duration_s=self._duration) + ) + for lat in result["latencies_ms"]: + self.record_sample("keepalive", lat) + self.log.info( + " %d requests, %d ok, %d failed", + result["success"] + result["failed"], + result["success"], result["failed"], + ) + + # Connection scaling + levels: List[int] = [1, 10, 50, 100, 200] + levels = [c for c in levels if c <= self._concurrency * 2] + scale_duration = max(self._duration / 3, 3.0) + self.log.info("[3/3] Connection scaling %s...", levels) + for c in levels: + result = asyncio.run( + async_rpc_flood(node, "getblockcount", + concurrency=c, + duration_s=scale_duration) + ) + for lat in result["latencies_ms"]: + self.record_sample(f"scaling_c{c}", lat) + br = BenchResult.from_samples(f"c={c}", result["latencies_ms"]) + self.log.info( + " c=%-5d %8.1f ops/s mean=%.2fms p99=%.2fms", + c, br.ops_per_sec, br.mean_ms, br.p99_ms, + ) + + +if __name__ == '__main__': + RpcBench().main() From 8d138828e2025f645d8d484e9c0b0f50d835fecd Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Tue, 10 Mar 2026 18:49:27 +0530 Subject: [PATCH 09/10] test: add ZMQ txhash notification latency benchmark --- test/bench/bench_helpers.py | 26 ++++++++ test/bench/zmq_bench.py | 117 ++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+) create mode 100755 test/bench/zmq_bench.py diff --git a/test/bench/bench_helpers.py b/test/bench/bench_helpers.py index 7ffb38000fb6..1b66f3c10c6c 100755 --- a/test/bench/bench_helpers.py +++ b/test/bench/bench_helpers.py @@ -126,3 +126,29 @@ async def worker() -> None: "status_codes": status_codes, "bytes_received": bytes_rx, } + + +def zmq_subscribe( + address: str, + topic: bytes, + timeout_ms: int = 30000, +) -> Tuple[Any, Any]: + """Create a ZMQ SUB socket connected to *address* with *topic*. + Caller must remember to ``socket.close()`` and ``context.destroy()`` when done. + """ + import zmq + ctx = zmq.Context() + sock = ctx.socket(zmq.SUB) + sock.set(zmq.RCVTIMEO, timeout_ms) + sock.set(zmq.IPV6, 1) + sock.setsockopt(zmq.SUBSCRIBE, topic) + sock.connect(address) + return ctx, sock + + +def zmq_receive_one( + sock: Any, +) -> Tuple[bytes, bytes, float]: + """Receive one ZMQ multipart message from *sock*.""" + topic, body, _seq = sock.recv_multipart() + return topic, body, time.perf_counter() diff --git a/test/bench/zmq_bench.py b/test/bench/zmq_bench.py new file mode 100755 index 000000000000..82d48f1753f6 --- /dev/null +++ b/test/bench/zmq_bench.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 The Dash Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import time +from typing import Any, List + +from bench_framework import BenchFramework +from bench_helpers import create_self_transfer_batch, zmq_subscribe, zmq_receive_one + +from test_framework.authproxy import JSONRPCException # noqa: E402 +from test_framework.util import p2p_port # noqa: E402 +from test_framework.wallet import MiniWallet # noqa: E402 + +# Test may be skipped and not have zmq installed +try: + import zmq +except ImportError: + pass + + +class ZmqBench(BenchFramework): + + def set_test_params(self) -> None: + super().set_test_params() + + def run_test(self) -> None: + super().run_test() + + def add_options(self, parser) -> None: # type: ignore[override] + super().add_options(parser) + parser.add_argument( + "--tx-count", + dest="tx_count", + type=int, + default=50, + help="Transactions to send (default: 50)", + ) + + def skip_test_if_missing_module(self) -> None: + self.skip_if_no_bitcoind_zmq() + self.skip_if_no_py3_zmq() + + def set_bench_params(self) -> None: + self._tx_count = 50 + self._zmq_port = 0 + self.bench_iterations = 1 + self.bench_name = "zmq_notification_latency" + self.num_nodes = 1 + self.setup_clean_chain = True + self.warmup_iterations = 0 + + def setup_network(self) -> None: + self._zmq_port = p2p_port(self.num_nodes + 10) + zmq_addr = f"tcp://127.0.0.1:{self._zmq_port}" + self.extra_args = [[ + f"-zmqpubhashtx={zmq_addr}", + ]] + self.setup_nodes() + + def run_bench(self) -> None: + self._tx_count = self.options.tx_count + node = self.nodes[0] + wallet = MiniWallet(node) + zmq_addr = f"tcp://127.0.0.1:{self._zmq_port}" + + num_blocks = self._tx_count + 110 + self.log.info("Mining %d blocks for funding...", num_blocks) + self.generate(wallet, num_blocks, sync_fun=self.no_op) + + # Pre-create transactions + self.log.info("Creating %d transactions...", self._tx_count) + tx_hexes: List[str] = create_self_transfer_batch(wallet, self._tx_count) + + # Subscribe to hashtx notifications + ctx: Any + sock: Any + ctx, sock = zmq_subscribe(zmq_addr, b"hashtx") + + try: + # Generate a block and consume its notification so the subscriber is fully connected before timing + self.generate(node, 1, sync_fun=self.no_op) + try: + while True: + sock.recv_multipart(flags=zmq.NOBLOCK) + except zmq.Again: + pass + + # Send transactions and measure notification latency + self.log.info("Submitting %d transactions...", len(tx_hexes)) + received = 0 + for tx_hex in tx_hexes: + t_send = time.perf_counter() + try: + node.sendrawtransaction(tx_hex) + except JSONRPCException: + continue + try: + _topic, _body, t_recv = zmq_receive_one(sock) + latency_ms = (t_recv - t_send) * 1000.0 + self.record_sample("zmq_hashtx_latency", latency_ms) + received += 1 + except zmq.Again: + self.log.warning("ZMQ timeout waiting for notification") + + self.log.info( + "Received %d/%d ZMQ notifications", + received, len(tx_hexes), + ) + finally: + sock.close() + ctx.destroy(linger=0) + + +if __name__ == '__main__': + ZmqBench().main() From 7f0cebfe19fbc8a825abbadc0c9b312e45af2f72 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Thu, 12 Mar 2026 20:54:52 +0530 Subject: [PATCH 10/10] test: add REST benchmarks (sequential, keep-alive, concurrent, mixed) --- test/bench/bench_helpers.py | 136 +++++++++++++++++++++++++ test/bench/rest_bench.py | 192 ++++++++++++++++++++++++++++++++++++ 2 files changed, 328 insertions(+) create mode 100755 test/bench/rest_bench.py diff --git a/test/bench/bench_helpers.py b/test/bench/bench_helpers.py index 1b66f3c10c6c..f350f8c7b4eb 100755 --- a/test/bench/bench_helpers.py +++ b/test/bench/bench_helpers.py @@ -128,6 +128,142 @@ async def worker() -> None: } +async def async_rest_flood( + host: str, + port: int, + path: str, + concurrency: int = 50, + duration_s: float = 10.0, + force_close: bool = False, + connect_burst: int = 0, +) -> Dict[str, Any]: + """Flood a REST endpoint with concurrent HTTP GET requests.""" + url = f"http://{host}:{port}{path}" + latencies: List[float] = [] + success = 0 + failed = 0 + status_codes: Dict[str, int] = {} + bytes_rx = 0 + deadline = time.monotonic() + duration_s + + # Share a single connector across all workers so that connection + # establishment is serialised through aiohttp's pool rather than + # each worker racing to open its own TCP socket. This avoids + # overwhelming the server's accept backlog (SOMAXCONN=128 on macOS). + kwargs: Dict[str, Any] = {"limit": 0, "force_close": force_close} + if not force_close: + kwargs["keepalive_timeout"] = 60 + shared_conn = aiohttp.TCPConnector(**kwargs) + + # Semaphore that gates only the *first* request from each worker (the one + # that opens the TCP connection). After the connection is in the keep-alive + # pool, subsequent requests reuse it and never block on the semaphore. + connect_sem: Optional[asyncio.Semaphore] = ( + asyncio.Semaphore(connect_burst) + if (connect_burst > 0 and not force_close) + else None + ) + + async def worker(session: aiohttp.ClientSession) -> None: + nonlocal success, failed, bytes_rx + needs_connect = connect_sem is not None + while time.monotonic() < deadline: + t0 = time.perf_counter() + try: + if needs_connect: + assert connect_sem is not None + async with connect_sem: + async with session.get( + url, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + body = await resp.read() + elapsed_ms = (time.perf_counter() - t0) * 1000.0 + latencies.append(elapsed_ms) + bytes_rx += len(body) + key = str(resp.status) + status_codes[key] = status_codes.get(key, 0) + 1 + if resp.status == 200: + success += 1 + else: + failed += 1 + needs_connect = False + else: + async with session.get( + url, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + body = await resp.read() + elapsed_ms = (time.perf_counter() - t0) * 1000.0 + latencies.append(elapsed_ms) + bytes_rx += len(body) + key = str(resp.status) + status_codes[key] = status_codes.get(key, 0) + 1 + if resp.status == 200: + success += 1 + else: + failed += 1 + except Exception as e: + elapsed_ms = (time.perf_counter() - t0) * 1000.0 + latencies.append(elapsed_ms) + failed += 1 + key = type(e).__name__ + status_codes[key] = status_codes.get(key, 0) + 1 + + async with aiohttp.ClientSession(connector=shared_conn) as session: + tasks = [asyncio.create_task(worker(session)) for _ in range(concurrency)] + await asyncio.gather(*tasks) + + return { + "latencies_ms": latencies, + "success": success, + "failed": failed, + "status_codes": status_codes, + "bytes_received": bytes_rx, + } + + +async def async_rest_discover( + host: str, + port: int, +) -> Tuple[Optional[str], Optional[str]]: + """Probe a REST server for a light and heavy endpoint.""" + light_path: Optional[str] = None + heavy_path: Optional[str] = None + best_hash: Optional[str] = None + + conn = aiohttp.TCPConnector(force_close=True) + async with aiohttp.ClientSession(connector=conn) as session: + for path in ["/rest/chaininfo.json", "/rest/mempool/info.json"]: + try: + async with session.get( + f"http://{host}:{port}{path}", + timeout=aiohttp.ClientTimeout(total=5), + ) as resp: + if resp.status == 200: + body = await resp.json(content_type=None) + light_path = path + if "bestblockhash" in body: + best_hash = body["bestblockhash"] + break + except Exception: + continue + + if best_hash: + heavy_candidate = f"/rest/block/{best_hash}.json" + try: + async with session.get( + f"http://{host}:{port}{heavy_candidate}", + timeout=aiohttp.ClientTimeout(total=15), + ) as resp: + if resp.status == 200: + heavy_path = heavy_candidate + except Exception: + pass + + return light_path, heavy_path + + def zmq_subscribe( address: str, topic: bytes, diff --git a/test/bench/rest_bench.py b/test/bench/rest_bench.py new file mode 100755 index 000000000000..08b0b537bbb1 --- /dev/null +++ b/test/bench/rest_bench.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 The Dash Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import asyncio +import urllib.parse +from typing import Any, Dict, List, Tuple + +from bench_framework import BenchFramework +from bench_helpers import async_rest_discover, async_rest_flood +from bench_results import BenchResult + + +class RestBench(BenchFramework): + + def set_test_params(self) -> None: + super().set_test_params() + + def run_test(self) -> None: + super().run_test() + + def add_options(self, parser) -> None: # type: ignore[override] + super().add_options(parser) + parser.add_argument( + "--duration", + dest="bench_duration", + type=float, + default=10.0, + help="Duration per test in seconds (default: 10)", + ) + parser.add_argument( + "--concurrency", + dest="concurrency", + type=int, + default=100, + help="Max concurrent connections for storm/keepalive tests (default: 100)", + ) + parser.add_argument( + "--scale-max", + dest="scale_max", + type=int, + default=2000, + help="Max concurrency level for the scaling test (default: 2000)", + ) + parser.add_argument( + "--connect-burst", + dest="connect_burst", + type=int, + default=1024, + help="Max simultaneous initial TCP connections during scaling test (default: 1024)", + ) + + def set_bench_params(self) -> None: + self._concurrency: int = 100 + self._duration: float = 10.0 + self.bench_iterations = 1 + self.bench_name = "rest_throughput" + self.extra_args = [["-rest"]] + self.num_nodes = 1 + self.setup_clean_chain = False + self.warmup_iterations = 0 + + def run_bench(self) -> None: + self._duration = self.options.bench_duration + self._concurrency = self.options.concurrency + self._scale_max: int = self.options.scale_max + self._connect_burst: int = self.options.connect_burst + parsed = urllib.parse.urlparse(self.nodes[0].url) + host = parsed.hostname + port = parsed.port + + self.log.info("Discovering REST endpoints on %s:%d...", host, port) + endpoints = asyncio.run(self._discover_endpoints(host, port)) + if not endpoints.get("light"): + self.log.error("No responsive REST endpoint found") + return + + light_path = endpoints["light"] + heavy_path = endpoints.get("heavy") + self.log.info(" light: %s", light_path) + if heavy_path: + self.log.info(" heavy: %s", heavy_path) + + # Connection storm + self.log.info("[1/4] Connection storm (c=%d, %ds)...", self._concurrency, self._duration) + result = asyncio.run( + async_rest_flood(host, port, light_path, + concurrency=self._concurrency, + duration_s=self._duration, + force_close=True) + ) + for lat in result["latencies_ms"]: + self.record_sample("conn_storm", lat) + self._log_result("conn_storm", result) + + # Sustained keep-alive + self.log.info("[2/4] Sustained keep-alive (c=%d, %ds)...", self._concurrency, self._duration) + result = asyncio.run( + async_rest_flood(host, port, light_path, + concurrency=self._concurrency, + duration_s=self._duration, + connect_burst=self._connect_burst) + ) + for lat in result["latencies_ms"]: + self.record_sample("keepalive", lat) + self._log_result("keepalive", result) + + # Connection scaling + levels: List[int] = [1, 10, 50, 100, 200, 500, 1000, 2000, 5000] + levels = [c for c in levels if c <= self._scale_max] + scale_duration = max(self._duration / 3, 3.0) + self.log.info("[3/4] Connection scaling %s...", levels) + for c in levels: + result = asyncio.run( + async_rest_flood(host, port, light_path, + concurrency=c, + duration_s=scale_duration, + connect_burst=self._connect_burst) + ) + for lat in result["latencies_ms"]: + self.record_sample(f"scaling_c{c}", lat) + br = BenchResult.from_samples(f"c={c}", result["latencies_ms"]) + err_rate = "" + if result["failed"] > 0: + total = result["success"] + result["failed"] + pct = result["failed"] / total * 100 if total else 0 + err_rate = f" err={pct:.1f}%" + err_detail = "" + if result["status_codes"]: + err_detail = f" status_codes={result['status_codes']}" + self.log.info( + " c=%-5d %8.1f ops/s mean=%.2fms p99=%.2fms%s%s", + c, br.ops_per_sec, br.mean_ms, br.p99_ms, err_rate, err_detail, + ) + + # Mixed load + if not heavy_path: + self.log.info("[4/4] Mixed load — skipped (no heavy endpoint)") + else: + self.log.info("[4/4] Mixed load (light c=%d, heavy c=10, %ds)...", self._concurrency, self._duration) + light_result, heavy_result = asyncio.run( + self._mixed_load(host, port, light_path, heavy_path) + ) + for lat in light_result["latencies_ms"]: + self.record_sample("mixed_light", lat) + for lat in heavy_result["latencies_ms"]: + self.record_sample("mixed_heavy", lat) + self._log_result("mixed_light", light_result) + self._log_result("mixed_heavy", heavy_result) + + async def _discover_endpoints( + self, host: str, port: int, + ) -> Dict[str, Any]: + """Discover available REST endpoints and return a dict of paths.""" + light_path, heavy_path = await async_rest_discover(host, port) + result: Dict[str, Any] = {} + if light_path: + result["light"] = light_path + if heavy_path: + result["heavy"] = heavy_path + return result + + async def _mixed_load( + self, host: str, port: int, light_path: str, heavy_path: str, + ) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Run light and heavy requests concurrently.""" + light_task = asyncio.create_task( + async_rest_flood(host, port, light_path, + concurrency=self._concurrency, + duration_s=self._duration) + ) + heavy_task = asyncio.create_task( + async_rest_flood(host, port, heavy_path, + concurrency=10, + duration_s=self._duration) + ) + return await light_task, await heavy_task + + def _log_result(self, name: str, result: Dict[str, Any]) -> None: + br = BenchResult.from_samples(name, result["latencies_ms"]) + err_str = "" + if result["failed"] > 0: + err_str = f" errors={result['failed']}" + self.log.info( + " %8.1f ops/s mean=%.2fms p99=%.2fms%s", + br.ops_per_sec, br.mean_ms, br.p99_ms, err_str, + ) + + +if __name__ == '__main__': + RestBench().main()