diff --git a/interop/README.md b/interop/README.md index 78c96e144..4dc4cc640 100644 --- a/interop/README.md +++ b/interop/README.md @@ -56,6 +56,12 @@ The test app (`perf_test.py`) implements the [libp2p perf protocol](https://gith - Dialer runs upload/download/latency iterations and outputs YAML results to stdout - All logging goes to stderr (stdout is reserved for results) +**Listener lifecycle:** The listener polls `get_live_peers()` and exits after consecutive empty polls once a peer has connected (replacing an indefinite wait). The connect deadline (`TEST_TIMEOUT_SECS`) applies only until the first peer is seen, so slow ws+yamux benchmarks are not cut off mid-transfer. For **ws+mplex**, an additional idle window (30s without live peers) avoids false teardown while mplex I/O runs over WebSocket. The listener also marks the dialer as connected when the first inbound `/perf/1.0.0` stream opens, because `get_live_peers()` can stay empty during ws+mplex benchmarks in Docker. + +**WebSocket perf:** Dialer/listener listen addresses omit loopback (`127.0.0.1`/`::1`) so Identify does not advertise unreachable addrs in Docker; outbound dials to loopback are denied via `ConnectionConfig.deny_list`. + +**Local runs without Redis:** Set `PERF_LOCAL_ADDR_FILE` to a path; the listener writes its multiaddr there and the dialer polls the file. Use [`scripts/perf/run_local_perf.py`](../scripts/perf/run_local_perf.py) and [`scripts/perf/README.md`](../scripts/perf/README.md) for Docker-free matrix runs and `PY_YAMUX_*` throughput tuning. + ### Transport Tests (`interop/transport/`) Transport tests verify that py-libp2p can establish connections and exchange protocols with other implementations over various transport, secure channel, and muxer combinations (TCP, QUIC, WebSocket, Noise, TLS, yamux, mplex). diff --git a/interop/perf/perf_test.py b/interop/perf/perf_test.py index ae376b2d5..91b6c3697 100644 --- a/interop/perf/perf_test.py +++ b/interop/perf/perf_test.py @@ -13,12 +13,15 @@ from datetime import datetime, timedelta, timezone import logging import os +from pathlib import Path import ssl import sys import tempfile import time from typing import Any +# Interop perf: default logging is quiet. DEBUG=true enables targeted interop +# loggers (fast). LIBP2P_DEBUG enables full py-libp2p logging at import (slow). from cryptography import x509 from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa @@ -36,6 +39,7 @@ from libp2p.crypto.ed25519 import create_new_key_pair from libp2p.crypto.x25519 import create_new_key_pair as create_new_x25519_key_pair from libp2p.custom_types import TProtocol +from libp2p.network.config import ConnectionConfig from libp2p.peer.peerinfo import info_from_p2p_addr from libp2p.perf import PROTOCOL_NAME, PerfService from libp2p.security.insecure.transport import PLAINTEXT_PROTOCOL_ID, InsecureTransport @@ -47,17 +51,64 @@ PROTOCOL_ID as TLS_PROTOCOL_ID, TLSTransport, ) +from libp2p.transport.quic.config import QUICTransportConfig from libp2p.utils.address_validation import get_available_interfaces MAX_TEST_TIMEOUT = 300 logger = logging.getLogger("libp2p.perf_test") -def configure_logging() -> None: - """Configure logging based on DEBUG environment variable.""" - debug_value = os.getenv("DEBUG") or "false" - debug_enabled = debug_value.upper() in ["DEBUG", "1", "TRUE", "YES"] +def _env_int(name: str, default: int, minimum: int | None = None) -> int: + raw = os.getenv(name) + if not raw: + value = default + else: + try: + value = int(raw) + except ValueError: + value = default + if minimum is not None: + value = max(value, minimum) + return value + + +def _env_float(name: str, default: float, minimum: float | None = None) -> float: + raw = os.getenv(name) + if not raw: + value = default + else: + try: + value = float(raw) + except ValueError: + value = default + if minimum is not None: + value = max(value, minimum) + return value + + +class _UnbufferedStream: + """Flush after every write so debug logs appear immediately in Docker.""" + + def __init__(self, stream: Any) -> None: + self._stream = stream + + def write(self, data: str | bytes) -> int: + n = self._stream.write(data) + self._stream.flush() + return n + + def flush(self) -> None: + self._stream.flush() + + def __getattr__(self, name: str) -> Any: + return getattr(self._stream, name) + + +def _libp2p_debug_enabled() -> bool: + return bool(os.getenv("LIBP2P_DEBUG", "").strip()) + +def _quiet_multiaddr_loggers() -> None: for logger_name in [ "multiaddr", "multiaddr.transforms", @@ -66,10 +117,46 @@ def configure_logging() -> None: ]: logging.getLogger(logger_name).setLevel(logging.WARNING) + +def configure_logging() -> None: + """Configure perf logging: LIBP2P_DEBUG (full), DEBUG (targeted), or default.""" + _quiet_multiaddr_loggers() + + if _libp2p_debug_enabled(): + print( + "Full libp2p logging via LIBP2P_DEBUG " + f"({os.getenv('LIBP2P_DEBUG')!r}; configured at import, not capped)", + file=sys.stderr, + ) + sys.stderr.flush() + return + + debug_value = os.getenv("DEBUG") or "false" + debug_enabled = debug_value.upper() in ["DEBUG", "1", "TRUE", "YES"] + if debug_enabled: - for name in ["", "libp2p.perf_test", "libp2p", "libp2p.perf"]: + stream = _UnbufferedStream(sys.stderr) + logging.basicConfig( + level=logging.DEBUG, + stream=stream, + format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + force=True, + ) + for name in [ + "libp2p.perf_test", + "libp2p.perf", + "libp2p.security.tls", + "libp2p.stream_muxer.mplex", + "libp2p.stream_muxer.yamux", + ]: logging.getLogger(name).setLevel(logging.DEBUG) - print("Debug logging enabled", file=sys.stderr) + # Keep core libp2p at INFO to avoid multi-GB logs on yamux perf runs. + logging.getLogger("libp2p").setLevel(logging.INFO) + logging.getLogger("libp2p.network").setLevel(logging.WARNING) + logging.getLogger("libp2p.transport").setLevel(logging.WARNING) + print("Targeted perf debug logging enabled (DEBUG=true)", file=sys.stderr) + sys.stderr.flush() else: logging.getLogger().setLevel(logging.INFO) logging.getLogger("libp2p.perf_test").setLevel(logging.INFO) @@ -91,16 +178,78 @@ def _percentile(sorted_values: list[float], p: float) -> float: return sorted_values[lower] * (1.0 - weight) + sorted_values[upper] * weight -def _is_connection_closed_error(exc: BaseException) -> bool: - """True if this is the expected 'Connection closed' from swarm/mplex on shutdown.""" +# Phrases when mplex/TLS read loops exit after peer disconnect (tcp+tls+mplex). +_SHUTDOWN_ERROR_PHRASES = ( + "connection closed", + "connection is closed", + "cannot read: tls connection is closed", + "tls connection is closed", + "broken pipe", + "connection reset", + "stream reset", + "stream eof", + "end of file", + "eof", + "closed resource", + "broken resource", +) + +# Extra settle time so dialer can exit before listener during interop teardown. +_LISTENER_POST_PERF_GRACE_TLS_MPLEX_SECS = 3.0 +_LISTENER_POST_PERF_GRACE_SECS = 0.5 +_DIALER_DISCONNECT_GRACE_TLS_MPLEX_SECS = 2.0 +_DIALER_DISCONNECT_GRACE_SECS = 0.5 +# Require several consecutive empty get_live_peers() polls before listener +# teardown (avoids false disconnect during slow muxer/transport stacks). +_LISTENER_EMPTY_PEER_POLLS_REQUIRED = 3 +_LISTENER_PEER_POLL_INTERVAL_SECS = 0.5 +# ws+mplex can report no live peers for seconds while the perf stream is active. +_LISTENER_WS_MPLEX_TEARDOWN_IDLE_SECS = 30.0 +# Perf runs are 1:1; do not auto-dial extra peers or retry loopback from Identify. +_PERF_LOOPBACK_DENY_LIST = ("127.0.0.0/8", "::1/128") + + +def _is_connection_closed_error(exc: BaseException | None) -> bool: + """True if this is an expected connection-closed error during muxer/TLS shutdown.""" + if exc is None: + return False + msg = str(exc).lower() - if "connection closed" in msg: + if any(phrase in msg for phrase in _SHUTDOWN_ERROR_PHRASES): return True + if isinstance(exc, ExceptionGroup): + if not exc.exceptions: + return False return all(_is_connection_closed_error(e) for e in exc.exceptions) + + if exc.__cause__ is not None and _is_connection_closed_error(exc.__cause__): + return True + if ( + exc.__context__ is not None + and exc.__context__ is not exc.__cause__ + and _is_connection_closed_error(exc.__context__) + ): + return True + return False +def _log_perf_phase(role: str, phase: str, **details: Any) -> None: + extra = " ".join(f"{k}={v}" for k, v in details.items()) + msg = f"[PERF_PHASE] {role} {phase}" + (f" {extra}" if extra else "") + print(msg, file=sys.stderr) + logger.info(msg) + + +def _log_ignored_shutdown_error(role: str, exc: BaseException) -> None: + print( + f"{role} completed (connection closed during cleanup)", + file=sys.stderr, + ) + logger.info("Ignored shutdown error: %s", exc) + + def _compute_stats(samples: list[float], is_latency: bool = False) -> dict[str, Any]: """Compute min, q1, median, q3, max, outliers, samples (IQR-based).""" if not samples: @@ -163,9 +312,14 @@ def __init__(self) -> None: self.is_dialer = is_dialer_val == "true" self.ip = os.getenv("LISTENER_IP") or "0.0.0.0" + self.local_addr_file = os.getenv("PERF_LOCAL_ADDR_FILE") self.redis_addr = os.getenv("REDIS_ADDR") - if not self.redis_addr: - raise ValueError("REDIS_ADDR environment variable is required") + if not self.local_addr_file and not self.redis_addr: + raise ValueError( + "REDIS_ADDR is required unless PERF_LOCAL_ADDR_FILE is set" + ) + if self.local_addr_file and not self.redis_addr: + self.redis_addr = "local:0" if ":" in self.redis_addr: self.redis_host, port = self.redis_addr.split(":", 1) self.redis_port = int(port) @@ -185,10 +339,30 @@ def __init__(self) -> None: timeout_val = os.getenv("TEST_TIMEOUT_SECS") or "180" self.test_timeout_seconds = min(int(timeout_val), MAX_TEST_TIMEOUT) + # 64 KiB blocks align with yamux half-window batching; stay under Noise limits. + self.write_block_size = _env_int("PERF_WRITE_BLOCK_SIZE", 65536, minimum=1024) + self.dial_timeout_seconds = _env_float("DIAL_TIMEOUT_SECS", 30.0, minimum=1.0) + self.upgrade_timeout_seconds = _env_float( + "UPGRADE_TIMEOUT_SECS", 30.0, minimum=1.0 + ) + self.stream_negotiate_timeout_seconds = _env_float( + "STREAM_NEGOTIATE_TIMEOUT_SECS", 30.0, minimum=1.0 + ) + self.negotiate_timeout_seconds = _env_int( + "NEGOTIATE_TIMEOUT_SECS", 30, minimum=1 + ) + self.quic_connection_timeout_seconds = _env_float( + "QUIC_CONNECTION_TIMEOUT_SECS", 30.0, minimum=1.0 + ) + self.quic_idle_timeout_seconds = _env_float( + "QUIC_IDLE_TIMEOUT_SECS", 60.0, minimum=1.0 + ) self.host: Any = None self.redis_client: redis.Redis[str] | None = None self.perf_service: PerfService | None = None + self._benchmarks_complete = False + self._listener_served_peer = False def validate_configuration(self) -> None: valid_transports = ["tcp", "ws", "wss", "quic-v1"] @@ -211,6 +385,148 @@ def validate_configuration(self) -> None: f"Unsupported muxer: {self.muxer}. Supported: {valid_muxers}" ) + def _is_tls_mplex(self) -> bool: + return self.security == "tls" and self.muxer == "mplex" + + def _listener_shutdown_grace_secs(self) -> float: + if self._is_tls_mplex(): + return _LISTENER_POST_PERF_GRACE_TLS_MPLEX_SECS + return _LISTENER_POST_PERF_GRACE_SECS + + def _dialer_disconnect_grace_secs(self) -> float: + if self._is_tls_mplex(): + return _DIALER_DISCONNECT_GRACE_TLS_MPLEX_SECS + return _DIALER_DISCONNECT_GRACE_SECS + + def _listener_teardown_idle_secs(self) -> float: + """Seconds without live peers before listener treats dialer as gone.""" + if self.transport in ("ws", "wss") and self.muxer == "mplex": + return _LISTENER_WS_MPLEX_TEARDOWN_IDLE_SECS + return _LISTENER_EMPTY_PEER_POLLS_REQUIRED * _LISTENER_PEER_POLL_INTERVAL_SECS + + def _should_ignore_shutdown_error(self, exc: BaseException) -> bool: + """Swallow connection-closed errors only during post-benchmark cleanup.""" + if not _is_connection_closed_error(exc): + _log_perf_phase( + self._role_label(), + "shutdown_error_not_ignored", + reason="not_connection_closed", + exc=type(exc).__name__, + ) + return False + if self.is_dialer: + ignore = self._benchmarks_complete + else: + ignore = self._listener_served_peer + _log_perf_phase( + self._role_label(), + "shutdown_ignore" if ignore else "shutdown_error_not_ignored", + benchmarks_complete=self._benchmarks_complete, + listener_served_peer=self._listener_served_peer, + exc=type(exc).__name__, + ) + return ignore + + def _role_label(self) -> str: + return "dialer" if self.is_dialer else "listener" + + def _close_redis(self) -> None: + if self.redis_client: + try: + self.redis_client.close() + except Exception: + pass + + async def _stop_perf_service(self) -> None: + if self.perf_service is None: + return + try: + await self.perf_service.stop() + except Exception as e: + logger.debug("PerfService.stop: %s", e) + + def _connection_config(self) -> ConnectionConfig: + return ConnectionConfig( + dial_timeout=self.dial_timeout_seconds, + inbound_upgrade_timeout=self.upgrade_timeout_seconds, + outbound_upgrade_timeout=self.upgrade_timeout_seconds, + outbound_stream_protocol_negotiation_timeout=( + self.stream_negotiate_timeout_seconds + ), + inbound_stream_protocol_negotiation_timeout=( + self.stream_negotiate_timeout_seconds + ), + # Single peer benchmark: no connmgr background dials. + min_connections=0, + low_watermark=0, + max_connections_per_peer=2, + deny_list=list(_PERF_LOOPBACK_DENY_LIST), + ) + + def _without_loopback_listen_addrs( + self, addresses: list[multiaddr.Multiaddr] + ) -> list[multiaddr.Multiaddr]: + """Drop loopback binds so Identify does not advertise 127.0.0.1/::1.""" + filtered: list[multiaddr.Multiaddr] = [] + for addr in addresses: + ip_value = self._get_ip_value(addr) + if ip_value in ("127.0.0.1", "::1"): + continue + filtered.append(addr) + return filtered + + def _fallback_listen_addr(self) -> multiaddr.Multiaddr: + if self.transport == "ws": + return multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0/ws") + if self.transport == "wss": + return multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0/wss") + if self.transport == "quic-v1": + return self._build_quic_addr("0.0.0.0", 0) + return multiaddr.Multiaddr("/ip4/0.0.0.0/tcp/0") + + def _perf_listen_addresses(self, port: int = 0) -> list[multiaddr.Multiaddr]: + addrs = self._without_loopback_listen_addrs(self.create_listen_addresses(port)) + return addrs if addrs else [self._fallback_listen_addr()] + + def _dialer_ws_listen_addrs(self) -> list[multiaddr.Multiaddr]: + """WS dialer binds for transport registration (no loopback Identify).""" + return self._perf_listen_addresses(0) + + def _install_listener_perf_lifecycle_handler(self) -> None: + """Mark peer connected on first inbound perf stream (ws+mplex polls lie).""" + assert self.perf_service is not None + original = self.perf_service._handle_message + + async def marking_handler(stream: Any) -> None: + if not self._listener_served_peer: + self._listener_served_peer = True + _log_perf_phase("listener", "perf_stream_connected") + await original(stream) + + self.host.set_stream_handler(self.perf_service._protocol, marking_handler) + + def _listener_peer_present(self) -> bool: + """True when dialer opened perf or swarm lists a live peer (connect detect).""" + if self._listener_served_peer: + return True + return bool(self.host.get_live_peers()) + + def _quic_transport_config(self) -> QUICTransportConfig: + return QUICTransportConfig( + connection_timeout=self.quic_connection_timeout_seconds, + idle_timeout=self.quic_idle_timeout_seconds, + dial_timeout=self.dial_timeout_seconds, + inbound_upgrade_timeout=self.upgrade_timeout_seconds, + outbound_upgrade_timeout=self.upgrade_timeout_seconds, + outbound_stream_protocol_negotiation_timeout=( + self.stream_negotiate_timeout_seconds + ), + inbound_stream_protocol_negotiation_timeout=( + self.stream_negotiate_timeout_seconds + ), + NEGOTIATE_TIMEOUT=self.stream_negotiate_timeout_seconds, + ) + def create_security_options(self) -> tuple[dict[TProtocol, Any], Any]: standalone = ["quic-v1"] if self.transport in standalone: @@ -304,7 +620,22 @@ def create_tls_server_config(self) -> ssl.SSLContext | None: return None def _get_ip_value(self, addr: multiaddr.Multiaddr) -> str | None: - return addr.value_for_protocol("ip4") or addr.value_for_protocol("ip6") + for protocol in ("ip4", "ip6"): + try: + value = addr.value_for_protocol(protocol) + except Exception: + value = None + if value: + return value + return None + + def _safe_value_for_protocol( + self, addr: multiaddr.Multiaddr, protocol: str + ) -> str | None: + try: + return addr.value_for_protocol(protocol) + except Exception: + return None def _get_protocol_names(self, addr: multiaddr.Multiaddr) -> list[str]: return [p.name for p in addr.protocols()] @@ -338,13 +669,19 @@ def create_listen_addresses(self, port: int = 0) -> list[multiaddr.Multiaddr]: if self.transport == "quic-v1": out = [] for addr in base_addrs: - ip_value = self._get_ip_value(addr) - tcp_port = addr.value_for_protocol("tcp") or port - if ip_value: - qa = self._build_quic_addr(ip_value, tcp_port) + try: + ip_value = self._get_ip_value(addr) + tcp_port = self._safe_value_for_protocol(addr, "tcp") + if not ip_value: + continue + qa = self._build_quic_addr( + ip_value, int(tcp_port) if tcp_port else port + ) _, p2p = self._extract_and_preserve_p2p(addr) qa = self._encapsulate_with_p2p(qa, p2p) out.append(qa) + except Exception: + continue return out if out else [self._build_quic_addr("0.0.0.0", port)] if self.transport == "ws": out = [] @@ -422,8 +759,7 @@ def _replace_loopback_ip(self, addr: multiaddr.Multiaddr) -> str: if ip_value not in ["127.0.0.1", "0.0.0.0", "::1", "::"]: return str(addr) actual = self.get_container_ip() - names = self._get_protocol_names(addr) - is_ipv6 = "ip6" in names + is_ipv6 = ":" in actual parts = [f"/ip6/{actual}" if is_ipv6 else f"/ip4/{actual}"] found = False for proto, value in addr.items(): @@ -447,6 +783,12 @@ def _get_publishable_address(self, addresses: list[multiaddr.Multiaddr]) -> str: async def _connect_redis_with_retry( self, max_retries: int = 10, retry_delay: float = 1.0 ) -> None: + if self.local_addr_file: + print( + f"Local coordination via {self.local_addr_file}", + file=sys.stderr, + ) + return print("Connecting to Redis...", file=sys.stderr) for attempt in range(max_retries): try: @@ -470,7 +812,7 @@ async def run_listener(self) -> None: sec_opt, key_pair = self.create_security_options() muxer_opt = self.create_muxer_options() - listen_addrs = self.create_listen_addresses(0) + listen_addrs = self._perf_listen_addresses(0) tls_client = self.create_tls_client_config() tls_server = self.create_tls_server_config() @@ -479,30 +821,115 @@ async def run_listener(self) -> None: sec_opt=sec_opt, muxer_opt=muxer_opt, listen_addrs=listen_addrs, + negotiate_timeout=self.negotiate_timeout_seconds, enable_quic=(self.transport == "quic-v1"), + quic_transport_opt=self._quic_transport_config() + if self.transport == "quic-v1" + else None, tls_client_config=tls_client, tls_server_config=tls_server, + connection_config=self._connection_config(), + ) + self.perf_service = PerfService( + self.host, {"write_block_size": self.write_block_size} ) - self.perf_service = PerfService(self.host) await self.perf_service.start() + self._install_listener_perf_lifecycle_handler() print(f"Perf service started (protocol {PROTOCOL_NAME})", file=sys.stderr) - async with self.host.run(listen_addrs=listen_addrs): - all_addrs = self.host.get_addrs() - if not all_addrs: - raise RuntimeError("No listen addresses available") - actual_addr = self._get_publishable_address(all_addrs) - print(f"Publishing address: {actual_addr}", file=sys.stderr) - redis_key = f"{self.test_key}_listener_multiaddr" - assert self.redis_client is not None - self.redis_client.set(redis_key, actual_addr) - print("Listener ready, waiting for dialer...", file=sys.stderr) - await trio.sleep_forever() + listener_ready = False + try: + async with self.host.run(listen_addrs=listen_addrs): + all_addrs = self.host.get_addrs() + if not all_addrs: + raise RuntimeError("No listen addresses available") + actual_addr = self._get_publishable_address(all_addrs) + print(f"Publishing address: {actual_addr}", file=sys.stderr) + if self.local_addr_file: + Path(self.local_addr_file).write_text(actual_addr, encoding="utf-8") + else: + redis_key = f"{self.test_key}_listener_multiaddr" + assert self.redis_client is not None + self.redis_client.set(redis_key, actual_addr) + print("Listener ready, waiting for dialer...", file=sys.stderr) + _log_perf_phase( + "listener", + "ready", + test_timeout_seconds=self.test_timeout_seconds, + muxer=self.muxer, + security=self.security, + transport=self.transport, + ) + listener_ready = True + + connect_deadline = time.monotonic() + self.test_timeout_seconds + saw_peer = False + empty_peer_polls = 0 + last_peer_seen_at = time.monotonic() + teardown_idle_secs = self._listener_teardown_idle_secs() + while True: + if not saw_peer: + if self._listener_peer_present(): + saw_peer = True + empty_peer_polls = 0 + last_peer_seen_at = time.monotonic() + elif time.monotonic() >= connect_deadline: + raise RuntimeError( + f"Timeout: dialer never connected within " + f"{self.test_timeout_seconds}s" + ) + elif self.host.get_live_peers(): + empty_peer_polls = 0 + last_peer_seen_at = time.monotonic() + else: + empty_peer_polls += 1 + peer_idle_secs = time.monotonic() - last_peer_seen_at + if ( + empty_peer_polls >= _LISTENER_EMPTY_PEER_POLLS_REQUIRED + and peer_idle_secs >= teardown_idle_secs + ): + print( + "Listener: peer disconnected, shutting down", + file=sys.stderr, + ) + _log_perf_phase( + "listener", + "teardown_start", + peer_idle_secs=f"{peer_idle_secs:.1f}", + ) + await trio.sleep(self._listener_shutdown_grace_secs()) + break + + await trio.sleep(_LISTENER_PEER_POLL_INTERVAL_SECS) + + await self._stop_perf_service() + self._close_redis() + except ExceptionGroup as eg: + if listener_ready and self._should_ignore_shutdown_error(eg): + _log_ignored_shutdown_error("Listener", eg) + return + raise + except BaseException as e: + if listener_ready and self._should_ignore_shutdown_error(e): + _log_ignored_shutdown_error("Listener", e) + return + raise async def _wait_for_listener_addr(self) -> str: - redis_key = f"{self.test_key}_listener_multiaddr" timeout = min(self.test_timeout_seconds, MAX_TEST_TIMEOUT) deadline = time.monotonic() + timeout + if self.local_addr_file: + addr_path = Path(self.local_addr_file) + while time.monotonic() < deadline: + if addr_path.is_file(): + text = addr_path.read_text(encoding="utf-8").strip() + if text: + return text + await trio.sleep(0.1) + raise RuntimeError( + f"Timeout waiting for listener address in {addr_path} after {timeout}s" + ) + redis_key = f"{self.test_key}_listener_multiaddr" assert self.redis_client is not None while time.monotonic() < deadline: addr = self.redis_client.get(redis_key) @@ -541,7 +968,7 @@ async def run_dialer(self) -> None: # Dialer needs listen_addrs for ws/wss so transport is registered; # for quic/tcp pass [] (host.run still starts swarm/nursery) dialer_listen_addrs = ( - self.create_listen_addresses(0) if self.transport in ["ws", "wss"] else None + self._dialer_ws_listen_addrs() if self.transport in ["ws", "wss"] else None ) tls_client = self.create_tls_client_config() tls_server = None @@ -550,14 +977,21 @@ async def run_dialer(self) -> None: "key_pair": key_pair, "sec_opt": sec_opt, "muxer_opt": muxer_opt, + "negotiate_timeout": self.negotiate_timeout_seconds, "enable_quic": (self.transport == "quic-v1"), + "quic_transport_opt": self._quic_transport_config() + if self.transport == "quic-v1" + else None, "tls_client_config": tls_client, "tls_server_config": tls_server, + "connection_config": self._connection_config(), } if dialer_listen_addrs: kw["listen_addrs"] = dialer_listen_addrs self.host = new_host(**kw) - self.perf_service = PerfService(self.host) + self.perf_service = PerfService( + self.host, {"write_block_size": self.write_block_size} + ) await self.perf_service.start() # Must run host inside host.run() so swarm/nursery are active @@ -572,8 +1006,19 @@ async def run_dialer(self) -> None: listener_peer_id = info.peer_id await self.host.connect(info) print("Connected to listener", file=sys.stderr) + _log_perf_phase( + "dialer", + "connected", + test_timeout_seconds=self.test_timeout_seconds, + muxer=self.muxer, + security=self.security, + transport=self.transport, + ) upload_samples: list[float] = [] + _log_perf_phase( + "dialer", "upload_start", iterations=self.upload_iterations + ) for i in range(self.upload_iterations): elapsed = await self._one_measurement(self.upload_bytes, 0) gbps = ( @@ -588,6 +1033,9 @@ async def run_dialer(self) -> None: ) download_samples: list[float] = [] + _log_perf_phase( + "dialer", "download_start", iterations=self.download_iterations + ) for i in range(self.download_iterations): elapsed = await self._one_measurement(0, self.download_bytes) gbps = ( @@ -606,6 +1054,9 @@ async def run_dialer(self) -> None: elapsed = await self._one_measurement(1, 1) latency_samples.append(elapsed * 1000.0) print("Latency iterations done", file=sys.stderr) + _log_perf_phase( + "dialer", "latency_done", iterations=self.latency_iterations + ) u = _compute_stats(upload_samples, is_latency=False) d = _compute_stats(download_samples, is_latency=False) @@ -643,47 +1094,84 @@ async def run_dialer(self) -> None: print(f" samples: {lat['samples']}") print(" unit: ms") + self._benchmarks_complete = True + _log_perf_phase("dialer", "benchmarks_complete") + # Graceful close: disconnect listener so it sees a clean # close, then stop services + _log_perf_phase("dialer", "teardown_start") try: await self.host.disconnect(listener_peer_id) - await trio.sleep(0.5) except Exception as e: logger.debug("Disconnect: %s", e) - try: - await self.perf_service.stop() - except Exception as e: - logger.debug("PerfService.stop: %s", e) - if self.redis_client: - try: - self.redis_client.close() - except Exception: - pass + await trio.sleep(self._dialer_disconnect_grace_secs()) + await self._stop_perf_service() + self._close_redis() + except ExceptionGroup as eg: + if self._should_ignore_shutdown_error(eg): + _log_ignored_shutdown_error("Dialer", eg) + return + raise except BaseException as e: - # Swarm/mplex may raise "Connection closed" on disconnect; - # treat as success - if not _is_connection_closed_error(e): - raise + if self._should_ignore_shutdown_error(e): + _log_ignored_shutdown_error("Dialer", e) + return + raise async def run(self) -> None: try: - await self._connect_redis_with_retry() if self.is_dialer: await self.run_dialer() else: await self.run_listener() - except Exception as e: + except ExceptionGroup as eg: + if self._should_ignore_shutdown_error(eg): + _log_ignored_shutdown_error("Perf", eg) + return + print(f"Error: {eg}", file=sys.stderr) + import traceback + + traceback.print_exc(file=sys.stderr) + sys.exit(1) + except BaseException as e: + if self._should_ignore_shutdown_error(e): + _log_ignored_shutdown_error("Perf", e) + return print(f"Error: {e}", file=sys.stderr) import traceback traceback.print_exc(file=sys.stderr) - if self.redis_client: - self.redis_client.close() sys.exit(1) + finally: + self._close_redis() async def main() -> None: + print( + "[PERF_IMAGE_MARKER] perf-test entrypoint starting", + file=sys.stderr, + ) + sys.stderr.flush() + configure_logging() + + if _libp2p_debug_enabled(): + msg = ( + "[PERF_IMAGE_MARKER] full libp2p logging via LIBP2P_DEBUG " + f"({os.getenv('LIBP2P_DEBUG')!r})" + ) + logger.info(msg) + print(msg, file=sys.stderr) + sys.stderr.flush() + elif os.getenv("DEBUG", "").upper() in ("1", "TRUE", "YES", "DEBUG"): + msg = ( + "[PERF_IMAGE_MARKER] targeted logging (TLS/mplex/yamux at DEBUG; " + "set LIBP2P_DEBUG for full libp2p trace)" + ) + logger.info(msg) + print(msg, file=sys.stderr) + sys.stderr.flush() + test = PerfTest() await test.run() diff --git a/libp2p/stream_muxer/yamux/yamux.py b/libp2p/stream_muxer/yamux/yamux.py index e48126875..cc2a85f8c 100644 --- a/libp2p/stream_muxer/yamux/yamux.py +++ b/libp2p/stream_muxer/yamux/yamux.py @@ -10,6 +10,7 @@ ) import inspect import logging +import os import struct from types import ( TracebackType, @@ -57,6 +58,61 @@ # Configure logger for this module logger = logging.getLogger(__name__) +_PERF_YAMUX_DEBUG_LOG_INTERVAL = 500 +_DEFAULT_ASSUME_RTT_MS = 1.0 + +# Optional perf tuning via PY_YAMUX_* environment variables (read at call time). +# RELEASE_ON_READ defaults on for go-yamux-like credit release on partial reads. +# Unified-testing and scripts/perf/run_local_perf.py forward these to perf +# containers. Full variable list and examples: scripts/perf/README.md + + +def _py_yamux_env(name: str, default: str = "") -> str: + """Read PY_YAMUX_* tuning env (perf harness or local runner).""" + return os.getenv(f"PY_YAMUX_{name}", default) + + +def _py_yamux_env_set(name: str) -> bool: + return f"PY_YAMUX_{name}" in os.environ + + +def _perf_yamux_debug_enabled() -> bool: + return _py_yamux_env("DEBUG", "").upper() in ("1", "TRUE", "YES") + + +def _yamux_disable_hysteresis() -> bool: + """Debug escape hatch: treat any positive delta as a full GrowTo.""" + return _py_yamux_env("DISABLE_HYSTERESIS", "").upper() in ( + "1", + "TRUE", + "YES", + ) + + +def _yamux_release_on_read() -> bool: + """When true, send WINDOW_UPDATE for bytes_read if hysteresis defers GrowTo.""" + return _py_yamux_env("RELEASE_ON_READ", "1").upper() not in ( + "0", + "FALSE", + "NO", + ) + + +def _yamux_batch_threshold_divisor() -> int: + """GrowTo / pending batch threshold = target_recv_window // divisor (default 2).""" + raw = _py_yamux_env("BATCH_THRESHOLD_DIV", "2") + try: + div = int(raw) + except ValueError: + div = 2 + return max(div, 1) + + +def _perf_yamux_log(msg: str) -> None: + if _perf_yamux_debug_enabled(): + logger.debug("[YAMUX_PERF] %s", msg) + + PROTOCOL_ID = "/yamux/1.0.0" TYPE_DATA = 0x0 TYPE_WINDOW_UPDATE = 0x1 @@ -99,6 +155,31 @@ def __init__(self, stream_id: int, conn: "Yamux", is_initiator: bool) -> None: self.epoch_start = 0.0 # trio.current_time() of last window update self.rw_lock = ReadWriteLock() self.close_lock = trio.Lock() + self._zero_window_waits = 0 + self._window_update_hysteresis_skips = 0 + self._window_update_bytes_read_only = 0 + self._window_update_full_grow = 0 + self._window_update_pending_flush = 0 + self._window_update_partial_slack = 0 + self._pending_recv_release = 0 + + def _effective_rtt(self) -> float: + """Measured RTT, or PY_YAMUX_ASSUME_RTT_MS / default when ping RTT is 0.""" + rtt = self.conn.rtt() + if rtt > 0: + return rtt + if _py_yamux_env_set("ASSUME_RTT_MS"): + raw = _py_yamux_env("ASSUME_RTT_MS", "").strip() + try: + return max(float(raw), 0.0) / 1000.0 + except ValueError: + return 0.0 + return _DEFAULT_ASSUME_RTT_MS / 1000.0 + + def _grow_threshold(self) -> int: + if _yamux_disable_hysteresis(): + return 0 + return self.target_recv_window // _yamux_batch_threshold_divisor() async def __aenter__(self) -> "YamuxStream": """Enter the async context manager.""" @@ -150,6 +231,17 @@ async def write(self, data: bytes) -> None: if frame is not None: break + self._zero_window_waits += 1 + if ( + _perf_yamux_debug_enabled() + and self._zero_window_waits % _PERF_YAMUX_DEBUG_LOG_INTERVAL == 1 + ): + _perf_yamux_log( + f"stream={self.stream_id} zero_window_waits=" + f"{self._zero_window_waits} send_window={self.send_window} " + f"recv_window={self.recv_window} " + f"target={self.target_recv_window}" + ) logger.debug( f"Stream {self.stream_id}: Window is zero, waiting for update" ) @@ -246,55 +338,106 @@ async def _do_window_update() -> None: _ = skip_lock await _do_window_update() - async def _auto_tune_and_send_window_update(self: "YamuxStream") -> None: + def _grow_to_delta(self, buffered: int) -> int: + """Apply GrowTo slack (caller checks hysteresis threshold).""" + delta = self.target_recv_window - (self.recv_window + buffered) + if delta <= 0: + return 0 + self.recv_window += delta + return delta + + def _apply_autotune_epoch(self, buffered: int) -> int: + """Double target within 4×RTT when sending an update; return extra increment.""" + rtt = self._effective_rtt() + now = trio.current_time() + if rtt <= 0: + self.epoch_start = now + return 0 + + if self.epoch_start > 0 and (now - self.epoch_start) >= rtt * 4: + self.epoch_start = now + return 0 + + extra_delta = 0 + new_target = min(self.target_recv_window * 2, MAX_WINDOW_SIZE) + if new_target > self.target_recv_window: + self.target_recv_window = new_target + new_current = self.recv_window + buffered + grow = self.target_recv_window - new_current + if grow > 0: + self.recv_window += grow + extra_delta = grow + + self.epoch_start = now + logger.debug( + f"Stream {self.stream_id}: Auto-tune extra_delta={extra_delta}, " + f"target={self.target_recv_window}" + ) + return extra_delta + + async def _auto_tune_and_send_window_update( + self: "YamuxStream", *, bytes_read: int = 0 + ) -> None: """ - Auto-tune receive window size based on RTT and send window update. + Auto-tune receive window size and send WINDOW_UPDATE (go-yamux semantics). - Ports go-yamux's two-pass GrowTo + sendWindowUpdate logic: - - Pass 1: GrowTo(current_target) — restore window to current target - - Auto-tune: if within 4x RTT of last epoch, double the target - - Pass 2: GrowTo(new_target, force=True) — grow to new target - - Only the final delta is sent to the peer (matches go-yamux behavior) + Mirrors go-yamux ``sendWindowUpdate`` + ``GrowTo`` hysteresis (half window), + with ``_pending_recv_release`` so several smaller reads batch into one + large update like repeated ``Read()`` calls in Go. """ - total_delta: int + total_delta = 0 async with self.window_lock: - # Match go-yamux GrowTo: currentWindow = cap + len - buffered = len(self.conn.stream_buffers.get(self.stream_id, b"")) - current_window = self.recv_window + buffered - - # Pass 1: GrowTo(target_recv_window) — like go's first GrowTo call - delta = self.target_recv_window - current_window - if delta <= 0: - return - # Hysteresis: skip if delta < 50% of target (matches go-yamux GrowTo) - if delta < self.target_recv_window // 2: - return - # Apply first pass growth to recv_window (like go's cap += delta) - self.recv_window += delta - - # Auto-tune: if within 4x RTT of last epoch, double the target - now = trio.current_time() - rtt = self.conn.rtt() - if rtt > 0 and self.epoch_start > 0 and (now - self.epoch_start) < rtt * 4: - new_target = min(self.target_recv_window * 2, MAX_WINDOW_SIZE) - if new_target > self.target_recv_window: - self.target_recv_window = new_target - # Pass 2: GrowTo(new_target, force=True) — incremental - # Recompute current_window after pass 1 growth - new_current = self.recv_window + buffered - extra_delta = self.target_recv_window - new_current - if extra_delta > 0: - self.recv_window += extra_delta - delta += extra_delta # Send total delta (pass 1 + pass 2) - - self.epoch_start = now - logger.debug( - f"Stream {self.stream_id}: Auto-tune window update " - f"delta={delta}, target={self.target_recv_window}" - ) - total_delta = delta + if bytes_read > 0: + self._pending_recv_release += bytes_read - await self.send_window_update(total_delta, skip_lock=True) + buffered = len(self.conn.stream_buffers.get(self.stream_id, b"")) + delta = self.target_recv_window - (self.recv_window + buffered) + threshold = self._grow_threshold() + pending = self._pending_recv_release + + if delta >= threshold and delta > 0: + self._window_update_full_grow += 1 + total_delta = self._grow_to_delta(buffered) + total_delta += self._apply_autotune_epoch(buffered) + self._pending_recv_release = 0 + elif pending >= threshold: + self.recv_window += pending + total_delta = pending + self._pending_recv_release = 0 + self._window_update_pending_flush += 1 + total_delta += self._apply_autotune_epoch(buffered) + elif 0 < delta < threshold and bytes_read > 0: + self._window_update_partial_slack += 1 + self.recv_window += delta + bytes_read + total_delta = delta + bytes_read + self._pending_recv_release -= bytes_read + total_delta += self._apply_autotune_epoch(buffered) + elif _yamux_release_on_read() and bytes_read > 0: + self.recv_window += bytes_read + total_delta = bytes_read + self._pending_recv_release -= bytes_read + self._window_update_bytes_read_only += 1 + total_delta += self._apply_autotune_epoch(buffered) + elif delta < threshold and bytes_read == 0: + if _perf_yamux_debug_enabled(): + self._window_update_hysteresis_skips += 1 + + if total_delta > 0: + await self.send_window_update(total_delta, skip_lock=True) + + def _log_perf_yamux_summary(self) -> None: + if not _perf_yamux_debug_enabled(): + return + _perf_yamux_log( + f"stream={self.stream_id} summary " + f"zero_window_waits={self._zero_window_waits} " + f"hysteresis_skips={self._window_update_hysteresis_skips} " + f"bytes_read_updates={self._window_update_bytes_read_only} " + f"partial_slack={self._window_update_partial_slack} " + f"full_grow_updates={self._window_update_full_grow} " + f"pending_flush={self._window_update_pending_flush} " + f"target={self.target_recv_window}" + ) async def read(self, n: int | None = -1) -> bytes: """ @@ -344,14 +487,14 @@ async def read(self, n: int | None = -1) -> bytes: return data raise MuxedStreamEOF("Stream buffer closed") - # If we have data in buffer, process it - if len(buffer) > 0: - chunk = bytes(buffer) - buffer.clear() + # If we have data in buffer, process in MAX_MESSAGE_SIZE slices + # (like bounded Read in go-yamux) so hysteresis can batch credit. + while len(buffer) > 0: + take = min(len(buffer), MAX_MESSAGE_SIZE) + chunk = bytes(buffer[:take]) + del buffer[:take] data += chunk - - # Auto-tune and send window update for the chunk we just read - await self._auto_tune_and_send_window_update() + await self._auto_tune_and_send_window_update(bytes_read=len(chunk)) # Check for reset if self.reset_received: @@ -396,10 +539,11 @@ async def read(self, n: int | None = -1) -> bytes: return b"" else: data = await self.conn.read_stream(self.stream_id, n) - await self._auto_tune_and_send_window_update() + await self._auto_tune_and_send_window_update(bytes_read=len(data)) return data async def close(self) -> None: + self._log_perf_yamux_summary() async with self.close_lock: if not self.send_closed: logger.debug(f"Half-closing stream {self.stream_id} (local end)") diff --git a/newsfragments/1344.bugfix.rst b/newsfragments/1344.bugfix.rst new file mode 100644 index 000000000..92bd7401e --- /dev/null +++ b/newsfragments/1344.bugfix.rst @@ -0,0 +1 @@ +Fixed Python perf interop teardown for tls+mplex and listener lifecycle so slow ws+yamux benchmarks complete without false failures during shutdown. diff --git a/newsfragments/1344.performance.rst b/newsfragments/1344.performance.rst new file mode 100644 index 000000000..ea679835e --- /dev/null +++ b/newsfragments/1344.performance.rst @@ -0,0 +1 @@ +Improved yamux receive-window updates during partial reads to reduce round-trips in large perf transfers. diff --git a/scripts/perf/README.md b/scripts/perf/README.md new file mode 100644 index 000000000..642aed0a5 --- /dev/null +++ b/scripts/perf/README.md @@ -0,0 +1,110 @@ +# Local Python perf runner + +`run_local_perf.py` runs the same `interop/perf/perf_test.py` binary as the unified-testing Docker perf harness, but coordinates listener and dialer with a temp file (`PERF_LOCAL_ADDR_FILE`) instead of Redis. + +## Setup + +From the py-libp2p repo root (the script auto-uses `.venv/bin/python` when present): + +```bash +python -m venv .venv +uv pip install --python .venv/bin/python -e . +uv pip install --python .venv/bin/python redis cryptography +chmod +x scripts/perf/run_local_perf.py +``` + +`redis` is only used when `PERF_LOCAL_ADDR_FILE` is unset; local runs still import it today. + +## Usage + +```bash +# Default: tcp + noise + yamux +./scripts/perf/run_local_perf.py + +# All 8 python self-test stacks +./scripts/perf/run_local_perf.py --matrix + +# One matrix row +./scripts/perf/run_local_perf.py --stack-index 0 + +# Faster iteration +./scripts/perf/run_local_perf.py --quick -t tcp -s noise -m yamux + +# Yamux tuning (forwarded from the shell) +PY_YAMUX_DISABLE_HYSTERESIS=1 ./scripts/perf/run_local_perf.py +PY_YAMUX_ASSUME_RTT_MS=1 PY_YAMUX_BATCH_THRESHOLD_DIV=2 ./scripts/perf/run_local_perf.py +PERF_WRITE_BLOCK_SIZE=65536 ./scripts/perf/run_local_perf.py --debug + +# Full libp2p trace (slow on yamux; use for deep debugging only) +LIBP2P_DEBUG=DEBUG ./scripts/perf/run_local_perf.py --quick -t tcp -s noise -m yamux +LIBP2P_DEBUG=libp2p.stream_muxer.yamux:DEBUG ./scripts/perf/run_local_perf.py --quick +``` + +Dialer YAML results are printed to stdout; a summary table is printed to stderr at the end (median upload/download/latency per stack). + +## Logging + +`perf_test.py` supports three levels (mutually prioritized): + +| Mode | How to enable | What you get | +| -------------------- | ----------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------ | +| **Default** | (nothing) | Minimal stderr; suitable for throughput runs | +| **Targeted interop** | `DEBUG=true`, `./run.sh --debug`, or `run_local_perf.py --debug` | DEBUG on perf/TLS/mplex/yamux loggers; core `libp2p` stays capped. Also sets `PY_YAMUX_DEBUG=1` locally / on python Docker sides | +| **Full libp2p** | `LIBP2P_DEBUG=…` in the environment **before** the process starts | Standard py-libp2p logging (`libp2p.utils.logging` at import). Perf does not cap levels. **Very verbose and slow** on yamux 1GB runs | + +`LIBP2P_DEBUG` uses the same syntax as elsewhere in py-libp2p (see `docs/getting_started.rst`), for example: + +- `LIBP2P_DEBUG=DEBUG` — all libp2p modules +- `LIBP2P_DEBUG=stream_muxer.yamux:DEBUG,transport:INFO` — module-specific levels +- `LIBP2P_DEBUG_FILE=/path/to/log` — optional file sink (with `LIBP2P_DEBUG` set) + +When both are set, **`LIBP2P_DEBUG` wins** (full logging); `DEBUG=true` alone does not imply `LIBP2P_DEBUG`. + +For Docker perf: export on the host before `./run.sh`, e.g. `LIBP2P_DEBUG=DEBUG ./run.sh --impl-select python --yes` (not recommended for throughput benchmarking). + +## Environment variables + +| Variable | Role | +| ---------------------------------------------------------------- | ----------------------------------------------------------- | +| `TRANSPORT`, `SECURE_CHANNEL`, `MUXER` | Stack (CLI overrides defaults) | +| `IS_DIALER` | Set by the script (`true` / `false`) | +| `PERF_LOCAL_ADDR_FILE` | Set by the script (coordination file) | +| `UPLOAD_BYTES`, `DOWNLOAD_BYTES` | Payload sizes (dialer) | +| `UPLOAD_ITERATIONS`, `DOWNLOAD_ITERATIONS`, `LATENCY_ITERATIONS` | Iteration counts | +| `TEST_TIMEOUT_SECS` | In-process wait for listener addr / peer (inside perf_test) | +| `PERF_TEST_TIMEOUT_SECS` | Harness/subprocess timeout (same as `run.sh --timeout`) | +| `PERF_WRITE_BLOCK_SIZE` | Perf service write block size | +| `PY_YAMUX_*` | Python yamux hysteresis / autotune knobs (see below) | +| `DEBUG` | Targeted interop perf logging (see [Logging](#logging)) | +| `LIBP2P_DEBUG`, `LIBP2P_DEBUG_FILE` | Full py-libp2p logging (see [Logging](#logging)) | + +### `PY_YAMUX_*` tuning (python only) + +| Variable | Default in code | Purpose | +| ------------------------------ | -------------------------------- | ----------------------------------------------------------- | +| `PY_YAMUX_RELEASE_ON_READ` | on (`1`) | Release window credit on read when hysteresis defers GrowTo | +| `PY_YAMUX_ASSUME_RTT_MS` | `1` when unset and ping RTT is 0 | Bootstrap autotune before measured RTT | +| `PY_YAMUX_BATCH_THRESHOLD_DIV` | `2` | GrowTo threshold = target // divisor | +| `PY_YAMUX_DISABLE_HYSTERESIS` | off | Treat any positive delta as full GrowTo | +| `PY_YAMUX_DEBUG` | off | Targeted yamux perf logs | + +#### Example scenarios + +| Goal | Command | +| -------------------------------------- | -------------------------------------------------------------------------------------------- | +| Baseline throughput (default autotune) | `./scripts/perf/run_local_perf.py --quick -t tcp -s noise -m yamux` | +| Compare without hysteresis | `PY_YAMUX_DISABLE_HYSTERESIS=1 ./scripts/perf/run_local_perf.py --quick` | +| WAN-like bootstrap before ping RTT | `PY_YAMUX_ASSUME_RTT_MS=50 ./scripts/perf/run_local_perf.py --quick` | +| Larger / earlier GrowTo batches | `PY_YAMUX_BATCH_THRESHOLD_DIV=1 ./scripts/perf/run_local_perf.py --quick` | +| Defer read-side WINDOW_UPDATE | `PY_YAMUX_RELEASE_ON_READ=0 ./scripts/perf/run_local_perf.py --quick` | +| Window/autotune traces (low overhead) | `PY_YAMUX_DEBUG=1 ./scripts/perf/run_local_perf.py --quick` or `--debug` | +| Full yamux module trace (slow) | `LIBP2P_DEBUG=stream_muxer.yamux:DEBUG ./scripts/perf/run_local_perf.py --quick` | +| Docker harness, same tuning | `PY_YAMUX_ASSUME_RTT_MS=1 ./perf/run.sh --impl-select python --yes -t tcp -s noise -m yamux` | + +Combine knobs as needed, e.g. `PY_YAMUX_ASSUME_RTT_MS=10 PY_YAMUX_BATCH_THRESHOLD_DIV=1 ./scripts/perf/run_local_perf.py --matrix`. + +Subprocess timeout: `--timeout SECS`, or `PERF_TEST_TIMEOUT_SECS` / `TEST_TIMEOUT_SECS` from the environment. + +## Matrix stacks (`--list-stacks`) + +Same as `python-v0.x` in unified-testing `perf/images.yaml`: tcp/ws × noise/tls × yamux/mplex (8 combinations). diff --git a/scripts/perf/run_local_perf.py b/scripts/perf/run_local_perf.py new file mode 100755 index 000000000..9551e95b8 --- /dev/null +++ b/scripts/perf/run_local_perf.py @@ -0,0 +1,574 @@ +#!/usr/bin/env python3 +r""" +Run Python↔Python perf tests locally (no Docker, no Redis). + +Spawns listener + dialer subprocesses using interop/perf/perf_test.py with +PERF_LOCAL_ADDR_FILE for address handoff. Matches perf harness env vars where +possible so yamux/mplex tuning can be iterated quickly. + +Examples: + # tcp + noise + yamux (default stack) + ./scripts/perf/run_local_perf.py + + # Full python self-matrix (8 stacks from perf/images.yaml) + ./scripts/perf/run_local_perf.py --matrix + + # Shorter run for debugging + UPLOAD_BYTES=67108864 UPLOAD_ITERATIONS=2 ./scripts/perf/run_local_perf.py --quick + + # Yamux A/B + PY_YAMUX_DISABLE_HYSTERESIS=1 ./scripts/perf/run_local_perf.py \\ + -t tcp -s noise -m yamux + PY_YAMUX_ASSUME_RTT_MS=1 PY_YAMUX_BATCH_THRESHOLD_DIV=2 \\ + ./scripts/perf/run_local_perf.py + +From repo root (editable install recommended): + pip install -e . + python scripts/perf/run_local_perf.py --matrix + +""" + +from __future__ import annotations + +import argparse +from collections.abc import Mapping, Sequence +from dataclasses import dataclass +import os +from pathlib import Path +import subprocess +import sys +import tempfile +import time + +# scripts/perf -> repo root +REPO_ROOT = Path(__file__).resolve().parents[2] +PERF_ENTRY = REPO_ROOT / "interop" / "perf" / "perf_test.py" + + +def _python_executable() -> str: + """Prefer repo .venv when present (avoids system Python missing deps).""" + venv_py = REPO_ROOT / ".venv" / "bin" / "python" + if venv_py.is_file(): + return str(venv_py) + return sys.executable + + +# Same stacks as python-v0.x in unified-testing perf/images.yaml +# (tcp/ws × noise/tls × yamux/mplex) +PYTHON_PERF_STACKS: tuple[tuple[str, str | None, str | None], ...] = ( + ("tcp", "noise", "yamux"), + ("tcp", "noise", "mplex"), + ("tcp", "tls", "yamux"), + ("tcp", "tls", "mplex"), + ("ws", "noise", "yamux"), + ("ws", "noise", "mplex"), + ("ws", "tls", "yamux"), + ("ws", "tls", "mplex"), +) + +# Passed through from the parent environment when set (perf harness / shell exports). +PASSTHROUGH_ENV_KEYS: tuple[str, ...] = ( + "DEBUG", + "LIBP2P_DEBUG", + "LIBP2P_DEBUG_FILE", + "PY_YAMUX_DEBUG", + "PY_YAMUX_DISABLE_HYSTERESIS", + "PY_YAMUX_RELEASE_ON_READ", + "PY_YAMUX_ASSUME_RTT_MS", + "PY_YAMUX_BATCH_THRESHOLD_DIV", + "PERF_WRITE_BLOCK_SIZE", + "UPLOAD_BYTES", + "DOWNLOAD_BYTES", + "UPLOAD_ITERATIONS", + "DOWNLOAD_ITERATIONS", + "LATENCY_ITERATIONS", + "TEST_TIMEOUT_SECS", + "DIAL_TIMEOUT_SECS", + "UPGRADE_TIMEOUT_SECS", + "STREAM_NEGOTIATE_TIMEOUT_SECS", + "NEGOTIATE_TIMEOUT_SECS", + "QUIC_CONNECTION_TIMEOUT_SECS", + "QUIC_IDLE_TIMEOUT_SECS", + "LISTENER_IP", +) + +QUICK_DEFAULTS: Mapping[str, str] = { + "UPLOAD_BYTES": "67108864", + "DOWNLOAD_BYTES": "67108864", + "UPLOAD_ITERATIONS": "2", + "DOWNLOAD_ITERATIONS": "2", + "LATENCY_ITERATIONS": "10", +} + + +@dataclass(frozen=True) +class MetricSummary: + min: float + median: float + max: float + unit: str + + def format_median(self) -> str: + if self.unit == "ms": + return f"{self.median:.3f} {self.unit}" + return f"{self.median:.2f} {self.unit}" + + +@dataclass +class TestResult: + stack: Stack + rc: int + elapsed_secs: float + upload: MetricSummary | None = None + download: MetricSummary | None = None + latency: MetricSummary | None = None + + @property + def passed(self) -> bool: + return self.rc == 0 + + @property + def status(self) -> str: + if self.rc == 0: + return "PASS" + if self.rc == 124: + return "TIMEOUT" + return f"FAIL({self.rc})" + + +@dataclass(frozen=True) +class Stack: + transport: str + secure: str | None + muxer: str | None + + @property + def label(self) -> str: + parts = [self.transport] + if self.secure: + parts.append(self.secure) + if self.muxer: + parts.append(self.muxer) + return "+".join(parts) + + +def _parse_stack(transport: str, secure: str | None, muxer: str | None) -> Stack: + if transport == "quic-v1": + return Stack(transport, secure or None, muxer or None) + if not secure or not muxer: + raise SystemExit( + f"{transport} requires --secure and --muxer (e.g. -t tcp -s noise -m yamux)" + ) + return Stack(transport, secure, muxer) + + +def _build_role_env( + *, + stack: Stack, + is_dialer: bool, + addr_file: Path, + test_key: str, + debug: bool, + extra: Mapping[str, str], +) -> dict[str, str]: + env = {k: v for k, v in os.environ.items() if k in PASSTHROUGH_ENV_KEYS and v} + env.update(extra) + env.update( + { + "IS_DIALER": "true" if is_dialer else "false", + "PERF_LOCAL_ADDR_FILE": str(addr_file), + "TEST_KEY": test_key, + "TRANSPORT": stack.transport, + "LISTENER_IP": env.get("LISTENER_IP", "0.0.0.0"), + "DEBUG": "true" if debug else env.get("DEBUG", "false"), + } + ) + if stack.secure: + env["SECURE_CHANNEL"] = stack.secure + if stack.muxer: + env["MUXER"] = stack.muxer + if debug: + env["PY_YAMUX_DEBUG"] = "1" + if is_dialer: + env.setdefault("UPLOAD_BYTES", "1073741824") + env.setdefault("DOWNLOAD_BYTES", "1073741824") + env.setdefault("UPLOAD_ITERATIONS", "10") + env.setdefault("DOWNLOAD_ITERATIONS", "10") + env.setdefault("LATENCY_ITERATIONS", "100") + env.setdefault("TEST_TIMEOUT_SECS", "300") + return env + + +def _parse_perf_stdout( + stdout: str, +) -> tuple[MetricSummary | None, MetricSummary | None, MetricSummary | None]: + """Parse upload/download/latency blocks from perf_test dialer stdout.""" + + def _section(name: str) -> MetricSummary | None: + in_section = False + values: dict[str, str] = {} + for line in stdout.splitlines(): + if line == f"{name}:": + in_section = True + values = {} + continue + if in_section: + if not line.startswith(" "): + break + key, _, raw = line.strip().partition(": ") + values[key] = raw + if "median" not in values or "unit" not in values: + return None + return MetricSummary( + min=float(values.get("min", values["median"])), + median=float(values["median"]), + max=float(values.get("max", values["median"])), + unit=values["unit"], + ) + + return _section("upload"), _section("download"), _section("latency") + + +def _format_duration(secs: float) -> str: + if secs < 60: + return f"{secs:.1f}s" + minutes, remainder = divmod(int(secs), 60) + if remainder: + return f"{minutes}m{remainder}s" + return f"{minutes}m" + + +def _active_yamux_env() -> list[str]: + keys = ( + "PY_YAMUX_DISABLE_HYSTERESIS", + "PY_YAMUX_RELEASE_ON_READ", + "PY_YAMUX_ASSUME_RTT_MS", + "PY_YAMUX_BATCH_THRESHOLD_DIV", + "PY_YAMUX_DEBUG", + "PERF_WRITE_BLOCK_SIZE", + ) + return [f"{k}={os.environ[k]}" for k in keys if os.environ.get(k)] + + +def _print_summary(results: list[TestResult]) -> None: + passed = sum(1 for r in results if r.passed) + total = len(results) + total_elapsed = sum(r.elapsed_secs for r in results) + + print("", file=sys.stderr) + print("=" * 88, file=sys.stderr) + print( + f" PERF SUMMARY — {passed}/{total} passed, " + f"total {_format_duration(total_elapsed)}", + file=sys.stderr, + ) + yamux_env = _active_yamux_env() + if yamux_env: + print(f" tuning: {', '.join(yamux_env)}", file=sys.stderr) + print("=" * 88, file=sys.stderr) + + label_w = max(len(r.stack.label) for r in results) + label_w = max(label_w, len("Stack")) + header = ( + f"{'Stack':<{label_w}} {'Status':<8} {'Time':>7} " + f"{'Upload':>12} {'Download':>12} {'Latency':>12}" + ) + print(header, file=sys.stderr) + print("-" * len(header), file=sys.stderr) + + for r in results: + upload = r.upload.format_median() if r.upload else "—" + download = r.download.format_median() if r.download else "—" + latency = r.latency.format_median() if r.latency else "—" + print( + f"{r.stack.label:<{label_w}} {r.status:<8} " + f"{_format_duration(r.elapsed_secs):>7} " + f"{upload:>12} {download:>12} {latency:>12}", + file=sys.stderr, + ) + + print("=" * 88, file=sys.stderr) + + for r in results: + if not r.passed: + continue + if r.upload and r.download and r.latency: + print( + f" {r.stack.label}: upload {r.upload.min:.2f}–{r.upload.max:.2f} " + f"{r.upload.unit} (median {r.upload.median:.2f}), " + f"download {r.download.min:.2f}–{r.download.max:.2f} " + f"{r.download.unit} (median {r.download.median:.2f}), " + f"latency {r.latency.min:.3f}–{r.latency.max:.3f} " + f"{r.latency.unit} (median {r.latency.median:.3f})", + file=sys.stderr, + ) + print("", file=sys.stderr) + + +def _run_subprocess( + role: str, + env: dict[str, str], + *, + timeout: float | None, + log_dir: Path | None, +) -> subprocess.Popen[str]: + if log_dir: + err_path = log_dir / f"{role}.stderr" + err_f = err_path.open("w", encoding="utf-8") + else: + err_f = None + cmd = [_python_executable(), str(PERF_ENTRY)] + print(f"[run_local_perf] starting {role}: {' '.join(cmd)}", file=sys.stderr) + return subprocess.Popen( + cmd, + env=env, + stdout=subprocess.PIPE if role == "dialer" else None, + stderr=err_f if err_f else None, + text=True, + ) + + +def run_one_stack( + stack: Stack, + *, + timeout_secs: int, + debug: bool, + quick: bool, + log_dir: Path | None, + extra_env: Mapping[str, str], +) -> TestResult: + started = time.monotonic() + if not PERF_ENTRY.is_file(): + print(f"perf entry not found: {PERF_ENTRY}", file=sys.stderr) + return TestResult(stack=stack, rc=1, elapsed_secs=0.0) + + overrides = dict(extra_env) + if quick: + for k, v in QUICK_DEFAULTS.items(): + overrides.setdefault(k, v) + + with tempfile.TemporaryDirectory(prefix="py-libp2p-perf-") as tmp: + addr_file = Path(tmp) / "listener_multiaddr" + addr_file.touch() + test_key = "local" + + listener_env = _build_role_env( + stack=stack, + is_dialer=False, + addr_file=addr_file, + test_key=test_key, + debug=debug, + extra=overrides, + ) + dialer_env = _build_role_env( + stack=stack, + is_dialer=True, + addr_file=addr_file, + test_key=test_key, + debug=debug, + extra=overrides, + ) + + listener = _run_subprocess( + "listener", listener_env, timeout=None, log_dir=log_dir + ) + time.sleep(0.5) + dialer = _run_subprocess( + "dialer", dialer_env, timeout=timeout_secs, log_dir=log_dir + ) + + dialer_rc = 1 + dialer_stdout = "" + try: + assert dialer.stdout is not None + dialer_stdout, _ = dialer.communicate(timeout=timeout_secs) + dialer_rc = dialer.returncode or 0 + except subprocess.TimeoutExpired: + dialer.kill() + print( + f"[run_local_perf] dialer timed out after {timeout_secs}s", + file=sys.stderr, + ) + dialer_rc = 124 + finally: + listener.terminate() + try: + listener.wait(timeout=30) + except subprocess.TimeoutExpired: + listener.kill() + listener.wait(timeout=10) + + if dialer_stdout: + print(dialer_stdout, end="" if dialer_stdout.endswith("\n") else "\n") + + upload, download, latency = _parse_perf_stdout(dialer_stdout) + elapsed = time.monotonic() - started + result = TestResult( + stack=stack, + rc=dialer_rc, + elapsed_secs=elapsed, + upload=upload, + download=download, + latency=latency, + ) + line = f"[run_local_perf] {stack.label}: {result.status}" + if upload: + line += f" — upload {upload.format_median()}" + if download: + line += f", download {download.format_median()}" + if latency: + line += f", latency {latency.format_median()}" + line += f" ({_format_duration(elapsed)})" + print(line, file=sys.stderr) + return result + + +def _stacks_from_args(args: argparse.Namespace) -> list[Stack]: + if args.matrix: + return [Stack(t, s, m) for t, s, m in PYTHON_PERF_STACKS] + if args.stack_index is not None: + idx = args.stack_index + if idx < 0 or idx >= len(PYTHON_PERF_STACKS): + raise SystemExit(f"--stack-index must be 0..{len(PYTHON_PERF_STACKS) - 1}") + t, s, m = PYTHON_PERF_STACKS[idx] + return [_parse_stack(t, s, m)] + return [ + _parse_stack( + args.transport, + args.secure, + args.muxer, + ) + ] + + +def _check_imports() -> None: + py = _python_executable() + probe = subprocess.run( + [py, "-c", "import multiaddr, redis, trio, libp2p"], + capture_output=True, + text=True, + ) + if probe.returncode != 0: + print(probe.stderr or probe.stdout, file=sys.stderr) + raise SystemExit( + "Missing perf dependencies. From repo root:\n" + " uv pip install --python .venv/bin/python -e .\n" + " uv pip install --python .venv/bin/python redis cryptography" + ) + + +def main(argv: Sequence[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "-t", + "--transport", + default="tcp", + help="TRANSPORT (default: tcp). Use with -s/-m unless --matrix", + ) + parser.add_argument( + "-s", + "--secure", + default="noise", + help="SECURE_CHANNEL (default: noise)", + ) + parser.add_argument( + "-m", + "--muxer", + default="yamux", + help="MUXER (default: yamux)", + ) + parser.add_argument( + "--matrix", + action="store_true", + help="Run all 8 python-v0.x perf stacks (tcp/ws × noise/tls × yamux/mplex)", + ) + parser.add_argument( + "--stack-index", + type=int, + metavar="N", + help=f"Run PYTHON_PERF_STACKS[N] (0..{len(PYTHON_PERF_STACKS) - 1})", + ) + parser.add_argument( + "--list-stacks", + action="store_true", + help="Print matrix indices and exit", + ) + parser.add_argument( + "--quick", + action="store_true", + help="Smaller UPLOAD/DOWNLOAD bytes and fewer iterations (see QUICK_DEFAULTS)", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Set DEBUG=true (enables targeted libp2p loggers in perf_test)", + ) + parser.add_argument( + "--timeout", + type=int, + default=None, + metavar="SECS", + help="Subprocess timeout for dialer (default: TEST_TIMEOUT_SECS or 300)", + ) + parser.add_argument( + "--log-dir", + type=Path, + default=None, + help="Write listener/dialer stderr to this directory", + ) + parser.add_argument( + "--fail-fast", + action="store_true", + help="Stop --matrix on first failure", + ) + args = parser.parse_args(list(argv) if argv is not None else None) + + if args.list_stacks: + for i, (t, s, m) in enumerate(PYTHON_PERF_STACKS): + print(f"{i}: {t}+{s}+{m}") + return 0 + + _check_imports() + + if args.matrix and ( + args.transport != "tcp" or args.secure != "noise" or args.muxer != "yamux" + ): + print( + "Note: -t/-s/-m are ignored with --matrix", + file=sys.stderr, + ) + + stacks = _stacks_from_args(args) + timeout = args.timeout + if timeout is None: + raw = os.getenv("TEST_TIMEOUT_SECS") or os.getenv("PERF_TEST_TIMEOUT_SECS") + timeout = int(raw) if raw else 300 + + failures = 0 + results: list[TestResult] = [] + for stack in stacks: + print(f"\n=== {stack.label} ===", file=sys.stderr) + result = run_one_stack( + stack, + timeout_secs=timeout, + debug=args.debug, + quick=args.quick, + log_dir=args.log_dir, + extra_env={}, + ) + results.append(result) + if not result.passed: + failures += 1 + if args.fail_fast: + break + + sys.stdout.flush() + _print_summary(results) + return 1 if failures else 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/core/stream_muxer/yamux/test_yamux_growto_hysteresis.py b/tests/core/stream_muxer/yamux/test_yamux_growto_hysteresis.py new file mode 100644 index 000000000..e5662a893 --- /dev/null +++ b/tests/core/stream_muxer/yamux/test_yamux_growto_hysteresis.py @@ -0,0 +1,191 @@ +""" +Unit tests for Go-like GrowTo hysteresis and pending recv release batching. +""" + +import os +from unittest.mock import AsyncMock, Mock + +import pytest + +from libp2p.stream_muxer.yamux.yamux import ( + DEFAULT_WINDOW_SIZE, + MAX_WINDOW_SIZE, + YamuxStream, +) + +PERF_CHUNK = 65500 # legacy interop perf read size +READ_CHUNK = 64 * 1024 # two reads cross half-window (131072) +HALF_WINDOW = DEFAULT_WINDOW_SIZE // 2 + +_PY_YAMUX_ENV_KEYS = ( + "PY_YAMUX_DISABLE_HYSTERESIS", + "PY_YAMUX_RELEASE_ON_READ", + "PY_YAMUX_ASSUME_RTT_MS", + "PY_YAMUX_BATCH_THRESHOLD_DIV", + "PY_YAMUX_DEBUG", +) + + +def _clear_py_yamux_env() -> None: + for key in _PY_YAMUX_ENV_KEYS: + os.environ.pop(key, None) + + +def _make_stream( + *, + recv_window: int = DEFAULT_WINDOW_SIZE, + target_recv_window: int = DEFAULT_WINDOW_SIZE, + buffered: int = 0, + rtt: float = 0.0, +) -> tuple[YamuxStream, AsyncMock]: + mock_conn = Mock() + mock_conn.stream_buffers = {1: bytearray(b"x" * buffered)} + mock_conn.rtt = Mock(return_value=rtt) + mock_conn._write_frame = AsyncMock() + stream = YamuxStream(1, mock_conn, is_initiator=True) + stream.recv_window = recv_window + stream.target_recv_window = target_recv_window + send_update = AsyncMock() + stream.send_window_update = send_update + return stream, send_update + + +@pytest.mark.trio +async def test_release_on_read_sends_credit_when_hysteresis_defers_grow() -> None: + """Single read below half-window sends at least per-read credit (default).""" + _clear_py_yamux_env() + os.environ["PY_YAMUX_RELEASE_ON_READ"] = "1" + stream, send_update = _make_stream() + await stream._auto_tune_and_send_window_update(bytes_read=PERF_CHUNK) + send_update.assert_awaited_once() + call = send_update.await_args + assert call is not None + assert call.args[0] >= PERF_CHUNK + assert stream._pending_recv_release == 0 + + +@pytest.mark.trio +async def test_autotune_on_release_doubles_target() -> None: + """Branch C runs auto-tune when effective RTT is available.""" + _clear_py_yamux_env() + os.environ["PY_YAMUX_RELEASE_ON_READ"] = "1" + stream, send_update = _make_stream(rtt=0.001) + stream.epoch_start = 0.0 + await stream._auto_tune_and_send_window_update(bytes_read=PERF_CHUNK) + send_update.assert_awaited_once() + call = send_update.await_args + assert call is not None + assert call.args[0] > PERF_CHUNK + assert stream.target_recv_window == 2 * DEFAULT_WINDOW_SIZE + + +@pytest.mark.trio +async def test_partial_slack_includes_autotune_extra() -> None: + """Partial slack branch reclaims delta and may add auto-tune increment.""" + _clear_py_yamux_env() + os.environ["PY_YAMUX_RELEASE_ON_READ"] = "0" + slack = HALF_WINDOW // 2 + stream, send_update = _make_stream( + recv_window=DEFAULT_WINDOW_SIZE - slack - PERF_CHUNK, + buffered=0, + rtt=0.001, + ) + stream.epoch_start = 0.0 + await stream._auto_tune_and_send_window_update(bytes_read=PERF_CHUNK) + send_update.assert_awaited_once() + call = send_update.await_args + assert call is not None + assert call.args[0] >= slack + PERF_CHUNK + assert stream._window_update_partial_slack == 1 + + +@pytest.mark.trio +async def test_pending_flush_after_two_reads() -> None: + """Two 64KiB reads batch into one >= half-window WINDOW_UPDATE.""" + _clear_py_yamux_env() + os.environ["PY_YAMUX_RELEASE_ON_READ"] = "0" + stream, send_update = _make_stream() + await stream._auto_tune_and_send_window_update(bytes_read=READ_CHUNK) + assert send_update.await_count == 0 + assert stream._pending_recv_release == READ_CHUNK + + await stream._auto_tune_and_send_window_update(bytes_read=READ_CHUNK) + send_update.assert_awaited_once() + call = send_update.await_args + assert call is not None + assert call.args[0] >= HALF_WINDOW + assert stream._window_update_pending_flush == 1 + + +@pytest.mark.trio +async def test_full_grow_when_delta_exceeds_threshold() -> None: + """Large gap to target triggers full GrowTo (branch A).""" + _clear_py_yamux_env() + stream, send_update = _make_stream( + recv_window=0, + target_recv_window=DEFAULT_WINDOW_SIZE, + buffered=0, + ) + await stream._auto_tune_and_send_window_update(bytes_read=0) + send_update.assert_awaited_once() + call = send_update.await_args + assert call is not None + assert call.args[0] >= HALF_WINDOW + assert stream._window_update_full_grow == 1 + assert stream._pending_recv_release == 0 + + +@pytest.mark.trio +async def test_disable_hysteresis_forces_full_grow() -> None: + """PY_YAMUX_DISABLE_HYSTERESIS sends large update on small positive delta.""" + _clear_py_yamux_env() + os.environ["PY_YAMUX_DISABLE_HYSTERESIS"] = "1" + stream, send_update = _make_stream( + recv_window=DEFAULT_WINDOW_SIZE - PERF_CHUNK, + target_recv_window=DEFAULT_WINDOW_SIZE, + buffered=0, + ) + await stream._auto_tune_and_send_window_update(bytes_read=PERF_CHUNK) + send_update.assert_awaited_once() + call = send_update.await_args + assert call is not None + assert call.args[0] >= PERF_CHUNK + + +@pytest.mark.trio +async def test_assume_rtt_ms_enables_autotune_without_ping() -> None: + """PY_YAMUX_ASSUME_RTT_MS bootstraps auto-tune when conn.rtt() is zero.""" + _clear_py_yamux_env() + os.environ["PY_YAMUX_ASSUME_RTT_MS"] = "1" + os.environ["PY_YAMUX_RELEASE_ON_READ"] = "1" + stream, send_update = _make_stream(rtt=0.0) + stream.epoch_start = 0.0 + await stream._auto_tune_and_send_window_update(bytes_read=PERF_CHUNK) + send_update.assert_awaited_once() + assert stream.target_recv_window > DEFAULT_WINDOW_SIZE + assert stream.target_recv_window <= MAX_WINDOW_SIZE + + +@pytest.mark.trio +async def test_default_assume_rtt_bootstrap_without_env() -> None: + """When PY_YAMUX_ASSUME_RTT_MS is unset, default 1ms bootstrap enables autotune.""" + _clear_py_yamux_env() + os.environ["PY_YAMUX_RELEASE_ON_READ"] = "1" + stream, send_update = _make_stream(rtt=0.0) + stream.epoch_start = 0.0 + await stream._auto_tune_and_send_window_update(bytes_read=PERF_CHUNK) + send_update.assert_awaited_once() + assert stream.target_recv_window > DEFAULT_WINDOW_SIZE + + +@pytest.mark.trio +async def test_assume_rtt_zero_disables_autotune_bootstrap() -> None: + """Explicit PY_YAMUX_ASSUME_RTT_MS=0 disables bootstrap when ping RTT is zero.""" + _clear_py_yamux_env() + os.environ["PY_YAMUX_ASSUME_RTT_MS"] = "0" + os.environ["PY_YAMUX_RELEASE_ON_READ"] = "1" + stream, send_update = _make_stream(rtt=0.0) + stream.epoch_start = 0.0 + await stream._auto_tune_and_send_window_update(bytes_read=PERF_CHUNK) + send_update.assert_awaited_once() + assert stream.target_recv_window == DEFAULT_WINDOW_SIZE diff --git a/tests/interop/perf/__init__.py b/tests/interop/perf/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/interop/perf/test_perf_test_helpers.py b/tests/interop/perf/test_perf_test_helpers.py new file mode 100644 index 000000000..a07668fdf --- /dev/null +++ b/tests/interop/perf/test_perf_test_helpers.py @@ -0,0 +1,197 @@ +"""Unit tests for interop/perf/perf_test.py helper logic.""" + +from __future__ import annotations + +import importlib.util +from pathlib import Path +import sys + +import pytest +import multiaddr + +try: + ExceptionGroup # noqa: B018 +except NameError: + from exceptiongroup import ExceptionGroup # type: ignore[no-redef] + +_REPO_ROOT = Path(__file__).resolve().parents[3] +_PERF_TEST_PATH = _REPO_ROOT / "interop" / "perf" / "perf_test.py" + + +def _load_perf_test_module(): + spec = importlib.util.spec_from_file_location("interop_perf_test", _PERF_TEST_PATH) + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules["interop_perf_test"] = module + spec.loader.exec_module(module) + return module + + +perf_test = _load_perf_test_module() + +_is_connection_closed_error = perf_test._is_connection_closed_error +_env_int = perf_test._env_int +_env_float = perf_test._env_float +PerfTest = perf_test.PerfTest + + +def _perf_test_env( + monkeypatch: pytest.MonkeyPatch, *, is_dialer: bool, tmp_path: Path +) -> PerfTest: + monkeypatch.setenv("TRANSPORT", "tcp") + monkeypatch.setenv("MUXER", "mplex") + monkeypatch.setenv("SECURE_CHANNEL", "tls") + monkeypatch.setenv("IS_DIALER", "true" if is_dialer else "false") + monkeypatch.setenv("TEST_KEY", "test-key") + monkeypatch.setenv("PERF_LOCAL_ADDR_FILE", str(tmp_path / "perf-local-addr")) + monkeypatch.delenv("REDIS_ADDR", raising=False) + return PerfTest() + + +def test_is_connection_closed_error_matching_phrases() -> None: + assert _is_connection_closed_error(RuntimeError("connection closed")) + assert _is_connection_closed_error(RuntimeError("TLS connection is closed")) + assert _is_connection_closed_error(RuntimeError("broken pipe")) + + +def test_is_connection_closed_error_unrelated() -> None: + assert not _is_connection_closed_error(RuntimeError("negotiation failed")) + assert not _is_connection_closed_error(None) + + +def test_is_connection_closed_error_exception_group() -> None: + eg = ExceptionGroup( + "shutdown", + [RuntimeError("connection closed"), RuntimeError("stream eof")], + ) + assert _is_connection_closed_error(eg) + + mixed = ExceptionGroup( + "shutdown", + [RuntimeError("connection closed"), RuntimeError("other failure")], + ) + assert not _is_connection_closed_error(mixed) + + +def test_is_connection_closed_error_cause_chain() -> None: + cause = RuntimeError("connection reset") + wrapper = RuntimeError("wrapper") + wrapper.__cause__ = cause + assert _is_connection_closed_error(wrapper) + + +def test_should_ignore_shutdown_error_dialer( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + test = _perf_test_env(monkeypatch, is_dialer=True, tmp_path=tmp_path) + exc = RuntimeError("connection closed") + + test._benchmarks_complete = False + assert not test._should_ignore_shutdown_error(exc) + + test._benchmarks_complete = True + assert test._should_ignore_shutdown_error(exc) + + +def test_should_ignore_shutdown_error_listener( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + test = _perf_test_env(monkeypatch, is_dialer=False, tmp_path=tmp_path) + exc = RuntimeError("connection closed") + + test._listener_served_peer = False + assert not test._should_ignore_shutdown_error(exc) + + test._listener_served_peer = True + assert test._should_ignore_shutdown_error(exc) + + +def test_should_ignore_shutdown_error_non_connection( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + test = _perf_test_env(monkeypatch, is_dialer=True, tmp_path=tmp_path) + test._benchmarks_complete = True + assert not test._should_ignore_shutdown_error(RuntimeError("timeout")) + + +def test_env_int_defaults_and_minimum(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("TEST_ENV_INT", raising=False) + assert _env_int("TEST_ENV_INT", 42) == 42 + monkeypatch.setenv("TEST_ENV_INT", "not-a-number") + assert _env_int("TEST_ENV_INT", 42) == 42 + monkeypatch.setenv("TEST_ENV_INT", "5") + assert _env_int("TEST_ENV_INT", 42, minimum=10) == 10 + + +def test_env_float_defaults_and_minimum(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.delenv("TEST_ENV_FLOAT", raising=False) + assert _env_float("TEST_ENV_FLOAT", 1.5) == 1.5 + monkeypatch.setenv("TEST_ENV_FLOAT", "bad") + assert _env_float("TEST_ENV_FLOAT", 1.5) == 1.5 + monkeypatch.setenv("TEST_ENV_FLOAT", "0.1") + assert _env_float("TEST_ENV_FLOAT", 1.5, minimum=1.0) == 1.0 + + +def test_listener_teardown_idle_secs_ws_mplex( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("TRANSPORT", "ws") + monkeypatch.setenv("MUXER", "mplex") + monkeypatch.setenv("SECURE_CHANNEL", "noise") + monkeypatch.setenv("IS_DIALER", "false") + monkeypatch.setenv("TEST_KEY", "test-key") + monkeypatch.setenv("PERF_LOCAL_ADDR_FILE", str(tmp_path / "perf-local-addr")) + monkeypatch.delenv("REDIS_ADDR", raising=False) + test = PerfTest() + assert test._listener_teardown_idle_secs() == 30.0 + + +def test_listener_teardown_idle_secs_tcp_yamux( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("TRANSPORT", "tcp") + monkeypatch.setenv("MUXER", "yamux") + monkeypatch.setenv("SECURE_CHANNEL", "noise") + monkeypatch.setenv("IS_DIALER", "false") + monkeypatch.setenv("TEST_KEY", "test-key") + monkeypatch.setenv("PERF_LOCAL_ADDR_FILE", str(tmp_path / "perf-local-addr")) + monkeypatch.delenv("REDIS_ADDR", raising=False) + test = PerfTest() + assert test._listener_teardown_idle_secs() == 1.5 + + +def test_without_loopback_listen_addrs_filters_loopback( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("TRANSPORT", "ws") + monkeypatch.setenv("MUXER", "mplex") + monkeypatch.setenv("SECURE_CHANNEL", "noise") + monkeypatch.setenv("IS_DIALER", "true") + monkeypatch.setenv("TEST_KEY", "test-key") + monkeypatch.setenv("PERF_LOCAL_ADDR_FILE", str(tmp_path / "perf-local-addr")) + monkeypatch.delenv("REDIS_ADDR", raising=False) + test = PerfTest() + addrs = [ + multiaddr.Multiaddr("/ip4/127.0.0.1/tcp/0/ws"), + multiaddr.Multiaddr("/ip4/192.168.1.1/tcp/0/ws"), + ] + filtered = test._without_loopback_listen_addrs(addrs) + assert len(filtered) == 1 + assert "192.168.1.1" in str(filtered[0]) + + +def test_connection_config_denies_loopback_and_disables_autoconnect( + monkeypatch: pytest.MonkeyPatch, tmp_path: Path +) -> None: + monkeypatch.setenv("TRANSPORT", "tcp") + monkeypatch.setenv("MUXER", "yamux") + monkeypatch.setenv("SECURE_CHANNEL", "noise") + monkeypatch.setenv("IS_DIALER", "true") + monkeypatch.setenv("TEST_KEY", "test-key") + monkeypatch.setenv("PERF_LOCAL_ADDR_FILE", str(tmp_path / "perf-local-addr")) + monkeypatch.delenv("REDIS_ADDR", raising=False) + cfg = PerfTest()._connection_config() + assert cfg.min_connections == 0 + assert cfg.low_watermark == 0 + assert "127.0.0.0/8" in cfg.deny_list diff --git a/tox.ini b/tox.ini index 929b0b7d9..8b532dfa4 100644 --- a/tox.ini +++ b/tox.ini @@ -45,6 +45,9 @@ commands= pre-commit install pre-commit run --all-files --show-diff-on-failure +[testenv:py{310,311,312,313}-interop] +deps = redis>=4.0.0 + [testenv:py{310,311,312,313}-wheel] deps= wheel