From f9a1c011dd6466df955bb913616e922a4f9ced49 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Fri, 20 Feb 2026 14:49:37 -0500 Subject: [PATCH 1/6] Add durable_function:true tag to enhanced metrics for Durable Function runtimes When the PlatformInitStart payload contains a runtime_version field with "DurableFunction" (e.g. "python:3.14.DurableFunction.v6"), set the tag durable_function:true on all enhanced metrics generated by the extension. Co-Authored-By: Claude Sonnet 4.6 --- bottlecap/src/bin/bottlecap/main.rs | 7 +- .../src/lifecycle/invocation/processor.rs | 110 +++++++++++++++++- .../lifecycle/invocation/processor_service.rs | 14 ++- bottlecap/src/metrics/enhanced/lambda.rs | 34 ++++++ 4 files changed, 159 insertions(+), 6 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index f8bcf5cec..a4b9076f1 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -824,9 +824,12 @@ async fn handle_event_bus_event( Event::Telemetry(event) => { debug!("Telemetry event received: {:?}", event); match event.record { - TelemetryRecord::PlatformInitStart { .. } => { + TelemetryRecord::PlatformInitStart { + runtime_version, + .. + } => { if let Err(e) = invocation_processor_handle - .on_platform_init_start(event.time) + .on_platform_init_start(event.time, runtime_version) .await { error!("Failed to send platform init start to processor: {}", e); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index a5282d99e..36ebf5ccf 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -284,7 +284,13 @@ impl Processor { /// /// This is used to create a cold start span, since this telemetry event does not /// provide a `request_id`, we try to guess which invocation is the cold start. - pub fn on_platform_init_start(&mut self, time: DateTime) { + pub fn on_platform_init_start(&mut self, time: DateTime, runtime_version: Option) { + if runtime_version + .as_deref() + .is_some_and(|rv| rv.contains("DurableFunction")) + { + self.enhanced_metrics.set_durable_function_tag(); + } let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) .expect("time went backwards") @@ -1714,6 +1720,108 @@ mod tests { ), } + #[tokio::test] + async fn test_on_platform_init_start_sets_durable_function_tag() { + let mut processor = setup(); + let time = Utc::now(); + + processor.on_platform_init_start( + time, + Some("python:3.14.DurableFunction.v6".to_string()), + ); + + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + processor.enhanced_metrics.increment_invocation_metric(now); + + let ts = (now / 10) * 10; + let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok(); + let entry = processor + .enhanced_metrics + .aggr_handle + .get_entry_by_id( + crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(), + durable_tags, + ts, + ) + .await + .unwrap(); + assert!( + entry.is_some(), + "Expected durable_function:true tag on enhanced metric" + ); + } + + #[tokio::test] + async fn test_on_platform_init_start_no_durable_function_tag_for_regular_runtime() { + let mut processor = setup(); + let time = Utc::now(); + + processor.on_platform_init_start(time, Some("python:3.12.v10".to_string())); + + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + processor.enhanced_metrics.increment_invocation_metric(now); + + let ts = (now / 10) * 10; + let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok(); + let entry = processor + .enhanced_metrics + .aggr_handle + .get_entry_by_id( + crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(), + durable_tags, + ts, + ) + .await + .unwrap(); + assert!( + entry.is_none(), + "Expected no durable_function:true tag for regular runtime" + ); + } + + #[tokio::test] + async fn test_on_platform_init_start_no_durable_function_tag_when_runtime_version_is_none() { + let mut processor = setup(); + let time = Utc::now(); + + processor.on_platform_init_start(time, None); + + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + processor.enhanced_metrics.increment_invocation_metric(now); + + let ts = (now / 10) * 10; + let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok(); + let entry = processor + .enhanced_metrics + .aggr_handle + .get_entry_by_id( + crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(), + durable_tags, + ts, + ) + .await + .unwrap(); + assert!( + entry.is_none(), + "Expected no durable_function:true tag when runtime_version is None" + ); + } + #[tokio::test] async fn test_is_managed_instance_mode_returns_true() { use crate::config::aws::LAMBDA_MANAGED_INSTANCES_INIT_TYPE; diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index 25ca0bff4..82457ec21 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -40,6 +40,7 @@ pub enum ProcessorCommand { }, PlatformInitStart { time: DateTime, + runtime_version: Option, }, PlatformInitReport { init_type: InitType, @@ -140,9 +141,13 @@ impl InvocationProcessorHandle { pub async fn on_platform_init_start( &self, time: DateTime, + runtime_version: Option, ) -> Result<(), mpsc::error::SendError> { self.sender - .send(ProcessorCommand::PlatformInitStart { time }) + .send(ProcessorCommand::PlatformInitStart { + time, + runtime_version, + }) .await } @@ -454,8 +459,11 @@ impl InvocationProcessorService { ProcessorCommand::InvokeEvent { request_id } => { self.processor.on_invoke_event(request_id); } - ProcessorCommand::PlatformInitStart { time } => { - self.processor.on_platform_init_start(time); + ProcessorCommand::PlatformInitStart { + time, + runtime_version, + } => { + self.processor.on_platform_init_start(time, runtime_version); } ProcessorCommand::PlatformInitReport { init_type, diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 4e41e4b76..ec5c155d1 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -61,6 +61,12 @@ impl Lambda { .insert(String::from("runtime"), runtime.to_string()); } + /// Sets the `durable_function:true` tag in `dynamic_value_tags` + pub fn set_durable_function_tag(&mut self) { + self.dynamic_value_tags + .insert(String::from("durable_function"), String::from("true")); + } + fn get_dynamic_value_tags(&self) -> Option { let vec_tags: Vec = self .dynamic_value_tags @@ -838,6 +844,34 @@ mod tests { } } + #[tokio::test] + async fn test_set_durable_function_tag() { + let (metrics_aggr, my_config) = setup(); + let mut lambda = Lambda::new(metrics_aggr.clone(), my_config); + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + + lambda.set_durable_function_tag(); + lambda.increment_invocation_metric(now); + + // Verify the metric was emitted with the durable_function:true tag + let ts = (now / 10) * 10; + let durable_tags = SortedTags::parse("durable_function:true").ok(); + let entry = metrics_aggr + .get_entry_by_id( + constants::INVOCATIONS_METRIC.into(), + durable_tags, + ts, + ) + .await + .unwrap(); + assert!(entry.is_some(), "Expected metric with durable_function:true tag"); + } + #[tokio::test] #[allow(clippy::float_cmp)] async fn test_increment_invocation_metric() { From 32bca7c068a830dab5a65574f1a3c244773b3ad2 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Mon, 23 Feb 2026 08:39:34 -0500 Subject: [PATCH 2/6] Add durable function tag for enhanced metrics --- bottlecap/src/flushing/service.rs | 16 ++++++++++++++++ bottlecap/src/lifecycle/invocation/processor.rs | 7 +++++++ bottlecap/src/metrics/enhanced/lambda.rs | 17 ++++++++++++++++- 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index 973a72cfc..f5e3d1864 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -98,6 +98,22 @@ impl FlushingService { let series = flush_response.series; let sketches = flush_response.distributions; + debug!( + "FLUSHING_SERVICE | flushing {} series batch(es) and {} sketch batch(es)", + series.len(), + sketches.len() + ); + for (i, sketch_payload) in sketches.iter().enumerate() { + for sketch in &sketch_payload.sketches { + debug!( + "FLUSHING_SERVICE | sketch batch[{}]: metric='{}' tags={:?}", + i, + sketch.metric, + sketch.tags + ); + } + } + for (idx, flusher) in self.metrics_flushers.iter().enumerate() { let flusher = flusher.clone(); let series_clone = series.clone(); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 36ebf5ccf..c8cf5b39b 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -285,11 +285,18 @@ impl Processor { /// This is used to create a cold start span, since this telemetry event does not /// provide a `request_id`, we try to guess which invocation is the cold start. pub fn on_platform_init_start(&mut self, time: DateTime, runtime_version: Option) { + debug!( + "PlatformInitStart received: runtime_version={:?}", + runtime_version + ); if runtime_version .as_deref() .is_some_and(|rv| rv.contains("DurableFunction")) { + debug!("DurableFunction detected in runtime_version, setting durable_function:true tag on enhanced metrics"); self.enhanced_metrics.set_durable_function_tag(); + } else { + debug!("runtime_version does not contain 'DurableFunction', skipping durable_function tag"); } let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index ec5c155d1..3eaa08099 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -63,8 +63,13 @@ impl Lambda { /// Sets the `durable_function:true` tag in `dynamic_value_tags` pub fn set_durable_function_tag(&mut self) { + debug!("Enhanced metrics: inserting durable_function:true into dynamic_value_tags"); self.dynamic_value_tags .insert(String::from("durable_function"), String::from("true")); + debug!( + "Enhanced metrics: dynamic_value_tags after set_durable_function_tag: {:?}", + self.dynamic_value_tags + ); } fn get_dynamic_value_tags(&self) -> Option { @@ -76,6 +81,11 @@ impl Lambda { let string_tags = vec_tags.join(","); + debug!( + "Enhanced metrics: get_dynamic_value_tags returning tag string: {:?}", + string_tags + ); + SortedTags::parse(&string_tags).ok() } @@ -152,10 +162,15 @@ impl Lambda { if !self.config.enhanced_metrics { return; } + debug!( + "Enhanced metrics: creating metric '{}' with dynamic_value_tags: {:?}", + metric_name, self.dynamic_value_tags + ); + let tags = self.get_dynamic_value_tags(); let metric = Metric::new( metric_name.into(), MetricValue::distribution(1f64), - self.get_dynamic_value_tags(), + tags, Some(timestamp), ); if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) { From 109f5fafeb98d5bf070210eafb0786610ee4a95e Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 25 Feb 2026 17:36:43 -0500 Subject: [PATCH 3/6] Remove debug log --- bottlecap/src/lifecycle/invocation/processor.rs | 7 ------- bottlecap/src/metrics/enhanced/lambda.rs | 10 ---------- 2 files changed, 17 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index c8cf5b39b..36ebf5ccf 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -285,18 +285,11 @@ impl Processor { /// This is used to create a cold start span, since this telemetry event does not /// provide a `request_id`, we try to guess which invocation is the cold start. pub fn on_platform_init_start(&mut self, time: DateTime, runtime_version: Option) { - debug!( - "PlatformInitStart received: runtime_version={:?}", - runtime_version - ); if runtime_version .as_deref() .is_some_and(|rv| rv.contains("DurableFunction")) { - debug!("DurableFunction detected in runtime_version, setting durable_function:true tag on enhanced metrics"); self.enhanced_metrics.set_durable_function_tag(); - } else { - debug!("runtime_version does not contain 'DurableFunction', skipping durable_function tag"); } let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 3eaa08099..e40e208e7 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -63,13 +63,8 @@ impl Lambda { /// Sets the `durable_function:true` tag in `dynamic_value_tags` pub fn set_durable_function_tag(&mut self) { - debug!("Enhanced metrics: inserting durable_function:true into dynamic_value_tags"); self.dynamic_value_tags .insert(String::from("durable_function"), String::from("true")); - debug!( - "Enhanced metrics: dynamic_value_tags after set_durable_function_tag: {:?}", - self.dynamic_value_tags - ); } fn get_dynamic_value_tags(&self) -> Option { @@ -81,11 +76,6 @@ impl Lambda { let string_tags = vec_tags.join(","); - debug!( - "Enhanced metrics: get_dynamic_value_tags returning tag string: {:?}", - string_tags - ); - SortedTags::parse(&string_tags).ok() } From ace03169f3984d45c418d31b5d4453078c0190e3 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 25 Feb 2026 17:37:00 -0500 Subject: [PATCH 4/6] fmt --- bottlecap/src/bin/bottlecap/main.rs | 3 +-- bottlecap/src/flushing/service.rs | 4 +--- bottlecap/src/lifecycle/invocation/processor.rs | 5 +---- bottlecap/src/metrics/enhanced/lambda.rs | 11 +++++------ 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/bottlecap/src/bin/bottlecap/main.rs b/bottlecap/src/bin/bottlecap/main.rs index a4b9076f1..2e5c23ae4 100644 --- a/bottlecap/src/bin/bottlecap/main.rs +++ b/bottlecap/src/bin/bottlecap/main.rs @@ -825,8 +825,7 @@ async fn handle_event_bus_event( debug!("Telemetry event received: {:?}", event); match event.record { TelemetryRecord::PlatformInitStart { - runtime_version, - .. + runtime_version, .. } => { if let Err(e) = invocation_processor_handle .on_platform_init_start(event.time, runtime_version) diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index f5e3d1864..1f0f2d946 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -107,9 +107,7 @@ impl FlushingService { for sketch in &sketch_payload.sketches { debug!( "FLUSHING_SERVICE | sketch batch[{}]: metric='{}' tags={:?}", - i, - sketch.metric, - sketch.tags + i, sketch.metric, sketch.tags ); } } diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 36ebf5ccf..e369e5238 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -1725,10 +1725,7 @@ mod tests { let mut processor = setup(); let time = Utc::now(); - processor.on_platform_init_start( - time, - Some("python:3.14.DurableFunction.v6".to_string()), - ); + processor.on_platform_init_start(time, Some("python:3.14.DurableFunction.v6".to_string())); let now: i64 = std::time::UNIX_EPOCH .elapsed() diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index e40e208e7..679e8e0a0 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -867,14 +867,13 @@ mod tests { let ts = (now / 10) * 10; let durable_tags = SortedTags::parse("durable_function:true").ok(); let entry = metrics_aggr - .get_entry_by_id( - constants::INVOCATIONS_METRIC.into(), - durable_tags, - ts, - ) + .get_entry_by_id(constants::INVOCATIONS_METRIC.into(), durable_tags, ts) .await .unwrap(); - assert!(entry.is_some(), "Expected metric with durable_function:true tag"); + assert!( + entry.is_some(), + "Expected metric with durable_function:true tag" + ); } #[tokio::test] From 6c0e76d4aafa0c8fa400b43c4fa639d116afdc72 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Wed, 25 Feb 2026 17:45:12 -0500 Subject: [PATCH 5/6] Log runtime version --- bottlecap/src/flushing/service.rs | 14 -------------- bottlecap/src/lifecycle/invocation/processor.rs | 10 +++++----- bottlecap/src/metrics/enhanced/lambda.rs | 4 ---- 3 files changed, 5 insertions(+), 23 deletions(-) diff --git a/bottlecap/src/flushing/service.rs b/bottlecap/src/flushing/service.rs index 1f0f2d946..973a72cfc 100644 --- a/bottlecap/src/flushing/service.rs +++ b/bottlecap/src/flushing/service.rs @@ -98,20 +98,6 @@ impl FlushingService { let series = flush_response.series; let sketches = flush_response.distributions; - debug!( - "FLUSHING_SERVICE | flushing {} series batch(es) and {} sketch batch(es)", - series.len(), - sketches.len() - ); - for (i, sketch_payload) in sketches.iter().enumerate() { - for sketch in &sketch_payload.sketches { - debug!( - "FLUSHING_SERVICE | sketch batch[{}]: metric='{}' tags={:?}", - i, sketch.metric, sketch.tags - ); - } - } - for (idx, flusher) in self.metrics_flushers.iter().enumerate() { let flusher = flusher.clone(); let series_clone = series.clone(); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index e369e5238..321ed9fbc 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -285,11 +285,11 @@ impl Processor { /// This is used to create a cold start span, since this telemetry event does not /// provide a `request_id`, we try to guess which invocation is the cold start. pub fn on_platform_init_start(&mut self, time: DateTime, runtime_version: Option) { - if runtime_version - .as_deref() - .is_some_and(|rv| rv.contains("DurableFunction")) - { - self.enhanced_metrics.set_durable_function_tag(); + if let Some(ref rv) = runtime_version { + debug!("Runtime version: {}", rv); + if rv.contains("DurableFunction") { + self.enhanced_metrics.set_durable_function_tag(); + } } let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH) diff --git a/bottlecap/src/metrics/enhanced/lambda.rs b/bottlecap/src/metrics/enhanced/lambda.rs index 679e8e0a0..2a1829b6c 100644 --- a/bottlecap/src/metrics/enhanced/lambda.rs +++ b/bottlecap/src/metrics/enhanced/lambda.rs @@ -152,10 +152,6 @@ impl Lambda { if !self.config.enhanced_metrics { return; } - debug!( - "Enhanced metrics: creating metric '{}' with dynamic_value_tags: {:?}", - metric_name, self.dynamic_value_tags - ); let tags = self.get_dynamic_value_tags(); let metric = Metric::new( metric_name.into(), From 3c50d85711d890e88324b37fc5038c70bfab65e5 Mon Sep 17 00:00:00 2001 From: Yiming Luo Date: Thu, 26 Feb 2026 15:58:24 -0500 Subject: [PATCH 6/6] Remove debug log --- bottlecap/src/lifecycle/invocation/processor.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index 321ed9fbc..ccb9f497e 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -285,11 +285,8 @@ impl Processor { /// This is used to create a cold start span, since this telemetry event does not /// provide a `request_id`, we try to guess which invocation is the cold start. pub fn on_platform_init_start(&mut self, time: DateTime, runtime_version: Option) { - if let Some(ref rv) = runtime_version { - debug!("Runtime version: {}", rv); - if rv.contains("DurableFunction") { - self.enhanced_metrics.set_durable_function_tag(); - } + if runtime_version.as_deref().map_or(false, |rv| rv.contains("DurableFunction")) { + self.enhanced_metrics.set_durable_function_tag(); } let start_time: i64 = SystemTime::from(time) .duration_since(UNIX_EPOCH)