diff --git a/src/args.rs b/src/args.rs index 0d0d163..107fd41 100644 --- a/src/args.rs +++ b/src/args.rs @@ -181,6 +181,10 @@ pub struct Args { #[clap(long, default_value_t = 0.1)] pub timing_threshold: f64, + /// Include stats of slow requests in the final report. + #[clap(long, default_value_t = false)] + pub measure_slow_requests: bool, + /// Use UUIDs instead of sequential ids #[clap(long, default_value_t = false)] pub uuids: bool, diff --git a/src/common.rs b/src/common.rs index 3f19a0d..b2a697f 100644 --- a/src/common.rs +++ b/src/common.rs @@ -44,7 +44,7 @@ const GEO_RADIUS_METERS_MAX: f64 = 50000.0; const BOOL_TRUE_RATIO: f64 = 0.7; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct Timing { pub delay_millis: f64, // milliseconds pub value: f64, diff --git a/src/main.rs b/src/main.rs index ddd1ff3..1c2dc89 100644 --- a/src/main.rs +++ b/src/main.rs @@ -95,8 +95,12 @@ fn main() { .enable_all() .build(); - runtime + let res = runtime .unwrap() - .block_on(run_benchmark(args, stopped)) - .unwrap(); + .block_on(run_benchmark(args, stopped.clone())); + + if let Err(err) = res { + stopped.store(true, Ordering::SeqCst); + println!("\n=> Bfb exited early because of an error:\n{err:?}"); + } } diff --git a/src/processor.rs b/src/processor.rs index e1bfa75..f4aceba 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -27,4 +27,6 @@ pub trait Processor { } fn get_batch_size(&self) -> usize; + + fn slow_requests(&self) -> Vec; } diff --git a/src/scroll.rs b/src/scroll.rs index 0e380b7..bb51472 100644 --- a/src/scroll.rs +++ b/src/scroll.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use indicatif::ProgressBar; @@ -13,12 +13,18 @@ pub struct ScrollProcessor { args: Args, stopped: Arc, clients: Vec, - pub start_timestamp_millis: f64, + start_timestamp_millis: f64, start_time: std::time::Instant, - pub server_timings: Mutex>, - pub rps: Mutex>, - pub full_timings: Mutex>, - pub uuids: Vec, + uuids: Vec, + stats: Mutex, +} + +#[derive(Default)] +pub struct ScrollStats { + server_timings: Vec, + rps: Vec, + full_timings: Vec, + slow_scroll: Vec, } impl ScrollProcessor { @@ -37,10 +43,8 @@ impl ScrollProcessor { .unwrap() .as_millis() as f64, start_time: std::time::Instant::now(), - server_timings: Mutex::new(Vec::new()), - rps: Mutex::new(Vec::new()), - full_timings: Mutex::new(Vec::new()), uuids, + stats: Mutex::new(ScrollStats::default()), } } @@ -50,7 +54,7 @@ impl ScrollProcessor { args: &Args, progress_bar: &ProgressBar, ) -> Result<(), anyhow::Error> { - if self.stopped.load(std::sync::atomic::Ordering::Relaxed) { + if self.stopped.load(Ordering::Relaxed) { return Ok(()); } @@ -95,9 +99,13 @@ impl ScrollProcessor { value: elapsed, }; - self.full_timings.lock().unwrap().push(full_timing); + let server_timing = Timing { + delay_millis: self.start_time.elapsed().as_millis() as f64, + value: res.time, + }; - if res.time > self.args.timing_threshold { + let slow_request = res.time > self.args.timing_threshold; + if slow_request { progress_bar.println(format!("Slow scroll: {:?}", res.time)); } @@ -109,18 +117,22 @@ impl ScrollProcessor { )); } - let server_timing = Timing { - delay_millis: self.start_time.elapsed().as_millis() as f64, - value: res.time, - }; - let rps_timing = Timing { delay_millis: self.start_time.elapsed().as_millis() as f64, value: progress_bar.per_sec(), }; - self.server_timings.lock().unwrap().push(server_timing); - self.rps.lock().unwrap().push(rps_timing); + // Update stats all at once + { + let mut stats = self.stats.lock().unwrap(); + stats.full_timings.push(full_timing); + stats.server_timings.push(server_timing); + stats.rps.push(rps_timing); + + if slow_request { + stats.slow_scroll.push(server_timing); + } + } if let Some(delay_millis) = self.args.delay { tokio::time::sleep(std::time::Duration::from_millis(delay_millis as u64)).await; @@ -145,23 +157,27 @@ impl Processor for ScrollProcessor { } fn server_timings(&self) -> Vec { - self.server_timings.lock().unwrap().clone() + self.stats.lock().unwrap().server_timings.clone() } fn qps(&self) -> Vec { - self.rps.lock().unwrap().clone() + self.stats.lock().unwrap().rps.clone() } fn rps(&self) -> Vec { // for requests without batching, qps = rps - self.rps.lock().unwrap().clone() + self.stats.lock().unwrap().rps.clone() } fn full_timings(&self) -> Vec { - self.full_timings.lock().unwrap().clone() + self.stats.lock().unwrap().full_timings.clone() } fn get_batch_size(&self) -> usize { 1 // No batching for scroll. } + + fn slow_requests(&self) -> Vec { + self.stats.lock().unwrap().slow_scroll.clone() + } } diff --git a/src/search.rs b/src/search.rs index b6d33be..d02610b 100644 --- a/src/search.rs +++ b/src/search.rs @@ -24,16 +24,17 @@ struct SearchStats { rps: Vec, full_timings: Vec, precisions: Vec, + slow_request_timings: Vec, } pub struct SearchProcessor { args: Args, stopped: Arc, clients: Vec, - pub start_timestamp_millis: f64, + start_timestamp_millis: f64, start_time: std::time::Instant, stats: Mutex, - pub uuids: Vec, + uuids: Vec, } impl SearchProcessor { @@ -248,7 +249,8 @@ impl SearchProcessor { value: elapsed, }; - if res_batch.time > self.args.timing_threshold { + let slow_request = res_batch.time > self.args.timing_threshold; + if slow_request { progress_bar.println(format!("Slow search: {:?}", res_batch.time)); } @@ -274,6 +276,9 @@ impl SearchProcessor { stats_guard.qps.push(qps_timing); stats_guard.rps.push(rps_timing); stats_guard.full_timings.push(full_timing); + if slow_request { + stats_guard.slow_request_timings.push(server_timing); + } } if let Some(delay_millis) = self.args.delay { @@ -400,4 +405,8 @@ impl Processor for SearchProcessor { fn get_batch_size(&self) -> usize { self.args.search_batch_size } + + fn slow_requests(&self) -> Vec { + self.stats.lock().unwrap().slow_request_timings.clone() + } } diff --git a/src/stats.rs b/src/stats.rs index f0172c2..5d3053d 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -78,7 +78,7 @@ pub async fn process( progress_bar.set_draw_target(draw_target); // Use RPS mode if --rps is set, otherwise use parallel mode - if let Some(target_rps) = args.rps { + let res = if let Some(target_rps) = args.rps { process_with_rps( args, stopped.clone(), @@ -88,7 +88,7 @@ pub async fn process( batch_size, target_rps, ) - .await?; + .await } else { process_with_parallel( args, @@ -98,8 +98,8 @@ pub async fn process( batch_count, batch_size, ) - .await?; - } + .await + }; if stopped.load(Ordering::Relaxed) { progress_bar.abandon(); @@ -108,6 +108,12 @@ pub async fn process( } let mut full_timings = processor.full_timings(); + + // Don't print stats if we didn't execute at least one request. + if res.is_err() && full_timings.is_empty() { + return Err(res.err().unwrap()); + } + println!("--- Request timings ---"); print_stats(args, &mut full_timings, "request time", true); let mut server_timings = processor.server_timings(); @@ -133,10 +139,18 @@ pub async fn process( println!("Median precision@10: {p50_time}"); } - if args.json.is_some() { + // Include details of slow requests. + if args.measure_slow_requests { + println!("--- Slow requests ---"); + let mut slow_requests = processor.slow_requests(); + println!("Total: {}", slow_requests.len()); + print_stats(args, &mut slow_requests, "slow request time", true); + } + + if let Some(json_path) = &args.json { println!("--- Writing results to json file ---"); write_to_json( - args.json.as_ref().unwrap(), + json_path, SearcherResults { server_timings: server_timings.iter().map(|x| x.value).collect(), rps: rps.iter().map(|x| x.value).collect(), @@ -165,6 +179,9 @@ pub async fn process( )?; } + // Don't ignore the error if occurred. + res?; + Ok(()) }