diff --git a/Cargo.lock b/Cargo.lock index 6617fdf..d223f48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -508,6 +508,7 @@ dependencies = [ "derive_more", "fnv", "hashbrown 0.15.5", + "http", "mockito", "proptest", "protobuf", diff --git a/crates/dogstatsd/Cargo.toml b/crates/dogstatsd/Cargo.toml index e938879..1fcdbe7 100644 --- a/crates/dogstatsd/Cargo.toml +++ b/crates/dogstatsd/Cargo.toml @@ -29,6 +29,7 @@ datadog-fips = { path = "../datadog-fips", default-features = false } rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] } [dev-dependencies] +http = "1" mockito = { version = "1.5.0", default-features = false } proptest = "1.4.0" tracing-test = { version = "0.2.5", default-features = false } diff --git a/crates/dogstatsd/src/aggregator.rs b/crates/dogstatsd/src/aggregator.rs index a453eda..15799e5 100644 --- a/crates/dogstatsd/src/aggregator.rs +++ b/crates/dogstatsd/src/aggregator.rs @@ -757,4 +757,108 @@ pub mod tests { assert_eq!(deserialized.sketches().len(), 10); assert_eq!(deserialized, distribution); } + + #[test] + #[allow(clippy::float_cmp)] + fn count_aggregation_sums_values() { + let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap(); + + let m1 = parse("hits:3|c|#env:prod|T1656581409").expect("parse failed"); + let m2 = parse("hits:7|c|#env:prod|T1656581409").expect("parse failed"); + let m3 = parse("hits:5|c|#env:prod|T1656581409").expect("parse failed"); + + aggregator.insert(m1).unwrap(); + aggregator.insert(m2).unwrap(); + aggregator.insert(m3).unwrap(); + + // Same name + tags + timestamp bucket = one context, values summed + assert_eq!(aggregator.map.len(), 1); + + let entry = aggregator + .get_entry_by_id( + "hits".into(), + &Some(SortedTags::parse("env:prod").unwrap()), + 1656581400, + ) + .unwrap(); + assert_eq!(entry.value.get_value().unwrap(), 15.0); + } + + #[test] + #[allow(clippy::float_cmp)] + fn gauge_aggregation_last_wins() { + let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap(); + + let m1 = parse("cpu:30.0|g|#host:a|T1656581409").expect("parse failed"); + let m2 = parse("cpu:55.0|g|#host:a|T1656581409").expect("parse failed"); + let m3 = parse("cpu:42.0|g|#host:a|T1656581409").expect("parse failed"); + + aggregator.insert(m1).unwrap(); + aggregator.insert(m2).unwrap(); + aggregator.insert(m3).unwrap(); + + // Same context, gauge = last value wins + assert_eq!(aggregator.map.len(), 1); + + let entry = aggregator + .get_entry_by_id( + "cpu".into(), + &Some(SortedTags::parse("host:a").unwrap()), + 1656581400, + ) + .unwrap(); + assert_eq!(entry.value.get_value().unwrap(), 42.0); + } + + #[test] + fn distribution_aggregation_merges_sketches() { + let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap(); + + let m1 = parse("latency:10.0|d|#svc:web|T1656581409").expect("parse failed"); + let m2 = parse("latency:20.0|d|#svc:web|T1656581409").expect("parse failed"); + let m3 = parse("latency:30.0|d|#svc:web|T1656581409").expect("parse failed"); + + aggregator.insert(m1).unwrap(); + aggregator.insert(m2).unwrap(); + aggregator.insert(m3).unwrap(); + + // Same context, distributions merge into one sketch + assert_eq!(aggregator.map.len(), 1); + + let entry = aggregator + .get_entry_by_id( + "latency".into(), + &Some(SortedTags::parse("svc:web").unwrap()), + 1656581400, + ) + .unwrap(); + let sketch = entry.value.get_sketch().unwrap(); + assert!((sketch.min().unwrap() - 10.0).abs() < PRECISION); + assert!((sketch.max().unwrap() - 30.0).abs() < PRECISION); + assert!((sketch.sum().unwrap() - 60.0).abs() < PRECISION); + assert_eq!(sketch.count(), 3); + } + + #[test] + #[allow(clippy::float_cmp)] + fn mixed_metric_types_stay_separate() { + let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap(); + + // Same name but different types and tags keep different contexts + let count = parse("req:1|c|#route:a|T1656581409").expect("parse failed"); + let gauge = parse("req:5|g|#route:b|T1656581409").expect("parse failed"); + let dist = parse("req:100|d|#route:c|T1656581409").expect("parse failed"); + + aggregator.insert(count).unwrap(); + aggregator.insert(gauge).unwrap(); + aggregator.insert(dist).unwrap(); + + assert_eq!(aggregator.map.len(), 3); + + let series = aggregator.to_series(); + assert_eq!(series.len(), 2); // count + gauge + + let protos = aggregator.distributions_to_protobuf(); + assert_eq!(protos.sketches().len(), 1); // distribution + } } diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index e2ab2a0..5a6f1b2 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -817,6 +817,22 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d .starts_with("custom.namespace.my.metric")); } + #[tokio::test] + async fn test_dogstatsd_filter_lambda_enhanced_invocations() { + let response = setup_and_consume_dogstatsd( + "aws.lambda.enhanced.invocations:1|c\ncustom.metric:5|c\n", + None, + ) + .await; + + // aws.lambda.enhanced.invocations should be filtered out + assert_eq!(response.series.len(), 1); + assert_eq!(response.series[0].series.len(), 1); + assert!(response.series[0].series[0] + .metric + .starts_with("custom.metric")); + } + #[tokio::test] async fn test_create_udp_socket_default_so_rcvbuf() { let socket = super::create_udp_socket("127.0.0.1:0", None).await.unwrap(); diff --git a/crates/dogstatsd/src/flusher.rs b/crates/dogstatsd/src/flusher.rs index e4fbe4b..c8a4d70 100644 --- a/crates/dogstatsd/src/flusher.rs +++ b/crates/dogstatsd/src/flusher.rs @@ -401,4 +401,142 @@ mod tests { // Should attempt to flush and return Some with failed metrics (since we're not mocking the API) assert!(result.is_some()); } + + // ---- should_try_next_batch tests ---- + + /// Helper to build a minimal reqwest::Response with a given status code. + async fn mock_response(status: u16) -> Response { + http::Response::builder() + .status(status) + .body("") + .unwrap() + .into() + } + + #[tokio::test] + async fn test_should_try_next_batch_accepted() { + let resp = Ok(mock_response(202).await); + let (continue_shipping, should_retry) = should_try_next_batch(resp).await; + assert!(continue_shipping, "202 should continue to next batch"); + assert!(!should_retry, "202 should not retry"); + } + + #[tokio::test] + async fn test_should_try_next_batch_ok_non_accepted() { + // 200 OK is unexpected for the intake API (it expects 202) + let resp = Ok(mock_response(200).await); + let (continue_shipping, should_retry) = should_try_next_batch(resp).await; + assert!( + !continue_shipping, + "unexpected 200 is not 4xx, should not continue" + ); + assert!( + should_retry, + "unexpected 200 is not 4xx, should retry (treated as transient)" + ); + } + + #[tokio::test] + async fn test_should_try_next_batch_400_permanent() { + let resp = Ok(mock_response(400).await); + let (continue_shipping, should_retry) = should_try_next_batch(resp).await; + assert!( + continue_shipping, + "4xx permanent error should continue to next batch" + ); + assert!(!should_retry, "4xx permanent error should not retry"); + } + + #[tokio::test] + async fn test_should_try_next_batch_403_permanent() { + let resp = Ok(mock_response(403).await); + let (continue_shipping, should_retry) = should_try_next_batch(resp).await; + assert!( + continue_shipping, + "403 permanent error should continue to next batch" + ); + assert!(!should_retry, "403 permanent error should not retry"); + } + + #[tokio::test] + async fn test_should_try_next_batch_500_temporary() { + let resp = Ok(mock_response(500).await); + let (continue_shipping, should_retry) = should_try_next_batch(resp).await; + assert!( + !continue_shipping, + "5xx temporary error should not continue" + ); + assert!(should_retry, "5xx temporary error should retry"); + } + + #[tokio::test] + async fn test_should_try_next_batch_503_temporary() { + let resp = Ok(mock_response(503).await); + let (continue_shipping, should_retry) = should_try_next_batch(resp).await; + assert!( + !continue_shipping, + "503 temporary error should not continue" + ); + assert!(should_retry, "503 temporary error should retry"); + } + + #[tokio::test] + async fn test_should_try_next_batch_payload_error() { + let resp = Err(ShippingError::Payload("bad data".to_string())); + let (continue_shipping, should_retry) = should_try_next_batch(resp).await; + assert!( + continue_shipping, + "payload error should continue to next batch (data is malformed)" + ); + assert!( + !should_retry, + "payload error should not retry (data won't change)" + ); + } + + #[tokio::test] + async fn test_should_try_next_batch_destination_error_temporary() { + let resp = Err(ShippingError::Destination( + Some(StatusCode::INTERNAL_SERVER_ERROR), + "server down".to_string(), + )); + let (continue_shipping, should_retry) = should_try_next_batch(resp).await; + assert!( + !continue_shipping, + "5xx destination error should not continue" + ); + assert!(should_retry, "5xx destination error should retry"); + } + + #[tokio::test] + async fn test_should_try_next_batch_destination_error_permanent() { + let resp = Err(ShippingError::Destination( + Some(StatusCode::FORBIDDEN), + "bad key".to_string(), + )); + let (continue_shipping, should_retry) = should_try_next_batch(resp).await; + assert!( + !continue_shipping, + "4xx destination error should not continue" + ); + assert!(!should_retry, "4xx destination error should not retry"); + } + + #[tokio::test] + async fn test_should_try_next_batch_destination_error_no_status() { + // No status code (e.g., timeout / connection refused) + let resp = Err(ShippingError::Destination( + None, + "connection refused".to_string(), + )); + let (continue_shipping, should_retry) = should_try_next_batch(resp).await; + assert!( + !continue_shipping, + "no-status destination error should not continue" + ); + assert!( + should_retry, + "no-status destination error should retry (transient)" + ); + } }