Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 91 additions & 31 deletions speedtest_cloudflare_cli/core/speedtest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import concurrent.futures
import contextlib
import functools
import re
Expand All @@ -20,19 +21,19 @@
from speedtest_cloudflare_cli.models import metadata, result

CHUNK_SIZE = 1024 * 1024
PING_HOST = "1.1.1.1"
CLOUDFLARE_HOST = "speed.cloudflare.com"
PING_COUNT = 3
PING_TIMEOUT = 3

# Adaptive mode constants
PROBE_SIZE_MB = 5 # Size for preliminary probe test
PROBE_TIMEOUT_SECONDS = 2.0 # Max seconds for probe test
TARGET_TEST_DURATION = 7.5 # Target duration for main test in seconds
PROBE_SIZE_MB = 15 # Size for preliminary probe test (larger for accurate estimation)
PROBE_TIMEOUT_SECONDS = 3.0 # Max seconds for probe test
TARGET_TEST_DURATION = 10.0 # Target duration for main test in seconds
MIN_TEST_SIZE_MB = 1 # Minimum test size
MAX_TEST_SIZE_MB = 200 # Maximum test size
MAX_TEST_SIZE_MB = 500 # Maximum test size for high-speed connections
MIN_REALISTIC_SPEED = 0.1 # Minimum realistic speed in Mbps
MAX_REALISTIC_SPEED = 10000 # Maximum realistic speed in Mbps
PARALLEL_CONNECTIONS = 8 # Number of parallel connections for upload


@functools.cache
Expand All @@ -41,6 +42,12 @@ def client() -> httpx.Client:
return httpx.Client(headers=headers, timeout=None) # noqa: S113


def new_client() -> httpx.Client:
"""Create a new HTTP client for parallel connections."""
headers = {"Connection": "Keep-Alive", "Referer": f"https://{CLOUDFLARE_HOST}/"}
return httpx.Client(headers=headers, timeout=None) # noqa: S113


