diff --git a/sds/src/lib.rs b/sds/src/lib.rs index 4527fe2b..72dc2a67 100644 --- a/sds/src/lib.rs +++ b/sds/src/lib.rs @@ -45,8 +45,8 @@ pub use scanner::shared_pool::{SharedPool, SharedPoolGuard}; pub use scanner::suppression::Suppressions; pub use scanner::{ CompiledRule, MatchEmitter, Precedence, RootCompiledRule, RootRuleConfig, RuleResult, - RuleStatus, ScanOptionBuilder, Scanner, ScannerBuilder, SharedData, StringMatch, - StringMatchesCtx, + RuleStatus, ScanMetrics, ScanOptionBuilder, ScanResult, Scanner, ScannerBuilder, SharedData, + StringMatch, StringMatchesCtx, config::RuleConfig, error::{CreateScannerError, ScannerError}, regex_rule::config::{ diff --git a/sds/src/scanner/metrics.rs b/sds/src/scanner/metrics.rs index bd450a8f..66658f82 100644 --- a/sds/src/scanner/metrics.rs +++ b/sds/src/scanner/metrics.rs @@ -1,5 +1,5 @@ use crate::Labels; -use metrics::{Counter, counter}; +use metrics::{Counter, Histogram, counter, histogram}; pub struct RuleMetrics { pub false_positive_excluded_attributes: Counter, @@ -21,15 +21,17 @@ pub struct ScannerMetrics { pub duration_ns: Counter, pub match_count: Counter, pub suppressed_match_count: Counter, + pub cpu_duration: Histogram, } impl ScannerMetrics { - pub fn new(labels: &Labels) -> Self { + pub fn new(labels: &Labels, highcard_labels: &Labels) -> Self { ScannerMetrics { num_scanned_events: counter!("scanned_events", labels.clone()), duration_ns: counter!("scanning.duration", labels.clone()), match_count: counter!("scanning.match_count", labels.clone()), suppressed_match_count: counter!("scanning.suppressed_match_count", labels.clone()), + cpu_duration: histogram!("scanning.cpu_duration", highcard_labels.clone()), } } } diff --git a/sds/src/scanner/mod.rs b/sds/src/scanner/mod.rs index ddb45f86..3ad018f8 100644 --- a/sds/src/scanner/mod.rs +++ b/sds/src/scanner/mod.rs @@ -257,14 +257,17 @@ impl StringMatchesCtx<'_> { // The future is spawned onto the tokio runtime immediately so it starts running // in the background let fut = TOKIO_RUNTIME.spawn(async move { + let start = Instant::now(); let mut ctx = AsyncStringMatchesCtx { rule_matches: vec![], }; (func)(&mut ctx).await?; + let io_duration = start.elapsed(); Ok(AsyncRuleInfo { rule_index, rule_matches: ctx.rule_matches, + io_duration, }) }); @@ -299,6 +302,20 @@ pub struct PendingRuleJob { pub struct AsyncRuleInfo { rule_index: usize, rule_matches: Vec, + io_duration: Duration, +} + +#[derive(Debug, Clone)] +pub struct ScanMetrics { + pub total_duration: Duration, + pub io_duration: Duration, + pub num_async_rules: usize, +} + +#[derive(Debug)] +pub struct ScanResult { + pub matches: Vec, + pub metrics: ScanMetrics, } /// A rule result that cannot be async @@ -443,6 +460,7 @@ pub struct Scanner { scanner_features: ScannerFeatures, metrics: ScannerMetrics, labels: Labels, + pub highcard_labels: Labels, match_validators_per_type: AHashMap>, per_scanner_data: SharedData, async_scan_timeout: Duration, @@ -470,16 +488,13 @@ impl Scanner { event: &mut E, options: ScanOptions, ) -> Result, ScannerError> { - block_on(self.internal_scan_with_metrics(event, options)) + block_on(self.internal_scan_with_metrics(event, options)).map(|result| result.matches) } // This function scans the given event with the rules configured in the scanner. // The event parameter is a mutable reference to the event that should be scanned (implemented the Event trait). - // The return value is a list of RuleMatch objects, which contain information about the matches that were found. - pub async fn scan_async( - &self, - event: &mut E, - ) -> Result, ScannerError> { + // The return value is a ScanResult containing matches and timing metrics. + pub async fn scan_async(&self, event: &mut E) -> Result { self.scan_async_with_options(event, ScanOptions::default()) .await } @@ -488,7 +503,7 @@ impl Scanner { &self, event: &mut E, options: ScanOptions, - ) -> Result, ScannerError> { + ) -> Result { let fut = self.internal_scan_with_metrics(event, options); // The sleep from the timeout requires being in a tokio context @@ -520,18 +535,34 @@ impl Scanner { &self, event: &mut E, options: ScanOptions, - ) -> Result, ScannerError> { + ) -> Result { let start = Instant::now(); let result = self.internal_scan(event, options).await; - match &result { - Ok(rule_matches) => { - self.record_metrics(rule_matches, start); + match result { + Ok((rule_matches, io_duration, num_async_rules)) => { + self.record_metrics(&rule_matches, start); + let total_duration = start.elapsed(); + + // Calculate CPU duration by subtracting I/O wait time from total duration + let cpu_duration = total_duration.saturating_sub(io_duration); + + // Record CPU duration histogram in nanoseconds + self.metrics.cpu_duration.record(cpu_duration.as_nanos() as f64); + + Ok(ScanResult { + matches: rule_matches, + metrics: ScanMetrics { + total_duration, + io_duration, + num_async_rules, + }, + }) } - Err(_) => { + Err(e) => { self.record_metrics(&[], start); + Err(e) } } - result } fn process_rule_matches( @@ -611,7 +642,7 @@ impl Scanner { &self, event: &mut E, options: ScanOptions, - ) -> Result, ScannerError> { + ) -> Result<(Vec, Duration, usize), ScannerError> { // If validation is requested, we need to collect match content even if the scanner // wasn't originally configured to return matches let need_match_content = self.scanner_features.return_matches || options.validate_matches; @@ -639,8 +670,12 @@ impl Scanner { // The async jobs were already spawned on the tokio runtime, so the // results just need to be collected + let num_async_jobs = async_jobs.len(); + let mut total_io_duration = Duration::ZERO; + for job in async_jobs { let rule_info = job.fut.await.unwrap()?; + total_io_duration += rule_info.io_duration; rule_matches.push_async_matches( &job.path, rule_info @@ -664,7 +699,7 @@ impl Scanner { self.validate_matches(&mut output_rule_matches); } - Ok(output_rule_matches) + Ok((output_rule_matches, total_io_duration, num_async_jobs)) } pub fn suppress_matches( @@ -926,6 +961,7 @@ impl Drop for Scanner { pub struct ScannerBuilder<'a> { rules: &'a [RootRuleConfig>], labels: Labels, + highcard_labels: Labels, scanner_features: ScannerFeatures, async_scan_timeout: Duration, } @@ -935,6 +971,7 @@ impl ScannerBuilder<'_> { ScannerBuilder { rules, labels: Labels::empty(), + highcard_labels: Labels::empty(), scanner_features: ScannerFeatures::default(), async_scan_timeout: Duration::from_secs(60 * 5), } @@ -945,6 +982,11 @@ impl ScannerBuilder<'_> { self } + pub fn highcard_labels(mut self, labels: Labels) -> Self { + self.highcard_labels = labels; + self + } + pub fn with_async_scan_timeout(mut self, duration: Duration) -> Self { self.async_scan_timeout = duration; self @@ -1065,9 +1107,10 @@ impl ScannerBuilder<'_> { rules: compiled_rules, scoped_ruleset, scanner_features: self.scanner_features, - metrics: ScannerMetrics::new(&self.labels), + metrics: ScannerMetrics::new(&self.labels, &self.highcard_labels), match_validators_per_type, labels: self.labels, + highcard_labels: self.highcard_labels, per_scanner_data, async_scan_timeout: self.async_scan_timeout, }) diff --git a/sds/src/scanner/test/async_rule.rs b/sds/src/scanner/test/async_rule.rs index cca7d491..cec1b98e 100644 --- a/sds/src/scanner/test/async_rule.rs +++ b/sds/src/scanner/test/async_rule.rs @@ -65,8 +65,8 @@ async fn run_async_rule() { // async scan let mut input = "this is a secret with random data".to_owned(); - let matched_rules = scanner.scan_async(&mut input).await.unwrap(); - assert_eq!(matched_rules.len(), 1); + let result = scanner.scan_async(&mut input).await.unwrap(); + assert_eq!(result.matches.len(), 1); assert_eq!(input, "this is a [REDACTED] with random data"); } @@ -107,8 +107,8 @@ fn async_scan_outside_of_tokio() { let fut = async move { let mut input = "this is a secret with random data".to_owned(); - let matched_rules = scanner.scan_async(&mut input).await.unwrap(); - assert_eq!(matched_rules.len(), 1); + let result = scanner.scan_async(&mut input).await.unwrap(); + assert_eq!(result.matches.len(), 1); assert_eq!(input, "this is a [REDACTED] with random data"); };