From f63c0cd9c0c979bed58dae392790b059054c2076 Mon Sep 17 00:00:00 2001 From: takitsu21 Date: Fri, 30 Jan 2026 20:36:21 +0100 Subject: [PATCH] Improve upload accuracy and fix latency --- speedtest_cloudflare_cli/core/speedtest.py | 122 +++++++++++++++------ speedtest_cloudflare_cli/main.py | 4 +- tests/core/test_speedtest.py | 8 +- 3 files changed, 98 insertions(+), 36 deletions(-) diff --git a/speedtest_cloudflare_cli/core/speedtest.py b/speedtest_cloudflare_cli/core/speedtest.py index 07c41ce..81d7c39 100644 --- a/speedtest_cloudflare_cli/core/speedtest.py +++ b/speedtest_cloudflare_cli/core/speedtest.py @@ -1,3 +1,4 @@ +import concurrent.futures import contextlib import functools import re @@ -20,19 +21,19 @@ from speedtest_cloudflare_cli.models import metadata, result CHUNK_SIZE = 1024 * 1024 -PING_HOST = "1.1.1.1" CLOUDFLARE_HOST = "speed.cloudflare.com" PING_COUNT = 3 PING_TIMEOUT = 3 # Adaptive mode constants -PROBE_SIZE_MB = 5 # Size for preliminary probe test -PROBE_TIMEOUT_SECONDS = 2.0 # Max seconds for probe test -TARGET_TEST_DURATION = 7.5 # Target duration for main test in seconds +PROBE_SIZE_MB = 15 # Size for preliminary probe test (larger for accurate estimation) +PROBE_TIMEOUT_SECONDS = 3.0 # Max seconds for probe test +TARGET_TEST_DURATION = 10.0 # Target duration for main test in seconds MIN_TEST_SIZE_MB = 1 # Minimum test size -MAX_TEST_SIZE_MB = 200 # Maximum test size +MAX_TEST_SIZE_MB = 500 # Maximum test size for high-speed connections MIN_REALISTIC_SPEED = 0.1 # Minimum realistic speed in Mbps MAX_REALISTIC_SPEED = 10000 # Maximum realistic speed in Mbps +PARALLEL_CONNECTIONS = 8 # Number of parallel connections for upload @functools.cache @@ -41,6 +42,12 @@ def client() -> httpx.Client: return httpx.Client(headers=headers, timeout=None) # noqa: S113 +def new_client() -> httpx.Client: + """Create a new HTTP client for parallel connections.""" + headers = {"Connection": "Keep-Alive", "Referer": f"https://{CLOUDFLARE_HOST}/"} + return httpx.Client(headers=headers, timeout=None) # noqa: S113 + + @contextlib.contextmanager def track_progress(silent: bool = False) -> Generator[Progress]: with Progress( @@ -69,7 +76,7 @@ def _fallback_ping() -> float | str: # Try system ping try: out = subprocess.check_output( # noqa: S603 - ["ping", "-c", str(PING_COUNT), "-W", str(PING_TIMEOUT), PING_HOST], # noqa: S607 + ["ping", "-c", str(PING_COUNT), "-W", str(PING_TIMEOUT), CLOUDFLARE_HOST], # noqa: S607 stderr=subprocess.DEVNULL, text=True, ) @@ -82,7 +89,7 @@ def _fallback_ping() -> float | str: # Fallback to TCP latency try: start = time.perf_counter() - with socket.create_connection((PING_HOST, 443), PING_TIMEOUT): + with socket.create_connection((CLOUDFLARE_HOST, 443), PING_TIMEOUT): return (time.perf_counter() - start) * 1000 except OSError: return "N/A" @@ -121,8 +128,9 @@ def _download( progress.update(task, description="Downloading... 🚀", advance=len(chunk)) @functools.cached_property - def upload_data_blocks(self) -> bytes: - return b"0" * self.upload_size + def upload_chunk(self) -> bytes: + """Single reusable chunk for uploads - avoids repeated allocations.""" + return b"0" * CHUNK_SIZE def _http_latency(self, **kwargs): start = time.perf_counter() @@ -131,7 +139,7 @@ def _http_latency(self, **kwargs): def ping(self) -> None: try: - self.latency = ping3.ping(PING_HOST, unit="ms", timeout=PING_TIMEOUT) + self.latency = ping3.ping(CLOUDFLARE_HOST, unit="ms", timeout=PING_TIMEOUT) except (ping3.errors.PingError, PermissionError): self.latency = _fallback_ping() @@ -142,17 +150,19 @@ def _upload( deadline: float | None = None, ) -> None: """Upload data in streaming chunks to keep the HTTP connection alive and update progress.""" + chunk = self.upload_chunk # Reuse same chunk - no allocation per iteration def data_stream(): - offset = 0 - while offset < self.upload_size: + bytes_sent = 0 + while bytes_sent < self.upload_size: if deadline is not None and time.perf_counter() > deadline: break # Timeout reached, stop uploading - chunk = self.upload_data_blocks[offset : offset + CHUNK_SIZE] - offset += len(chunk) + remaining = self.upload_size - bytes_sent + current_chunk = chunk if remaining >= CHUNK_SIZE else chunk[:remaining] + bytes_sent += len(current_chunk) if progress and task is not None: - progress.update(task, description="Uploading... 🚀", advance=len(chunk)) - yield chunk + progress.update(task, description="Uploading... 🚀", advance=len(current_chunk)) + yield current_chunk # httpx will read the iterator lazily and stream the request body with client().stream("POST", f"{self.url}/__up", data=data_stream()) as _response: @@ -160,6 +170,66 @@ def data_stream(): # request completes and the connection is released. pass + def _parallel_upload_worker( + self, + upload_size: int, + progress: Progress | None, + task: TaskID | None, + deadline: float | None, + lock: threading.Lock, + ) -> int: + """Worker function for parallel upload. Returns bytes uploaded.""" + chunk = self.upload_chunk + http_client = new_client() + bytes_uploaded = 0 + + try: + + def data_stream(): + nonlocal bytes_uploaded + while bytes_uploaded < upload_size: + if deadline is not None and time.perf_counter() > deadline: + break + remaining = upload_size - bytes_uploaded + current_chunk = chunk if remaining >= CHUNK_SIZE else chunk[:remaining] + bytes_uploaded += len(current_chunk) + if progress and task is not None: + with lock: + progress.update(task, description="Uploading... 🚀", advance=len(current_chunk)) + yield current_chunk + + with http_client.stream("POST", f"{self.url}/__up", data=data_stream()) as _response: + pass + finally: + http_client.close() + + return bytes_uploaded + + def _parallel_upload( + self, + progress: Progress | None = None, + task: TaskID | None = None, + deadline: float | None = None, + ) -> None: + """Upload data using multiple parallel connections to maximize bandwidth.""" + total_size = self.upload_size + size_per_connection = total_size // PARALLEL_CONNECTIONS + lock = threading.Lock() + + with concurrent.futures.ThreadPoolExecutor(max_workers=PARALLEL_CONNECTIONS) as executor: + futures = [ + executor.submit( + self._parallel_upload_worker, + size_per_connection, + progress, + task, + deadline, + lock, + ) + for _ in range(PARALLEL_CONNECTIONS) + ] + concurrent.futures.wait(futures) + def _compute_network_speed(self, progress: Progress, size_to_process: int, func: Callable) -> result.Result: jitter = 0 times_to_process = [] @@ -223,13 +293,9 @@ def upload_speed(self, silent: bool, adaptive: bool = False, default_size_mb: in adaptive_size = self._calculate_adaptive_size(probe_speed, "upload", default_size_mb) self.upload_size = adaptive_size - # Clear cached upload_data_blocks since upload_size changed - if "upload_data_blocks" in self.__dict__: - del self.__dict__["upload_data_blocks"] - with track_progress(silent=silent) as progress: upload_result = self._compute_network_speed( - progress=progress, size_to_process=self.upload_size, func=self._upload + progress=progress, size_to_process=self.upload_size, func=self._parallel_upload ) return upload_result @@ -258,9 +324,6 @@ def _run_probe_test(self, test_type: str, silent: bool = True) -> float | None: self.download_size = probe_size else: self.upload_size = probe_size - # Clear cached upload_data_blocks since upload_size changed - if "upload_data_blocks" in self.__dict__: - del self.__dict__["upload_data_blocks"] start_time = time.perf_counter() @@ -293,9 +356,6 @@ def _run_probe_test(self, test_type: str, silent: bool = True) -> float | None: self.download_size = original_size else: self.upload_size = original_size - # Clear cached upload_data_blocks again since upload_size changed - if "upload_data_blocks" in self.__dict__: - del self.__dict__["upload_data_blocks"] def _calculate_adaptive_size(self, probe_speed: float | None, test_type: str, default_size_mb: int) -> int: """ @@ -321,13 +381,13 @@ def _calculate_adaptive_size(self, probe_speed: float | None, test_type: str, de speed_MBps = probe_speed / 8 # Calculate ideal size PER ATTEMPT - # Use longer duration per attempt for better accuracy (3 seconds per attempt) + # Use longer duration per attempt for better accuracy (5 seconds per attempt) # This ensures each attempt is substantial enough for accurate measurement - duration_per_attempt = 3.0 # seconds per attempt + duration_per_attempt = 5.0 # seconds per attempt ideal_size_mb = speed_MBps * duration_per_attempt - # Apply boundaries (minimum 10MB per attempt for accuracy) - final_size_mb = max(10, min(ideal_size_mb, MAX_TEST_SIZE_MB)) + # Apply boundaries (minimum 50MB per attempt for accuracy on fast connections) + final_size_mb = max(50, min(ideal_size_mb, MAX_TEST_SIZE_MB)) # Round to nearest integer and convert to bytes return int(round(final_size_mb)) * CHUNK_SIZE diff --git a/speedtest_cloudflare_cli/main.py b/speedtest_cloudflare_cli/main.py index 556b742..1de310e 100644 --- a/speedtest_cloudflare_cli/main.py +++ b/speedtest_cloudflare_cli/main.py @@ -75,8 +75,8 @@ def safe_value(result: result.Result | None, attr: str) -> str: @click.option("--download", "-d", is_flag=True, help="Run download test") @click.option("--download_size", "-ds", type=int, default=DOWNLOAD_SIZE, help="Download size in MB") @click.option("--upload_size", "-us", type=int, default=UPLOAD_SIZE, help="Upload size in MB") -@click.option("--attempts", "-a", type=int, default=5, help="Number of attempts") -@click.option("--timeout", "-t", type=float, default=10.0, help="Timeout per test in seconds (default: 10)") +@click.option("--attempts", "-a", type=int, default=3, help="Number of attempts") +@click.option("--timeout", "-t", type=float, default=15.0, help="Timeout per test in seconds (default: 15)") @click.option("--json", is_flag=True, help="Output results in JSON format") @click.option("--silent", is_flag=True, help="Run in silent mode") @click.option("--json-output", type=click.Path(writable=True), default=None, help="Save JSON results to file") diff --git a/tests/core/test_speedtest.py b/tests/core/test_speedtest.py index 122f85b..1981532 100644 --- a/tests/core/test_speedtest.py +++ b/tests/core/test_speedtest.py @@ -58,9 +58,11 @@ def test_wait(my_speedtest_object, mocker: MockerFixture): # ) -def test_data_blocks(my_speedtest_object): - data_blocks = my_speedtest_object.upload_data_blocks - assert data_blocks == b"0" * my_speedtest_object.upload_size +def test_upload_chunk(my_speedtest_object): + from speedtest_cloudflare_cli.core.speedtest import CHUNK_SIZE + + upload_chunk = my_speedtest_object.upload_chunk + assert upload_chunk == b"0" * CHUNK_SIZE # @unittest.mock.patch("time.perf_counter")