Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 53 additions & 26 deletions crates/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::Arc;
use enum_map::EnumMap;
use spacetimedb_schema::reducer_name::ReducerName;
use tokio::sync::mpsc;
use tokio::time::MissedTickBehavior;

use crate::subscription::ExecutionCounters;
use spacetimedb_datastore::execution_context::WorkloadType;
Expand Down Expand Up @@ -79,37 +80,63 @@ impl MetricsRecorderQueue {
}
}

fn record_metrics(
MetricsMessage {
reducer,
metrics_for_writer,
metrics_for_reader,
tx_data,
counters,
}: MetricsMessage,
) {
if let Some(tx_metrics) = metrics_for_writer {
tx_metrics.report(
// If row updates are present,
// they will always belong to the writer transaction.
tx_data.as_deref(),
reducer.as_ref(),
|wl| &counters[wl],
);
}
if let Some(tx_metrics) = metrics_for_reader {
tx_metrics.report(
// If row updates are present,
// they will never belong to the reader transaction.
// Passing row updates here will most likely panic.
None,
reducer.as_ref(),
|wl| &counters[wl],
);
}
}

/// The metrics recorder is a side channel that the main database thread forwards metrics to.
/// While we want to avoid unnecessary compute on the critical path, communicating with other
/// threads is not free, and for this case in particular waking a parked task is not free.
///
/// Previously, each tx would send its metrics to the recorder task. As soon as the recorder
/// task `recv`d a message, it would update the counters and gauges, and immediately wait for
/// the next tx's message. This meant that the tx would need to be more expensive than the
/// recording of its metrics in order for the recorder task not to be parked on `recv` when
/// the tx would `send` its metrics. This would obviously never be the case, and so each `send`
/// would incur the overhead of waking the task.
///
/// To mitigate this, we now record metrics, for potentially many transactions, periodically
/// every 5ms.
const TX_METRICS_RECORDING_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5);

/// Spawns a task for recording transaction metrics.
/// Returns the handle for pushing metrics to the recorder.
pub fn spawn_tx_metrics_recorder() -> (MetricsRecorderQueue, tokio::task::AbortHandle) {
let (tx, mut rx) = mpsc::unbounded_channel();
let abort_handle = tokio::spawn(async move {
while let Some(MetricsMessage {
reducer,
metrics_for_writer,
metrics_for_reader,
tx_data,
counters,
}) = rx.recv().await
{
if let Some(tx_metrics) = metrics_for_writer {
tx_metrics.report(
// If row updates are present,
// they will always belong to the writer transaction.
tx_data.as_deref(),
reducer.as_ref(),
|wl| &counters[wl],
);
}
if let Some(tx_metrics) = metrics_for_reader {
tx_metrics.report(
// If row updates are present,
// they will never belong to the reader transaction.
// Passing row updates here will most likely panic.
None,
reducer.as_ref(),
|wl| &counters[wl],
);
let mut interval = tokio::time::interval(TX_METRICS_RECORDING_INTERVAL);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
interval.tick().await;
while let Ok(metrics) = rx.try_recv() {
record_metrics(metrics);
}
}
})
Expand Down
Loading