diff --git a/crates/core/src/db/mod.rs b/crates/core/src/db/mod.rs index 07fb6cf140f..45777fbd612 100644 --- a/crates/core/src/db/mod.rs +++ b/crates/core/src/db/mod.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use enum_map::EnumMap; use spacetimedb_schema::reducer_name::ReducerName; use tokio::sync::mpsc; +use tokio::time::MissedTickBehavior; use crate::subscription::ExecutionCounters; use spacetimedb_datastore::execution_context::WorkloadType; @@ -79,37 +80,63 @@ impl MetricsRecorderQueue { } } +fn record_metrics( + MetricsMessage { + reducer, + metrics_for_writer, + metrics_for_reader, + tx_data, + counters, + }: MetricsMessage, +) { + if let Some(tx_metrics) = metrics_for_writer { + tx_metrics.report( + // If row updates are present, + // they will always belong to the writer transaction. + tx_data.as_deref(), + reducer.as_ref(), + |wl| &counters[wl], + ); + } + if let Some(tx_metrics) = metrics_for_reader { + tx_metrics.report( + // If row updates are present, + // they will never belong to the reader transaction. + // Passing row updates here will most likely panic. + None, + reducer.as_ref(), + |wl| &counters[wl], + ); + } +} + +/// The metrics recorder is a side channel that the main database thread forwards metrics to. +/// While we want to avoid unnecessary compute on the critical path, communicating with other +/// threads is not free, and for this case in particular waking a parked task is not free. +/// +/// Previously, each tx would send its metrics to the recorder task. As soon as the recorder +/// task `recv`d a message, it would update the counters and gauges, and immediately wait for +/// the next tx's message. This meant that the tx would need to be more expensive than the +/// recording of its metrics in order for the recorder task not to be parked on `recv` when +/// the tx would `send` its metrics. This would obviously never be the case, and so each `send` +/// would incur the overhead of waking the task. +/// +/// To mitigate this, we now record metrics, for potentially many transactions, periodically +/// every 5ms. +const TX_METRICS_RECORDING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5); + /// Spawns a task for recording transaction metrics. /// Returns the handle for pushing metrics to the recorder. pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) { let (tx, mut rx) = mpsc::unbounded_channel(); let abort_handle = tokio::spawn(async move { - while let Some(MetricsMessage { - reducer, - metrics_for_writer, - metrics_for_reader, - tx_data, - counters, - }) = rx.recv().await - { - if let Some(tx_metrics) = metrics_for_writer { - tx_metrics.report( - // If row updates are present, - // they will always belong to the writer transaction. - tx_data.as_deref(), - reducer.as_ref(), - |wl| &counters[wl], - ); - } - if let Some(tx_metrics) = metrics_for_reader { - tx_metrics.report( - // If row updates are present, - // they will never belong to the reader transaction. - // Passing row updates here will most likely panic. - None, - reducer.as_ref(), - |wl| &counters[wl], - ); + let mut interval = tokio::time::interval(TX_METRICS_RECORDING_INTERVAL); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + interval.tick().await; + while let Ok(metrics) = rx.try_recv() { + record_metrics(metrics); } } })