Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/dogstatsd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ datadog-fips = { path = "../datadog-fips", default-features = false }
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] }

[dev-dependencies]
http = "1"
mockito = { version = "1.5.0", default-features = false }
proptest = "1.4.0"
tracing-test = { version = "0.2.5", default-features = false }
Expand Down
104 changes: 104 additions & 0 deletions crates/dogstatsd/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,4 +757,108 @@ pub mod tests {
assert_eq!(deserialized.sketches().len(), 10);
assert_eq!(deserialized, distribution);
}

#[test]
#[allow(clippy::float_cmp)]
fn count_aggregation_sums_values() {
let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap();

let m1 = parse("hits:3|c|#env:prod|T1656581409").expect("parse failed");
let m2 = parse("hits:7|c|#env:prod|T1656581409").expect("parse failed");
let m3 = parse("hits:5|c|#env:prod|T1656581409").expect("parse failed");

aggregator.insert(m1).unwrap();
aggregator.insert(m2).unwrap();
aggregator.insert(m3).unwrap();

// Same name + tags + timestamp bucket = one context, values summed
assert_eq!(aggregator.map.len(), 1);

let entry = aggregator
.get_entry_by_id(
"hits".into(),
&Some(SortedTags::parse("env:prod").unwrap()),
1656581400,
)
.unwrap();
assert_eq!(entry.value.get_value().unwrap(), 15.0);
}

#[test]
#[allow(clippy::float_cmp)]
fn gauge_aggregation_last_wins() {
let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap();

let m1 = parse("cpu:30.0|g|#host:a|T1656581409").expect("parse failed");
let m2 = parse("cpu:55.0|g|#host:a|T1656581409").expect("parse failed");
let m3 = parse("cpu:42.0|g|#host:a|T1656581409").expect("parse failed");

aggregator.insert(m1).unwrap();
aggregator.insert(m2).unwrap();
aggregator.insert(m3).unwrap();

// Same context, gauge = last value wins
assert_eq!(aggregator.map.len(), 1);

let entry = aggregator
.get_entry_by_id(
"cpu".into(),
&Some(SortedTags::parse("host:a").unwrap()),
1656581400,
)
.unwrap();
assert_eq!(entry.value.get_value().unwrap(), 42.0);
}

#[test]
fn distribution_aggregation_merges_sketches() {
let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap();

let m1 = parse("latency:10.0|d|#svc:web|T1656581409").expect("parse failed");
let m2 = parse("latency:20.0|d|#svc:web|T1656581409").expect("parse failed");
let m3 = parse("latency:30.0|d|#svc:web|T1656581409").expect("parse failed");

aggregator.insert(m1).unwrap();
aggregator.insert(m2).unwrap();
aggregator.insert(m3).unwrap();

// Same context, distributions merge into one sketch
assert_eq!(aggregator.map.len(), 1);

let entry = aggregator
.get_entry_by_id(
"latency".into(),
&Some(SortedTags::parse("svc:web").unwrap()),
1656581400,
)
.unwrap();
let sketch = entry.value.get_sketch().unwrap();
assert!((sketch.min().unwrap() - 10.0).abs() < PRECISION);
assert!((sketch.max().unwrap() - 30.0).abs() < PRECISION);
assert!((sketch.sum().unwrap() - 60.0).abs() < PRECISION);
assert_eq!(sketch.count(), 3);
}

#[test]
#[allow(clippy::float_cmp)]
fn mixed_metric_types_stay_separate() {
let mut aggregator = Aggregator::new(EMPTY_TAGS, 10).unwrap();

// Same name but different types and tags keep different contexts
let count = parse("req:1|c|#route:a|T1656581409").expect("parse failed");
let gauge = parse("req:5|g|#route:b|T1656581409").expect("parse failed");
let dist = parse("req:100|d|#route:c|T1656581409").expect("parse failed");

aggregator.insert(count).unwrap();
aggregator.insert(gauge).unwrap();
aggregator.insert(dist).unwrap();

assert_eq!(aggregator.map.len(), 3);

let series = aggregator.to_series();
assert_eq!(series.len(), 2); // count + gauge

let protos = aggregator.distributions_to_protobuf();
assert_eq!(protos.sketches().len(), 1); // distribution
}
}
16 changes: 16 additions & 0 deletions crates/dogstatsd/src/dogstatsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,22 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
.starts_with("custom.namespace.my.metric"));
}

