diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index f8bcf5cec..2e5c23ae4 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -824,9 +824,11 @@ async fn handle_event_bus_event( Event::Telemetry(event) => { debug!("Telemetry event received: {:?}", event); match event.record { - TelemetryRecord::PlatformInitStart { .. } => { + TelemetryRecord::PlatformInitStart { + runtime_version, .. + } => { if let Err(e) = invocation_processor_handle - .on_platform_init_start(event.time) + .on_platform_init_start(event.time, runtime_version) .await { error!("Failed to send platform init start to processor: {}", e); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index a5282d99e..ccb9f497e 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -284,7 +284,10 @@ impl Processor { /// /// This is used to create a cold start span, since this telemetry event does not /// provide a `request_id`, we try to guess which invocation is the cold start. - pub fn on_platform_init_start(&mut self, time: DateTime) { + pub fn on_platform_init_start(&mut self, time: DateTime, runtime_version: Option) { + if runtime_version.as_deref().map_or(false, |rv| rv.contains("DurableFunction")) { + self.enhanced_metrics.set_durable_function_tag(); + } let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) .expect("time went backwards") @@ -1714,6 +1717,105 @@ mod tests { ), } + #[tokio::test] + async fn test_on_platform_init_start_sets_durable_function_tag() { + let mut processor = setup(); + let time = Utc::now(); + + processor.on_platform_init_start(time, Some("python:3.14.DurableFunction.v6".to_string())); + + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + processor.enhanced_metrics.increment_invocation_metric(now); + + let ts = (now / 10) * 10; + let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok(); + let entry = processor + .enhanced_metrics + .aggr_handle + .get_entry_by_id( + crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(), + durable_tags, + ts, + ) + .await + .unwrap(); + assert!( + entry.is_some(), + "Expected durable_function:true tag on enhanced metric" + ); + } + + #[tokio::test] + async fn test_on_platform_init_start_no_durable_function_tag_for_regular_runtime() { + let mut processor = setup(); + let time = Utc::now(); + + processor.on_platform_init_start(time, Some("python:3.12.v10".to_string())); + + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + processor.enhanced_metrics.increment_invocation_metric(now); + + let ts = (now / 10) * 10; + let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok(); + let entry = processor + .enhanced_metrics + .aggr_handle + .get_entry_by_id( + crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(), + durable_tags, + ts, + ) + .await + .unwrap(); + assert!( + entry.is_none(), + "Expected no durable_function:true tag for regular runtime" + ); + } + + #[tokio::test] + async fn test_on_platform_init_start_no_durable_function_tag_when_runtime_version_is_none() { + let mut processor = setup(); + let time = Utc::now(); + + processor.on_platform_init_start(time, None); + + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + processor.enhanced_metrics.increment_invocation_metric(now); + + let ts = (now / 10) * 10; + let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok(); + let entry = processor + .enhanced_metrics + .aggr_handle + .get_entry_by_id( + crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(), + durable_tags, + ts, + ) + .await + .unwrap(); + assert!( + entry.is_none(), + "Expected no durable_function:true tag when runtime_version is None" + ); + } + #[tokio::test] async fn test_is_managed_instance_mode_returns_true() { use crate::config::aws::LAMBDA_MANAGED_INSTANCES_INIT_TYPE; diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 25ca0bff4..82457ec21 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -40,6 +40,7 @@ pub enum ProcessorCommand { }, PlatformInitStart { time: DateTime, + runtime_version: Option, }, PlatformInitReport { init_type: InitType, @@ -140,9 +141,13 @@ impl InvocationProcessorHandle { pub async fn on_platform_init_start( &self, time: DateTime, + runtime_version: Option, ) -> Result<(), mpsc::error::SendError> { self.sender - .send(ProcessorCommand::PlatformInitStart { time }) + .send(ProcessorCommand::PlatformInitStart { + time, + runtime_version, + }) .await } @@ -454,8 +459,11 @@ impl InvocationProcessorService { ProcessorCommand::InvokeEvent { request_id } => { self.processor.on_invoke_event(request_id); } - ProcessorCommand::PlatformInitStart { time } => { - self.processor.on_platform_init_start(time); + ProcessorCommand::PlatformInitStart { + time, + runtime_version, + } => { + self.processor.on_platform_init_start(time, runtime_version); } ProcessorCommand::PlatformInitReport { init_type, diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 4e41e4b76..2a1829b6c 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -61,6 +61,12 @@ impl Lambda { .insert(String::from("runtime"), runtime.to_string()); } + /// Sets the `durable_function:true` tag in `dynamic_value_tags` + pub fn set_durable_function_tag(&mut self) { + self.dynamic_value_tags + .insert(String::from("durable_function"), String::from("true")); + } + fn get_dynamic_value_tags(&self) -> Option { let vec_tags: Vec = self .dynamic_value_tags @@ -146,10 +152,11 @@ impl Lambda { if !self.config.enhanced_metrics { return; } + let tags = self.get_dynamic_value_tags(); let metric = Metric::new( metric_name.into(), MetricValue::distribution(1f64), - self.get_dynamic_value_tags(), + tags, Some(timestamp), ); if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { @@ -838,6 +845,33 @@ mod tests { } } + #[tokio::test] + async fn test_set_durable_function_tag() { + let (metrics_aggr, my_config) = setup(); + let mut lambda = Lambda::new(metrics_aggr.clone(), my_config); + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + + lambda.set_durable_function_tag(); + lambda.increment_invocation_metric(now); + + // Verify the metric was emitted with the durable_function:true tag + let ts = (now / 10) * 10; + let durable_tags = SortedTags::parse("durable_function:true").ok(); + let entry = metrics_aggr + .get_entry_by_id(constants::INVOCATIONS_METRIC.into(), durable_tags, ts) + .await + .unwrap(); + assert!( + entry.is_some(), + "Expected metric with durable_function:true tag" + ); + } + #[tokio::test] #[allow(clippy::float_cmp)] async fn test_increment_invocation_metric() {