@contextlib.contextmanager
def track_progress(silent: bool = False) -> Generator[Progress]:
with Progress(
Expand Down Expand Up @@ -69,7 +76,7 @@ def _fallback_ping() -> float | str:
# Try system ping
try:
out = subprocess.check_output( # noqa: S603
["ping", "-c", str(PING_COUNT), "-W", str(PING_TIMEOUT), PING_HOST], # noqa: S607
["ping", "-c", str(PING_COUNT), "-W", str(PING_TIMEOUT), CLOUDFLARE_HOST], # noqa: S607
stderr=subprocess.DEVNULL,
text=True,
)
Expand All @@ -82,7 +89,7 @@ def _fallback_ping() -> float | str:
# Fallback to TCP latency
try:
start = time.perf_counter()
with socket.create_connection((PING_HOST, 443), PING_TIMEOUT):
with socket.create_connection((CLOUDFLARE_HOST, 443), PING_TIMEOUT):
return (time.perf_counter() - start) * 1000
except OSError:
return "N/A"
Expand Down Expand Up @@ -121,8 +128,9 @@ def _download(
progress.update(task, description="Downloading... 🚀", advance=len(chunk))

@functools.cached_property
def upload_data_blocks(self) -> bytes:
return b"0" * self.upload_size
def upload_chunk(self) -> bytes:
"""Single reusable chunk for uploads - avoids repeated allocations."""
return b"0" * CHUNK_SIZE

def _http_latency(self, **kwargs):
start = time.perf_counter()
Expand All @@ -131,7 +139,7 @@ def _http_latency(self, **kwargs):

def ping(self) -> None:
try:
self.latency = ping3.ping(PING_HOST, unit="ms", timeout=PING_TIMEOUT)
self.latency = ping3.ping(CLOUDFLARE_HOST, unit="ms", timeout=PING_TIMEOUT)
except (ping3.errors.PingError, PermissionError):
self.latency = _fallback_ping()

Expand All @@ -142,24 +150,86 @@ def _upload(
deadline: float | None = None,
) -> None:
"""Upload data in streaming chunks to keep the HTTP connection alive and update progress."""
chunk = self.upload_chunk # Reuse same chunk - no allocation per iteration

def data_stream():
offset = 0
while offset < self.upload_size:
bytes_sent = 0
while bytes_sent < self.upload_size:
if deadline is not None and time.perf_counter() > deadline:
break # Timeout reached, stop uploading
chunk = self.upload_data_blocks[offset : offset + CHUNK_SIZE]
offset += len(chunk)
remaining = self.upload_size - bytes_sent
current_chunk = chunk if remaining >= CHUNK_SIZE else chunk[:remaining]
bytes_sent += len(current_chunk)
if progress and task is not None:
progress.update(task, description="Uploading... 🚀", advance=len(chunk))
yield chunk
progress.update(task, description="Uploading... 🚀", advance=len(current_chunk))
yield current_chunk

# httpx will read the iterator lazily and stream the request body
with client().stream("POST", f"{self.url}/__up", data=data_stream()) as _response:
# No need to consume the response body; the context manager ensures the
# request completes and the connection is released.
pass

def _parallel_upload_worker(
self,
upload_size: int,
progress: Progress | None,
task: TaskID | None,
deadline: float | None,
lock: threading.Lock,
) -> int:
"""Worker function for parallel upload. Returns bytes uploaded."""
chunk = self.upload_chunk
http_client = new_client()
bytes_uploaded = 0

try:

def data_stream():
nonlocal bytes_uploaded
while bytes_uploaded < upload_size:
if deadline is not None and time.perf_counter() > deadline:
break
remaining = upload_size - bytes_uploaded
current_chunk = chunk if remaining >= CHUNK_SIZE else chunk[:remaining]
bytes_uploaded += len(current_chunk)
if progress and task is not None:
with lock:
progress.update(task, description="Uploading... 🚀", advance=len(current_chunk))
yield current_chunk

with http_client.stream("POST", f"{self.url}/__up", data=data_stream()) as _response:
pass
finally:
http_client.close()

return bytes_uploaded

def _parallel_upload(
self,
progress: Progress | None = None,
task: TaskID | None = None,
deadline: float | None = None,
) -> None:
"""Upload data using multiple parallel connections to maximize bandwidth."""
total_size = self.upload_size
size_per_connection = total_size // PARALLEL_CONNECTIONS
lock = threading.Lock()

with concurrent.futures.ThreadPoolExecutor(max_workers=PARALLEL_CONNECTIONS) as executor:
futures = [
executor.submit(
self._parallel_upload_worker,
size_per_connection,
progress,
task,
deadline,
lock,
)
for _ in range(PARALLEL_CONNECTIONS)
]
concurrent.futures.wait(futures)

def _compute_network_speed(self, progress: Progress, size_to_process: int, func: Callable) -> result.Result:
jitter = 0
times_to_process = []
Expand Down Expand Up @@ -223,13 +293,9 @@ def upload_speed(self, silent: bool, adaptive: bool = False, default_size_mb: in
adaptive_size = self._calculate_adaptive_size(probe_speed, "upload", default_size_mb)
self.upload_size = adaptive_size

# Clear cached upload_data_blocks since upload_size changed
if "upload_data_blocks" in self.__dict__:
del self.__dict__["upload_data_blocks"]

with track_progress(silent=silent) as progress:
upload_result = self._compute_network_speed(
progress=progress, size_to_process=self.upload_size, func=self._upload
progress=progress, size_to_process=self.upload_size, func=self._parallel_upload
)

return upload_result
Expand Down Expand Up @@ -258,9 +324,6 @@ def _run_probe_test(self, test_type: str, silent: bool = True) -> float | None:
self.download_size = probe_size
else:
self.upload_size = probe_size
# Clear cached upload_data_blocks since upload_size changed
if "upload_data_blocks" in self.__dict__:
del self.__dict__["upload_data_blocks"]

start_time = time.perf_counter()

Expand Down Expand Up @@ -293,9 +356,6 @@ def _run_probe_test(self, test_type: str, silent: bool = True) -> float | None:
self.download_size = original_size
else:
self.upload_size = original_size
# Clear cached upload_data_blocks again since upload_size changed
if "upload_data_blocks" in self.__dict__:
del self.__dict__["upload_data_blocks"]

def _calculate_adaptive_size(self, probe_speed: float | None, test_type: str, default_size_mb: int) -> int:
"""
Expand All @@ -321,13 +381,13 @@ def _calculate_adaptive_size(self, probe_speed: float | None, test_type: str, de
speed_MBps = probe_speed / 8

# Calculate ideal size PER ATTEMPT
# Use longer duration per attempt for better accuracy (3 seconds per attempt)
# Use longer duration per attempt for better accuracy (5 seconds per attempt)
# This ensures each attempt is substantial enough for accurate measurement
duration_per_attempt = 3.0 # seconds per attempt
duration_per_attempt = 5.0 # seconds per attempt
ideal_size_mb = speed_MBps * duration_per_attempt

# Apply boundaries (minimum 10MB per attempt for accuracy)
final_size_mb = max(10, min(ideal_size_mb, MAX_TEST_SIZE_MB))
# Apply boundaries (minimum 50MB per attempt for accuracy on fast connections)
final_size_mb = max(50, min(ideal_size_mb, MAX_TEST_SIZE_MB))

# Round to nearest integer and convert to bytes
return int(round(final_size_mb)) * CHUNK_SIZE
4 changes: 2 additions & 2 deletions speedtest_cloudflare_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def safe_value(result: result.Result | None, attr: str) -> str:
@click.option("--download", "-d", is_flag=True, help="Run download test")
@click.option("--download_size", "-ds", type=int, default=DOWNLOAD_SIZE, help="Download size in MB")
@click.option("--upload_size", "-us", type=int, default=UPLOAD_SIZE, help="Upload size in MB")
@click.option("--attempts", "-a", type=int, default=5, help="Number of attempts")
@click.option("--timeout", "-t", type=float, default=10.0, help="Timeout per test in seconds (default: 10)")
@click.option("--attempts", "-a", type=int, default=3, help="Number of attempts")
@click.option("--timeout", "-t", type=float, default=15.0, help="Timeout per test in seconds (default: 15)")
@click.option("--json", is_flag=True, help="Output results in JSON format")
@click.option("--silent", is_flag=True, help="Run in silent mode")
@click.option("--json-output", type=click.Path(writable=True), default=None, help="Save JSON results to file")
Expand Down
8 changes: 5 additions & 3 deletions tests/core/test_speedtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ def test_wait(my_speedtest_object, mocker: MockerFixture):
# )


def test_data_blocks(my_speedtest_object):
data_blocks = my_speedtest_object.upload_data_blocks
assert data_blocks == b"0" * my_speedtest_object.upload_size
def test_upload_chunk(my_speedtest_object):
from speedtest_cloudflare_cli.core.speedtest import CHUNK_SIZE

upload_chunk = my_speedtest_object.upload_chunk
assert upload_chunk == b"0" * CHUNK_SIZE


# @unittest.mock.patch("time.perf_counter")
Expand Down