Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,9 +824,11 @@ async fn handle_event_bus_event(
Event::Telemetry(event) => {
debug!("Telemetry event received: {:?}", event);
match event.record {
TelemetryRecord::PlatformInitStart { .. } => {
TelemetryRecord::PlatformInitStart {
runtime_version, ..
} => {
if let Err(e) = invocation_processor_handle
.on_platform_init_start(event.time)
.on_platform_init_start(event.time, runtime_version)
.await
{
error!("Failed to send platform init start to processor: {}", e);
Expand Down
107 changes: 106 additions & 1 deletion bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,13 @@ impl Processor {
///
/// This is used to create a cold start span, since this telemetry event does not
/// provide a `request_id`, we try to guess which invocation is the cold start.
pub fn on_platform_init_start(&mut self, time: DateTime<Utc>) {
pub fn on_platform_init_start(&mut self, time: DateTime<Utc>, runtime_version: Option<String>) {
if let Some(ref rv) = runtime_version {
debug!("Runtime version: {}", rv);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this debug log required? It always appear at the top of the platform init start

if rv.contains("DurableFunction") {
self.enhanced_metrics.set_durable_function_tag();
}
}
let start_time: i64 = SystemTime::from(time)
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
Expand Down Expand Up @@ -1714,6 +1720,105 @@ mod tests {
),
}

#[tokio::test]
async fn test_on_platform_init_start_sets_durable_function_tag() {
let mut processor = setup();
let time = Utc::now();

processor.on_platform_init_start(time, Some("python:3.14.DurableFunction.v6".to_string()));

let now: i64 = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
processor.enhanced_metrics.increment_invocation_metric(now);

let ts = (now / 10) * 10;
let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok();
let entry = processor
.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(),
durable_tags,
ts,
)
.await
.unwrap();
assert!(
entry.is_some(),
"Expected durable_function:true tag on enhanced metric"
);
}

#[tokio::test]
async fn test_on_platform_init_start_no_durable_function_tag_for_regular_runtime() {
let mut processor = setup();
let time = Utc::now();

processor.on_platform_init_start(time, Some("python:3.12.v10".to_string()));

let now: i64 = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
processor.enhanced_metrics.increment_invocation_metric(now);

let ts = (now / 10) * 10;
let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok();
let entry = processor
.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(),
durable_tags,
ts,
)
.await
.unwrap();
assert!(
entry.is_none(),
"Expected no durable_function:true tag for regular runtime"
);
}

#[tokio::test]
async fn test_on_platform_init_start_no_durable_function_tag_when_runtime_version_is_none() {
let mut processor = setup();
let time = Utc::now();

processor.on_platform_init_start(time, None);

let now: i64 = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
processor.enhanced_metrics.increment_invocation_metric(now);

let ts = (now / 10) * 10;
let durable_tags = dogstatsd::metric::SortedTags::parse("durable_function:true").ok();
let entry = processor
.enhanced_metrics
.aggr_handle
.get_entry_by_id(
crate::metrics::enhanced::constants::INVOCATIONS_METRIC.into(),
durable_tags,
ts,
)
.await
.unwrap();
assert!(
entry.is_none(),
"Expected no durable_function:true tag when runtime_version is None"
);
}

#[tokio::test]
async fn test_is_managed_instance_mode_returns_true() {
use crate::config::aws::LAMBDA_MANAGED_INSTANCES_INIT_TYPE;
Expand Down
14 changes: 11 additions & 3 deletions bottlecap/src/lifecycle/invocation/processor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub enum ProcessorCommand {
},
PlatformInitStart {
time: DateTime<Utc>,
runtime_version: Option<String>,
},
PlatformInitReport {
init_type: InitType,
Expand Down Expand Up @@ -140,9 +141,13 @@ impl InvocationProcessorHandle {
pub async fn on_platform_init_start(
&self,
time: DateTime<Utc>,
runtime_version: Option<String>,
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
self.sender
.send(ProcessorCommand::PlatformInitStart { time })
.send(ProcessorCommand::PlatformInitStart {
time,
runtime_version,
})
.await
}

Expand Down Expand Up @@ -454,8 +459,11 @@ impl InvocationProcessorService {
ProcessorCommand::InvokeEvent { request_id } => {
self.processor.on_invoke_event(request_id);
}
ProcessorCommand::PlatformInitStart { time } => {
self.processor.on_platform_init_start(time);
ProcessorCommand::PlatformInitStart {
time,
runtime_version,
} => {
self.processor.on_platform_init_start(time, runtime_version);
}
ProcessorCommand::PlatformInitReport {
init_type,
Expand Down
36 changes: 35 additions & 1 deletion bottlecap/src/metrics/enhanced/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ impl Lambda {
.insert(String::from("runtime"), runtime.to_string());
}

/// Sets the `durable_function:true` tag in `dynamic_value_tags`
pub fn set_durable_function_tag(&mut self) {
self.dynamic_value_tags
.insert(String::from("durable_function"), String::from("true"));
}

fn get_dynamic_value_tags(&self) -> Option<SortedTags> {
let vec_tags: Vec<String> = self
.dynamic_value_tags
Expand Down Expand Up @@ -146,10 +152,11 @@ impl Lambda {
if !self.config.enhanced_metrics {
return;
}
let tags = self.get_dynamic_value_tags();
let metric = Metric::new(
metric_name.into(),
MetricValue::distribution(1f64),
self.get_dynamic_value_tags(),
tags,
Some(timestamp),
);
if let Err(e) = self.aggr_handle.insert_batch(vec![metric]) {
Expand Down Expand Up @@ -838,6 +845,33 @@ mod tests {
}
}

#[tokio::test]
async fn test_set_durable_function_tag() {
let (metrics_aggr, my_config) = setup();
let mut lambda = Lambda::new(metrics_aggr.clone(), my_config);
let now: i64 = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();

lambda.set_durable_function_tag();
lambda.increment_invocation_metric(now);

// Verify the metric was emitted with the durable_function:true tag
let ts = (now / 10) * 10;
let durable_tags = SortedTags::parse("durable_function:true").ok();
let entry = metrics_aggr
.get_entry_by_id(constants::INVOCATIONS_METRIC.into(), durable_tags, ts)
.await
.unwrap();
assert!(
entry.is_some(),
"Expected metric with durable_function:true tag"
);
}

#[tokio::test]
#[allow(clippy::float_cmp)]
async fn test_increment_invocation_metric() {
Expand Down
Loading