Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ pub struct Args {
#[clap(long, default_value_t = 0.1)]
pub timing_threshold: f64,

/// Include stats of slow requests in the final report.
#[clap(long, default_value_t = false)]
pub measure_slow_requests: bool,

/// Use UUIDs instead of sequential ids
#[clap(long, default_value_t = false)]
pub uuids: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const GEO_RADIUS_METERS_MAX: f64 = 50000.0;

const BOOL_TRUE_RATIO: f64 = 0.7;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Copy)]
pub struct Timing {
pub delay_millis: f64, // milliseconds
pub value: f64,
Expand Down
10 changes: 7 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ fn main() {
.enable_all()
.build();

runtime
let res = runtime
.unwrap()
.block_on(run_benchmark(args, stopped))
.unwrap();
.block_on(run_benchmark(args, stopped.clone()));

if let Err(err) = res {
stopped.store(true, Ordering::SeqCst);
println!("\n=> Bfb exited early because of an error:\n{err:?}");
}
}
2 changes: 2 additions & 0 deletions src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ pub trait Processor {
}

fn get_batch_size(&self) -> usize;

fn slow_requests(&self) -> Vec<Timing>;
}
62 changes: 39 additions & 23 deletions src/scroll.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};

use indicatif::ProgressBar;
Expand All @@ -13,12 +13,18 @@ pub struct ScrollProcessor {
args: Args,
stopped: Arc<AtomicBool>,
clients: Vec<Qdrant>,
pub start_timestamp_millis: f64,
start_timestamp_millis: f64,
start_time: std::time::Instant,
pub server_timings: Mutex<Vec<Timing>>,
pub rps: Mutex<Vec<Timing>>,
pub full_timings: Mutex<Vec<Timing>>,
pub uuids: Vec<String>,
uuids: Vec<String>,
stats: Mutex<ScrollStats>,
}

#[derive(Default)]
pub struct ScrollStats {
server_timings: Vec<Timing>,
rps: Vec<Timing>,
full_timings: Vec<Timing>,
slow_scroll: Vec<Timing>,
}

impl ScrollProcessor {
Expand All @@ -37,10 +43,8 @@ impl ScrollProcessor {
.unwrap()
.as_millis() as f64,
start_time: std::time::Instant::now(),
server_timings: Mutex::new(Vec::new()),
rps: Mutex::new(Vec::new()),
full_timings: Mutex::new(Vec::new()),
uuids,
stats: Mutex::new(ScrollStats::default()),
}
}