#[tokio::test]
async fn test_dogstatsd_filter_lambda_enhanced_invocations() {
let response = setup_and_consume_dogstatsd(
"aws.lambda.enhanced.invocations:1|c\ncustom.metric:5|c\n",
None,
)
.await;

// aws.lambda.enhanced.invocations should be filtered out
assert_eq!(response.series.len(), 1);
assert_eq!(response.series[0].series.len(), 1);
assert!(response.series[0].series[0]
.metric
.starts_with("custom.metric"));
}

#[tokio::test]
async fn test_create_udp_socket_default_so_rcvbuf() {
let socket = super::create_udp_socket("127.0.0.1:0", None).await.unwrap();
Expand Down
138 changes: 138 additions & 0 deletions crates/dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,142 @@ mod tests {
// Should attempt to flush and return Some with failed metrics (since we're not mocking the API)
assert!(result.is_some());
}

// ---- should_try_next_batch tests ----

/// Helper to build a minimal reqwest::Response with a given status code.
async fn mock_response(status: u16) -> Response {
http::Response::builder()
.status(status)
.body("")
.unwrap()
.into()
}

#[tokio::test]
async fn test_should_try_next_batch_accepted() {
let resp = Ok(mock_response(202).await);
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
assert!(continue_shipping, "202 should continue to next batch");
assert!(!should_retry, "202 should not retry");
}

#[tokio::test]
async fn test_should_try_next_batch_ok_non_accepted() {
// 200 OK is unexpected for the intake API (it expects 202)
let resp = Ok(mock_response(200).await);
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
assert!(
!continue_shipping,
"unexpected 200 is not 4xx, should not continue"
);
assert!(
should_retry,
"unexpected 200 is not 4xx, should retry (treated as transient)"
);
}

#[tokio::test]
async fn test_should_try_next_batch_400_permanent() {
let resp = Ok(mock_response(400).await);
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
assert!(
continue_shipping,
"4xx permanent error should continue to next batch"
);
assert!(!should_retry, "4xx permanent error should not retry");
}

#[tokio::test]
async fn test_should_try_next_batch_403_permanent() {
let resp = Ok(mock_response(403).await);
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
assert!(
continue_shipping,
"403 permanent error should continue to next batch"
);
assert!(!should_retry, "403 permanent error should not retry");
}

#[tokio::test]
async fn test_should_try_next_batch_500_temporary() {
let resp = Ok(mock_response(500).await);
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
assert!(
!continue_shipping,
"5xx temporary error should not continue"
);
assert!(should_retry, "5xx temporary error should retry");
}

#[tokio::test]
async fn test_should_try_next_batch_503_temporary() {
let resp = Ok(mock_response(503).await);
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
assert!(
!continue_shipping,
"503 temporary error should not continue"
);
assert!(should_retry, "503 temporary error should retry");
}

#[tokio::test]
async fn test_should_try_next_batch_payload_error() {
let resp = Err(ShippingError::Payload("bad data".to_string()));
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
assert!(
continue_shipping,
"payload error should continue to next batch (data is malformed)"
);
assert!(
!should_retry,
"payload error should not retry (data won't change)"
);
}

#[tokio::test]
async fn test_should_try_next_batch_destination_error_temporary() {
let resp = Err(ShippingError::Destination(
Some(StatusCode::INTERNAL_SERVER_ERROR),
"server down".to_string(),
));
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
assert!(
!continue_shipping,
"5xx destination error should not continue"
);
assert!(should_retry, "5xx destination error should retry");
}

#[tokio::test]
async fn test_should_try_next_batch_destination_error_permanent() {
let resp = Err(ShippingError::Destination(
Some(StatusCode::FORBIDDEN),
"bad key".to_string(),
));
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
assert!(
!continue_shipping,
"4xx destination error should not continue"
);
assert!(!should_retry, "4xx destination error should not retry");
}

#[tokio::test]
async fn test_should_try_next_batch_destination_error_no_status() {
// No status code (e.g., timeout / connection refused)
let resp = Err(ShippingError::Destination(
None,
"connection refused".to_string(),
));
let (continue_shipping, should_retry) = should_try_next_batch(resp).await;
assert!(
!continue_shipping,
"no-status destination error should not continue"
);
assert!(
should_retry,
"no-status destination error should retry (transient)"
);
}
}
Loading