diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 79ed0ca..ec49d6b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: needs: test strategy: matrix: - target: [x86_64-linux, x86_64-macos, aarch64-linux] + target: [x86_64-linux, x86_64-macos, aarch64-linux, x86_64-windows] optimize: [Debug, ReleaseSafe] steps: diff --git a/README.md b/README.md index b35420d..cfa0b16 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,12 @@ # fast-cli -[![Zig](https://img.shields.io/badge/Zig-0.14.0+-orange.svg)](https://ziglang.org/) +[![Zig](https://img.shields.io/badge/Zig-0.15.2-orange?logo=zig)](https://ziglang.org/) [![CI](https://github.com/mikkelam/fast-cli/actions/workflows/ci.yml/badge.svg)](https://github.com/mikkelam/fast-cli/actions/workflows/ci.yml) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) A blazingly fast CLI tool for testing internet speed uses fast.com v2 api. Written in Zig for maximum performance. -⚡ **1.2 MB binary** • 🚀 **Zero runtime deps** • 📊 **Smart stability detection** +⚡ **1.2 MB binary** • 🚀 **Zero runtime deps** • 📊 **Adaptive stability-based stopping** ## Demo @@ -17,7 +17,7 @@ A blazingly fast CLI tool for testing internet speed uses fast.com v2 api. Writt - **Tiny binary**: Just 1.2 MB, no runtime dependencies - **Blazing fast**: Concurrent connections with adaptive chunk sizing - **Cross-platform**: Single binary for Linux, macOS -- **Smart stopping**: Uses Coefficient of Variation (CoV) algorithm for adaptive test duration +- **Smart stopping**: Uses a ramp + steady stability strategy and stops on stable speed or max duration ## Supported Platforms @@ -50,37 +50,54 @@ zig build -Doptimize=ReleaseSafe ## Usage ```console -❯ ./fast-cli --help -Estimate connection speed using fast.com -v0.0.1 - -Usage: fast-cli [options] - -Flags: - -u, --upload Check upload speed as well [Bool] (default: false) - -d, --duration Maximum test duration in seconds (uses Fast.com-style stability detection by default) [Int] (default: 30) - --https Use https when connecting to fast.com [Bool] (default: true) - -j, --json Output results in JSON format [Bool] (default: false) - -h, --help Shows the help for a command [Bool] (default: false) - -Use "fast-cli --help" for more information. +fast-cli - Estimate connection speed using fast.com + +USAGE: + fast-cli [OPTIONS] + +OPTIONS: + -h, --help Display this help and exit. + --https Use HTTPS when connecting to fast.com (default) + --no-https Use HTTP instead of HTTPS + -u, --upload Check upload speed as well + -j, --json Output results in JSON format + -d, --duration Maximum test duration in seconds (effective range: 7-30, default: 30) ``` ## Example Output ```console $ fast-cli --upload -🏓 25ms | ⬇️ Download: 113.7 Mbps | ⬆️ Upload: 62.1 Mbps +🏓 25ms | ⬇️ Download: 114 Mbps | ⬆️ Upload: 62 Mbps $ fast-cli -d 15 # Quick test with 15s max duration -🏓 22ms | ⬇️ Download: 105.0 Mbps +🏓 22ms | ⬇️ Download: 105 Mbps $ fast-cli -j # JSON output -{"download_mbps": 131.4, "ping_ms": 20.8} +{"download_mbps": 131, "ping_ms": 20.8, "upload_mbps": null, "error": null} ``` +## Stability Strategy + +The speed test uses a two-phase strategy: + +1. **Ramp phase**: increase active workers based on observed throughput. +2. **Steady phase**: lock worker count and estimate authoritative speed. + +The test stops when either: + +- speed is stable within a configured delta threshold over recent steady samples, or +- max duration is reached. + ## Development +Optional: use `mise` to install and run the project toolchain. + +```bash +mise install +mise exec -- zig build test +``` + ```bash # Debug build zig build diff --git a/src/cli/args.zig b/src/cli/args.zig index 81133d1..dad41ec 100644 --- a/src/cli/args.zig +++ b/src/cli/args.zig @@ -8,6 +8,14 @@ const BOLD = "\x1b[1m"; const YELLOW = "\x1b[33m"; const RESET = "\x1b[0m"; +pub const duration_default_seconds: u32 = 30; +pub const duration_min_seconds: u32 = 7; +pub const duration_max_seconds: u32 = 30; + +pub fn clampDurationSeconds(value: u32) u32 { + return @min(duration_max_seconds, @max(value, duration_min_seconds)); +} + pub const Args = struct { https: bool, upload: bool, @@ -25,15 +33,15 @@ pub const Args = struct { } }; -const params = clap.parseParamsComptime( +const params = clap.parseParamsComptime(std.fmt.comptimePrint( \\-h, --help Display this help and exit. \\ --https Use HTTPS when connecting to fast.com (default) \\ --no-https Use HTTP instead of HTTPS \\-u, --upload Check upload speed as well \\-j, --json Output results in JSON format - \\-d, --duration Maximum test duration in seconds (default: 30) + \\-d, --duration Maximum test duration in seconds (effective range: {d}-{d}, default: {d}) \\ -); +, .{ duration_min_seconds, duration_max_seconds, duration_default_seconds })); const parsers = .{ .usize = clap.parsers.int(u32, 10), @@ -56,7 +64,7 @@ pub fn parse(allocator: Allocator) !Args { .https = if (res.args.@"no-https" != 0) false else true, .upload = res.args.upload != 0, .json = res.args.json != 0, - .duration = res.args.duration orelse 30, + .duration = res.args.duration orelse duration_default_seconds, .help = res.args.help != 0, .allocator = allocator, .clap_result = res, diff --git a/src/cli/root.zig b/src/cli/root.zig index 76c2018..05b90a3 100644 --- a/src/cli/root.zig +++ b/src/cli/root.zig @@ -31,7 +31,7 @@ pub fn run(allocator: std.mem.Allocator) !void { var spinner = Spinner.init(allocator, .{}); defer spinner.deinit(); - var fast = Fast.init(std.heap.smp_allocator, args.https); + var fast = Fast.init(allocator, args.https); defer fast.deinit(); const urls = fast.get_urls(5) catch |err| { @@ -53,7 +53,7 @@ pub fn run(allocator: std.mem.Allocator) !void { } // Measure latency - var latency_tester = HttpLatencyTester.init(std.heap.smp_allocator); + var latency_tester = HttpLatencyTester.init(allocator); defer latency_tester.deinit(); const latency_ms = if (!args.json) blk: { @@ -70,25 +70,26 @@ pub fn run(allocator: std.mem.Allocator) !void { if (!args.json) { log.info("Measuring download speed...", .{}); - try spinner.start("Measuring download speed...", .{}); + try spinner.start("⬇️ 0 Mbps", .{}); } // Initialize speed tester - var speed_tester = HTTPSpeedTester.init(std.heap.smp_allocator); + var speed_tester = HTTPSpeedTester.init(allocator); defer speed_tester.deinit(); const criteria = StabilityCriteria{ - .ramp_up_duration_seconds = 4, - .max_duration_seconds = @as(u32, @intCast(@min(25, args.duration))), - .measurement_interval_ms = 750, - .sliding_window_size = 6, - .stability_threshold_cov = 0.15, - .stable_checks_required = 2, + .min_duration_seconds = Args.duration_min_seconds, + .max_duration_seconds = Args.clampDurationSeconds(args.duration), + .progress_frequency_ms = 150, + .moving_average_window_size = 5, + .stability_delta_percent = 2.0, + .min_stable_measurements = 6, + .connections_min = 1, + .max_bytes_in_flight = 78_643_200, }; const download_result = if (args.json) blk: { - break :blk speed_tester.measure_download_speed_stability(urls, criteria) catch |err| { - try spinner.fail("Download test failed: {}", .{err}); + break :blk speed_tester.measure_download_speed_stability(urls, criteria) catch { try outputJson(null, null, null, "Download test failed"); return; }; @@ -98,21 +99,18 @@ pub fn run(allocator: std.mem.Allocator) !void { try spinner.fail("Download test failed: {}", .{err}); return; }; - spinner.stop(); break :blk result; }; var upload_result: ?SpeedTestResult = null; if (args.upload) { if (!args.json) { - spinner.stop(); log.info("Measuring upload speed...", .{}); - try spinner.start("Measuring upload speed...", .{}); + try spinner.updateMessage("⬆️ 0 Mbps", .{}); } upload_result = if (args.json) blk: { - break :blk speed_tester.measure_upload_speed_stability(urls, criteria) catch |err| { - try spinner.fail("Upload test failed: {}", .{err}); + break :blk speed_tester.measure_upload_speed_stability(urls, criteria) catch { try outputJson(download_result.speed.value, latency_ms, null, "Upload test failed"); return; }; @@ -122,7 +120,6 @@ pub fn run(allocator: std.mem.Allocator) !void { try spinner.fail("Upload test failed: {}", .{err}); return; }; - spinner.stop(); break :blk result; }; } @@ -131,15 +128,15 @@ pub fn run(allocator: std.mem.Allocator) !void { if (!args.json) { if (latency_ms) |ping| { if (upload_result) |up| { - try spinner.succeed("🏓 {d:.0}ms | ⬇️ Download: {d:.1} {s} | ⬆️ Upload: {d:.1} {s}", .{ ping, download_result.speed.value, download_result.speed.unit.toString(), up.speed.value, up.speed.unit.toString() }); + try spinner.succeed("🏓 {d:.0}ms | ⬇️ Download: {d:.0} {s} | ⬆️ Upload: {d:.0} {s}", .{ ping, download_result.speed.value, download_result.speed.unit.toString(), up.speed.value, up.speed.unit.toString() }); } else { - try spinner.succeed("🏓 {d:.0}ms | ⬇️ Download: {d:.1} {s}", .{ ping, download_result.speed.value, download_result.speed.unit.toString() }); + try spinner.succeed("🏓 {d:.0}ms | ⬇️ Download: {d:.0} {s}", .{ ping, download_result.speed.value, download_result.speed.unit.toString() }); } } else { if (upload_result) |up| { - try spinner.succeed("⬇️ Download: {d:.1} {s} | ⬆️ Upload: {d:.1} {s}", .{ download_result.speed.value, download_result.speed.unit.toString(), up.speed.value, up.speed.unit.toString() }); + try spinner.succeed("⬇️ Download: {d:.0} {s} | ⬆️ Upload: {d:.0} {s}", .{ download_result.speed.value, download_result.speed.unit.toString(), up.speed.value, up.speed.unit.toString() }); } else { - try spinner.succeed("⬇️ Download: {d:.1} {s}", .{ download_result.speed.value, download_result.speed.unit.toString() }); + try spinner.succeed("⬇️ Download: {d:.0} {s}", .{ download_result.speed.value, download_result.speed.unit.toString() }); } } } else { @@ -149,11 +146,11 @@ pub fn run(allocator: std.mem.Allocator) !void { } fn updateSpinnerText(spinner: *Spinner, measurement: SpeedMeasurement) void { - spinner.updateMessage("⬇️ {d:.1} {s}", .{ measurement.value, measurement.unit.toString() }) catch {}; + spinner.updateMessage("⬇️ {d:.0} {s}", .{ measurement.value, measurement.unit.toString() }) catch {}; } fn updateUploadSpinnerText(spinner: *Spinner, measurement: SpeedMeasurement) void { - spinner.updateMessage("⬆️ {d:.1} {s}", .{ measurement.value, measurement.unit.toString() }) catch {}; + spinner.updateMessage("⬆️ {d:.0} {s}", .{ measurement.value, measurement.unit.toString() }) catch {}; } fn outputJson(download_mbps: ?f64, ping_ms: ?f64, upload_mbps: ?f64, error_message: ?[]const u8) !void { @@ -166,9 +163,9 @@ fn outputJson(download_mbps: ?f64, ping_ms: ?f64, upload_mbps: ?f64, error_messa var upload_buf: [32]u8 = undefined; var error_buf: [256]u8 = undefined; - const download_str = if (download_mbps) |d| try std.fmt.bufPrint(&download_buf, "{d:.1}", .{d}) else "null"; + const download_str = if (download_mbps) |d| try std.fmt.bufPrint(&download_buf, "{d:.0}", .{d}) else "null"; const ping_str = if (ping_ms) |p| try std.fmt.bufPrint(&ping_buf, "{d:.1}", .{p}) else "null"; - const upload_str = if (upload_mbps) |u| try std.fmt.bufPrint(&upload_buf, "{d:.1}", .{u}) else "null"; + const upload_str = if (upload_mbps) |u| try std.fmt.bufPrint(&upload_buf, "{d:.0}", .{u}) else "null"; const error_str = if (error_message) |e| try std.fmt.bufPrint(&error_buf, "\"{s}\"", .{e}) else "null"; try stdout.print("{{\"download_mbps\": {s}, \"ping_ms\": {s}, \"upload_mbps\": {s}, \"error\": {s}}}\n", .{ download_str, ping_str, upload_str, error_str }); diff --git a/src/lib/http_speed_tester_v2.zig b/src/lib/http_speed_tester_v2.zig index c13a975..4edbe2e 100644 --- a/src/lib/http_speed_tester_v2.zig +++ b/src/lib/http_speed_tester_v2.zig @@ -2,7 +2,6 @@ const std = @import("std"); const speed_worker = @import("workers/speed_worker.zig"); const BandwidthMeter = @import("bandwidth.zig").BandwidthMeter; const SpeedMeasurement = @import("bandwidth.zig").SpeedMeasurement; -const SpeedUnit = @import("bandwidth.zig").SpeedUnit; const WorkerManager = @import("workers/worker_manager.zig").WorkerManager; const measurement_strategy = @import("measurement_strategy.zig"); const DurationStrategy = measurement_strategy.DurationStrategy; @@ -14,35 +13,68 @@ const print = std.debug.print; pub const SpeedTestResult = struct { speed: SpeedMeasurement, - /// Convert bytes per second to optimal unit for display (in bits per second) pub fn fromBytesPerSecond(bytes_per_second: f64) SpeedTestResult { - // Convert bytes/s to bits/s - const speed_bits_per_sec = bytes_per_second * 8.0; - const abs_speed = @abs(speed_bits_per_sec); - - const speed_measurement = if (abs_speed >= 1_000_000_000) - SpeedMeasurement{ .value = speed_bits_per_sec / 1_000_000_000, .unit = .gbps } - else if (abs_speed >= 1_000_000) - SpeedMeasurement{ .value = speed_bits_per_sec / 1_000_000, .unit = .mbps } - else if (abs_speed >= 1_000) - SpeedMeasurement{ .value = speed_bits_per_sec / 1_000, .unit = .kbps } - else - SpeedMeasurement{ .value = speed_bits_per_sec, .unit = .bps }; + return fromBitsPerSecond(bytes_per_second * 8.0); + } + + pub fn fromBitsPerSecond(bits_per_second: f64) SpeedTestResult { + return SpeedTestResult{ .speed = speedMeasurementFromBitsPerSecond(bits_per_second) }; + } +}; + +pub const TraceSample = struct { + t_ms: u64, + total_bytes: u64, +}; + +pub const TraceCapture = struct { + samples: std.ArrayList(TraceSample), + stop_at_ms: u64 = 0, + stable: bool = false, + final_speed_bits_per_sec: f64 = 0, + allocator: std.mem.Allocator, + + pub fn init(allocator: std.mem.Allocator) TraceCapture { + return .{ + .samples = std.ArrayList(TraceSample).empty, + .allocator = allocator, + }; + } - return SpeedTestResult{ .speed = speed_measurement }; + pub fn deinit(self: *TraceCapture) void { + self.samples.deinit(self.allocator); + } + + pub fn speedMbps(self: TraceCapture) f64 { + return self.final_speed_bits_per_sec / 1_000_000; } }; +fn speedMeasurementFromBitsPerSecond(bits_per_second: f64) SpeedMeasurement { + const abs_speed = @abs(bits_per_second); + + if (abs_speed >= 1_000_000_000) { + return SpeedMeasurement{ .value = bits_per_second / 1_000_000_000, .unit = .gbps }; + } + if (abs_speed >= 1_000_000) { + return SpeedMeasurement{ .value = bits_per_second / 1_000_000, .unit = .mbps }; + } + if (abs_speed >= 1_000) { + return SpeedMeasurement{ .value = bits_per_second / 1_000, .unit = .kbps }; + } + return SpeedMeasurement{ .value = bits_per_second, .unit = .bps }; +} + pub const HTTPSpeedTester = struct { allocator: std.mem.Allocator, concurrent_connections: u32, progress_update_interval_ms: u32, pub fn init(allocator: std.mem.Allocator) HTTPSpeedTester { - return HTTPSpeedTester{ + return .{ .allocator = allocator, - .concurrent_connections = 8, // Default 8 concurrent connections - .progress_update_interval_ms = 100, // Default 100ms updates + .concurrent_connections = 8, + .progress_update_interval_ms = 100, }; } @@ -50,19 +82,16 @@ pub const HTTPSpeedTester = struct { _ = self; } - // Stability-based download with optional progress callback pub fn measure_download_speed_stability_duration(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria, comptime ProgressType: ?type, progress_callback: if (ProgressType) |T| T else void) !SpeedTestResult { var strategy = measurement_strategy.createStabilityStrategy(self.allocator, criteria); defer strategy.deinit(); - return self.measureDownloadSpeedWithStability(urls, &strategy, ProgressType, progress_callback); + return self.measureDownloadSpeedWithStability(urls, &strategy, null, ProgressType, progress_callback); } - // Stability-based download without progress callback pub fn measure_download_speed_stability(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria) !SpeedTestResult { return self.measure_download_speed_stability_duration(urls, criteria, null, {}); } - // Stability-based upload with optional progress callback pub fn measure_upload_speed_stability_duration(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria, comptime ProgressType: ?type, progress_callback: if (ProgressType) |T| T else void) !SpeedTestResult { const upload_data = try self.allocator.alloc(u8, 4 * 1024 * 1024); defer self.allocator.free(upload_data); @@ -70,28 +99,22 @@ pub const HTTPSpeedTester = struct { var strategy = measurement_strategy.createStabilityStrategy(self.allocator, criteria); defer strategy.deinit(); - return self.measureUploadSpeedWithStability(urls, &strategy, upload_data, ProgressType, progress_callback); + return self.measureUploadSpeedWithStability(urls, &strategy, upload_data, null, ProgressType, progress_callback); } - // Stability-based upload without progress callback pub fn measure_upload_speed_stability(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria) !SpeedTestResult { return self.measure_upload_speed_stability_duration(urls, criteria, null, {}); } - // Convenience helpers for cleaner API usage - - // Clean duration-based download with optional progress callback pub fn measure_download_speed_duration(self: *HTTPSpeedTester, urls: []const []const u8, duration_seconds: u32, comptime ProgressType: ?type, progress_callback: if (ProgressType) |T| T else void) !SpeedTestResult { const strategy = measurement_strategy.createDurationStrategy(duration_seconds, self.progress_update_interval_ms); return self.measureDownloadSpeedWithDuration(urls, strategy, ProgressType, progress_callback); } - /// Simple download speed measurement without progress callback pub fn measureDownloadSpeed(self: *HTTPSpeedTester, urls: []const []const u8, duration_seconds: u32) !SpeedTestResult { return self.measure_download_speed_duration(urls, duration_seconds, null, {}); } - // Clean duration-based upload with optional progress callback pub fn measure_upload_speed_duration(self: *HTTPSpeedTester, urls: []const []const u8, duration_seconds: u32, comptime ProgressType: ?type, progress_callback: if (ProgressType) |T| T else void) !SpeedTestResult { const upload_data = try self.allocator.alloc(u8, 4 * 1024 * 1024); defer self.allocator.free(upload_data); @@ -101,22 +124,50 @@ pub const HTTPSpeedTester = struct { return self.measureUploadSpeedWithDuration(urls, strategy, upload_data, ProgressType, progress_callback); } - /// Simple upload speed measurement without progress callback pub fn measureUploadSpeed(self: *HTTPSpeedTester, urls: []const []const u8, duration_seconds: u32) !SpeedTestResult { return self.measure_upload_speed_duration(urls, duration_seconds, null, {}); } - /// Stability-based download speed measurement with progress callback (type inferred) pub fn measureDownloadSpeedWithStabilityProgress(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria, progress_callback: anytype) !SpeedTestResult { return self.measure_download_speed_stability_duration(urls, criteria, @TypeOf(progress_callback), progress_callback); } - /// Stability-based upload speed measurement with progress callback (type inferred) pub fn measureUploadSpeedWithStabilityProgress(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria, progress_callback: anytype) !SpeedTestResult { return self.measure_upload_speed_stability_duration(urls, criteria, @TypeOf(progress_callback), progress_callback); } - // Private implementation for duration-based download + pub fn measure_download_speed_stability_with_trace(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria, trace_capture: *TraceCapture) !SpeedTestResult { + var strategy = measurement_strategy.createStabilityStrategy(self.allocator, criteria); + defer strategy.deinit(); + return self.measureDownloadSpeedWithStability(urls, &strategy, trace_capture, null, {}); + } + + pub fn measure_upload_speed_stability_with_trace(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria, trace_capture: *TraceCapture) !SpeedTestResult { + const upload_data = try self.allocator.alloc(u8, 4 * 1024 * 1024); + defer self.allocator.free(upload_data); + @memset(upload_data, 'A'); + + var strategy = measurement_strategy.createStabilityStrategy(self.allocator, criteria); + defer strategy.deinit(); + return self.measureUploadSpeedWithStability(urls, &strategy, upload_data, trace_capture, null, {}); + } + + pub fn measureDownloadSpeedWithStabilityProgressTrace(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria, trace_capture: *TraceCapture, progress_callback: anytype) !SpeedTestResult { + var strategy = measurement_strategy.createStabilityStrategy(self.allocator, criteria); + defer strategy.deinit(); + return self.measureDownloadSpeedWithStability(urls, &strategy, trace_capture, @TypeOf(progress_callback), progress_callback); + } + + pub fn measureUploadSpeedWithStabilityProgressTrace(self: *HTTPSpeedTester, urls: []const []const u8, criteria: StabilityCriteria, trace_capture: *TraceCapture, progress_callback: anytype) !SpeedTestResult { + const upload_data = try self.allocator.alloc(u8, 4 * 1024 * 1024); + defer self.allocator.free(upload_data); + @memset(upload_data, 'A'); + + var strategy = measurement_strategy.createStabilityStrategy(self.allocator, criteria); + defer strategy.deinit(); + return self.measureUploadSpeedWithStability(urls, &strategy, upload_data, trace_capture, @TypeOf(progress_callback), progress_callback); + } + fn measureDownloadSpeedWithDuration( self: *HTTPSpeedTester, urls: []const []const u8, @@ -129,18 +180,15 @@ pub const HTTPSpeedTester = struct { var timer = try speed_worker.RealTimer.init(); var should_stop = std.atomic.Value(bool).init(false); - // Initialize bandwidth meter for progress tracking var bandwidth_meter = BandwidthMeter.init(); if (has_progress) { try bandwidth_meter.start(); } - // Setup worker manager const num_workers = @min(urls.len, self.concurrent_connections); var worker_manager = try WorkerManager.init(self.allocator, &should_stop, num_workers); defer worker_manager.deinit(); - // Setup download workers const workers = try worker_manager.setupDownloadWorkers( urls, self.concurrent_connections, @@ -149,10 +197,8 @@ pub const HTTPSpeedTester = struct { ); defer worker_manager.cleanupWorkers(workers); - // Start workers try worker_manager.startDownloadWorkers(workers); - // Main measurement loop while (strategy.shouldContinue(timer.timer_interface().read())) { std.Thread.sleep(strategy.getSleepInterval()); @@ -164,10 +210,8 @@ pub const HTTPSpeedTester = struct { } } - // Stop and wait for workers worker_manager.stopAndJoinWorkers(); - // Calculate results const totals = worker_manager.calculateDownloadTotals(workers); if (totals.errors > 0) { print("Download completed with {} errors\n", .{totals.errors}); @@ -181,7 +225,6 @@ pub const HTTPSpeedTester = struct { return SpeedTestResult.fromBytesPerSecond(speed_bytes_per_sec); } - // Private implementation for duration-based upload fn measureUploadSpeedWithDuration( self: *HTTPSpeedTester, urls: []const []const u8, @@ -195,18 +238,15 @@ pub const HTTPSpeedTester = struct { var timer = try speed_worker.RealTimer.init(); var should_stop = std.atomic.Value(bool).init(false); - // Initialize bandwidth meter for progress tracking var bandwidth_meter = BandwidthMeter.init(); if (has_progress) { try bandwidth_meter.start(); } - // Setup worker manager const num_workers = @min(urls.len, self.concurrent_connections); var worker_manager = try WorkerManager.init(self.allocator, &should_stop, num_workers); defer worker_manager.deinit(); - // Setup upload workers const workers = try worker_manager.setupUploadWorkers( urls, self.concurrent_connections, @@ -216,10 +256,8 @@ pub const HTTPSpeedTester = struct { ); defer worker_manager.cleanupWorkers(workers); - // Start workers try worker_manager.startUploadWorkers(workers); - // Main measurement loop while (strategy.shouldContinue(timer.timer_interface().read())) { std.Thread.sleep(strategy.getSleepInterval()); @@ -231,10 +269,8 @@ pub const HTTPSpeedTester = struct { } } - // Stop and wait for workers worker_manager.stopAndJoinWorkers(); - // Calculate results const totals = worker_manager.calculateUploadTotals(workers); if (totals.errors > 0) { print("Upload completed with {} errors\n", .{totals.errors}); @@ -248,149 +284,206 @@ pub const HTTPSpeedTester = struct { return SpeedTestResult.fromBytesPerSecond(speed_bytes_per_sec); } - // Private implementation for stability-based download + fn effectiveMaxWorkers(self: *HTTPSpeedTester, urls_len: usize, criteria: StabilityCriteria) usize { + if (urls_len == 0) return 0; + const criteria_max: usize = @intCast(@max(criteria.connections_max, 1)); + const configured_max: usize = @intCast(@max(self.concurrent_connections, 1)); + return @min(urls_len, @min(criteria_max, configured_max)); + } + + fn initialActiveWorkers(max_workers: usize, criteria: StabilityCriteria) u32 { + if (max_workers == 0) return 0; + const min_connections = @max(criteria.connections_min, 1); + return @intCast(@min(max_workers, min_connections)); + } + + fn applyConnectionRamp(active_worker_count: *std.atomic.Value(u32), desired: u32, max_workers: usize) void { + if (max_workers == 0) return; + const max_workers_u32: u32 = @intCast(max_workers); + const bounded = @max(1, @min(desired, max_workers_u32)); + const current = active_worker_count.load(.monotonic); + if (bounded > current) { + active_worker_count.store(bounded, .monotonic); + } + } + fn measureDownloadSpeedWithStability( self: *HTTPSpeedTester, urls: []const []const u8, strategy: *StabilityStrategy, + trace_capture: ?*TraceCapture, comptime ProgressType: ?type, progress_callback: if (ProgressType) |T| T else void, ) !SpeedTestResult { const has_progress = ProgressType != null; + if (urls.len == 0) return SpeedTestResult.fromBitsPerSecond(0); + var timer = try speed_worker.RealTimer.init(); var should_stop = std.atomic.Value(bool).init(false); + var last_emitted_progress_speed_bits_per_sec: ?f64 = null; - // Initialize bandwidth meter for progress tracking - var bandwidth_meter = BandwidthMeter.init(); - if (has_progress) { - try bandwidth_meter.start(); - } + const max_workers = self.effectiveMaxWorkers(urls.len, strategy.criteria); + var active_worker_count = std.atomic.Value(u32).init(initialActiveWorkers(max_workers, strategy.criteria)); - // Setup worker manager - const num_workers = @min(urls.len, self.concurrent_connections); - var worker_manager = try WorkerManager.init(self.allocator, &should_stop, num_workers); + var worker_manager = try WorkerManager.init(self.allocator, &should_stop, max_workers); defer worker_manager.deinit(); - // Setup download workers - const workers = try worker_manager.setupDownloadWorkers( + const workers = try worker_manager.setupDownloadWorkersWithControl( urls, - self.concurrent_connections, + max_workers, timer.timer_interface(), strategy.max_duration_ns, + &active_worker_count, + strategy.criteria.max_bytes_in_flight, ); defer worker_manager.cleanupWorkers(workers); - // Start workers try worker_manager.startDownloadWorkers(workers); - // Main measurement loop while (strategy.shouldContinue(timer.timer_interface().read())) { std.Thread.sleep(strategy.getSleepInterval()); + const current_time_ns = timer.timer_interface().read(); const current_bytes = worker_manager.getCurrentDownloadBytes(workers); + const decision = try strategy.handleProgress(current_time_ns, current_bytes); - if (has_progress) { - bandwidth_meter.update_total(current_bytes); - const measurement = bandwidth_meter.bandwidthWithUnits(); - progress_callback.call(measurement); + applyConnectionRamp(&active_worker_count, decision.desired_connections, max_workers); + + if (trace_capture) |trace| { + if (decision.sampled) { + try trace.samples.append(trace.allocator, .{ + .t_ms = current_time_ns / std.time.ns_per_ms, + .total_bytes = current_bytes, + }); + } } - const should_stop_early = try strategy.handleProgress( - timer.timer_interface().read(), - current_bytes, - ); + if (has_progress) { + progress_callback.call(speedMeasurementFromBitsPerSecond(decision.display_speed_bits_per_sec)); + last_emitted_progress_speed_bits_per_sec = decision.display_speed_bits_per_sec; + } - if (should_stop_early) break; + if (decision.should_stop) { + if (trace_capture) |trace| { + trace.stop_at_ms = current_time_ns / std.time.ns_per_ms; + trace.stable = current_time_ns < strategy.max_duration_ns; + } + break; + } } - // Stop and wait for workers worker_manager.stopAndJoinWorkers(); - // Calculate results const totals = worker_manager.calculateDownloadTotals(workers); if (totals.errors > 0) { print("Download completed with {} errors\n", .{totals.errors}); } const actual_duration_ns = timer.timer_interface().read(); - const actual_duration_s = @as(f64, @floatFromInt(actual_duration_ns)) / std.time.ns_per_s; + const speed_bits_per_sec = if (has_progress and last_emitted_progress_speed_bits_per_sec != null) + last_emitted_progress_speed_bits_per_sec.? + else + strategy.finalSpeedBitsPerSecond(totals.bytes, actual_duration_ns); - if (actual_duration_s == 0) return SpeedTestResult.fromBytesPerSecond(0); - const speed_bytes_per_sec = @as(f64, @floatFromInt(totals.bytes)) / actual_duration_s; - return SpeedTestResult.fromBytesPerSecond(speed_bytes_per_sec); + if (trace_capture) |trace| { + if (trace.stop_at_ms == 0) { + trace.stop_at_ms = actual_duration_ns / std.time.ns_per_ms; + trace.stable = false; + } + trace.final_speed_bits_per_sec = speed_bits_per_sec; + } + + return SpeedTestResult.fromBitsPerSecond(speed_bits_per_sec); } - // Private implementation for stability-based upload fn measureUploadSpeedWithStability( self: *HTTPSpeedTester, urls: []const []const u8, strategy: *StabilityStrategy, upload_data: []const u8, + trace_capture: ?*TraceCapture, comptime ProgressType: ?type, progress_callback: if (ProgressType) |T| T else void, ) !SpeedTestResult { const has_progress = ProgressType != null; + if (urls.len == 0) return SpeedTestResult.fromBitsPerSecond(0); + var timer = try speed_worker.RealTimer.init(); var should_stop = std.atomic.Value(bool).init(false); + var last_emitted_progress_speed_bits_per_sec: ?f64 = null; - // Initialize bandwidth meter for progress tracking - var bandwidth_meter = BandwidthMeter.init(); - if (has_progress) { - try bandwidth_meter.start(); - } + const max_workers = self.effectiveMaxWorkers(urls.len, strategy.criteria); + var active_worker_count = std.atomic.Value(u32).init(initialActiveWorkers(max_workers, strategy.criteria)); - // Setup worker manager - const num_workers = @min(urls.len, self.concurrent_connections); - var worker_manager = try WorkerManager.init(self.allocator, &should_stop, num_workers); + var worker_manager = try WorkerManager.init(self.allocator, &should_stop, max_workers); defer worker_manager.deinit(); - // Setup upload workers - const workers = try worker_manager.setupUploadWorkers( + const workers = try worker_manager.setupUploadWorkersWithControl( urls, - self.concurrent_connections, + max_workers, timer.timer_interface(), strategy.max_duration_ns, upload_data, + &active_worker_count, + strategy.criteria.max_bytes_in_flight, ); defer worker_manager.cleanupWorkers(workers); - // Start workers try worker_manager.startUploadWorkers(workers); - // Main measurement loop while (strategy.shouldContinue(timer.timer_interface().read())) { std.Thread.sleep(strategy.getSleepInterval()); + const current_time_ns = timer.timer_interface().read(); const current_bytes = worker_manager.getCurrentUploadBytes(workers); + const decision = try strategy.handleProgress(current_time_ns, current_bytes); - if (has_progress) { - bandwidth_meter.update_total(current_bytes); - const measurement = bandwidth_meter.bandwidthWithUnits(); - progress_callback.call(measurement); + applyConnectionRamp(&active_worker_count, decision.desired_connections, max_workers); + + if (trace_capture) |trace| { + if (decision.sampled) { + try trace.samples.append(trace.allocator, .{ + .t_ms = current_time_ns / std.time.ns_per_ms, + .total_bytes = current_bytes, + }); + } } - const should_stop_early = try strategy.handleProgress( - timer.timer_interface().read(), - current_bytes, - ); + if (has_progress) { + progress_callback.call(speedMeasurementFromBitsPerSecond(decision.display_speed_bits_per_sec)); + last_emitted_progress_speed_bits_per_sec = decision.display_speed_bits_per_sec; + } - if (should_stop_early) break; + if (decision.should_stop) { + if (trace_capture) |trace| { + trace.stop_at_ms = current_time_ns / std.time.ns_per_ms; + trace.stable = current_time_ns < strategy.max_duration_ns; + } + break; + } } - // Stop and wait for workers worker_manager.stopAndJoinWorkers(); - // Calculate results const totals = worker_manager.calculateUploadTotals(workers); if (totals.errors > 0) { print("Upload completed with {} errors\n", .{totals.errors}); } const actual_duration_ns = timer.timer_interface().read(); - const actual_duration_s = @as(f64, @floatFromInt(actual_duration_ns)) / std.time.ns_per_s; + const speed_bits_per_sec = if (has_progress and last_emitted_progress_speed_bits_per_sec != null) + last_emitted_progress_speed_bits_per_sec.? + else + strategy.finalSpeedBitsPerSecond(totals.bytes, actual_duration_ns); - if (actual_duration_s == 0) return SpeedTestResult.fromBytesPerSecond(0); - const speed_bytes_per_sec = @as(f64, @floatFromInt(totals.bytes)) / actual_duration_s; - return SpeedTestResult.fromBytesPerSecond(speed_bytes_per_sec); + if (trace_capture) |trace| { + if (trace.stop_at_ms == 0) { + trace.stop_at_ms = actual_duration_ns / std.time.ns_per_ms; + trace.stable = false; + } + trace.final_speed_bits_per_sec = speed_bits_per_sec; + } + + return SpeedTestResult.fromBitsPerSecond(speed_bits_per_sec); } }; diff --git a/src/lib/measurement_strategy.zig b/src/lib/measurement_strategy.zig index 6b743c9..d0604a6 100644 --- a/src/lib/measurement_strategy.zig +++ b/src/lib/measurement_strategy.zig @@ -1,12 +1,39 @@ const std = @import("std"); pub const StabilityCriteria = struct { - ramp_up_duration_seconds: u32 = 4, - max_duration_seconds: u32 = 25, - measurement_interval_ms: u64 = 750, - sliding_window_size: u32 = 6, - stability_threshold_cov: f64 = 0.15, - stable_checks_required: u32 = 2, + min_duration_seconds: u32 = 7, + max_duration_seconds: u32 = 30, + progress_frequency_ms: u64 = 150, + moving_average_window_size: u32 = 5, + stability_delta_percent: f64 = 2.0, + min_stable_measurements: u32 = 6, + connections_min: u32 = 1, + connections_max: u32 = 8, + max_bytes_in_flight: u64 = 78_643_200, +}; + +pub const ProgressSnapshot = struct { + bytes: u64, + time_ms: u64, +}; + +pub const ProgressMeasurement = struct { + speed_bits_per_sec: f64, +}; + +pub const EstimationPhase = enum { + ramp, + steady, +}; + +pub const StabilityDecision = struct { + should_stop: bool, + desired_connections: u32, + phase: EstimationPhase, + display_speed_bits_per_sec: f64, + authoritative_speed_bits_per_sec: f64, + speed_bits_per_sec: f64, + sampled: bool = false, }; pub const DurationStrategy = struct { @@ -22,128 +49,352 @@ pub const DurationStrategy = struct { } }; +const StableMovingAverage = struct { + window_size: usize, + snapshot_reset_threshold: usize = 5, + start_index: usize = 0, + current_speed_bytes_per_ms: f64 = 0, + fixed_start_index: bool = false, + last_len: usize = 0, + bytes_sum: u64 = 0, + time_sum_ms: u64 = 0, + + fn init(window_size: usize) StableMovingAverage { + return .{ .window_size = @max(window_size, 1) }; + } + + fn reset(self: *StableMovingAverage) void { + self.start_index = 0; + self.current_speed_bytes_per_ms = 0; + self.fixed_start_index = false; + self.last_len = 0; + self.bytes_sum = 0; + self.time_sum_ms = 0; + } + + fn compute(self: *StableMovingAverage, snapshots: []const ProgressSnapshot) f64 { + if (snapshots.len < self.snapshot_reset_threshold or snapshots.len < self.last_len) { + self.reset(); + } + self.last_len = snapshots.len; + + if (!self.fixed_start_index and snapshots.len > 0) { + var sum_bytes: u64 = 0; + var sum_time_ms: u64 = 0; + + const end = if (snapshots.len > self.window_size) snapshots.len - self.window_size else 0; + var i = snapshots.len; + while (i > end) { + i -= 1; + sum_bytes += snapshots[i].bytes; + sum_time_ms += snapshots[i].time_ms; + } + + const candidate_speed = if (sum_time_ms > 0) + @as(f64, @floatFromInt(sum_bytes)) / @as(f64, @floatFromInt(sum_time_ms)) + else + 0; + + if (candidate_speed >= self.current_speed_bytes_per_ms) { + self.start_index = snapshots.len; + self.current_speed_bytes_per_ms = candidate_speed; + self.bytes_sum = sum_bytes; + self.time_sum_ms = sum_time_ms; + } else { + self.fixed_start_index = true; + } + } + + var j = self.start_index; + while (j < snapshots.len) : (j += 1) { + self.bytes_sum += snapshots[j].bytes; + self.time_sum_ms += snapshots[j].time_ms; + } + self.start_index = snapshots.len; + + if (self.time_sum_ms == 0) return 0; + return 1000.0 * @as(f64, @floatFromInt(self.bytes_sum)) * 8.0 / @as(f64, @floatFromInt(self.time_sum_ms)); + } +}; + +const StableDeltaStopper = struct { + min_duration_ns: u64, + max_duration_ns: u64, + stability_delta_percent: f64, + min_stable_measurements: usize, + + fn init(criteria: StabilityCriteria) StableDeltaStopper { + return .{ + .min_duration_ns = @as(u64, criteria.min_duration_seconds) * std.time.ns_per_s, + .max_duration_ns = @as(u64, criteria.max_duration_seconds) * std.time.ns_per_s, + .stability_delta_percent = criteria.stability_delta_percent, + .min_stable_measurements = @max(criteria.min_stable_measurements, 1), + }; + } + + fn shouldStop(self: *const StableDeltaStopper, test_time_ns: u64, measurements: []const ProgressMeasurement, current_speed_bits_per_sec: f64) bool { + if (test_time_ns >= self.max_duration_ns) return true; + if (test_time_ns < self.min_duration_ns) return false; + if (measurements.len < self.min_stable_measurements) return false; + + const half_window = (self.min_stable_measurements + 1) / 2; + const max_index = lastWindowMaxIndex(measurements, half_window); + if (measurements.len - max_index < half_window) return false; + + const start = measurements.len - self.min_stable_measurements; + const delta = maxDeltaPercent(current_speed_bits_per_sec, measurements[start..]); + return delta <= self.stability_delta_percent; + } +}; + +fn lastWindowMaxIndex(measurements: []const ProgressMeasurement, window_size: usize) usize { + if (measurements.len == 0) return 0; + + const start = if (measurements.len > window_size) measurements.len - window_size else 0; + var max_index = start; + var max_speed: f64 = 0; + + var i = measurements.len; + while (i > start) { + i -= 1; + const speed = measurements[i].speed_bits_per_sec; + if (speed >= max_speed) { + max_speed = speed; + max_index = i; + } + } + + return max_index; +} + +fn maxDeltaPercent(reference_speed: f64, measurements: []const ProgressMeasurement) f64 { + if (measurements.len == 0) return 100; + if (reference_speed <= 0) return 100; + + var max_delta: f64 = 0; + for (measurements) |measurement| { + const delta = 100.0 * @abs(measurement.speed_bits_per_sec - reference_speed) / reference_speed; + if (delta > max_delta) max_delta = delta; + } + return max_delta; +} + +fn desiredConnections(criteria: StabilityCriteria, speed_bits_per_sec: f64) u32 { + const max_connections = @max(criteria.connections_max, criteria.connections_min); + + const desired = if (speed_bits_per_sec >= 50_000_000) + max_connections + else if (speed_bits_per_sec >= 10_000_000) + @min(max_connections, 5) + else if (speed_bits_per_sec >= 1_000_000) + @min(max_connections, 3) + else if (speed_bits_per_sec >= 500_000) + @min(max_connections, 2) + else + criteria.connections_min; + + return @max(desired, criteria.connections_min); +} + pub const StabilityStrategy = struct { criteria: StabilityCriteria, - ramp_up_duration_ns: u64, + min_duration_ns: u64, max_duration_ns: u64, + ramp_min_duration_ns: u64, measurement_interval_ns: u64, - speed_measurements: std.ArrayList(f64), // Sliding window of recent speeds - last_sample_time: u64 = 0, + snapshots: std.ArrayList(ProgressSnapshot), + progress_measurements: std.ArrayList(ProgressMeasurement), + moving_average: StableMovingAverage, + stopper: StableDeltaStopper, + last_sample_time_ns: u64 = 0, last_total_bytes: u64 = 0, - consecutive_stable_checks: u32 = 0, + phase: EstimationPhase = .ramp, + provisional_speed_bits_per_sec: f64 = 0, + authoritative_speed_bits_per_sec: f64 = 0, + steady_total_bytes: u64 = 0, + steady_total_time_ns: u64 = 0, + ramp_max_desired_connections: u32 = 1, + ramp_ticks_without_increase: u32 = 0, + locked_connections: u32 = 1, allocator: std.mem.Allocator, pub fn init(allocator: std.mem.Allocator, criteria: StabilityCriteria) StabilityStrategy { - return StabilityStrategy{ + const min_duration_ns = @as(u64, criteria.min_duration_seconds) * std.time.ns_per_s; + const max_duration_ns = @as(u64, criteria.max_duration_seconds) * std.time.ns_per_s; + const ramp_min_duration_ns = @min(2 * std.time.ns_per_s, min_duration_ns); + const min_connections = @max(criteria.connections_min, 1); + + return .{ .criteria = criteria, - .ramp_up_duration_ns = @as(u64, criteria.ramp_up_duration_seconds) * std.time.ns_per_s, - .max_duration_ns = @as(u64, criteria.max_duration_seconds) * std.time.ns_per_s, - .measurement_interval_ns = criteria.measurement_interval_ms * std.time.ns_per_ms, - .speed_measurements = std.ArrayList(f64).empty, + .min_duration_ns = min_duration_ns, + .max_duration_ns = max_duration_ns, + .ramp_min_duration_ns = @min(ramp_min_duration_ns, max_duration_ns), + .measurement_interval_ns = criteria.progress_frequency_ms * std.time.ns_per_ms, + .snapshots = std.ArrayList(ProgressSnapshot).empty, + .progress_measurements = std.ArrayList(ProgressMeasurement).empty, + .moving_average = StableMovingAverage.init(criteria.moving_average_window_size), + .stopper = StableDeltaStopper.init(criteria), + .ramp_max_desired_connections = min_connections, + .locked_connections = min_connections, .allocator = allocator, }; } pub fn deinit(self: *StabilityStrategy) void { - self.speed_measurements.deinit(self.allocator); + self.snapshots.deinit(self.allocator); + self.progress_measurements.deinit(self.allocator); } - pub fn shouldContinue(self: StabilityStrategy, current_time: u64) bool { - return current_time < self.max_duration_ns; + pub fn shouldContinue(self: StabilityStrategy, current_time_ns: u64) bool { + return current_time_ns < self.max_duration_ns; } pub fn getSleepInterval(self: StabilityStrategy) u64 { - return self.measurement_interval_ns / 3; // Sample more frequently than measurement interval + return self.measurement_interval_ns; } - pub fn shouldSample(self: *StabilityStrategy, current_time: u64) bool { - return current_time - self.last_sample_time >= self.measurement_interval_ns; + pub fn getCurrentSpeedBitsPerSec(self: StabilityStrategy) f64 { + return if (self.phase == .steady and self.authoritative_speed_bits_per_sec > 0) + self.authoritative_speed_bits_per_sec + else + self.provisional_speed_bits_per_sec; } - pub fn addSample(self: *StabilityStrategy, current_time: u64, current_total_bytes: u64) !bool { - // Skip first sample to calculate speed - if (self.last_sample_time > 0) { - const bytes_diff = current_total_bytes - self.last_total_bytes; - const time_diff_ns = current_time - self.last_sample_time; - const time_diff_s = @as(f64, @floatFromInt(time_diff_ns)) / std.time.ns_per_s; + fn updateRampState(self: *StabilityStrategy, desired_connections: u32) void { + if (desired_connections > self.ramp_max_desired_connections) { + self.ramp_max_desired_connections = desired_connections; + self.ramp_ticks_without_increase = 0; + } else { + self.ramp_ticks_without_increase += 1; + } + } - const interval_speed = @as(f64, @floatFromInt(bytes_diff)) / time_diff_s; + fn shouldEnterSteadyPhase(self: *const StabilityStrategy, current_time_ns: u64) bool { + const ramp_lock_ticks_required: u32 = 3; + if (self.ramp_ticks_without_increase < ramp_lock_ticks_required) return false; - // Phase 1: Ramp-up - collect measurements but don't check stability - if (current_time < self.ramp_up_duration_ns) { - try self.speed_measurements.append(self.allocator, interval_speed); + const max_connections = @max(self.criteria.connections_max, self.criteria.connections_min); + const ramp_reached_max = self.ramp_max_desired_connections >= max_connections; + const ramp_time_elapsed = current_time_ns >= self.ramp_min_duration_ns; + return ramp_reached_max or ramp_time_elapsed; + } - // Keep sliding window size - if (self.speed_measurements.items.len > self.criteria.sliding_window_size) { - _ = self.speed_measurements.orderedRemove(0); - } - } else { - // Phase 2: Stabilization - check CoV for stability - try self.speed_measurements.append(self.allocator, interval_speed); - - // Maintain sliding window - if (self.speed_measurements.items.len > self.criteria.sliding_window_size) { - _ = self.speed_measurements.orderedRemove(0); - } - - // Check stability if we have enough measurements - if (self.speed_measurements.items.len >= self.criteria.sliding_window_size) { - const cov = calculateCoV(self.speed_measurements.items); - - if (cov <= self.criteria.stability_threshold_cov) { - self.consecutive_stable_checks += 1; - if (self.consecutive_stable_checks >= self.criteria.stable_checks_required) { - return true; // Stable, can stop - } - } else { - self.consecutive_stable_checks = 0; // Reset counter - } - } - } - } + fn updateSteadyEstimator(self: *StabilityStrategy, bytes_diff: u64, elapsed_ns: u64) void { + self.steady_total_bytes += bytes_diff; + self.steady_total_time_ns += elapsed_ns; + if (self.steady_total_time_ns == 0) return; - self.last_sample_time = current_time; - self.last_total_bytes = current_total_bytes; - return false; // Not stable yet + const duration_s = @as(f64, @floatFromInt(self.steady_total_time_ns)) / std.time.ns_per_s; + if (duration_s == 0) return; + self.authoritative_speed_bits_per_sec = @as(f64, @floatFromInt(self.steady_total_bytes)) * 8.0 / duration_s; } - pub fn handleProgress(self: *StabilityStrategy, current_time: u64, current_bytes: u64) !bool { - if (self.shouldSample(current_time)) { - return try self.addSample(current_time, current_bytes); + pub fn handleProgress(self: *StabilityStrategy, current_time_ns: u64, current_total_bytes: u64) !StabilityDecision { + const initial_display_speed = self.getCurrentSpeedBitsPerSec(); + const initial_desired = if (self.phase == .steady) self.locked_connections else @max(self.criteria.connections_min, 1); + var decision = StabilityDecision{ + .should_stop = false, + .desired_connections = initial_desired, + .phase = self.phase, + .display_speed_bits_per_sec = initial_display_speed, + .authoritative_speed_bits_per_sec = initial_display_speed, + .speed_bits_per_sec = initial_display_speed, + .sampled = false, + }; + + if (self.last_sample_time_ns == 0) { + self.last_sample_time_ns = current_time_ns; + self.last_total_bytes = current_total_bytes; + decision.desired_connections = self.criteria.connections_min; + return decision; } - return false; - } -}; -/// Calculate Coefficient of Variation (standard deviation / mean) for stability detection -fn calculateCoV(speeds: []const f64) f64 { - if (speeds.len < 2) return 1.0; // Not enough data, assume unstable + if (current_time_ns <= self.last_sample_time_ns) { + return decision; + } - // Calculate mean - var sum: f64 = 0; - for (speeds) |speed| { - sum += speed; - } - const mean = sum / @as(f64, @floatFromInt(speeds.len)); + const elapsed_ns = current_time_ns - self.last_sample_time_ns; + if (elapsed_ns < self.measurement_interval_ns) { + return decision; + } + + const bytes_diff = if (current_total_bytes >= self.last_total_bytes) + current_total_bytes - self.last_total_bytes + else + 0; + + const time_ms = @max(elapsed_ns / std.time.ns_per_ms, 1); - if (mean == 0) return 1.0; // Avoid division by zero + try self.snapshots.append(self.allocator, .{ + .bytes = bytes_diff, + .time_ms = time_ms, + }); - // Calculate variance - var variance: f64 = 0; - for (speeds) |speed| { - const diff = speed - mean; - variance += diff * diff; + self.provisional_speed_bits_per_sec = self.moving_average.compute(self.snapshots.items); + const provisional_desired = desiredConnections(self.criteria, self.provisional_speed_bits_per_sec); + + if (self.phase == .ramp) { + self.updateRampState(provisional_desired); + if (self.shouldEnterSteadyPhase(current_time_ns)) { + self.phase = .steady; + self.locked_connections = self.ramp_max_desired_connections; + self.steady_total_bytes = 0; + self.steady_total_time_ns = 0; + self.authoritative_speed_bits_per_sec = 0; + } + } + + if (self.phase == .steady) { + self.updateSteadyEstimator(bytes_diff, elapsed_ns); + try self.progress_measurements.append(self.allocator, .{ + .speed_bits_per_sec = self.authoritative_speed_bits_per_sec, + }); + } + + const display_speed = if (self.phase == .steady and self.authoritative_speed_bits_per_sec > 0) + self.authoritative_speed_bits_per_sec + else + self.provisional_speed_bits_per_sec; + const authoritative_speed = if (self.authoritative_speed_bits_per_sec > 0) + self.authoritative_speed_bits_per_sec + else + self.provisional_speed_bits_per_sec; + const desired_connections = if (self.phase == .steady) self.locked_connections else provisional_desired; + + decision.phase = self.phase; + decision.display_speed_bits_per_sec = display_speed; + decision.authoritative_speed_bits_per_sec = authoritative_speed; + decision.speed_bits_per_sec = display_speed; + decision.desired_connections = desired_connections; + decision.should_stop = self.stopper.shouldStop( + current_time_ns, + self.progress_measurements.items, + authoritative_speed, + ); + decision.sampled = true; + + self.last_sample_time_ns = current_time_ns; + self.last_total_bytes = current_total_bytes; + + return decision; } - variance = variance / @as(f64, @floatFromInt(speeds.len)); - // Calculate CoV (coefficient of variation) - const std_dev = @sqrt(variance); - return std_dev / mean; -} + pub fn finalSpeedBitsPerSecond(self: *const StabilityStrategy, total_bytes: u64, duration_ns: u64) f64 { + if (self.authoritative_speed_bits_per_sec > 0) { + return self.authoritative_speed_bits_per_sec; + } + if (self.provisional_speed_bits_per_sec > 0) { + return self.provisional_speed_bits_per_sec; + } + if (duration_ns == 0) return 0; + const duration_s = @as(f64, @floatFromInt(duration_ns)) / std.time.ns_per_s; + if (duration_s == 0) return 0; + return @as(f64, @floatFromInt(total_bytes)) * 8.0 / duration_s; + } +}; -// Clean helper functions pub fn createDurationStrategy(duration_seconds: u32, progress_update_interval_ms: u64) DurationStrategy { - return DurationStrategy{ + return .{ .target_duration_ns = @as(u64, duration_seconds) * std.time.ns_per_s, .progress_update_interval_ms = progress_update_interval_ms, }; diff --git a/src/lib/spinner/spinner.zig b/src/lib/spinner/spinner.zig index 64d04ed..819d0fc 100644 --- a/src/lib/spinner/spinner.zig +++ b/src/lib/spinner/spinner.zig @@ -25,7 +25,8 @@ const RED = "\x1b[31m"; const RESET = "\x1b[0m"; allocator: Allocator, -message: []u8 = &.{}, +message_buf: [256]u8 = undefined, +message_len: usize = 0, writer_buffer: [4096]u8, writer: WriterType, thread: ?Thread = null, @@ -37,7 +38,8 @@ pub fn init(allocator: Allocator, options: Options) Spinner { var spinner: Spinner = undefined; spinner.allocator = allocator; spinner.refresh_rate_ms = options.refresh_rate_ms; - spinner.message = &.{}; + spinner.message_buf = [_]u8{0} ** 256; + spinner.message_len = 0; spinner.thread = null; spinner.mutex = .{}; spinner.should_stop = std.atomic.Value(bool).init(true); @@ -54,12 +56,6 @@ pub fn init(allocator: Allocator, options: Options) Spinner { pub fn deinit(self: *Spinner) void { self.stop(); - self.mutex.lock(); - defer self.mutex.unlock(); - if (self.message.len > 0) { - self.allocator.free(self.message); - self.message = &.{}; - } } pub fn start(self: *Spinner, comptime fmt: []const u8, args: anytype) !void { @@ -68,10 +64,7 @@ pub fn start(self: *Spinner, comptime fmt: []const u8, args: anytype) !void { self.mutex.lock(); defer self.mutex.unlock(); - if (self.message.len > 0) { - self.allocator.free(self.message); - } - self.message = try std.fmt.allocPrint(self.allocator, fmt, args); + setMessage(self, fmt, args); switch (self.writer) { .file => |*w| { @@ -110,11 +103,7 @@ pub fn stop(self: *Spinner) void { pub fn updateMessage(self: *Spinner, comptime fmt: []const u8, args: anytype) !void { self.mutex.lock(); defer self.mutex.unlock(); - - if (self.message.len > 0) { - self.allocator.free(self.message); - } - self.message = try std.fmt.allocPrint(self.allocator, fmt, args); + setMessage(self, fmt, args); } pub fn succeed(self: *Spinner, comptime fmt: []const u8, args: anytype) !void { @@ -127,12 +116,10 @@ pub fn succeed(self: *Spinner, comptime fmt: []const u8, args: anytype) !void { switch (self.writer) { .file => |*w| { - w.interface.writeAll(SHOW_CURSOR) catch {}; try w.interface.print(GREEN ++ "✔" ++ RESET ++ " {s}\n", .{msg}); try w.interface.flush(); }, .test_writer => |*w| { - w.writeAll(SHOW_CURSOR) catch {}; try w.print(GREEN ++ "✔" ++ RESET ++ " {s}\n", .{msg}); }, } @@ -148,12 +135,10 @@ pub fn fail(self: *Spinner, comptime fmt: []const u8, args: anytype) !void { switch (self.writer) { .file => |*w| { - w.interface.writeAll(SHOW_CURSOR) catch {}; try w.interface.print(RED ++ "✖" ++ RESET ++ " {s}\n", .{msg}); try w.interface.flush(); }, .test_writer => |*w| { - w.writeAll(SHOW_CURSOR) catch {}; try w.print(RED ++ "✖" ++ RESET ++ " {s}\n", .{msg}); }, } @@ -164,7 +149,7 @@ fn spinLoop(self: *Spinner) void { while (!self.should_stop.load(.acquire)) { self.mutex.lock(); - const msg = self.message; + const msg = self.message_buf[0..self.message_len]; switch (self.writer) { .file => |*w| { w.interface.print(CLEAR_LINE ++ "{s} {s}", .{ frames[frame_idx], msg }) catch {}; @@ -181,6 +166,26 @@ fn spinLoop(self: *Spinner) void { } } +fn setMessage(self: *Spinner, comptime fmt: []const u8, args: anytype) void { + const msg = std.fmt.bufPrint(&self.message_buf, fmt, args) catch |err| switch (err) { + error.NoSpaceLeft => { + var tmp: [1024]u8 = undefined; + const long_msg = std.fmt.bufPrint(&tmp, fmt, args) catch { + const truncated = "[message too long]"; + @memcpy(self.message_buf[0..truncated.len], truncated); + self.message_len = truncated.len; + return; + }; + const len = @min(long_msg.len, self.message_buf.len); + @memcpy(self.message_buf[0..len], long_msg[0..len]); + self.message_len = len; + return; + }, + }; + + self.message_len = msg.len; +} + test "spinner outputs hide cursor on start" { const testing = std.testing; diff --git a/src/lib/tests/fixtures/download_bursty.json b/src/lib/tests/fixtures/download_bursty.json new file mode 100644 index 0000000..1d759d7 --- /dev/null +++ b/src/lib/tests/fixtures/download_bursty.json @@ -0,0 +1,45 @@ +{ + "name": "download_bursty_then_stable", + "direction": "download", + "criteria": { + "min_duration_seconds": 2, + "max_duration_seconds": 8, + "progress_frequency_ms": 150, + "moving_average_window_size": 5, + "stability_delta_percent": 2.0, + "min_stable_measurements": 6, + "connections_min": 1, + "connections_max": 8, + "max_bytes_in_flight": 78643200 + }, + "samples": [ + { "t_ms": 0, "total_bytes": 0 }, + { "t_ms": 150, "total_bytes": 750000 }, + { "t_ms": 300, "total_bytes": 4500000 }, + { "t_ms": 450, "total_bytes": 6000000 }, + { "t_ms": 600, "total_bytes": 9000000 }, + { "t_ms": 750, "total_bytes": 11250000 }, + { "t_ms": 900, "total_bytes": 13500000 }, + { "t_ms": 1050, "total_bytes": 15750000 }, + { "t_ms": 1200, "total_bytes": 18000000 }, + { "t_ms": 1350, "total_bytes": 20250000 }, + { "t_ms": 1500, "total_bytes": 22500000 }, + { "t_ms": 1650, "total_bytes": 24750000 }, + { "t_ms": 1800, "total_bytes": 27000000 }, + { "t_ms": 1950, "total_bytes": 29250000 }, + { "t_ms": 2100, "total_bytes": 31500000 }, + { "t_ms": 2250, "total_bytes": 33750000 }, + { "t_ms": 2400, "total_bytes": 36000000 }, + { "t_ms": 2550, "total_bytes": 38250000 }, + { "t_ms": 2700, "total_bytes": 40500000 }, + { "t_ms": 2850, "total_bytes": 42750000 }, + { "t_ms": 3000, "total_bytes": 45000000 }, + { "t_ms": 3150, "total_bytes": 47250000 } + ], + "expected": { + "stop_at_ms": 2100, + "speed_mbps": 120.0, + "speed_tolerance_mbps": 6.0, + "stable": true + } +} diff --git a/src/lib/tests/fixtures/download_steady.json b/src/lib/tests/fixtures/download_steady.json new file mode 100644 index 0000000..9c8139a --- /dev/null +++ b/src/lib/tests/fixtures/download_steady.json @@ -0,0 +1,40 @@ +{ + "name": "download_steady_120mbps", + "direction": "download", + "criteria": { + "min_duration_seconds": 2, + "max_duration_seconds": 8, + "progress_frequency_ms": 150, + "moving_average_window_size": 5, + "stability_delta_percent": 2.0, + "min_stable_measurements": 6, + "connections_min": 1, + "connections_max": 8, + "max_bytes_in_flight": 78643200 + }, + "samples": [ + { "t_ms": 0, "total_bytes": 0 }, + { "t_ms": 150, "total_bytes": 2250000 }, + { "t_ms": 300, "total_bytes": 4500000 }, + { "t_ms": 450, "total_bytes": 6750000 }, + { "t_ms": 600, "total_bytes": 9000000 }, + { "t_ms": 750, "total_bytes": 11250000 }, + { "t_ms": 900, "total_bytes": 13500000 }, + { "t_ms": 1050, "total_bytes": 15750000 }, + { "t_ms": 1200, "total_bytes": 18000000 }, + { "t_ms": 1350, "total_bytes": 20250000 }, + { "t_ms": 1500, "total_bytes": 22500000 }, + { "t_ms": 1650, "total_bytes": 24750000 }, + { "t_ms": 1800, "total_bytes": 27000000 }, + { "t_ms": 1950, "total_bytes": 29250000 }, + { "t_ms": 2100, "total_bytes": 31500000 }, + { "t_ms": 2250, "total_bytes": 33750000 }, + { "t_ms": 2400, "total_bytes": 36000000 } + ], + "expected": { + "stop_at_ms": 2100, + "speed_mbps": 120.0, + "speed_tolerance_mbps": 3.0, + "stable": true + } +} diff --git a/src/lib/tests/fixtures/fast_trace.json b/src/lib/tests/fixtures/fast_trace.json new file mode 100644 index 0000000..921c577 --- /dev/null +++ b/src/lib/tests/fixtures/fast_trace.json @@ -0,0 +1,431 @@ +{ + "criteria": { + "min_duration_seconds": 7, + "max_duration_seconds": 30, + "progress_frequency_ms": 150, + "moving_average_window_size": 5, + "stability_delta_percent": 2, + "min_stable_measurements": 6, + "connections_min": 1, + "connections_max": 8, + "max_bytes_in_flight": 78643200 + }, + "download": { + "stop_at_ms": 8750, + "stable": true, + "speed_mbps": 136.89258038470672, + "samples": [ + { + "t_ms": 309, + "total_bytes": 458752 + }, + { + "t_ms": 463, + "total_bytes": 2031616 + }, + { + "t_ms": 618, + "total_bytes": 2818048 + }, + { + "t_ms": 772, + "total_bytes": 3866624 + }, + { + "t_ms": 927, + "total_bytes": 8060928 + }, + { + "t_ms": 1079, + "total_bytes": 11206656 + }, + { + "t_ms": 1234, + "total_bytes": 11206656 + }, + { + "t_ms": 1388, + "total_bytes": 11206656 + }, + { + "t_ms": 1543, + "total_bytes": 14352384 + }, + { + "t_ms": 1697, + "total_bytes": 18546688 + }, + { + "t_ms": 1852, + "total_bytes": 20643840 + }, + { + "t_ms": 2006, + "total_bytes": 23789568 + }, + { + "t_ms": 2156, + "total_bytes": 25886720 + }, + { + "t_ms": 2311, + "total_bytes": 30081024 + }, + { + "t_ms": 2463, + "total_bytes": 32178176 + }, + { + "t_ms": 2614, + "total_bytes": 36372480 + }, + { + "t_ms": 2769, + "total_bytes": 38469632 + }, + { + "t_ms": 2924, + "total_bytes": 39518208 + }, + { + "t_ms": 3075, + "total_bytes": 44761088 + }, + { + "t_ms": 3230, + "total_bytes": 44761088 + }, + { + "t_ms": 3383, + "total_bytes": 50003968 + }, + { + "t_ms": 3538, + "total_bytes": 50003968 + }, + { + "t_ms": 3693, + "total_bytes": 55246848 + }, + { + "t_ms": 3845, + "total_bytes": 56295424 + }, + { + "t_ms": 3996, + "total_bytes": 58392576 + }, + { + "t_ms": 4151, + "total_bytes": 63635456 + }, + { + "t_ms": 4305, + "total_bytes": 63635456 + }, + { + "t_ms": 4457, + "total_bytes": 68878336 + }, + { + "t_ms": 4609, + "total_bytes": 70975488 + }, + { + "t_ms": 4764, + "total_bytes": 74121216 + }, + { + "t_ms": 4916, + "total_bytes": 75169792 + }, + { + "t_ms": 5070, + "total_bytes": 77266944 + }, + { + "t_ms": 5225, + "total_bytes": 83558400 + }, + { + "t_ms": 5378, + "total_bytes": 84672512 + }, + { + "t_ms": 5531, + "total_bytes": 86769664 + }, + { + "t_ms": 5686, + "total_bytes": 89915392 + }, + { + "t_ms": 5837, + "total_bytes": 90963968 + }, + { + "t_ms": 5990, + "total_bytes": 90963968 + }, + { + "t_ms": 6145, + "total_bytes": 99352576 + }, + { + "t_ms": 6298, + "total_bytes": 101449728 + }, + { + "t_ms": 6453, + "total_bytes": 101449728 + }, + { + "t_ms": 6608, + "total_bytes": 101449728 + }, + { + "t_ms": 6760, + "total_bytes": 111935488 + }, + { + "t_ms": 6913, + "total_bytes": 111935488 + }, + { + "t_ms": 7065, + "total_bytes": 113049600 + }, + { + "t_ms": 7215, + "total_bytes": 113049600 + }, + { + "t_ms": 7369, + "total_bytes": 121438208 + }, + { + "t_ms": 7522, + "total_bytes": 121438208 + }, + { + "t_ms": 7677, + "total_bytes": 122552320 + }, + { + "t_ms": 7831, + "total_bytes": 124649472 + }, + { + "t_ms": 7986, + "total_bytes": 133038080 + }, + { + "t_ms": 8137, + "total_bytes": 133038080 + }, + { + "t_ms": 8288, + "total_bytes": 136183808 + }, + { + "t_ms": 8440, + "total_bytes": 139329536 + }, + { + "t_ms": 8595, + "total_bytes": 139329536 + }, + { + "t_ms": 8750, + "total_bytes": 144572416 + } + ] + }, + "upload": { + "stop_at_ms": 7071, + "stable": true, + "speed_mbps": 63.436221974876766, + "samples": [ + { + "t_ms": 310, + "total_bytes": 129024 + }, + { + "t_ms": 460, + "total_bytes": 526336 + }, + { + "t_ms": 615, + "total_bytes": 2156544 + }, + { + "t_ms": 770, + "total_bytes": 2291712 + }, + { + "t_ms": 925, + "total_bytes": 4589568 + }, + { + "t_ms": 1075, + "total_bytes": 5072896 + }, + { + "t_ms": 1227, + "total_bytes": 5892096 + }, + { + "t_ms": 1382, + "total_bytes": 7530496 + }, + { + "t_ms": 1535, + "total_bytes": 8120320 + }, + { + "t_ms": 1690, + "total_bytes": 8120320 + }, + { + "t_ms": 1843, + "total_bytes": 8218624 + }, + { + "t_ms": 1998, + "total_bytes": 8382464 + }, + { + "t_ms": 2153, + "total_bytes": 10741760 + }, + { + "t_ms": 2308, + "total_bytes": 12052480 + }, + { + "t_ms": 2463, + "total_bytes": 13101056 + }, + { + "t_ms": 2618, + "total_bytes": 15722496 + }, + { + "t_ms": 2773, + "total_bytes": 16771072 + }, + { + "t_ms": 2924, + "total_bytes": 17295360 + }, + { + "t_ms": 3077, + "total_bytes": 18343936 + }, + { + "t_ms": 3232, + "total_bytes": 19916800 + }, + { + "t_ms": 3386, + "total_bytes": 20965376 + }, + { + "t_ms": 3539, + "total_bytes": 23062528 + }, + { + "t_ms": 3694, + "total_bytes": 23062528 + }, + { + "t_ms": 3848, + "total_bytes": 24897536 + }, + { + "t_ms": 4001, + "total_bytes": 25421824 + }, + { + "t_ms": 4156, + "total_bytes": 28305408 + }, + { + "t_ms": 4309, + "total_bytes": 29091840 + }, + { + "t_ms": 4462, + "total_bytes": 30664704 + }, + { + "t_ms": 4617, + "total_bytes": 32237568 + }, + { + "t_ms": 4772, + "total_bytes": 33548288 + }, + { + "t_ms": 4924, + "total_bytes": 35121152 + }, + { + "t_ms": 5075, + "total_bytes": 36431872 + }, + { + "t_ms": 5230, + "total_bytes": 37480448 + }, + { + "t_ms": 5383, + "total_bytes": 38266880 + }, + { + "t_ms": 5535, + "total_bytes": 39577600 + }, + { + "t_ms": 5689, + "total_bytes": 40626176 + }, + { + "t_ms": 5843, + "total_bytes": 41412608 + }, + { + "t_ms": 5997, + "total_bytes": 42985472 + }, + { + "t_ms": 6153, + "total_bytes": 43771904 + }, + { + "t_ms": 6303, + "total_bytes": 45869056 + }, + { + "t_ms": 6457, + "total_bytes": 47179776 + }, + { + "t_ms": 6612, + "total_bytes": 47966208 + }, + { + "t_ms": 6765, + "total_bytes": 49801216 + }, + { + "t_ms": 6916, + "total_bytes": 50063360 + }, + { + "t_ms": 7071, + "total_bytes": 52160512 + } + ] + } +} diff --git a/src/lib/tests/fixtures/upload_steady.json b/src/lib/tests/fixtures/upload_steady.json new file mode 100644 index 0000000..89c61f3 --- /dev/null +++ b/src/lib/tests/fixtures/upload_steady.json @@ -0,0 +1,40 @@ +{ + "name": "upload_steady_60mbps", + "direction": "upload", + "criteria": { + "min_duration_seconds": 2, + "max_duration_seconds": 8, + "progress_frequency_ms": 150, + "moving_average_window_size": 5, + "stability_delta_percent": 2.0, + "min_stable_measurements": 6, + "connections_min": 1, + "connections_max": 8, + "max_bytes_in_flight": 78643200 + }, + "samples": [ + { "t_ms": 0, "total_bytes": 0 }, + { "t_ms": 150, "total_bytes": 1125000 }, + { "t_ms": 300, "total_bytes": 2250000 }, + { "t_ms": 450, "total_bytes": 3375000 }, + { "t_ms": 600, "total_bytes": 4500000 }, + { "t_ms": 750, "total_bytes": 5625000 }, + { "t_ms": 900, "total_bytes": 6750000 }, + { "t_ms": 1050, "total_bytes": 7875000 }, + { "t_ms": 1200, "total_bytes": 9000000 }, + { "t_ms": 1350, "total_bytes": 10125000 }, + { "t_ms": 1500, "total_bytes": 11250000 }, + { "t_ms": 1650, "total_bytes": 12375000 }, + { "t_ms": 1800, "total_bytes": 13500000 }, + { "t_ms": 1950, "total_bytes": 14625000 }, + { "t_ms": 2100, "total_bytes": 15750000 }, + { "t_ms": 2250, "total_bytes": 16875000 }, + { "t_ms": 2400, "total_bytes": 18000000 } + ], + "expected": { + "stop_at_ms": 2100, + "speed_mbps": 60.0, + "speed_tolerance_mbps": 2.0, + "stable": true + } +} diff --git a/src/lib/tests/fixtures/upload_unstable.json b/src/lib/tests/fixtures/upload_unstable.json new file mode 100644 index 0000000..3d5686e --- /dev/null +++ b/src/lib/tests/fixtures/upload_unstable.json @@ -0,0 +1,44 @@ +{ + "name": "upload_unstable_hits_max_duration", + "direction": "upload", + "criteria": { + "min_duration_seconds": 2, + "max_duration_seconds": 3, + "progress_frequency_ms": 150, + "moving_average_window_size": 3, + "stability_delta_percent": 1.5, + "min_stable_measurements": 6, + "connections_min": 1, + "connections_max": 8, + "max_bytes_in_flight": 78643200 + }, + "samples": [ + { "t_ms": 0, "total_bytes": 0 }, + { "t_ms": 150, "total_bytes": 750000 }, + { "t_ms": 300, "total_bytes": 3000000 }, + { "t_ms": 450, "total_bytes": 3750000 }, + { "t_ms": 600, "total_bytes": 6000000 }, + { "t_ms": 750, "total_bytes": 6750000 }, + { "t_ms": 900, "total_bytes": 9000000 }, + { "t_ms": 1050, "total_bytes": 9750000 }, + { "t_ms": 1200, "total_bytes": 12000000 }, + { "t_ms": 1350, "total_bytes": 12750000 }, + { "t_ms": 1500, "total_bytes": 15000000 }, + { "t_ms": 1650, "total_bytes": 15750000 }, + { "t_ms": 1800, "total_bytes": 18000000 }, + { "t_ms": 1950, "total_bytes": 18750000 }, + { "t_ms": 2100, "total_bytes": 21000000 }, + { "t_ms": 2250, "total_bytes": 21750000 }, + { "t_ms": 2400, "total_bytes": 24000000 }, + { "t_ms": 2550, "total_bytes": 24750000 }, + { "t_ms": 2700, "total_bytes": 27000000 }, + { "t_ms": 2850, "total_bytes": 27750000 }, + { "t_ms": 3000, "total_bytes": 30000000 } + ], + "expected": { + "stop_at_ms": 3000, + "speed_mbps": 80.0, + "speed_tolerance_mbps": 25.0, + "stable": false + } +} diff --git a/src/lib/tests/measurement_strategy_test.zig b/src/lib/tests/measurement_strategy_test.zig index 738e4ee..2567831 100644 --- a/src/lib/tests/measurement_strategy_test.zig +++ b/src/lib/tests/measurement_strategy_test.zig @@ -1,9 +1,20 @@ const std = @import("std"); const testing = std.testing; const measurement_strategy = @import("../measurement_strategy.zig"); -const MeasurementStrategy = measurement_strategy.MeasurementStrategy; const StabilityCriteria = measurement_strategy.StabilityCriteria; -const BandwidthMeter = @import("../bandwidth.zig").BandwidthMeter; +const EstimationPhase = measurement_strategy.EstimationPhase; + +fn defaultDecision() measurement_strategy.StabilityDecision { + return .{ + .should_stop = false, + .desired_connections = 1, + .phase = .ramp, + .display_speed_bits_per_sec = 0, + .authoritative_speed_bits_per_sec = 0, + .speed_bits_per_sec = 0, + .sampled = false, + }; +} test "createDurationStrategy" { const strategy = measurement_strategy.createDurationStrategy(10, 100); @@ -13,325 +24,146 @@ test "createDurationStrategy" { } test "DurationStrategy shouldContinue" { - const strategy = measurement_strategy.createDurationStrategy(1, 100); // 1 second - - // Should continue before target duration - try testing.expect(strategy.shouldContinue(500 * std.time.ns_per_ms)); // 0.5 seconds + const strategy = measurement_strategy.createDurationStrategy(1, 100); - // Should not continue after target duration - try testing.expect(!strategy.shouldContinue(2 * std.time.ns_per_s)); // 2 seconds + try testing.expect(strategy.shouldContinue(500 * std.time.ns_per_ms)); + try testing.expect(!strategy.shouldContinue(2 * std.time.ns_per_s)); } -test "Strategy getSleepInterval" { - // Duration strategy should use progress update interval - const duration_strategy = measurement_strategy.createDurationStrategy(10, 250); - try testing.expect(duration_strategy.getSleepInterval() == 250 * std.time.ns_per_ms); -} - -// Fast.com-style stability tests - -test "StabilityCriteria default values" { +test "StabilityCriteria defaults" { const criteria = StabilityCriteria{}; - try testing.expect(criteria.ramp_up_duration_seconds == 4); - try testing.expect(criteria.max_duration_seconds == 25); - try testing.expect(criteria.measurement_interval_ms == 750); - try testing.expect(criteria.sliding_window_size == 6); - try testing.expect(criteria.stability_threshold_cov == 0.15); - try testing.expect(criteria.stable_checks_required == 2); -} - -test "createStabilityStrategy" { - const criteria = StabilityCriteria{ - .ramp_up_duration_seconds = 5, - .max_duration_seconds = 20, - .measurement_interval_ms = 500, - .sliding_window_size = 8, - .stability_threshold_cov = 0.12, - .stable_checks_required = 3, - }; - - var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); - defer strategy.deinit(); - - try testing.expect(strategy.criteria.ramp_up_duration_seconds == 5); - try testing.expect(strategy.criteria.max_duration_seconds == 20); - try testing.expect(strategy.criteria.measurement_interval_ms == 500); - try testing.expect(strategy.criteria.sliding_window_size == 8); - try testing.expect(strategy.criteria.stability_threshold_cov == 0.12); - try testing.expect(strategy.criteria.stable_checks_required == 3); - try testing.expect(strategy.ramp_up_duration_ns == 5 * std.time.ns_per_s); - try testing.expect(strategy.max_duration_ns == 20 * std.time.ns_per_s); -} - -test "StabilityStrategy shouldContinue" { - const criteria = StabilityCriteria{ - .max_duration_seconds = 20, - }; - - var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); - defer strategy.deinit(); - - // Should continue before max duration - try testing.expect(strategy.shouldContinue(15 * std.time.ns_per_s)); - - // Should not continue after max duration - try testing.expect(!strategy.shouldContinue(25 * std.time.ns_per_s)); -} - -test "StabilityStrategy getSleepInterval" { - const criteria = StabilityCriteria{}; - var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); - defer strategy.deinit(); - - // Should be measurement_interval / 3 = 750ms / 3 = 250ms - try testing.expect(strategy.getSleepInterval() == 250 * std.time.ns_per_ms); -} - -test "StabilityStrategy shouldSample timing" { - const criteria = StabilityCriteria{}; - var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); - defer strategy.deinit(); - - // First call should not sample (last_sample_time is 0) - try testing.expect(!strategy.shouldSample(0)); - - // Should not sample if less than 1 second has passed - strategy.last_sample_time = 500 * std.time.ns_per_ms; // 0.5 seconds - try testing.expect(!strategy.shouldSample(800 * std.time.ns_per_ms)); // 0.8 seconds - - // Should sample if 1 second or more has passed - try testing.expect(strategy.shouldSample(1600 * std.time.ns_per_ms)); // 1.6 seconds -} - -test "StabilityStrategy addSample basic functionality" { - const criteria = StabilityCriteria{ - .ramp_up_duration_seconds = 1, // Short for testing - .sliding_window_size = 3, - .stability_threshold_cov = 0.5, // High threshold to avoid early stability - .stable_checks_required = 2, - }; - - var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); - defer strategy.deinit(); - - // First sample should be skipped - const is_stable1 = try strategy.addSample(1 * std.time.ns_per_s, 1000); - try testing.expect(!is_stable1); - try testing.expect(strategy.speed_measurements.items.len == 0); - - // Second sample should be added - const is_stable2 = try strategy.addSample(2 * std.time.ns_per_s, 2000); - try testing.expect(!is_stable2); // Not stable yet, need more measurements for CoV - try testing.expect(strategy.speed_measurements.items.len == 1); - - // Third sample should be added - const is_stable3 = try strategy.addSample(3 * std.time.ns_per_s, 3000); - try testing.expect(!is_stable3); // Still need more measurements - try testing.expect(strategy.speed_measurements.items.len == 2); - - // Fourth sample should trigger stability check (we have 3 measurements now) - _ = try strategy.addSample(4 * std.time.ns_per_s, 4000); - try testing.expect(strategy.speed_measurements.items.len == 3); + try testing.expectEqual(@as(u32, 7), criteria.min_duration_seconds); + try testing.expectEqual(@as(u32, 30), criteria.max_duration_seconds); + try testing.expectEqual(@as(u64, 150), criteria.progress_frequency_ms); + try testing.expectEqual(@as(u32, 5), criteria.moving_average_window_size); + try testing.expectEqual(@as(u32, 6), criteria.min_stable_measurements); + try testing.expectEqual(@as(u32, 1), criteria.connections_min); + try testing.expectEqual(@as(u32, 8), criteria.connections_max); } -test "StabilityStrategy requires ramp up duration" { +test "StabilityStrategy transitions to steady phase and locks estimator" { const criteria = StabilityCriteria{ - .ramp_up_duration_seconds = 10, - .sliding_window_size = 2, - .stability_threshold_cov = 0.01, // Low threshold for easy stability - .stable_checks_required = 1, + .min_duration_seconds = 2, + .max_duration_seconds = 10, + .progress_frequency_ms = 100, + .moving_average_window_size = 5, + .stability_delta_percent = 2.0, + .min_stable_measurements = 100, + .connections_min = 1, + .connections_max = 8, }; var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); defer strategy.deinit(); - // Add samples before ramp up duration - should not be stable - _ = try strategy.addSample(1 * std.time.ns_per_s, 1000); - _ = try strategy.addSample(2 * std.time.ns_per_s, 2000); - const is_stable_early = try strategy.addSample(3 * std.time.ns_per_s, 3000); - try testing.expect(!is_stable_early); // Should not be stable before ramp up duration - - // Add sample after ramp up duration - might be stable - _ = try strategy.addSample(11 * std.time.ns_per_s, 11000); - // Result depends on CoV calculation, but should not crash -} - -test "StabilityStrategy handleProgress integration" { - const criteria = StabilityCriteria{ - .ramp_up_duration_seconds = 2, - .sliding_window_size = 2, - .stability_threshold_cov = 0.1, - .measurement_interval_ms = 500, - }; - - var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); - defer strategy.deinit(); + _ = try strategy.handleProgress(0, 0); - // Should not trigger sampling immediately - const should_stop1 = try strategy.handleProgress(500 * std.time.ns_per_ms, 500); - try testing.expect(!should_stop1); + var total_bytes: u64 = 0; + var saw_ramp = false; + var saw_steady = false; - // Should not trigger sampling if less than 1 second elapsed - const should_stop2 = try strategy.handleProgress(800 * std.time.ns_per_ms, 800); - try testing.expect(!should_stop2); + for (0..120) |i| { + total_bytes += 1_500_000; + const t_ms = @as(u64, @intCast(i + 1)) * 100; + const decision = try strategy.handleProgress(t_ms * std.time.ns_per_ms, total_bytes); + if (!decision.sampled) continue; - // Should trigger sampling after measurement interval (750ms) - _ = try strategy.handleProgress(750 * std.time.ns_per_ms, 750); - try testing.expect(strategy.speed_measurements.items.len == 0); // First sample skipped + if (decision.phase == .ramp and decision.display_speed_bits_per_sec > 0) { + saw_ramp = true; + } + if (decision.phase == .steady) { + saw_steady = true; + try testing.expectEqual(decision.display_speed_bits_per_sec, decision.authoritative_speed_bits_per_sec); + try testing.expectEqual(decision.desired_connections, strategy.locked_connections); + break; + } + } - // Should add second sample - _ = try strategy.handleProgress(1500 * std.time.ns_per_ms, 1500); - try testing.expect(strategy.speed_measurements.items.len == 1); + try testing.expect(saw_ramp); + try testing.expect(saw_steady); } -test "CoV stability detection algorithm" { +test "StabilityStrategy steady estimator uses time-weighted average" { const criteria = StabilityCriteria{ - .ramp_up_duration_seconds = 1, // Short for testing - .sliding_window_size = 4, - .stability_threshold_cov = 0.05, // 5% CoV threshold - .stable_checks_required = 1, + .min_duration_seconds = 0, + .max_duration_seconds = 30, + .progress_frequency_ms = 50, + .moving_average_window_size = 5, + .stability_delta_percent = 0.1, + .min_stable_measurements = 100, + .connections_min = 1, + .connections_max = 1, }; var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); defer strategy.deinit(); - // Add stable samples after ramp up period - _ = try strategy.addSample(1 * std.time.ns_per_s, 1000); // Skip first - _ = try strategy.addSample(2 * std.time.ns_per_s, 2000); // 1000 bytes/s (after ramp up) - _ = try strategy.addSample(3 * std.time.ns_per_s, 3000); // 1000 bytes/s - _ = try strategy.addSample(4 * std.time.ns_per_s, 4000); // 1000 bytes/s + _ = try strategy.handleProgress(0, 0); - // This should be stable since CoV should be very low - const is_stable = try strategy.addSample(5 * std.time.ns_per_s, 5000); // 1000 bytes/s + var total_bytes: u64 = 0; + var now_ms: u64 = 0; + var decision = defaultDecision(); - // Should be stable with consistent speeds - try testing.expect(is_stable); -} + const warmup_intervals_ms = [_]u64{ 100, 100, 100, 100 }; + for (warmup_intervals_ms) |dt_ms| { + now_ms += dt_ms; + total_bytes += 500_000; + decision = try strategy.handleProgress(now_ms * std.time.ns_per_ms, total_bytes); + } -test "CoV stability detection - unstable case" { - const criteria = StabilityCriteria{ - .ramp_up_duration_seconds = 1, // Short for testing - .sliding_window_size = 3, - .stability_threshold_cov = 0.02, // Strict 2% CoV threshold - .stable_checks_required = 1, - }; - - var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); - defer strategy.deinit(); + while (decision.phase != .steady) { + now_ms += 100; + total_bytes += 500_000; + decision = try strategy.handleProgress(now_ms * std.time.ns_per_ms, total_bytes); + } - // Add samples that should NOT be stable (high variance) - _ = try strategy.addSample(1 * std.time.ns_per_s, 1000); // Skip first - _ = try strategy.addSample(2 * std.time.ns_per_s, 2000); // 1000 bytes/s (after ramp up) - _ = try strategy.addSample(3 * std.time.ns_per_s, 3500); // 1500 bytes/s (high variance) + now_ms += 100; + total_bytes += 1_000_000; + decision = try strategy.handleProgress(now_ms * std.time.ns_per_ms, total_bytes); - // This should NOT be stable due to high CoV - const is_stable = try strategy.addSample(4 * std.time.ns_per_s, 4000); // 500 bytes/s (high variance) + now_ms += 300; + total_bytes += 1_000_000; + decision = try strategy.handleProgress(now_ms * std.time.ns_per_ms, total_bytes); - // Should not be stable with inconsistent speeds - try testing.expect(!is_stable); + const expected_bps = (@as(f64, 2_000_000) * 8.0) / 0.4; + try testing.expectApproxEqAbs(expected_bps, decision.authoritative_speed_bits_per_sec, 0.001); } -test "CoV stability handles variable speeds correctly" { +test "StabilityStrategy final speed equals last displayed speed after steady samples" { const criteria = StabilityCriteria{ - .ramp_up_duration_seconds = 1, - .sliding_window_size = 6, - .stability_threshold_cov = 0.05, - .stable_checks_required = 2, + .min_duration_seconds = 0, + .max_duration_seconds = 10, + .progress_frequency_ms = 100, + .moving_average_window_size = 5, + .stability_delta_percent = 2.0, + .min_stable_measurements = 100, + .connections_min = 1, + .connections_max = 3, }; var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); defer strategy.deinit(); - // Add samples with a peak in the middle, then lower speeds - _ = try strategy.addSample(1 * std.time.ns_per_s, 1000); // Skip first - _ = try strategy.addSample(2 * std.time.ns_per_s, 2000); // 1000 bytes/s (after ramp up) - _ = try strategy.addSample(3 * std.time.ns_per_s, 4000); // 2000 bytes/s (peak creates high CoV) - _ = try strategy.addSample(4 * std.time.ns_per_s, 5000); // 1000 bytes/s - _ = try strategy.addSample(5 * std.time.ns_per_s, 6000); // 1000 bytes/s - - // Should not be stable yet due to high CoV from the peak - const is_stable = try strategy.addSample(6 * std.time.ns_per_s, 7000); // 1000 bytes/s - - // CoV should still be too high due to the peak in the sliding window - try testing.expect(!is_stable); - - // Test should not crash and should have collected measurements - try testing.expect(strategy.speed_measurements.items.len > 0); -} - -test "CoV stability detection realistic scenario" { - const criteria = StabilityCriteria{ - .ramp_up_duration_seconds = 5, - .max_duration_seconds = 20, - .stability_threshold_cov = 0.15, // 15% CoV threshold - .sliding_window_size = 6, - .stable_checks_required = 2, - }; - - var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); - defer strategy.deinit(); - - // Simulate realistic speed test progression: ramp up, then stabilize - _ = try strategy.addSample(1 * std.time.ns_per_s, 1000); // Skip first - _ = try strategy.addSample(2 * std.time.ns_per_s, 3000); // 2000 bytes/s (ramp up) - _ = try strategy.addSample(3 * std.time.ns_per_s, 6000); // 3000 bytes/s (still ramping) - - // Before min duration - should not be stable regardless of measurements - const stable_before_min = try strategy.addSample(4 * std.time.ns_per_s, 10000); // 4000 bytes/s (peak) - try testing.expect(!stable_before_min); - - // After min duration with stable measurements - _ = try strategy.addSample(6 * std.time.ns_per_s, 16000); // 4000 bytes/s (stable) - _ = try strategy.addSample(7 * std.time.ns_per_s, 20000); // 4000 bytes/s (stable) - _ = try strategy.addSample(8 * std.time.ns_per_s, 24000); // 4000 bytes/s (stable) - const stable_after_min = try strategy.addSample(9 * std.time.ns_per_s, 28000); // 4000 bytes/s (stable) - - // Should be able to detect stability after minimum duration with consistent speeds - try testing.expect(stable_after_min or strategy.speed_measurements.items.len >= 6); -} - -test "CoV timing intervals specification" { - const criteria = StabilityCriteria{}; - var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); - defer strategy.deinit(); - - // Should be measurement_interval / 3 = 750ms / 3 = 250ms - try testing.expect(strategy.getSleepInterval() == 250 * std.time.ns_per_ms); - - // Should enforce measurement interval sampling (750ms by default) - try testing.expect(!strategy.shouldSample(0)); - strategy.last_sample_time = 500 * std.time.ns_per_ms; - try testing.expect(!strategy.shouldSample(1000 * std.time.ns_per_ms)); - try testing.expect(strategy.shouldSample(1250 * std.time.ns_per_ms)); -} - -test "CoV algorithm handles edge cases correctly" { - const criteria = StabilityCriteria{ - .ramp_up_duration_seconds = 1, - .sliding_window_size = 3, - .stability_threshold_cov = 0.05, - .stable_checks_required = 1, - }; - - var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, criteria); - defer strategy.deinit(); + _ = try strategy.handleProgress(0, 0); - // Test very small speed changes (edge case for percentage calculation) - _ = try strategy.addSample(1 * std.time.ns_per_s, 1000); // Skip first - _ = try strategy.addSample(2 * std.time.ns_per_s, 1001); // 1 byte/s - _ = try strategy.addSample(3 * std.time.ns_per_s, 1002); // 1 byte/s - const stable_small = try strategy.addSample(4 * std.time.ns_per_s, 1003); // 1 byte/s + var total_bytes: u64 = 0; + var last_time_ms: u64 = 0; + var last_display_bps: f64 = 0; + var last_phase: EstimationPhase = .ramp; - // Should handle small speeds without division errors - _ = stable_small; // May or may not be stable, but shouldn't crash + for (0..80) |i| { + total_bytes += 800_000; + last_time_ms = @as(u64, @intCast(i + 1)) * 100; + const decision = try strategy.handleProgress(last_time_ms * std.time.ns_per_ms, total_bytes); + if (!decision.sampled) continue; + last_display_bps = decision.display_speed_bits_per_sec; + last_phase = decision.phase; + } - // Test zero speed edge case - strategy.speed_measurements.clearRetainingCapacity(); - strategy.last_sample_time = 0; - _ = try strategy.addSample(1 * std.time.ns_per_s, 1000); // Skip first - const stable_zero = try strategy.addSample(2 * std.time.ns_per_s, 1000); // 0 bytes/s + try testing.expect(last_display_bps > 0); + try testing.expectEqual(EstimationPhase.steady, last_phase); - // Zero speed should not be considered stable - try testing.expect(!stable_zero); + const final_bps = strategy.finalSpeedBitsPerSecond(total_bytes, last_time_ms * std.time.ns_per_ms); + try testing.expectEqual(last_display_bps, final_bps); } diff --git a/src/lib/tests/parity_fixture_test.zig b/src/lib/tests/parity_fixture_test.zig new file mode 100644 index 0000000..c20774a --- /dev/null +++ b/src/lib/tests/parity_fixture_test.zig @@ -0,0 +1,84 @@ +const std = @import("std"); +const testing = std.testing; +const measurement_strategy = @import("../measurement_strategy.zig"); + +const Fixture = struct { + name: []const u8, + direction: []const u8, + criteria: measurement_strategy.StabilityCriteria, + samples: []Sample, + expected: Expected, + + const Sample = struct { + t_ms: u64, + total_bytes: u64, + }; + + const Expected = struct { + stop_at_ms: u64, + speed_mbps: f64, + speed_tolerance_mbps: f64, + stable: bool, + }; +}; + +fn absDiffU64(a: u64, b: u64) u64 { + return if (a > b) a - b else b - a; +} + +fn runFixture(comptime path: []const u8) !void { + const data = @embedFile(path); + var parsed = try std.json.parseFromSlice(Fixture, testing.allocator, data, .{}); + defer parsed.deinit(); + + const fixture = parsed.value; + + var strategy = measurement_strategy.createStabilityStrategy(testing.allocator, fixture.criteria); + defer strategy.deinit(); + + var stop_at_ms = fixture.samples[fixture.samples.len - 1].t_ms; + var stopped = false; + + for (fixture.samples) |sample| { + const decision = try strategy.handleProgress(sample.t_ms * std.time.ns_per_ms, sample.total_bytes); + if (decision.should_stop) { + stopped = true; + stop_at_ms = sample.t_ms; + break; + } + } + + const max_duration_ms = @as(u64, fixture.criteria.max_duration_seconds) * 1000; + const stable_detected = stopped and stop_at_ms < max_duration_ms; + try testing.expectEqual(fixture.expected.stable, stable_detected); + + if (stable_detected) { + const stop_tolerance_ms = fixture.criteria.progress_frequency_ms; + try testing.expect(absDiffU64(stop_at_ms, fixture.expected.stop_at_ms) <= stop_tolerance_ms); + } else { + try testing.expect(stop_at_ms >= fixture.expected.stop_at_ms); + } + + const duration_ns = stop_at_ms * std.time.ns_per_ms; + const total_bytes = fixture.samples[fixture.samples.len - 1].total_bytes; + const speed_bits_per_sec = strategy.finalSpeedBitsPerSecond(total_bytes, duration_ns); + const speed_mbps = speed_bits_per_sec / 1_000_000; + + try testing.expect(@abs(speed_mbps - fixture.expected.speed_mbps) <= fixture.expected.speed_tolerance_mbps); +} + +test "fixture replay: steady download" { + try runFixture("fixtures/download_steady.json"); +} + +test "fixture replay: bursty download" { + try runFixture("fixtures/download_bursty.json"); +} + +test "fixture replay: steady upload" { + try runFixture("fixtures/upload_steady.json"); +} + +test "fixture replay: unstable upload" { + try runFixture("fixtures/upload_unstable.json"); +} diff --git a/src/lib/workers/speed_worker.zig b/src/lib/workers/speed_worker.zig index 6c0a524..1fd7047 100644 --- a/src/lib/workers/speed_worker.zig +++ b/src/lib/workers/speed_worker.zig @@ -38,10 +38,13 @@ pub const Header = struct { pub const FetchResponse = struct { status: http.Status, body: []const u8, + byte_count: usize, allocator: std.mem.Allocator, pub fn deinit(self: *FetchResponse) void { - self.allocator.free(self.body); + if (self.body.len > 0) { + self.allocator.free(self.body); + } } }; @@ -72,9 +75,11 @@ pub const DownloadWorker = struct { config: WorkerConfig, bytes_downloaded: std.atomic.Value(u64), should_stop: *std.atomic.Value(bool), + active_worker_count: ?*std.atomic.Value(u32), http_client: HttpClient, timer: Timer, target_duration_ns: u64, + max_bytes_in_flight: u64, allocator: std.mem.Allocator, error_count: std.atomic.Value(u32), // Dynamic chunk sizing @@ -90,18 +95,22 @@ pub const DownloadWorker = struct { pub fn init( config: WorkerConfig, should_stop: *std.atomic.Value(bool), + active_worker_count: ?*std.atomic.Value(u32), http_client: HttpClient, timer: Timer, target_duration_ns: u64, + max_bytes_in_flight: u64, allocator: std.mem.Allocator, ) Self { return Self{ .config = config, .bytes_downloaded = std.atomic.Value(u64).init(0), .should_stop = should_stop, + .active_worker_count = active_worker_count, .http_client = http_client, .timer = timer, .target_duration_ns = target_duration_ns, + .max_bytes_in_flight = max_bytes_in_flight, .allocator = allocator, .error_count = std.atomic.Value(u32).init(0), .current_chunk_size = MIN_CHUNK_SIZE, @@ -122,6 +131,11 @@ pub const DownloadWorker = struct { var retry_count: u32 = 0; while (!self.should_stop.load(.monotonic)) { + if (!self.isActiveWorker()) { + std.Thread.sleep(std.time.ns_per_ms * 5); + continue; + } + // Check if we've exceeded the target duration if (self.timer.read() >= self.target_duration_ns) { self.should_stop.store(true, .monotonic); @@ -134,7 +148,7 @@ pub const DownloadWorker = struct { } // Use dynamic chunk size - const chunk_size = self.current_chunk_size; + const chunk_size = @min(self.current_chunk_size, self.chunkSizeCapFromInFlight()); const range_end = @min(range_start + chunk_size - 1, MAX_FILE_SIZE - 1); // Convert speedtest URL to range URL @@ -188,10 +202,10 @@ pub const DownloadWorker = struct { } // Update total bytes downloaded - _ = self.bytes_downloaded.fetchAdd(response.body.len, .monotonic); + _ = self.bytes_downloaded.fetchAdd(response.byte_count, .monotonic); // Dynamically adjust chunk size based on performance - self.adjustChunkSize(request_duration_ns, response.body.len); + self.adjustChunkSize(request_duration_ns, response.byte_count); range_start += chunk_size; // Small delay between requests @@ -224,6 +238,27 @@ pub const DownloadWorker = struct { _ = bytes_downloaded; // Suppress unused parameter warning } + fn isActiveWorker(self: *Self) bool { + if (self.active_worker_count) |count| { + return self.config.worker_id < count.load(.monotonic); + } + return true; + } + + fn chunkSizeCapFromInFlight(self: *Self) u32 { + if (self.active_worker_count == null or self.max_bytes_in_flight == 0) { + return self.max_chunk_size; + } + + const active_count = @max(self.active_worker_count.?.load(.monotonic), 1); + const per_worker_cap = self.max_bytes_in_flight / active_count; + const bounded = @min( + @as(u64, self.max_chunk_size), + @max(@as(u64, self.min_chunk_size), per_worker_cap), + ); + return @intCast(bounded); + } + pub fn getBytesDownloaded(self: *const Self) u64 { return self.bytes_downloaded.load(.monotonic); } @@ -238,10 +273,12 @@ pub const UploadWorker = struct { config: WorkerConfig, bytes_uploaded: std.atomic.Value(u64), should_stop: *std.atomic.Value(bool), + active_worker_count: ?*std.atomic.Value(u32), http_client: HttpClient, timer: Timer, target_duration_ns: u64, upload_data: []const u8, + max_bytes_in_flight: u64, allocator: std.mem.Allocator, error_count: std.atomic.Value(u32), // Dynamic upload sizing @@ -256,20 +293,24 @@ pub const UploadWorker = struct { pub fn init( config: WorkerConfig, should_stop: *std.atomic.Value(bool), + active_worker_count: ?*std.atomic.Value(u32), http_client: HttpClient, timer: Timer, target_duration_ns: u64, upload_data: []const u8, + max_bytes_in_flight: u64, allocator: std.mem.Allocator, ) Self { return Self{ .config = config, .bytes_uploaded = std.atomic.Value(u64).init(0), .should_stop = should_stop, + .active_worker_count = active_worker_count, .http_client = http_client, .timer = timer, .target_duration_ns = target_duration_ns, .upload_data = upload_data, + .max_bytes_in_flight = max_bytes_in_flight, .allocator = allocator, .error_count = std.atomic.Value(u32).init(0), .current_upload_size = MIN_UPLOAD_SIZE, @@ -289,6 +330,11 @@ pub const UploadWorker = struct { var retry_count: u32 = 0; while (!self.should_stop.load(.monotonic)) { + if (!self.isActiveWorker()) { + std.Thread.sleep(std.time.ns_per_ms * 5); + continue; + } + // Check if we've exceeded the target duration if (self.timer.read() >= self.target_duration_ns) { self.should_stop.store(true, .monotonic); @@ -296,7 +342,10 @@ pub const UploadWorker = struct { } // Use dynamic upload size - const upload_size = @min(self.current_upload_size, self.upload_data.len); + const upload_size = @min( + @min(self.current_upload_size, self.upload_data.len), + self.uploadSizeCapFromInFlight(), + ); const upload_chunk = self.upload_data[0..upload_size]; const start_time = self.timer.read(); @@ -368,6 +417,27 @@ pub const UploadWorker = struct { _ = bytes_uploaded; // Suppress unused parameter warning } + fn isActiveWorker(self: *Self) bool { + if (self.active_worker_count) |count| { + return self.config.worker_id < count.load(.monotonic); + } + return true; + } + + fn uploadSizeCapFromInFlight(self: *Self) u32 { + if (self.active_worker_count == null or self.max_bytes_in_flight == 0) { + return self.max_upload_size; + } + + const active_count = @max(self.active_worker_count.?.load(.monotonic), 1); + const per_worker_cap = self.max_bytes_in_flight / active_count; + const bounded = @min( + @as(u64, self.max_upload_size), + @max(@as(u64, self.min_upload_size), per_worker_cap), + ); + return @intCast(bounded); + } + pub fn getBytesUploaded(self: *const Self) u64 { return self.bytes_uploaded.load(.monotonic); } @@ -404,21 +474,22 @@ pub const RealHttpClient = struct { fn fetch(ptr: *anyopaque, request: FetchRequest) !FetchResponse { const self: *Self = @ptrCast(@alignCast(ptr)); - var response_body = std.Io.Writer.Allocating.init(self.allocator); - errdefer response_body.deinit(); + var writer_buffer: [1024]u8 = undefined; + var discarding = std.Io.Writer.Discarding.init(&writer_buffer); const fetch_options = http.Client.FetchOptions{ .method = request.method, .location = .{ .url = request.url }, .payload = if (request.payload) |p| p else null, - .response_writer = &response_body.writer, + .response_writer = &discarding.writer, }; const result = try self.client.fetch(fetch_options); return FetchResponse{ .status = result.status, - .body = try response_body.toOwnedSlice(), + .body = &.{}, + .byte_count = @intCast(discarding.fullCount()), .allocator = self.allocator, }; } @@ -485,6 +556,7 @@ pub const MockHttpClient = struct { try self.responses.append(self.allocator, FetchResponse{ .status = status, .body = body_copy, + .byte_count = body_copy.len, .allocator = self.allocator, }); } @@ -524,6 +596,7 @@ pub const MockHttpClient = struct { return FetchResponse{ .status = response.status, .body = body_copy, + .byte_count = body_copy.len, .allocator = self.allocator, }; } @@ -586,9 +659,10 @@ test "DownloadWorker basic functionality" { var mock_timer = MockTimer.init(); var should_stop = std.atomic.Value(bool).init(false); + var active_workers = std.atomic.Value(u32).init(1); const config = WorkerConfig{ - .worker_id = 1, + .worker_id = 0, .url = "https://example.com/test", .chunk_size = 1024, .delay_between_requests_ms = 0, @@ -597,9 +671,11 @@ test "DownloadWorker basic functionality" { var worker = DownloadWorker.init( config, &should_stop, + &active_workers, mock_client.httpClient(), mock_timer.timer_interface(), std.time.ns_per_s * 2, // 2 second target + 0, allocator, ); @@ -633,9 +709,10 @@ test "DownloadWorker handles errors gracefully" { var mock_timer = MockTimer.init(); var should_stop = std.atomic.Value(bool).init(false); + var active_workers = std.atomic.Value(u32).init(1); const config = WorkerConfig{ - .worker_id = 1, + .worker_id = 0, .url = "https://example.com/test", .max_retries = 2, }; @@ -643,9 +720,11 @@ test "DownloadWorker handles errors gracefully" { var worker = DownloadWorker.init( config, &should_stop, + &active_workers, mock_client.httpClient(), mock_timer.timer_interface(), std.time.ns_per_s, // 1 second target + 0, allocator, ); diff --git a/src/lib/workers/worker_manager.zig b/src/lib/workers/worker_manager.zig index 4755925..d3f18bc 100644 --- a/src/lib/workers/worker_manager.zig +++ b/src/lib/workers/worker_manager.zig @@ -65,6 +65,25 @@ pub const WorkerManager = struct { concurrent_connections: usize, timer_interface: speed_worker.Timer, target_duration: u64, + ) ![]speed_worker.DownloadWorker { + return self.setupDownloadWorkersWithControl( + urls, + concurrent_connections, + timer_interface, + target_duration, + null, + 0, + ); + } + + pub fn setupDownloadWorkersWithControl( + self: *Self, + urls: []const []const u8, + concurrent_connections: usize, + timer_interface: speed_worker.Timer, + target_duration: u64, + active_worker_count: ?*std.atomic.Value(u32), + max_bytes_in_flight: u64, ) ![]speed_worker.DownloadWorker { const num_workers = calculateWorkerCount(urls, concurrent_connections); std.debug.assert(num_workers == self.http_clients.len); @@ -80,9 +99,11 @@ pub const WorkerManager = struct { worker.* = speed_worker.DownloadWorker.init( config, self.should_stop, + active_worker_count, self.http_clients[i].httpClient(), timer_interface, target_duration, + max_bytes_in_flight, self.allocator, ); } @@ -98,6 +119,27 @@ pub const WorkerManager = struct { timer_interface: speed_worker.Timer, target_duration: u64, upload_data: []const u8, + ) ![]speed_worker.UploadWorker { + return self.setupUploadWorkersWithControl( + urls, + concurrent_connections, + timer_interface, + target_duration, + upload_data, + null, + 0, + ); + } + + pub fn setupUploadWorkersWithControl( + self: *Self, + urls: []const []const u8, + concurrent_connections: usize, + timer_interface: speed_worker.Timer, + target_duration: u64, + upload_data: []const u8, + active_worker_count: ?*std.atomic.Value(u32), + max_bytes_in_flight: u64, ) ![]speed_worker.UploadWorker { const num_workers = calculateWorkerCount(urls, concurrent_connections); std.debug.assert(num_workers == self.http_clients.len); @@ -113,10 +155,12 @@ pub const WorkerManager = struct { worker.* = speed_worker.UploadWorker.init( config, self.should_stop, + active_worker_count, self.http_clients[i].httpClient(), timer_interface, target_duration, upload_data, + max_bytes_in_flight, self.allocator, ); } diff --git a/src/main.zig b/src/main.zig index e0fdabf..137bf66 100644 --- a/src/main.zig +++ b/src/main.zig @@ -10,12 +10,10 @@ pub const std_options: std.Options = .{ pub fn main() !void { var dbg = std.heap.DebugAllocator(.{}).init; - const allocator = switch (@import("builtin").mode) { .Debug => dbg.allocator(), .ReleaseFast, .ReleaseSafe, .ReleaseSmall => std.heap.smp_allocator, }; - defer if (@import("builtin").mode == .Debug) std.debug.assert(dbg.deinit() == .ok); try cli.run(allocator); diff --git a/src/test.zig b/src/test.zig index e546dd0..be1732c 100644 --- a/src/test.zig +++ b/src/test.zig @@ -5,9 +5,11 @@ test "all" { _ = @import("lib/fast.zig"); _ = @import("lib/bandwidth.zig"); _ = @import("lib/http_latency_tester.zig"); + _ = @import("lib/spinner/spinner.zig"); _ = @import("lib/workers/speed_worker.zig"); // Dedicated test modules _ = @import("lib/tests/measurement_strategy_test.zig"); + _ = @import("lib/tests/parity_fixture_test.zig"); _ = @import("lib/tests/worker_manager_test.zig"); }