From 0537973c11a158c23fe96c39aaa27e7b613217a8 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Fri, 13 Feb 2026 15:12:03 -0500 Subject: [PATCH 1/8] Set up Cgroup, CpuStats, and CpuMetricsCollector structs, and cgroup file reading and cpu stats calculation functions --- Cargo.lock | 20 +- crates/datadog-serverless-compat/src/main.rs | 27 +- crates/datadog-trace-agent/Cargo.toml | 2 + crates/datadog-trace-agent/src/lib.rs | 1 + .../src/metrics_collector.rs | 245 ++++++++++++++++++ 5 files changed, 292 insertions(+), 3 deletions(-) create mode 100644 crates/datadog-trace-agent/src/metrics_collector.rs diff --git a/Cargo.lock b/Cargo.lock index 9985655d..f155151e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -426,6 +426,7 @@ dependencies = [ "async-trait", "bytes", "datadog-fips", + "dogstatsd", "duplicate", "http-body-util", "hyper", @@ -435,6 +436,7 @@ dependencies = [ "libdd-trace-obfuscation", "libdd-trace-protobuf", "libdd-trace-utils", + "num_cpus", "reqwest", "rmp-serde", "serde", @@ -853,6 +855,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -1492,6 +1500,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -1745,7 +1763,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.114", diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index adc106e7..1a1922a5 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -18,7 +18,8 @@ use zstd::zstd_safe::CompressionLevel; use datadog_trace_agent::{ aggregator::TraceAggregator, - config, env_verifier, mini_agent, proxy_flusher, stats_flusher, stats_processor, + config, env_verifier, metrics_collector, mini_agent, proxy_flusher, stats_flusher, + stats_processor, trace_flusher::{self, TraceFlusher}, trace_processor, }; @@ -38,6 +39,7 @@ use dogstatsd::{ use dogstatsd::metric::{SortedTags, EMPTY_TAGS}; use tokio_util::sync::CancellationToken; +const CPU_METRICS_COLLECTION_INTERVAL: u64 = 3; const DOGSTATSD_FLUSH_INTERVAL: u64 = 10; const DOGSTATSD_TIMEOUT_DURATION: Duration = Duration::from_secs(5); const DEFAULT_DOGSTATSD_PORT: u16 = 8125; @@ -103,6 +105,10 @@ pub async fn main() { .ok() .and_then(|val| parse_metric_namespace(&val)); + let dd_enhanced_metrics = env::var("DD_ENHANCED_METRICS") + .map(|val| val.to_lowercase() != "false") + .unwrap_or(true); + let https_proxy = env::var("DD_PROXY_HTTPS") .or_else(|_| env::var("HTTPS_PROXY")) .ok(); @@ -169,7 +175,7 @@ pub async fn main() { } }); - let (metrics_flusher, _aggregator_handle) = if dd_use_dogstatsd { + let (metrics_flusher, aggregator_handle) = if dd_use_dogstatsd { debug!("Starting dogstatsd"); let (_, metrics_flusher, aggregator_handle) = start_dogstatsd( dd_dogstatsd_port, @@ -192,6 +198,23 @@ pub async fn main() { (None, None) }; + // If DD_ENHANCED_METRICS is true, start the CPU metrics collector + // Use the existing aggregator handle + // TODO: See if this works in Google Cloud Functions Gen 1. If not, only enable this for Azure Functions. + if dd_enhanced_metrics { + if let Some(ref handle) = aggregator_handle { + let cpu_collector_handle = handle.clone(); + tokio::spawn(async move { + let mut cpu_collector = metrics_collector::CpuMetricsCollector::new( + cpu_collector_handle, + None, + -1, + CPU_METRICS_COLLECTION_INTERVAL, + ); + }); + } + } + let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); flush_interval.tick().await; // discard first tick, which is instantaneous diff --git a/crates/datadog-trace-agent/Cargo.toml b/crates/datadog-trace-agent/Cargo.toml index 8a608642..e554fe15 100644 --- a/crates/datadog-trace-agent/Cargo.toml +++ b/crates/datadog-trace-agent/Cargo.toml @@ -29,6 +29,8 @@ libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = datadog-fips = { path = "../datadog-fips" } reqwest = { version = "0.12.23", features = ["json", "http2"], default-features = false } bytes = "1.10.1" +dogstatsd = { path = "../dogstatsd", default-features = true } +num_cpus = "1.16" [dev-dependencies] rmp-serde = "1.1.1" diff --git a/crates/datadog-trace-agent/src/lib.rs b/crates/datadog-trace-agent/src/lib.rs index a87bf56b..5ff530ae 100644 --- a/crates/datadog-trace-agent/src/lib.rs +++ b/crates/datadog-trace-agent/src/lib.rs @@ -11,6 +11,7 @@ pub mod aggregator; pub mod config; pub mod env_verifier; pub mod http_utils; +pub mod metrics_collector; pub mod mini_agent; pub mod proxy_flusher; pub mod stats_flusher; diff --git a/crates/datadog-trace-agent/src/metrics_collector.rs b/crates/datadog-trace-agent/src/metrics_collector.rs new file mode 100644 index 00000000..9474da86 --- /dev/null +++ b/crates/datadog-trace-agent/src/metrics_collector.rs @@ -0,0 +1,245 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! CPU metrics collector for Azure Functions +//! +//! This module provides functionality to read raw CPU statistics from cgroup v1 files, +//! compute the CPU usage and limit, and submit them as distribution metrics to Datadog. +//! +//! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores). + +use dogstatsd::aggregator_service::AggregatorHandle; +use dogstatsd::metric::SortedTags; +use num_cpus; +use std::fs; +use std::io; +use tracing::debug; + +const CGROUP_CPU_USAGE_PATH: &str = "/sys/fs/cgroup/cpu/cpuacct.usage"; // Reports the total CPU time, in nanoseconds, consumed by all tasks in this cgroup +const CGROUP_CPUSET_CPUS_PATH: &str = "/sys/fs/cgroup/cpuset/cpuset.cpus"; // Specifies the CPUs that tasks in this cgroup are permitted to access +const CGROUP_CPU_PERIOD_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; // Specifies a period of time, in microseconds, for how regularly a cgroup's access to CPU resources should be reallocated +const CGROUP_CPU_QUOTA_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; // Specifies the total amount of time, in microseconds, for which all tasks in a cgroup can run during one period + +const CPU_USAGE_METRIC: &str = "azure.functions.cpu.usage"; +const CPU_LIMIT_METRIC: &str = "azure.functions.cpu.limit"; + +/// Statistics from cgroup v1 files, normalized to nanoseconds +struct CgroupStats { + total: Option, // Total CPU usage (from cpuacct.usage) in nanoseconds + cpu_count: Option, // Number of accessible logical CPUs (from cpuset.cpus) + scheduler_period: Option, // CFS scheduler period (from cpu.cfs_period_us) in nanoseconds + scheduler_quota: Option, // CFS scheduler quota (from cpu.cfs_quota_us) in nanoseconds +} + +/// Computed CPU total and limit metrics +struct CpuStats { + pub total: f64, // Total CPU usage in nanoseconds + pub limit: Option, // CPU limit in nanoseconds + pub defaulted_limit: bool, // Whether CPU limit was defaulted to host CPU count +} + +fn read_cpu_stats() -> Option { + let cgroup_stats = read_cgroup_stats(); + build_cpu_stats(&cgroup_stats) +} + +/// Builds CPU stats - rate and limit +fn build_cpu_stats(cgroup_stats: &CgroupStats) -> Option { + let total = cgroup_stats.total?; + + let (limit_pct, defaulted) = compute_cpu_limit_pct(cgroup_stats); + + Some(CpuStats { + total: total as f64, + limit: Some(limit_pct), + defaulted_limit: defaulted, + }) +} + +/// Reads raw CPU statistics from cgroup v1 files and converts to nanoseconds +fn read_cgroup_stats() -> CgroupStats { + let total = fs::read_to_string(CGROUP_CPU_USAGE_PATH) + .ok() + .and_then(|contents| contents.trim().parse::().ok()); + if total.is_none() { + debug!("Could not read CPU usage from {CGROUP_CPU_USAGE_PATH}"); + } + + let cpu_count = read_cpu_count_from_file(CGROUP_CPUSET_CPUS_PATH).ok(); + if cpu_count.is_none() { + debug!("Could not read CPU count from {CGROUP_CPUSET_CPUS_PATH}"); + } + + let scheduler_period = fs::read_to_string(CGROUP_CPU_PERIOD_PATH) + .ok() + .and_then(|contents| contents.trim().parse::().map(|v| v * 1000).ok()); // Convert from microseconds to nanoseconds + if scheduler_period.is_none() { + debug!("Could not read scheduler period from {CGROUP_CPU_PERIOD_PATH}"); + } + + let scheduler_quota = fs::read_to_string(CGROUP_CPU_QUOTA_PATH) + .ok() + .and_then(|contents| { + contents.trim().parse::().ok().and_then(|quota| { + // Convert from microseconds to nanoseconds + if quota == -1 { + debug!("CFS scheduler quota is -1, setting to None"); + None + } else { + Some((quota * 1000) as u64) + } + }) + }); + if scheduler_quota.is_none() { + debug!("Could not read scheduler quota from {CGROUP_CPU_QUOTA_PATH}"); + } + + CgroupStats { + total, + cpu_count, + scheduler_period, + scheduler_quota, + } +} + +/// Reads CPU count from cpuset.cpus +/// +/// The cpuset.cpus file contains a comma-separated list, with dashes to represent ranges of CPUs, +/// e.g., "0-2,16" represents CPUs 0, 1, 2, and 16 +/// This function returns the count of CPUs, in this case 4. +fn read_cpu_count_from_file(path: &str) -> Result { + let contents = fs::read_to_string(path)?; + let cpuset_str = contents.trim(); + if cpuset_str.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("File {path} is empty"), + )); + } + debug!("Contents of {path}: {cpuset_str}"); + + let mut cpu_count: u64 = 0; + + for part in cpuset_str.split(',') { + let range: Vec<&str> = part.split('-').collect(); + if range.len() == 2 { + // Range like "0-3" + debug!("Range: {range:?}"); + let start: u64 = range[0].parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to parse u64 from range {range:?}: {e}"), + ) + })?; + let end: u64 = range[1].parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to parse u64 from range {range:?}: {e}"), + ) + })?; + cpu_count += end - start + 1; + } else { + // Single CPU like "2" + debug!("Single CPU: {part}"); + cpu_count += 1; + } + } + + debug!("Total CPU count: {cpu_count}"); + Ok(cpu_count) +} + +/// Computes the CPU limit percentage +fn compute_cpu_limit_pct(cgroup_stats: &CgroupStats) -> (f64, bool) { + match compute_cgroup_cpu_limit_pct(cgroup_stats) { + Some(limit) => (limit, false), + None => { + let host_cpu_count = num_cpus::get() as f64; + debug!( + "No CPU limit found, defaulting to host CPU count: {} CPUs", + host_cpu_count + ); + (host_cpu_count * 100.0, true) + } + } +} + +/// Computes the CPU limit percentage from cgroup statistics +/// Limit is computed using min(CPUSet, CFS CPU Quota) +fn compute_cgroup_cpu_limit_pct(cgroup_stats: &CgroupStats) -> Option { + let mut limit_pct = None; + + if let Some(cpu_count) = cgroup_stats.cpu_count { + let host_cpu_count = num_cpus::get() as u64; + if cpu_count != host_cpu_count { + let cpuset_limit_pct = cpu_count as f64 * 100.0; + limit_pct = Some(cpuset_limit_pct); + debug!( + "CPU limit from cpuset: {} CPUs ({}%)", + cpu_count, cpuset_limit_pct + ); + } + } + + if let (Some(scheduler_quota), Some(scheduler_period)) = + (cgroup_stats.scheduler_quota, cgroup_stats.scheduler_period) + { + let quota_limit_pct = 100.0 * (scheduler_quota as f64 / scheduler_period as f64); + match limit_pct { + None => { + limit_pct = Some(quota_limit_pct); + debug!( + "limit_pct is None, setting CPU limit from cfs quota: {}%", + quota_limit_pct + ); + } + Some(current_limit_pct) if quota_limit_pct < current_limit_pct => { + limit_pct = Some(quota_limit_pct); + debug!("CPU limit from cfs quota is less than current limit, setting CPU limit from cfs quota: {}%", quota_limit_pct); + } + _ => { + debug!("Keeping cpuset limit: {:?}%", limit_pct); + } + } + } + limit_pct +} + +pub struct CpuMetricsCollector { + aggregator: AggregatorHandle, + tags: Option, + last_usage_ns: i64, + collection_interval_secs: u64, +} + +impl CpuMetricsCollector { + /// Creates a new CpuMetricsCollector + /// + /// # Arguments + /// + /// * `aggregator` - The aggregator handle to submit metrics to + /// * `tags` - Optional tags to attach to all metrics + /// * `last_usage_ns` - The last usage time in nanoseconds + /// * `collection_interval_secs` - The interval in seconds to collect the metrics + pub fn new( + aggregator: AggregatorHandle, + tags: Option, + last_usage_ns: i64, + collection_interval_secs: u64, + ) -> Self { + Self { + aggregator, + tags, + last_usage_ns, + collection_interval_secs, + } + } + + pub fn collect_and_submit(&self) { + if let Some(cpu_stats) = read_cpu_stats() { + // Submit metrics + } else { + debug!("Skipping CPU metrics collection - could not find data to generate CPU usage and limit enhanced metrics"); + } + } +} From ac65d2b1084186aaa1f5da92206bface7e19420c Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Fri, 13 Feb 2026 15:36:29 -0500 Subject: [PATCH 2/8] Add cpu collector into loop with dogstatsd --- crates/datadog-serverless-compat/src/main.rs | 46 ++++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 1a1922a5..ebbce410 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -201,29 +201,39 @@ pub async fn main() { // If DD_ENHANCED_METRICS is true, start the CPU metrics collector // Use the existing aggregator handle // TODO: See if this works in Google Cloud Functions Gen 1. If not, only enable this for Azure Functions. - if dd_enhanced_metrics { - if let Some(ref handle) = aggregator_handle { - let cpu_collector_handle = handle.clone(); - tokio::spawn(async move { - let mut cpu_collector = metrics_collector::CpuMetricsCollector::new( - cpu_collector_handle, - None, - -1, - CPU_METRICS_COLLECTION_INTERVAL, - ); - }); - } - } + let cpu_collector = if dd_enhanced_metrics { + aggregator_handle.as_ref().map(|handle| { + metrics_collector::CpuMetricsCollector::new( + handle.clone(), + None, + -1, + CPU_METRICS_COLLECTION_INTERVAL, + ) + }) + } else { + info!("Enhanced metrics disabled"); + None + }; let mut flush_interval = interval(Duration::from_secs(DOGSTATSD_FLUSH_INTERVAL)); + let mut cpu_collection_interval = + interval(Duration::from_secs(CPU_METRICS_COLLECTION_INTERVAL)); flush_interval.tick().await; // discard first tick, which is instantaneous + cpu_collection_interval.tick().await; loop { - flush_interval.tick().await; - - if let Some(metrics_flusher) = metrics_flusher.as_ref() { - debug!("Flushing dogstatsd metrics"); - metrics_flusher.flush().await; + tokio::select! { + _ = flush_interval.tick() => { + if let Some(metrics_flusher) = metrics_flusher.as_ref() { + debug!("Flushing dogstatsd metrics"); + metrics_flusher.flush().await; + } + } + _ = cpu_collection_interval.tick() => { + if let Some(cpu_collector) = cpu_collector.as_ref() { + cpu_collector.collect_and_submit(); + } + } } } } From 0c9fcd06feda4aed2b743b3231054fb9b14ca867 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Fri, 13 Feb 2026 15:40:22 -0500 Subject: [PATCH 3/8] Fix license --- LICENSE-3rdparty.csv | 2 ++ 1 file changed, 2 insertions(+) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 6b221098..804a26ad 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -69,6 +69,7 @@ headers,https://github.com/hyperium/headers,MIT,Sean McArthur heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,The heck Authors heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,Without Boats +hermit-abi,https://github.com/hermit-os/hermit-rs,MIT OR Apache-2.0,Stefan Lankes hex,https://github.com/KokaKiwi/rust-hex,MIT OR Apache-2.0,KokaKiwi home,https://github.com/rust-lang/cargo,MIT OR Apache-2.0,Brian Anderson http,https://github.com/hyperium/http,MIT OR Apache-2.0,"Alex Crichton , Carl Lerche , Sean McArthur " @@ -119,6 +120,7 @@ multimap,https://github.com/havarnov/multimap,MIT OR Apache-2.0,Håvar Nøvik , Josh Triplett , The Nushell Project Developers" num-traits,https://github.com/rust-num/num-traits,MIT OR Apache-2.0,The Rust Project Developers +num_cpus,https://github.com/seanmonstar/num_cpus,MIT OR Apache-2.0,Sean McArthur once_cell,https://github.com/matklad/once_cell,MIT OR Apache-2.0,Aleksey Kladov openssl-probe,https://github.com/rustls/openssl-probe,MIT OR Apache-2.0,Alex Crichton ordered-float,https://github.com/reem/rust-ordered-float,MIT,"Jonathan Reem , Matt Brubeck " From b37e19c8dd8c07fe5ec979dc92338477b7193ff7 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Mon, 23 Feb 2026 11:52:00 -0500 Subject: [PATCH 4/8] Move metrics_collector into its own crate --- Cargo.lock | 10 ++++++++++ crates/datadog-metrics-collector/Cargo.toml | 11 +++++++++++ .../src/cpu.rs} | 11 +++++++++++ crates/datadog-metrics-collector/src/lib.rs | 10 ++++++++++ crates/datadog-serverless-compat/Cargo.toml | 1 + crates/datadog-serverless-compat/src/main.rs | 12 ++++-------- crates/datadog-trace-agent/src/lib.rs | 1 - 7 files changed, 47 insertions(+), 9 deletions(-) create mode 100644 crates/datadog-metrics-collector/Cargo.toml rename crates/{datadog-trace-agent/src/metrics_collector.rs => datadog-metrics-collector/src/cpu.rs} (95%) create mode 100644 crates/datadog-metrics-collector/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index f155151e..81fef7a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -390,6 +390,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "datadog-metrics-collector" +version = "0.1.0" +dependencies = [ + "dogstatsd", + "num_cpus", + "tracing", +] + [[package]] name = "datadog-protos" version = "0.1.0" @@ -408,6 +417,7 @@ dependencies = [ name = "datadog-serverless-compat" version = "0.1.0" dependencies = [ + "datadog-metrics-collector", "datadog-trace-agent", "dogstatsd", "libdd-trace-utils", diff --git a/crates/datadog-metrics-collector/Cargo.toml b/crates/datadog-metrics-collector/Cargo.toml new file mode 100644 index 00000000..4061bd13 --- /dev/null +++ b/crates/datadog-metrics-collector/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "datadog-metrics-collector" +version = "0.1.0" +edition.workspace = true +license.workspace = true +description = "Collector to read, compute, and submit enhanced metrics in Serverless environments" + +[dependencies] +dogstatsd = { path = "../dogstatsd", default-features = true } +num_cpus = "1.16" +tracing = { version = "0.1", default-features = false } diff --git a/crates/datadog-trace-agent/src/metrics_collector.rs b/crates/datadog-metrics-collector/src/cpu.rs similarity index 95% rename from crates/datadog-trace-agent/src/metrics_collector.rs rename to crates/datadog-metrics-collector/src/cpu.rs index 9474da86..1450e8d9 100644 --- a/crates/datadog-trace-agent/src/metrics_collector.rs +++ b/crates/datadog-metrics-collector/src/cpu.rs @@ -238,6 +238,17 @@ impl CpuMetricsCollector { pub fn collect_and_submit(&self) { if let Some(cpu_stats) = read_cpu_stats() { // Submit metrics + debug!("Collected cpu stats!"); + debug!("CPU usage: {}", cpu_stats.total); + if let Some(limit) = cpu_stats.limit { + debug!( + "CPU limit: {}%, defaulted: {}", + limit, cpu_stats.defaulted_limit + ); + } else { + debug!("CPU limit: None, defaulted: {}", cpu_stats.defaulted_limit); + } + debug!("Submitting CPU metrics!"); } else { debug!("Skipping CPU metrics collection - could not find data to generate CPU usage and limit enhanced metrics"); } diff --git a/crates/datadog-metrics-collector/src/lib.rs b/crates/datadog-metrics-collector/src/lib.rs new file mode 100644 index 00000000..dc6c5f0f --- /dev/null +++ b/crates/datadog-metrics-collector/src/lib.rs @@ -0,0 +1,10 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +#![cfg_attr(not(test), deny(clippy::panic))] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] +#![cfg_attr(not(test), deny(clippy::expect_used))] +#![cfg_attr(not(test), deny(clippy::todo))] +#![cfg_attr(not(test), deny(clippy::unimplemented))] + +pub mod cpu; diff --git a/crates/datadog-serverless-compat/Cargo.toml b/crates/datadog-serverless-compat/Cargo.toml index b4f96b42..02378c9d 100644 --- a/crates/datadog-serverless-compat/Cargo.toml +++ b/crates/datadog-serverless-compat/Cargo.toml @@ -7,6 +7,7 @@ description = "Binary to run trace-agent and dogstatsd servers in Serverless env [dependencies] datadog-trace-agent = { path = "../datadog-trace-agent" } +datadog-metrics-collector = { path = "../datadog-metrics-collector" } libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "d52ee90209cb12a28bdda0114535c1a985a29d95" } dogstatsd = { path = "../dogstatsd", default-features = true } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index ebbce410..e5240f4a 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -18,12 +18,13 @@ use zstd::zstd_safe::CompressionLevel; use datadog_trace_agent::{ aggregator::TraceAggregator, - config, env_verifier, metrics_collector, mini_agent, proxy_flusher, stats_flusher, - stats_processor, + config, env_verifier, mini_agent, proxy_flusher, stats_flusher, stats_processor, trace_flusher::{self, TraceFlusher}, trace_processor, }; +use datadog_metrics_collector::cpu::CpuMetricsCollector; + use libdd_trace_utils::{config_utils::read_cloud_env, trace_utils::EnvironmentType}; use dogstatsd::{ @@ -203,12 +204,7 @@ pub async fn main() { // TODO: See if this works in Google Cloud Functions Gen 1. If not, only enable this for Azure Functions. let cpu_collector = if dd_enhanced_metrics { aggregator_handle.as_ref().map(|handle| { - metrics_collector::CpuMetricsCollector::new( - handle.clone(), - None, - -1, - CPU_METRICS_COLLECTION_INTERVAL, - ) + CpuMetricsCollector::new(handle.clone(), None, -1, CPU_METRICS_COLLECTION_INTERVAL) }) } else { info!("Enhanced metrics disabled"); diff --git a/crates/datadog-trace-agent/src/lib.rs b/crates/datadog-trace-agent/src/lib.rs index 5ff530ae..a87bf56b 100644 --- a/crates/datadog-trace-agent/src/lib.rs +++ b/crates/datadog-trace-agent/src/lib.rs @@ -11,7 +11,6 @@ pub mod aggregator; pub mod config; pub mod env_verifier; pub mod http_utils; -pub mod metrics_collector; pub mod mini_agent; pub mod proxy_flusher; pub mod stats_flusher; From 7ffa45e977abaa678ee593adeaf70800e001fab0 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Mon, 23 Feb 2026 17:34:02 -0500 Subject: [PATCH 5/8] Submit cpu usage and limit metrics and fix units --- crates/datadog-metrics-collector/src/cpu.rs | 106 ++++++++++++------- crates/datadog-serverless-compat/src/main.rs | 8 +- 2 files changed, 72 insertions(+), 42 deletions(-) diff --git a/crates/datadog-metrics-collector/src/cpu.rs b/crates/datadog-metrics-collector/src/cpu.rs index 1450e8d9..ed541f11 100644 --- a/crates/datadog-metrics-collector/src/cpu.rs +++ b/crates/datadog-metrics-collector/src/cpu.rs @@ -9,23 +9,23 @@ //! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores). use dogstatsd::aggregator_service::AggregatorHandle; -use dogstatsd::metric::SortedTags; +use dogstatsd::metric::{Metric, MetricValue, SortedTags}; use num_cpus; use std::fs; use std::io; -use tracing::debug; +use tracing::{debug, error}; const CGROUP_CPU_USAGE_PATH: &str = "/sys/fs/cgroup/cpu/cpuacct.usage"; // Reports the total CPU time, in nanoseconds, consumed by all tasks in this cgroup const CGROUP_CPUSET_CPUS_PATH: &str = "/sys/fs/cgroup/cpuset/cpuset.cpus"; // Specifies the CPUs that tasks in this cgroup are permitted to access const CGROUP_CPU_PERIOD_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; // Specifies a period of time, in microseconds, for how regularly a cgroup's access to CPU resources should be reallocated const CGROUP_CPU_QUOTA_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; // Specifies the total amount of time, in microseconds, for which all tasks in a cgroup can run during one period -const CPU_USAGE_METRIC: &str = "azure.functions.cpu.usage"; -const CPU_LIMIT_METRIC: &str = "azure.functions.cpu.limit"; +const CPU_USAGE_METRIC: &str = "azure.functions.enhanced.test.cpu.usage"; +const CPU_LIMIT_METRIC: &str = "azure.functions.enhanced.test.cpu.limit"; /// Statistics from cgroup v1 files, normalized to nanoseconds struct CgroupStats { - total: Option, // Total CPU usage (from cpuacct.usage) in nanoseconds + total: Option, // Cumulative CPU usage (from cpuacct.usage) in nanoseconds cpu_count: Option, // Number of accessible logical CPUs (from cpuset.cpus) scheduler_period: Option, // CFS scheduler period (from cpu.cfs_period_us) in nanoseconds scheduler_quota: Option, // CFS scheduler quota (from cpu.cfs_quota_us) in nanoseconds @@ -33,8 +33,8 @@ struct CgroupStats { /// Computed CPU total and limit metrics struct CpuStats { - pub total: f64, // Total CPU usage in nanoseconds - pub limit: Option, // CPU limit in nanoseconds + pub total: f64, // Cumulative CPU usage in nanoseconds + pub limit: Option, // CPU limit in nanocores pub defaulted_limit: bool, // Whether CPU limit was defaulted to host CPU count } @@ -47,7 +47,7 @@ fn read_cpu_stats() -> Option { fn build_cpu_stats(cgroup_stats: &CgroupStats) -> Option { let total = cgroup_stats.total?; - let (limit_pct, defaulted) = compute_cpu_limit_pct(cgroup_stats); + let (limit_pct, defaulted) = compute_cpu_limit_nc(cgroup_stats); Some(CpuStats { total: total as f64, @@ -149,9 +149,9 @@ fn read_cpu_count_from_file(path: &str) -> Result { Ok(cpu_count) } -/// Computes the CPU limit percentage -fn compute_cpu_limit_pct(cgroup_stats: &CgroupStats) -> (f64, bool) { - match compute_cgroup_cpu_limit_pct(cgroup_stats) { +/// Computes the CPU limit in nanocores, with fallback to host CPU count +fn compute_cpu_limit_nc(cgroup_stats: &CgroupStats) -> (f64, bool) { + match compute_cgroup_cpu_limit_nc(cgroup_stats) { Some(limit) => (limit, false), None => { let host_cpu_count = num_cpus::get() as f64; @@ -159,24 +159,24 @@ fn compute_cpu_limit_pct(cgroup_stats: &CgroupStats) -> (f64, bool) { "No CPU limit found, defaulting to host CPU count: {} CPUs", host_cpu_count ); - (host_cpu_count * 100.0, true) + (host_cpu_count * 1000000000.0, true) // Convert to nanocores } } } -/// Computes the CPU limit percentage from cgroup statistics +/// Computes the CPU limit in nanocores from cgroup statistics /// Limit is computed using min(CPUSet, CFS CPU Quota) -fn compute_cgroup_cpu_limit_pct(cgroup_stats: &CgroupStats) -> Option { - let mut limit_pct = None; +fn compute_cgroup_cpu_limit_nc(cgroup_stats: &CgroupStats) -> Option { + let mut limit_nc = None; if let Some(cpu_count) = cgroup_stats.cpu_count { let host_cpu_count = num_cpus::get() as u64; if cpu_count != host_cpu_count { - let cpuset_limit_pct = cpu_count as f64 * 100.0; - limit_pct = Some(cpuset_limit_pct); + let cpuset_limit_nc = cpu_count as f64 * 1000000000.0; // Convert to nanocores + limit_nc = Some(cpuset_limit_nc); debug!( - "CPU limit from cpuset: {} CPUs ({}%)", - cpu_count, cpuset_limit_pct + "CPU limit from cpuset: {} CPUs ({} nanocores)", + cpu_count, cpuset_limit_nc ); } } @@ -184,31 +184,31 @@ fn compute_cgroup_cpu_limit_pct(cgroup_stats: &CgroupStats) -> Option { if let (Some(scheduler_quota), Some(scheduler_period)) = (cgroup_stats.scheduler_quota, cgroup_stats.scheduler_period) { - let quota_limit_pct = 100.0 * (scheduler_quota as f64 / scheduler_period as f64); - match limit_pct { + let quota_limit_nc = 1000000000.0 * (scheduler_quota as f64 / scheduler_period as f64); + match limit_nc { None => { - limit_pct = Some(quota_limit_pct); + limit_nc = Some(quota_limit_nc); debug!( - "limit_pct is None, setting CPU limit from cfs quota: {}%", - quota_limit_pct + "limit_pct is None, setting CPU limit from cfs quota: {} nanocores", + quota_limit_nc ); } - Some(current_limit_pct) if quota_limit_pct < current_limit_pct => { - limit_pct = Some(quota_limit_pct); - debug!("CPU limit from cfs quota is less than current limit, setting CPU limit from cfs quota: {}%", quota_limit_pct); + Some(current_limit_nc) if quota_limit_nc < current_limit_nc => { + limit_nc = Some(quota_limit_nc); + debug!("CPU limit from cfs quota is less than current limit, setting CPU limit from cfs quota: {} nanocores", quota_limit_nc); } _ => { - debug!("Keeping cpuset limit: {:?}%", limit_pct); + debug!("Keeping cpuset limit: {:?} nanocores", limit_nc); } } } - limit_pct + limit_nc } pub struct CpuMetricsCollector { aggregator: AggregatorHandle, tags: Option, - last_usage_ns: i64, + last_usage_ns: f64, collection_interval_secs: u64, } @@ -224,7 +224,7 @@ impl CpuMetricsCollector { pub fn new( aggregator: AggregatorHandle, tags: Option, - last_usage_ns: i64, + last_usage_ns: f64, collection_interval_secs: u64, ) -> Self { Self { @@ -235,18 +235,48 @@ impl CpuMetricsCollector { } } - pub fn collect_and_submit(&self) { + pub fn collect_and_submit(&mut self) { if let Some(cpu_stats) = read_cpu_stats() { // Submit metrics debug!("Collected cpu stats!"); + let current_usage_ns = cpu_stats.total; debug!("CPU usage: {}", cpu_stats.total); + + // Skip first collection + if self.last_usage_ns == -1.0 { + debug!("First CPU collection, skipping rate computation"); + self.last_usage_ns = current_usage_ns; + return; + } + + let delta_ns = current_usage_ns - self.last_usage_ns as f64; + self.last_usage_ns = current_usage_ns; + + // Divide nanoseconds delta by collection interval to get usage rate in nanocores + let usage_rate_nc = delta_ns / self.collection_interval_secs as f64; + debug!("Usage rate: {} nanocores/s", usage_rate_nc); + + let now = std::time::UNIX_EPOCH.elapsed() + .map(|d| d.as_secs()) + .unwrap_or(0) + .try_into() + .unwrap_or(0); + + let usage_metric = Metric::new(CPU_USAGE_METRIC.into(), MetricValue::distribution(usage_rate_nc), self.tags.clone(), Some(now)); + + if let Err(e) = self.aggregator.insert_batch(vec![usage_metric]) { + error!("Failed to insert CPU usage metric: {}", e); + } + if let Some(limit) = cpu_stats.limit { - debug!( - "CPU limit: {}%, defaulted: {}", - limit, cpu_stats.defaulted_limit - ); - } else { - debug!("CPU limit: None, defaulted: {}", cpu_stats.defaulted_limit); + debug!("CPU limit: {}", limit); + if cpu_stats.defaulted_limit { + debug!("CPU limit defaulted to host CPU count"); + } + let limit_metric = Metric::new(CPU_LIMIT_METRIC.into(), MetricValue::distribution(limit), self.tags.clone(), Some(now)); + if let Err(e) = self.aggregator.insert_batch(vec![limit_metric]) { + error!("Failed to insert CPU limit metric: {}", e); + } } debug!("Submitting CPU metrics!"); } else { diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index e5240f4a..ad1c28aa 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -202,9 +202,9 @@ pub async fn main() { // If DD_ENHANCED_METRICS is true, start the CPU metrics collector // Use the existing aggregator handle // TODO: See if this works in Google Cloud Functions Gen 1. If not, only enable this for Azure Functions. - let cpu_collector = if dd_enhanced_metrics { + let mut cpu_collector = if dd_enhanced_metrics { aggregator_handle.as_ref().map(|handle| { - CpuMetricsCollector::new(handle.clone(), None, -1, CPU_METRICS_COLLECTION_INTERVAL) + CpuMetricsCollector::new(handle.clone(), None, -1.0, CPU_METRICS_COLLECTION_INTERVAL) }) } else { info!("Enhanced metrics disabled"); @@ -226,8 +226,8 @@ pub async fn main() { } } _ = cpu_collection_interval.tick() => { - if let Some(cpu_collector) = cpu_collector.as_ref() { - cpu_collector.collect_and_submit(); + if let Some(ref mut collector) = cpu_collector { + collector.collect_and_submit(); } } } From 370f6521caba0cbde976a458d77c6e0181d473b8 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Wed, 25 Feb 2026 13:08:20 -0500 Subject: [PATCH 6/8] Test more precise time interval, add instance ID as a tag --- crates/datadog-metrics-collector/src/cpu.rs | 56 +++++++++++++++----- crates/datadog-serverless-compat/src/main.rs | 16 +++++- 2 files changed, 58 insertions(+), 14 deletions(-) diff --git a/crates/datadog-metrics-collector/src/cpu.rs b/crates/datadog-metrics-collector/src/cpu.rs index ed541f11..2d23c891 100644 --- a/crates/datadog-metrics-collector/src/cpu.rs +++ b/crates/datadog-metrics-collector/src/cpu.rs @@ -21,6 +21,7 @@ const CGROUP_CPU_PERIOD_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; // const CGROUP_CPU_QUOTA_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; // Specifies the total amount of time, in microseconds, for which all tasks in a cgroup can run during one period const CPU_USAGE_METRIC: &str = "azure.functions.enhanced.test.cpu.usage"; +const CPU_USAGE_PRECISE_METRIC: &str = "azure.functions.enhanced.test.cpu.usage.precise"; const CPU_LIMIT_METRIC: &str = "azure.functions.enhanced.test.cpu.limit"; /// Statistics from cgroup v1 files, normalized to nanoseconds @@ -47,11 +48,11 @@ fn read_cpu_stats() -> Option { fn build_cpu_stats(cgroup_stats: &CgroupStats) -> Option { let total = cgroup_stats.total?; - let (limit_pct, defaulted) = compute_cpu_limit_nc(cgroup_stats); + let (limit_nc, defaulted) = compute_cpu_limit_nc(cgroup_stats); Some(CpuStats { total: total as f64, - limit: Some(limit_pct), + limit: Some(limit_nc), defaulted_limit: defaulted, }) } @@ -170,8 +171,10 @@ fn compute_cgroup_cpu_limit_nc(cgroup_stats: &CgroupStats) -> Option { let mut limit_nc = None; if let Some(cpu_count) = cgroup_stats.cpu_count { + debug!("CPU count from cpuset: {cpu_count}"); let host_cpu_count = num_cpus::get() as u64; if cpu_count != host_cpu_count { + debug!("CPU count from cpuset is not equal to host CPU count"); let cpuset_limit_nc = cpu_count as f64 * 1000000000.0; // Convert to nanocores limit_nc = Some(cpuset_limit_nc); debug!( @@ -189,7 +192,7 @@ fn compute_cgroup_cpu_limit_nc(cgroup_stats: &CgroupStats) -> Option { None => { limit_nc = Some(quota_limit_nc); debug!( - "limit_pct is None, setting CPU limit from cfs quota: {} nanocores", + "limit_nc is None, setting CPU limit from cfs quota: {} nanocores", quota_limit_nc ); } @@ -208,8 +211,9 @@ fn compute_cgroup_cpu_limit_nc(cgroup_stats: &CgroupStats) -> Option { pub struct CpuMetricsCollector { aggregator: AggregatorHandle, tags: Option, - last_usage_ns: f64, collection_interval_secs: u64, + last_usage_ns: f64, + last_collection_time: std::time::Instant, } impl CpuMetricsCollector { @@ -219,19 +223,18 @@ impl CpuMetricsCollector { /// /// * `aggregator` - The aggregator handle to submit metrics to /// * `tags` - Optional tags to attach to all metrics - /// * `last_usage_ns` - The last usage time in nanoseconds /// * `collection_interval_secs` - The interval in seconds to collect the metrics pub fn new( aggregator: AggregatorHandle, tags: Option, - last_usage_ns: f64, collection_interval_secs: u64, ) -> Self { Self { aggregator, tags, - last_usage_ns, collection_interval_secs, + last_usage_ns: -1.0, + last_collection_time: std::time::Instant::now(), } } @@ -241,28 +244,52 @@ impl CpuMetricsCollector { debug!("Collected cpu stats!"); let current_usage_ns = cpu_stats.total; debug!("CPU usage: {}", cpu_stats.total); - + let now_instant = std::time::Instant::now(); + // Skip first collection if self.last_usage_ns == -1.0 { debug!("First CPU collection, skipping rate computation"); self.last_usage_ns = current_usage_ns; + self.last_collection_time = now_instant; return; } - let delta_ns = current_usage_ns - self.last_usage_ns as f64; + let delta_ns = current_usage_ns - self.last_usage_ns; self.last_usage_ns = current_usage_ns; + let elapsed_secs = self.last_collection_time.elapsed().as_secs_f64(); + debug!("Elapsed time: {} seconds", elapsed_secs); + self.last_collection_time = now_instant; // Divide nanoseconds delta by collection interval to get usage rate in nanocores let usage_rate_nc = delta_ns / self.collection_interval_secs as f64; debug!("Usage rate: {} nanocores/s", usage_rate_nc); + let precise_usage_rate_nc = delta_ns / elapsed_secs; + debug!("Precise usage rate: {} nanocores/s", precise_usage_rate_nc); - let now = std::time::UNIX_EPOCH.elapsed() + let now = std::time::UNIX_EPOCH + .elapsed() .map(|d| d.as_secs()) .unwrap_or(0) .try_into() .unwrap_or(0); - let usage_metric = Metric::new(CPU_USAGE_METRIC.into(), MetricValue::distribution(usage_rate_nc), self.tags.clone(), Some(now)); + let precise_metric = Metric::new( + CPU_USAGE_PRECISE_METRIC.into(), + MetricValue::distribution(precise_usage_rate_nc), + self.tags.clone(), + Some(now), + ); + + if let Err(e) = self.aggregator.insert_batch(vec![precise_metric]) { + error!("Failed to insert CPU usage precise metric: {}", e); + } + + let usage_metric = Metric::new( + CPU_USAGE_METRIC.into(), + MetricValue::distribution(usage_rate_nc), + self.tags.clone(), + Some(now), + ); if let Err(e) = self.aggregator.insert_batch(vec![usage_metric]) { error!("Failed to insert CPU usage metric: {}", e); @@ -273,7 +300,12 @@ impl CpuMetricsCollector { if cpu_stats.defaulted_limit { debug!("CPU limit defaulted to host CPU count"); } - let limit_metric = Metric::new(CPU_LIMIT_METRIC.into(), MetricValue::distribution(limit), self.tags.clone(), Some(now)); + let limit_metric = Metric::new( + CPU_LIMIT_METRIC.into(), + MetricValue::distribution(limit), + self.tags.clone(), + Some(now), + ); if let Err(e) = self.aggregator.insert_batch(vec![limit_metric]) { error!("Failed to insert CPU limit metric: {}", e); } diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index ad1c28aa..066f186d 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -52,7 +52,7 @@ pub async fn main() { .map(|val| val.to_lowercase()) .unwrap_or("info".to_string()); - let (_, env_type) = match read_cloud_env() { + let (app_name, env_type) = match read_cloud_env() { Some(value) => value, None => { error!("Unable to identify environment. Shutting down Mini Agent."); @@ -204,7 +204,19 @@ pub async fn main() { // TODO: See if this works in Google Cloud Functions Gen 1. If not, only enable this for Azure Functions. let mut cpu_collector = if dd_enhanced_metrics { aggregator_handle.as_ref().map(|handle| { - CpuMetricsCollector::new(handle.clone(), None, -1.0, CPU_METRICS_COLLECTION_INTERVAL) + // Elastic Premium and Premium plans use WEBSITE_INSTANCE_ID to identify the instance + // Flex Consumption and Consumption plans use WEBSITE_POD_NAME or CONTAINER_NAME + let instance_id = env::var("WEBSITE_INSTANCE_ID") + .or_else(|_| env::var("WEBSITE_POD_NAME")) + .or_else(|_| env::var("CONTAINER_NAME")) + .ok(); + debug!("Instance ID: {:?}", instance_id); + let mut tag_str = format!("functionname:{}", app_name); + if let Some(id) = instance_id { + tag_str.push_str(&format!(",instance_id:{}", id)); + } + let tags = SortedTags::parse(&tag_str).ok(); + CpuMetricsCollector::new(handle.clone(), tags, CPU_METRICS_COLLECTION_INTERVAL) }) } else { info!("Enhanced metrics disabled"); From dbcc82f05e798c2a164d3d9bf2977aa6db9f8236 Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Wed, 25 Feb 2026 19:35:58 -0500 Subject: [PATCH 7/8] Categorize metrics with azure.functions prefix as enhanced metrics --- crates/dogstatsd/src/origin.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index d0c0952d..ceb5ad61 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -18,6 +18,7 @@ const AZURE_FUNCTIONS_TAG_VALUE: &str = "azurefunction"; const DATADOG_PREFIX: &str = "datadog."; const AWS_LAMBDA_PREFIX: &str = "aws.lambda"; const GOOGLE_CLOUD_RUN_PREFIX: &str = "gcp.run"; +const AZURE_FUNCTIONS_PREFIX: &str = "azure.functions"; const JVM_PREFIX: &str = "jvm."; const RUNTIME_PREFIX: &str = "runtime."; @@ -87,7 +88,7 @@ impl Metric { || metric_name.starts_with(RUNTIME_PREFIX) { OriginService::ServerlessRuntime - } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX { + } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX || metric_prefix == AZURE_FUNCTIONS_PREFIX { OriginService::ServerlessEnhanced } else { OriginService::ServerlessCustom From 61df2ac4fa0993ff96ffe8d74f03b9686c27efca Mon Sep 17 00:00:00 2001 From: Kathie Huang Date: Thu, 26 Feb 2026 13:59:24 -0500 Subject: [PATCH 8/8] Refactor to make CpuMetricsCollector, CpuStats, and metrics submission OS-agnostic, create separate crates for Windows and Linux for reading raw CPU data --- crates/datadog-metrics-collector/src/cpu.rs | 199 ++--------------- crates/datadog-metrics-collector/src/lib.rs | 4 + crates/datadog-metrics-collector/src/linux.rs | 201 ++++++++++++++++++ .../datadog-metrics-collector/src/windows.rs | 14 ++ crates/dogstatsd/src/origin.rs | 20 +- 5 files changed, 242 insertions(+), 196 deletions(-) create mode 100644 crates/datadog-metrics-collector/src/linux.rs create mode 100644 crates/datadog-metrics-collector/src/windows.rs diff --git a/crates/datadog-metrics-collector/src/cpu.rs b/crates/datadog-metrics-collector/src/cpu.rs index 2d23c891..d31ef1c6 100644 --- a/crates/datadog-metrics-collector/src/cpu.rs +++ b/crates/datadog-metrics-collector/src/cpu.rs @@ -3,212 +3,32 @@ //! CPU metrics collector for Azure Functions //! -//! This module provides functionality to read raw CPU statistics from cgroup v1 files, -//! compute the CPU usage and limit, and submit them as distribution metrics to Datadog. +//! This module provides OS-agnostic CPU metrics collection, computing CPU usage +//! adnd limit and submitting them as distribution metrics to Datadog. //! //! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores). use dogstatsd::aggregator_service::AggregatorHandle; use dogstatsd::metric::{Metric, MetricValue, SortedTags}; -use num_cpus; -use std::fs; -use std::io; use tracing::{debug, error}; -const CGROUP_CPU_USAGE_PATH: &str = "/sys/fs/cgroup/cpu/cpuacct.usage"; // Reports the total CPU time, in nanoseconds, consumed by all tasks in this cgroup -const CGROUP_CPUSET_CPUS_PATH: &str = "/sys/fs/cgroup/cpuset/cpuset.cpus"; // Specifies the CPUs that tasks in this cgroup are permitted to access -const CGROUP_CPU_PERIOD_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; // Specifies a period of time, in microseconds, for how regularly a cgroup's access to CPU resources should be reallocated -const CGROUP_CPU_QUOTA_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; // Specifies the total amount of time, in microseconds, for which all tasks in a cgroup can run during one period - const CPU_USAGE_METRIC: &str = "azure.functions.enhanced.test.cpu.usage"; const CPU_USAGE_PRECISE_METRIC: &str = "azure.functions.enhanced.test.cpu.usage.precise"; const CPU_LIMIT_METRIC: &str = "azure.functions.enhanced.test.cpu.limit"; -/// Statistics from cgroup v1 files, normalized to nanoseconds -struct CgroupStats { - total: Option, // Cumulative CPU usage (from cpuacct.usage) in nanoseconds - cpu_count: Option, // Number of accessible logical CPUs (from cpuset.cpus) - scheduler_period: Option, // CFS scheduler period (from cpu.cfs_period_us) in nanoseconds - scheduler_quota: Option, // CFS scheduler quota (from cpu.cfs_quota_us) in nanoseconds -} - /// Computed CPU total and limit metrics -struct CpuStats { +pub struct CpuStats { pub total: f64, // Cumulative CPU usage in nanoseconds pub limit: Option, // CPU limit in nanocores pub defaulted_limit: bool, // Whether CPU limit was defaulted to host CPU count } -fn read_cpu_stats() -> Option { - let cgroup_stats = read_cgroup_stats(); - build_cpu_stats(&cgroup_stats) -} - -/// Builds CPU stats - rate and limit -fn build_cpu_stats(cgroup_stats: &CgroupStats) -> Option { - let total = cgroup_stats.total?; - - let (limit_nc, defaulted) = compute_cpu_limit_nc(cgroup_stats); - - Some(CpuStats { - total: total as f64, - limit: Some(limit_nc), - defaulted_limit: defaulted, - }) -} - -/// Reads raw CPU statistics from cgroup v1 files and converts to nanoseconds -fn read_cgroup_stats() -> CgroupStats { - let total = fs::read_to_string(CGROUP_CPU_USAGE_PATH) - .ok() - .and_then(|contents| contents.trim().parse::().ok()); - if total.is_none() { - debug!("Could not read CPU usage from {CGROUP_CPU_USAGE_PATH}"); - } - - let cpu_count = read_cpu_count_from_file(CGROUP_CPUSET_CPUS_PATH).ok(); - if cpu_count.is_none() { - debug!("Could not read CPU count from {CGROUP_CPUSET_CPUS_PATH}"); - } - - let scheduler_period = fs::read_to_string(CGROUP_CPU_PERIOD_PATH) - .ok() - .and_then(|contents| contents.trim().parse::().map(|v| v * 1000).ok()); // Convert from microseconds to nanoseconds - if scheduler_period.is_none() { - debug!("Could not read scheduler period from {CGROUP_CPU_PERIOD_PATH}"); - } - - let scheduler_quota = fs::read_to_string(CGROUP_CPU_QUOTA_PATH) - .ok() - .and_then(|contents| { - contents.trim().parse::().ok().and_then(|quota| { - // Convert from microseconds to nanoseconds - if quota == -1 { - debug!("CFS scheduler quota is -1, setting to None"); - None - } else { - Some((quota * 1000) as u64) - } - }) - }); - if scheduler_quota.is_none() { - debug!("Could not read scheduler quota from {CGROUP_CPU_QUOTA_PATH}"); - } - - CgroupStats { - total, - cpu_count, - scheduler_period, - scheduler_quota, - } -} - -/// Reads CPU count from cpuset.cpus -/// -/// The cpuset.cpus file contains a comma-separated list, with dashes to represent ranges of CPUs, -/// e.g., "0-2,16" represents CPUs 0, 1, 2, and 16 -/// This function returns the count of CPUs, in this case 4. -fn read_cpu_count_from_file(path: &str) -> Result { - let contents = fs::read_to_string(path)?; - let cpuset_str = contents.trim(); - if cpuset_str.is_empty() { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - format!("File {path} is empty"), - )); - } - debug!("Contents of {path}: {cpuset_str}"); - - let mut cpu_count: u64 = 0; - - for part in cpuset_str.split(',') { - let range: Vec<&str> = part.split('-').collect(); - if range.len() == 2 { - // Range like "0-3" - debug!("Range: {range:?}"); - let start: u64 = range[0].parse().map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("Failed to parse u64 from range {range:?}: {e}"), - ) - })?; - let end: u64 = range[1].parse().map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("Failed to parse u64 from range {range:?}: {e}"), - ) - })?; - cpu_count += end - start + 1; - } else { - // Single CPU like "2" - debug!("Single CPU: {part}"); - cpu_count += 1; - } - } - - debug!("Total CPU count: {cpu_count}"); - Ok(cpu_count) -} - -/// Computes the CPU limit in nanocores, with fallback to host CPU count -fn compute_cpu_limit_nc(cgroup_stats: &CgroupStats) -> (f64, bool) { - match compute_cgroup_cpu_limit_nc(cgroup_stats) { - Some(limit) => (limit, false), - None => { - let host_cpu_count = num_cpus::get() as f64; - debug!( - "No CPU limit found, defaulting to host CPU count: {} CPUs", - host_cpu_count - ); - (host_cpu_count * 1000000000.0, true) // Convert to nanocores - } - } -} - -/// Computes the CPU limit in nanocores from cgroup statistics -/// Limit is computed using min(CPUSet, CFS CPU Quota) -fn compute_cgroup_cpu_limit_nc(cgroup_stats: &CgroupStats) -> Option { - let mut limit_nc = None; - - if let Some(cpu_count) = cgroup_stats.cpu_count { - debug!("CPU count from cpuset: {cpu_count}"); - let host_cpu_count = num_cpus::get() as u64; - if cpu_count != host_cpu_count { - debug!("CPU count from cpuset is not equal to host CPU count"); - let cpuset_limit_nc = cpu_count as f64 * 1000000000.0; // Convert to nanocores - limit_nc = Some(cpuset_limit_nc); - debug!( - "CPU limit from cpuset: {} CPUs ({} nanocores)", - cpu_count, cpuset_limit_nc - ); - } - } - - if let (Some(scheduler_quota), Some(scheduler_period)) = - (cgroup_stats.scheduler_quota, cgroup_stats.scheduler_period) - { - let quota_limit_nc = 1000000000.0 * (scheduler_quota as f64 / scheduler_period as f64); - match limit_nc { - None => { - limit_nc = Some(quota_limit_nc); - debug!( - "limit_nc is None, setting CPU limit from cfs quota: {} nanocores", - quota_limit_nc - ); - } - Some(current_limit_nc) if quota_limit_nc < current_limit_nc => { - limit_nc = Some(quota_limit_nc); - debug!("CPU limit from cfs quota is less than current limit, setting CPU limit from cfs quota: {} nanocores", quota_limit_nc); - } - _ => { - debug!("Keeping cpuset limit: {:?} nanocores", limit_nc); - } - } - } - limit_nc +pub trait CpuStatsReader { + fn read(&self) -> Option; } pub struct CpuMetricsCollector { + reader: Box, aggregator: AggregatorHandle, tags: Option, collection_interval_secs: u64, @@ -229,7 +49,12 @@ impl CpuMetricsCollector { tags: Option, collection_interval_secs: u64, ) -> Self { + #[cfg(target_os = "windows")] + let reader: Box = Box::new(crate::windows::WindowsCpuStatsReader); + #[cfg(not(target_os = "windows"))] + let reader: Box = Box::new(crate::linux::LinuxCpuStatsReader); Self { + reader, aggregator, tags, collection_interval_secs, @@ -239,7 +64,7 @@ impl CpuMetricsCollector { } pub fn collect_and_submit(&mut self) { - if let Some(cpu_stats) = read_cpu_stats() { + if let Some(cpu_stats) = self.reader.read() { // Submit metrics debug!("Collected cpu stats!"); let current_usage_ns = cpu_stats.total; diff --git a/crates/datadog-metrics-collector/src/lib.rs b/crates/datadog-metrics-collector/src/lib.rs index dc6c5f0f..f600d65c 100644 --- a/crates/datadog-metrics-collector/src/lib.rs +++ b/crates/datadog-metrics-collector/src/lib.rs @@ -8,3 +8,7 @@ #![cfg_attr(not(test), deny(clippy::unimplemented))] pub mod cpu; +#[cfg(not(target_os = "windows"))] +pub mod linux; +#[cfg(target_os = "windows")] +pub mod windows; diff --git a/crates/datadog-metrics-collector/src/linux.rs b/crates/datadog-metrics-collector/src/linux.rs new file mode 100644 index 00000000..87220fc3 --- /dev/null +++ b/crates/datadog-metrics-collector/src/linux.rs @@ -0,0 +1,201 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! CPU metrics collector for Azure Functions +//! +//! This module provides functionality to read raw CPU statistics from cgroup v1 files +//! and compute the CPU usage and limit in Linux environments. +//! +//! All CPU metrics are reported in nanocores (1 core = 1,000,000,000 nanocores). + +use crate::cpu::{CpuStats, CpuStatsReader}; +use num_cpus; +use std::fs; +use std::io; +use tracing::debug; + +const CGROUP_CPU_USAGE_PATH: &str = "/sys/fs/cgroup/cpu/cpuacct.usage"; // Reports the total CPU time, in nanoseconds, consumed by all tasks in this cgroup +const CGROUP_CPUSET_CPUS_PATH: &str = "/sys/fs/cgroup/cpuset/cpuset.cpus"; // Specifies the CPUs that tasks in this cgroup are permitted to access +const CGROUP_CPU_PERIOD_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_period_us"; // Specifies a period of time, in microseconds, for how regularly a cgroup's access to CPU resources should be reallocated +const CGROUP_CPU_QUOTA_PATH: &str = "/sys/fs/cgroup/cpu/cpu.cfs_quota_us"; // Specifies the total amount of time, in microseconds, for which all tasks in a cgroup can run during one period + +/// Statistics from cgroup v1 files, normalized to nanoseconds +struct CgroupStats { + total: Option, // Cumulative CPU usage (from cpuacct.usage) in nanoseconds + cpu_count: Option, // Number of accessible logical CPUs (from cpuset.cpus) + scheduler_period: Option, // CFS scheduler period (from cpu.cfs_period_us) in nanoseconds + scheduler_quota: Option, // CFS scheduler quota (from cpu.cfs_quota_us) in nanoseconds +} + +pub struct LinuxCpuStatsReader; + +impl CpuStatsReader for LinuxCpuStatsReader { + fn read(&self) -> Option { + let cgroup_stats = read_cgroup_stats(); + build_cpu_stats(&cgroup_stats) + } +} + +/// Builds CPU stats - rate and limit +fn build_cpu_stats(cgroup_stats: &CgroupStats) -> Option { + let total = cgroup_stats.total?; + + let (limit_nc, defaulted) = compute_cpu_limit_nc(cgroup_stats); + + Some(CpuStats { + total: total as f64, + limit: Some(limit_nc), + defaulted_limit: defaulted, + }) +} + +/// Reads raw CPU statistics from cgroup v1 files and converts to nanoseconds +fn read_cgroup_stats() -> CgroupStats { + let total = fs::read_to_string(CGROUP_CPU_USAGE_PATH) + .ok() + .and_then(|contents| contents.trim().parse::().ok()); + if total.is_none() { + debug!("Could not read CPU usage from {CGROUP_CPU_USAGE_PATH}"); + } + + let cpu_count = read_cpu_count_from_file(CGROUP_CPUSET_CPUS_PATH).ok(); + if cpu_count.is_none() { + debug!("Could not read CPU count from {CGROUP_CPUSET_CPUS_PATH}"); + } + + let scheduler_period = fs::read_to_string(CGROUP_CPU_PERIOD_PATH) + .ok() + .and_then(|contents| contents.trim().parse::().map(|v| v * 1000).ok()); // Convert from microseconds to nanoseconds + if scheduler_period.is_none() { + debug!("Could not read scheduler period from {CGROUP_CPU_PERIOD_PATH}"); + } + + let scheduler_quota = fs::read_to_string(CGROUP_CPU_QUOTA_PATH) + .ok() + .and_then(|contents| { + contents.trim().parse::().ok().and_then(|quota| { + // Convert from microseconds to nanoseconds + if quota == -1 { + debug!("CFS scheduler quota is -1, setting to None"); + None + } else { + Some((quota * 1000) as u64) + } + }) + }); + if scheduler_quota.is_none() { + debug!("Could not read scheduler quota from {CGROUP_CPU_QUOTA_PATH}"); + } + + CgroupStats { + total, + cpu_count, + scheduler_period, + scheduler_quota, + } +} + +/// Reads CPU count from cpuset.cpus +/// +/// The cpuset.cpus file contains a comma-separated list, with dashes to represent ranges of CPUs, +/// e.g., "0-2,16" represents CPUs 0, 1, 2, and 16 +/// This function returns the count of CPUs, in this case 4. +fn read_cpu_count_from_file(path: &str) -> Result { + let contents = fs::read_to_string(path)?; + let cpuset_str = contents.trim(); + if cpuset_str.is_empty() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("File {path} is empty"), + )); + } + debug!("Contents of {path}: {cpuset_str}"); + + let mut cpu_count: u64 = 0; + + for part in cpuset_str.split(',') { + let range: Vec<&str> = part.split('-').collect(); + if range.len() == 2 { + // Range like "0-3" + debug!("Range: {range:?}"); + let start: u64 = range[0].parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to parse u64 from range {range:?}: {e}"), + ) + })?; + let end: u64 = range[1].parse().map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Failed to parse u64 from range {range:?}: {e}"), + ) + })?; + cpu_count += end - start + 1; + } else { + // Single CPU like "2" + debug!("Single CPU: {part}"); + cpu_count += 1; + } + } + + debug!("Total CPU count: {cpu_count}"); + Ok(cpu_count) +} + +/// Computes the CPU limit in nanocores, with fallback to host CPU count +fn compute_cpu_limit_nc(cgroup_stats: &CgroupStats) -> (f64, bool) { + match compute_cgroup_cpu_limit_nc(cgroup_stats) { + Some(limit) => (limit, false), + None => { + let host_cpu_count = num_cpus::get() as f64; + debug!( + "No CPU limit found, defaulting to host CPU count: {} CPUs", + host_cpu_count + ); + (host_cpu_count * 1000000000.0, true) // Convert to nanocores + } + } +} + +/// Computes the CPU limit in nanocores from cgroup statistics +/// Limit is computed using min(CPUSet, CFS CPU Quota) +fn compute_cgroup_cpu_limit_nc(cgroup_stats: &CgroupStats) -> Option { + let mut limit_nc = None; + + if let Some(cpu_count) = cgroup_stats.cpu_count { + debug!("CPU count from cpuset: {cpu_count}"); + let host_cpu_count = num_cpus::get() as u64; + if cpu_count != host_cpu_count { + debug!("CPU count from cpuset is not equal to host CPU count"); + let cpuset_limit_nc = cpu_count as f64 * 1000000000.0; // Convert to nanocores + limit_nc = Some(cpuset_limit_nc); + debug!( + "CPU limit from cpuset: {} CPUs ({} nanocores)", + cpu_count, cpuset_limit_nc + ); + } + } + + if let (Some(scheduler_quota), Some(scheduler_period)) = + (cgroup_stats.scheduler_quota, cgroup_stats.scheduler_period) + { + let quota_limit_nc = 1000000000.0 * (scheduler_quota as f64 / scheduler_period as f64); + match limit_nc { + None => { + limit_nc = Some(quota_limit_nc); + debug!( + "limit_nc is None, setting CPU limit from cfs quota: {} nanocores", + quota_limit_nc + ); + } + Some(current_limit_nc) if quota_limit_nc < current_limit_nc => { + limit_nc = Some(quota_limit_nc); + debug!("CPU limit from cfs quota is less than current limit, setting CPU limit from cfs quota: {} nanocores", quota_limit_nc); + } + _ => { + debug!("Keeping cpuset limit: {:?} nanocores", limit_nc); + } + } + } + limit_nc +} diff --git a/crates/datadog-metrics-collector/src/windows.rs b/crates/datadog-metrics-collector/src/windows.rs new file mode 100644 index 00000000..1be055ab --- /dev/null +++ b/crates/datadog-metrics-collector/src/windows.rs @@ -0,0 +1,14 @@ +// Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::cpu::{CpuStats, CpuStatsReader}; +use tracing::debug; + +pub struct WindowsCpuStatsReader; + +impl CpuStatsReader for WindowsCpuStatsReader { + fn read(&self) -> Option { + debug!("Reading CPU stats from Windows"); + None + } +} diff --git a/crates/dogstatsd/src/origin.rs b/crates/dogstatsd/src/origin.rs index ceb5ad61..818766d5 100644 --- a/crates/dogstatsd/src/origin.rs +++ b/crates/dogstatsd/src/origin.rs @@ -84,15 +84,17 @@ impl Metric { .join("."); // Determine the service based on metric prefix first - let service = if metric_name.starts_with(JVM_PREFIX) - || metric_name.starts_with(RUNTIME_PREFIX) - { - OriginService::ServerlessRuntime - } else if metric_prefix == AWS_LAMBDA_PREFIX || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX || metric_prefix == AZURE_FUNCTIONS_PREFIX { - OriginService::ServerlessEnhanced - } else { - OriginService::ServerlessCustom - }; + let service = + if metric_name.starts_with(JVM_PREFIX) || metric_name.starts_with(RUNTIME_PREFIX) { + OriginService::ServerlessRuntime + } else if metric_prefix == AWS_LAMBDA_PREFIX + || metric_prefix == GOOGLE_CLOUD_RUN_PREFIX + || metric_prefix == AZURE_FUNCTIONS_PREFIX + { + OriginService::ServerlessEnhanced + } else { + OriginService::ServerlessCustom + }; // Then determine the category based on tags let category = if has_tag_value(&tags, AWS_LAMBDA_TAG_KEY, "") {