Expand All @@ -50,7 +54,7 @@ impl ScrollProcessor {
args: &Args,
progress_bar: &ProgressBar,
) -> Result<(), anyhow::Error> {
if self.stopped.load(std::sync::atomic::Ordering::Relaxed) {
if self.stopped.load(Ordering::Relaxed) {
return Ok(());
}

Expand Down Expand Up @@ -95,9 +99,13 @@ impl ScrollProcessor {
value: elapsed,
};

self.full_timings.lock().unwrap().push(full_timing);
let server_timing = Timing {
delay_millis: self.start_time.elapsed().as_millis() as f64,
value: res.time,
};

if res.time > self.args.timing_threshold {
let slow_request = res.time > self.args.timing_threshold;
if slow_request {
progress_bar.println(format!("Slow scroll: {:?}", res.time));
}

Expand All @@ -109,18 +117,22 @@ impl ScrollProcessor {
));
}

let server_timing = Timing {
delay_millis: self.start_time.elapsed().as_millis() as f64,
value: res.time,
};

let rps_timing = Timing {
delay_millis: self.start_time.elapsed().as_millis() as f64,
value: progress_bar.per_sec(),
};

self.server_timings.lock().unwrap().push(server_timing);
self.rps.lock().unwrap().push(rps_timing);
// Update stats all at once
{
let mut stats = self.stats.lock().unwrap();
stats.full_timings.push(full_timing);
stats.server_timings.push(server_timing);
stats.rps.push(rps_timing);

if slow_request {
stats.slow_scroll.push(server_timing);
}
}

if let Some(delay_millis) = self.args.delay {
tokio::time::sleep(std::time::Duration::from_millis(delay_millis as u64)).await;
Expand All @@ -145,23 +157,27 @@ impl Processor for ScrollProcessor {
}

fn server_timings(&self) -> Vec<Timing> {
self.server_timings.lock().unwrap().clone()
self.stats.lock().unwrap().server_timings.clone()
}

fn qps(&self) -> Vec<Timing> {
self.rps.lock().unwrap().clone()
self.stats.lock().unwrap().rps.clone()
}

fn rps(&self) -> Vec<Timing> {
// for requests without batching, qps = rps
self.rps.lock().unwrap().clone()
self.stats.lock().unwrap().rps.clone()
}

fn full_timings(&self) -> Vec<Timing> {
self.full_timings.lock().unwrap().clone()
self.stats.lock().unwrap().full_timings.clone()
}

fn get_batch_size(&self) -> usize {
1 // No batching for scroll.
}

fn slow_requests(&self) -> Vec<Timing> {
self.stats.lock().unwrap().slow_scroll.clone()
}
}
15 changes: 12 additions & 3 deletions src/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ struct SearchStats {
rps: Vec<Timing>,
full_timings: Vec<Timing>,
precisions: Vec<f32>,
slow_request_timings: Vec<Timing>,
}

pub struct SearchProcessor {
args: Args,
stopped: Arc<AtomicBool>,
clients: Vec<Qdrant>,
pub start_timestamp_millis: f64,
start_timestamp_millis: f64,
start_time: std::time::Instant,
stats: Mutex<SearchStats>,
pub uuids: Vec<String>,
uuids: Vec<String>,
}

impl SearchProcessor {
Expand Down Expand Up @@ -248,7 +249,8 @@ impl SearchProcessor {
value: elapsed,
};

if res_batch.time > self.args.timing_threshold {
let slow_request = res_batch.time > self.args.timing_threshold;
if slow_request {
progress_bar.println(format!("Slow search: {:?}", res_batch.time));
}

Expand All @@ -274,6 +276,9 @@ impl SearchProcessor {
stats_guard.qps.push(qps_timing);
stats_guard.rps.push(rps_timing);
stats_guard.full_timings.push(full_timing);
if slow_request {
stats_guard.slow_request_timings.push(server_timing);
}
}

if let Some(delay_millis) = self.args.delay {
Expand Down Expand Up @@ -400,4 +405,8 @@ impl Processor for SearchProcessor {
fn get_batch_size(&self) -> usize {
self.args.search_batch_size
}

fn slow_requests(&self) -> Vec<Timing> {
self.stats.lock().unwrap().slow_request_timings.clone()
}
}
29 changes: 23 additions & 6 deletions src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub async fn process<P: Processor + Sync>(
progress_bar.set_draw_target(draw_target);

// Use RPS mode if --rps is set, otherwise use parallel mode
if let Some(target_rps) = args.rps {
let res = if let Some(target_rps) = args.rps {
process_with_rps(
args,
stopped.clone(),
Expand All @@ -88,7 +88,7 @@ pub async fn process<P: Processor + Sync>(
batch_size,
target_rps,
)
.await?;
.await
} else {
process_with_parallel(
args,
Expand All @@ -98,8 +98,8 @@ pub async fn process<P: Processor + Sync>(
batch_count,
batch_size,
)
.await?;
}
.await
};

if stopped.load(Ordering::Relaxed) {
progress_bar.abandon();
Expand All @@ -108,6 +108,12 @@ pub async fn process<P: Processor + Sync>(
}

let mut full_timings = processor.full_timings();

// Don't print stats if we didn't execute at least one request.
if res.is_err() && full_timings.is_empty() {
return Err(res.err().unwrap());
}

println!("--- Request timings ---");
print_stats(args, &mut full_timings, "request time", true);
let mut server_timings = processor.server_timings();
Expand All @@ -133,10 +139,18 @@ pub async fn process<P: Processor + Sync>(
println!("Median precision@10: {p50_time}");
}

if args.json.is_some() {
// Include details of slow requests.
if args.measure_slow_requests {
println!("--- Slow requests ---");
let mut slow_requests = processor.slow_requests();
println!("Total: {}", slow_requests.len());
print_stats(args, &mut slow_requests, "slow request time", true);
}

if let Some(json_path) = &args.json {
println!("--- Writing results to json file ---");
write_to_json(
args.json.as_ref().unwrap(),
json_path,
SearcherResults {
server_timings: server_timings.iter().map(|x| x.value).collect(),
rps: rps.iter().map(|x| x.value).collect(),
Expand Down Expand Up @@ -165,6 +179,9 @@ pub async fn process<P: Processor + Sync>(
)?;
}

// Don't ignore the error if occurred.
res?;

Ok(())
}

Expand Down