Skip to content

enhancement(runtime): add resource/pubsub registry for generalized state management in processes#1178

Draft
tobz wants to merge 5 commits intotobz/control-plane-tls-early-initfrom
tobz/runtime-system-state-mgmt-primitives
Draft

enhancement(runtime): add resource/pubsub registry for generalized state management in processes#1178
tobz wants to merge 5 commits intotobz/control-plane-tls-early-initfrom
tobz/runtime-system-state-mgmt-primitives

Conversation

@tobz
Copy link
Member

@tobz tobz commented Feb 9, 2026

Summary

Change Type

  • Bug fix
  • New feature
  • Non-functional (chore, refactoring, docs)
  • Performance

How did you test this PR?

References

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds generalized runtime state management primitives (resource registry + dataspace pub/sub) to support inter-process coordination keyed by typed handles, and wires in process-local identity tracking.

Changes:

  • Introduces ResourceRegistry for publishing/acquiring typed resources with async waiting and RAII return via guards.
  • Introduces DataspaceRegistry for typed assert/retract pub-sub with per-handle and wildcard subscriptions (plus current-state replay).
  • Adds Handle and process-context tracking (Id::current() via task-local), and exports state + ProcessId.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
lib/saluki-core/src/runtime/state/resources/mod.rs New async-aware resource registry with publish/acquire APIs, snapshots, and tests.
lib/saluki-core/src/runtime/state/mod.rs New state module defining Handle and re-exporting registries.
lib/saluki-core/src/runtime/state/dataspace/mod.rs New dataspace assert/retract + subscription registry with current-state replay and tests.
lib/saluki-core/src/runtime/process.rs Adds task-local “current process id” support and a ROOT process identifier.
lib/saluki-core/src/runtime/mod.rs Exposes state module and re-exports ProcessId.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

use std::{
any::{Any, TypeId},
collections::{HashMap, HashSet, VecDeque},
hash::Hash,
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::hash::Hash is imported but not used in this module. Please remove it to avoid warnings / keep the import list clean.

Suggested change
hash::Hash,

Copilot uses AI. Check for mistakes.
use std::{
any::{Any, TypeId},
collections::{HashMap, VecDeque},
hash::Hash,
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::hash::Hash is imported but not used in this module. Please remove it to avoid warnings / keep the import list clean.

Suggested change
hash::Hash,

Copilot uses AI. Check for mistakes.

// Not available, register as waiter
let (tx, rx) = oneshot::channel();
state.waiters.entry(key.clone()).or_default().push_back(tx);
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closed/canceled acquires leave behind oneshot::Senders in state.waiters until a publish/return happens for that key. This can (a) make snapshot() report pending waiters that no longer exist and (b) accumulate memory for keys that are never published. Consider pruning closed senders at insertion time (e.g., retain only !sender.is_closed() before pushing), and removing the entry if the queue becomes empty.

Suggested change
state.waiters.entry(key.clone()).or_default().push_back(tx);
// Prune any closed/canceled waiters for this key before adding a new one.
let waiters = state.waiters.entry(key.clone()).or_default();
waiters.retain(|sender| !sender.is_closed());
// After pruning, add the new waiter.
waiters.push_back(tx);

Copilot uses AI. Check for mistakes.
Comment on lines +458 to +462
// Spawn a task that will acquire (and wait)
let acquire_handle = tokio::spawn(async move { registry_clone.acquire::<u32>(h).await });

// Give the acquire task time to register as waiter
tokio::time::sleep(Duration::from_millis(10)).await;
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test uses sleep for synchronization, which can be flaky under load or on slower CI machines. Prefer an explicit synchronization signal (e.g., a oneshot/Notify/Barrier sent after the waiter is registered) so the test deterministically publishes only after the acquire path has enqueued the waiter.

Suggested change
// Spawn a task that will acquire (and wait)
let acquire_handle = tokio::spawn(async move { registry_clone.acquire::<u32>(h).await });
// Give the acquire task time to register as waiter
tokio::time::sleep(Duration::from_millis(10)).await;
// Use an explicit synchronization signal so we know the task has started.
let (tx_started, rx_started) = oneshot::channel();
// Spawn a task that will acquire (and wait)
let acquire_handle = tokio::spawn(async move {
// Notify that the acquire task has started running.
let _ = tx_started.send(());
registry_clone.acquire::<u32>(h).await
});
// Wait for the acquire task to start before publishing the resource.
rx_started.await.expect("acquire task did not start");

Copilot uses AI. Check for mistakes.
Comment on lines +529 to +538
// Spawn multiple waiters
let registry1 = registry.clone();
let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await });

tokio::time::sleep(Duration::from_millis(5)).await;

let registry2 = registry.clone();
let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await });

tokio::time::sleep(Duration::from_millis(5)).await;
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, multiple_waiters_fifo relies on short sleeps to establish waiter ordering. This can be timing-sensitive. Using explicit coordination (e.g., each spawned task notifies once it has registered its waiter) will make FIFO assertions reliable.

Suggested change
// Spawn multiple waiters
let registry1 = registry.clone();
let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await });
tokio::time::sleep(Duration::from_millis(5)).await;
let registry2 = registry.clone();
let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await });
tokio::time::sleep(Duration::from_millis(5)).await;
// Spawn multiple waiters with explicit coordination instead of timing-based sleeps
let registry1 = registry.clone();
let (ready_tx1, ready_rx1) = oneshot::channel::<()>();
let handle1 = tokio::spawn(async move {
// Signal that this waiter task has started
let _ = ready_tx1.send(());
registry1.acquire::<u32>(h).await
});
// Wait until the first waiter task has started
let _ = ready_rx1.await;
let registry2 = registry.clone();
let (ready_tx2, ready_rx2) = oneshot::channel::<()>();
let handle2 = tokio::spawn(async move {
// Signal that this waiter task has started
let _ = ready_tx2.send(());
registry2.acquire::<u32>(h).await
});
// Wait until the second waiter task has started
let _ = ready_rx2.await;

Copilot uses AI. Check for mistakes.
Comment on lines +530 to +538
let registry1 = registry.clone();
let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await });

tokio::time::sleep(Duration::from_millis(5)).await;

let registry2 = registry.clone();
let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await });

tokio::time::sleep(Duration::from_millis(5)).await;
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, multiple_waiters_fifo relies on short sleeps to establish waiter ordering. This can be timing-sensitive. Using explicit coordination (e.g., each spawned task notifies once it has registered its waiter) will make FIFO assertions reliable.

Suggested change
let registry1 = registry.clone();
let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await });
tokio::time::sleep(Duration::from_millis(5)).await;
let registry2 = registry.clone();
let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await });
tokio::time::sleep(Duration::from_millis(5)).await;
let (ready1_tx, ready1_rx) = oneshot::channel();
let registry1 = registry.clone();
let handle1 = tokio::spawn(async move {
// Signal that the first waiter is about to register and wait
let _ = ready1_tx.send(());
registry1.acquire::<u32>(h).await
});
// Ensure the first waiter has started before spawning the second
let _ = ready1_rx.await;
let (ready2_tx, ready2_rx) = oneshot::channel();
let registry2 = registry.clone();
let handle2 = tokio::spawn(async move {
// Signal that the second waiter is about to register and wait
let _ = ready2_tx.send(());
registry2.acquire::<u32>(h).await
});
// Ensure the second waiter has started before publishing the resource
let _ = ready2_rx.await;

Copilot uses AI. Check for mistakes.
Comment on lines +41 to +45
sync::{Arc, Mutex},
};

use snafu::Snafu;
use tokio::sync::oneshot;
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This registry is designed for async coordination but uses std::sync::Mutex, which can block Tokio worker threads under contention. Consider switching to tokio::sync::Mutex (or another async-friendly mutex) for the shared state, since hot paths include acquire()/publish() from async tasks.

Suggested change
sync::{Arc, Mutex},
};
use snafu::Snafu;
use tokio::sync::oneshot;
sync::Arc,
};
use snafu::Snafu;
use tokio::sync::{Mutex, oneshot};

Copilot uses AI. Check for mistakes.
any::{Any, TypeId},
collections::{HashMap, VecDeque},
hash::Hash,
sync::{Arc, Mutex},
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern here: DataspaceRegistry is used from async tasks but relies on std::sync::Mutex, which can block executor threads. Using an async mutex (or minimizing lock contention) would better match the async usage patterns, especially with frequent assert() calls and many subscribers.

Copilot uses AI. Check for mistakes.
/// associated with. If no wildcard broadcast channel exists yet for this type, one is created.
///
/// If values have already been asserted for this type on any handles, the subscription will immediately yield all
/// current values before any future broadcast updates.
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscribe_all replays current values by iterating HashMap::iter(), so the initial replay order is inherently non-deterministic. Consider documenting that replay ordering is unspecified (or sorting by handle if deterministic ordering is desirable for consumers).

Suggested change
/// current values before any future broadcast updates.
/// current values before any future broadcast updates. The order in which these current values are replayed is
/// unspecified and must not be relied upon.

Copilot uses AI. Check for mistakes.
@tobz tobz force-pushed the tobz/runtime-system-state-mgmt-primitives branch from e8b5919 to 52f0a8a Compare February 11, 2026 16:24
@tobz tobz force-pushed the tobz/control-plane-tls-early-init branch from b284244 to 75a8e2c Compare February 11, 2026 16:24
@pr-commenter
Copy link

pr-commenter bot commented Feb 11, 2026

Binary Size Analysis (Agent Data Plane)

Target: d3ab905 (baseline) vs 09abd4b (comparison) diff
Analysis Type: Stripped binaries (debug symbols excluded)
Baseline Size: 27.39 MiB
Comparison Size: 27.51 MiB
Size Change: +124.83 KiB (+0.45%)
Pass/Fail Threshold: +5%
Result: PASSED ✅

Changes by Module

Module File Size Symbols
saluki_core::runtime::supervisor +69.41 KiB 66
core +46.08 KiB 248
agent_data_plane::internal::initialize_and_launch_runtime -22.07 KiB 2
agent_data_plane::internal::create_internal_supervisor +16.17 KiB 1
saluki_app::memory::MemoryBoundsConfiguration -13.70 KiB 1
agent_data_plane::internal::control_plane -12.33 KiB 26
std -10.54 KiB 51
anyhow +8.26 KiB 30
agent_data_plane::cli::run +8.02 KiB 8
[sections] +7.57 KiB 9
saluki_core::runtime::process +7.40 KiB 7
agent_data_plane::internal::observability +5.60 KiB 16
saluki_core::topology::running +5.52 KiB 2
saluki_app::metrics::collect_runtime_metrics -4.74 KiB 1
tokio -3.60 KiB 110
saluki_core::runtime::restart +3.50 KiB 7
tracing_core +2.24 KiB 14
saluki_health::Runner::run +1.91 KiB 8
hashbrown +1.81 KiB 8
saluki_health::RunnerGuard +1.71 KiB 3

Detailed Symbol Changes

    FILE SIZE        VM SIZE    
 --------------  -------------- 
  +2.6%  +104Ki  +2.2% +77.7Ki    [1117 Others]
  [NEW] +59.2Ki  [NEW] +58.9Ki    _<agent_data_plane::internal::control_plane::PrivilegedApiWorker as saluki_core::runtime::supervisor::Supervisable>::initialize::_{{closure}}::h11fe1c9a197d4bbb
  [NEW] +21.3Ki  [NEW] +21.1Ki    _<agent_data_plane::internal::control_plane::UnprivilegedApiWorker as saluki_core::runtime::supervisor::Supervisable>::initialize::_{{closure}}::hd516f5c8e792be07
  [NEW] +18.6Ki  [NEW] +18.4Ki    saluki_app::api::APIBuilder::serve::_{{closure}}::hc67130ad013550cb
  [NEW] +16.3Ki  [NEW] +16.2Ki    saluki_core::runtime::supervisor::WorkerState::add_worker::h2f57c36c7d6a6d25
  [NEW] +16.2Ki  [NEW] +16.0Ki    agent_data_plane::internal::create_internal_supervisor::_{{closure}}::hc50f0c81432105dc
  [NEW] +16.2Ki  [NEW] +16.0Ki    _<core::pin::Pin<P> as core::future::future::Future>::poll::hf993bf6d214be6bb
  [NEW] +10.9Ki  [NEW] +10.8Ki    std::sys::backtrace::__rust_begin_short_backtrace::h7db93e33ffed5b3c
  [NEW] +10.7Ki  [NEW] +10.6Ki    <saluki_core::data_model::event::Event as core::clone::Clone>::clone.10479
  [NEW] +10.5Ki  [NEW] +10.4Ki    saluki_health::Runner::run::_{{closure}}::h99455b3fddb78a9b
  [NEW] +9.47Ki  [NEW] +9.34Ki    saluki_core::runtime::supervisor::Supervisor::run_inner::_{{closure}}::hf3a194df28b5bfa6
  [NEW] +6.80Ki  [NEW] +6.66Ki    saluki_core::runtime::supervisor::WorkerState::shutdown_workers::_{{closure}}::he6d5081c217bc272
  [NEW] +6.24Ki  [NEW] +6.10Ki    saluki_core::runtime::supervisor::WorkerState::shutdown_workers::_{{closure}}::h1d900b0791e57095
  [DEL] -8.42Ki  [DEL] -8.33Ki    std::sys::backtrace::__rust_begin_short_backtrace::h61680b9753eb2342
  [DEL] -9.20Ki  [DEL] -9.10Ki    saluki_health::Runner::run::_{{closure}}::h36f8d77002f294fa
  [DEL] -10.7Ki  [DEL] -10.6Ki    <saluki_core::data_model::event::Event as core::clone::Clone>::clone.10695
  [DEL] -13.7Ki  [DEL] -13.6Ki    saluki_app::memory::MemoryBoundsConfiguration::try_from_config::h331a424a41827053
  [DEL] -18.0Ki  [DEL] -17.8Ki    agent_data_plane::internal::control_plane::spawn_control_plane::_{{closure}}::_{{closure}}::h9b9375d6b068ec9c
  [DEL] -18.4Ki  [DEL] -18.2Ki    agent_data_plane::internal::initialize_and_launch_runtime::_{{closure}}::h3544d2e34d6be2ff
  [DEL] -18.7Ki  [DEL] -18.6Ki    saluki_app::api::APIBuilder::serve::_{{closure}}::hfe866520c248a5c1
  [DEL] -84.5Ki  [DEL] -84.4Ki    agent_data_plane::internal::control_plane::spawn_control_plane::_{{closure}}::hd6ee71eb8e3b5d42
  +0.4%  +124Ki  +0.4% +97.7Ki    TOTAL

@pr-commenter
Copy link

pr-commenter bot commented Feb 11, 2026

Regression Detector (Agent Data Plane)

Regression Detector Results

Run ID: b8a9d27d-de3d-456e-9e04-c24ec847ca14

Baseline: d3ab905
Comparison: 09abd4b
Diff

❌ Experiments with retried target crashes

This is a critical error. One or more replicates failed with a non-zero exit code. These replicates may have been retried. See Replicate Execution Details for more information.

  • otlp_ingest_logs_5mb_throughput
  • quality_gates_rss_dsd_ultraheavy
  • otlp_ingest_logs_5mb_cpu
  • otlp_ingest_traces_5mb_throughput
  • otlp_ingest_metrics_5mb_memory
  • dsd_uds_100mb_3k_contexts_cpu

Optimization Goals: ✅ No significant changes detected

Experiments ignored for regressions

Regressions in experiments with settings containing erratic: true are ignored.

perf experiment goal Δ mean % Δ mean % CI trials links
otlp_ingest_logs_5mb_throughput ingress throughput -0.02 [-0.14, +0.10] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_cpu % cpu utilization -2.17 [-7.10, +2.75] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_memory memory utilization -6.09 [-6.55, -5.64] 1 (metrics) (profiles) (logs)

Fine details of change detection per experiment

perf experiment goal Δ mean % Δ mean % CI trials links
dsd_uds_512kb_3k_contexts_cpu % cpu utilization +1.62 [-53.03, +56.27] 1 (metrics) (profiles) (logs)
otlp_ingest_metrics_5mb_memory memory utilization +1.29 [+1.05, +1.53] 1 (metrics) (profiles) (logs)
dsd_uds_10mb_3k_contexts_memory memory utilization +1.26 [+1.07, +1.46] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_5mb_memory memory utilization +0.95 [+0.69, +1.22] 1 (metrics) (profiles) (logs)
quality_gates_rss_idle memory utilization +0.93 [+0.90, +0.97] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_5mb_cpu % cpu utilization +0.67 [-1.20, +2.54] 1 (metrics) (profiles) (logs)
dsd_uds_1mb_3k_contexts_memory memory utilization +0.64 [+0.45, +0.83] 1 (metrics) (profiles) (logs)
dsd_uds_500mb_3k_contexts_memory memory utilization +0.55 [+0.37, +0.74] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_low memory utilization +0.50 [+0.35, +0.66] 1 (metrics) (profiles) (logs)
dsd_uds_100mb_3k_contexts_memory memory utilization +0.44 [+0.24, +0.63] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_medium memory utilization +0.42 [+0.22, +0.61] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_heavy memory utilization +0.42 [+0.29, +0.54] 1 (metrics) (profiles) (logs)
dsd_uds_512kb_3k_contexts_memory memory utilization +0.25 [+0.06, +0.43] 1 (metrics) (profiles) (logs)
dsd_uds_500mb_3k_contexts_cpu % cpu utilization +0.23 [-1.14, +1.60] 1 (metrics) (profiles) (logs)
quality_gates_rss_dsd_ultraheavy memory utilization +0.18 [+0.06, +0.31] 1 (metrics) (profiles) (logs)
dsd_uds_10mb_3k_contexts_throughput ingress throughput +0.02 [-0.13, +0.17] 1 (metrics) (profiles) (logs)
dsd_uds_100mb_3k_contexts_cpu % cpu utilization +0.00 [-5.20, +5.21] 1 (metrics) (profiles) (logs)
otlp_ingest_traces_5mb_throughput ingress throughput -0.00 [-0.02, +0.02] 1 (metrics) (profiles) (logs)
dsd_uds_1mb_3k_contexts_throughput ingress throughput -0.00 [-0.06, +0.06] 1 (metrics) (profiles) (logs)
otlp_ingest_metrics_5mb_throughput ingress throughput -0.01 [-0.13, +0.12] 1 (metrics) (profiles) (logs)
dsd_uds_512kb_3k_contexts_throughput ingress throughput -0.01 [-0.06, +0.05] 1 (metrics) (profiles) (logs)
dsd_uds_100mb_3k_contexts_throughput ingress throughput -0.01 [-0.06, +0.04] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_throughput ingress throughput -0.02 [-0.14, +0.10] 1 (metrics) (profiles) (logs)
dsd_uds_500mb_3k_contexts_throughput ingress throughput -0.79 [-0.92, -0.66] 1 (metrics) (profiles) (logs)
otlp_ingest_metrics_5mb_cpu % cpu utilization -1.43 [-7.25, +4.40] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_cpu % cpu utilization -2.17 [-7.10, +2.75] 1 (metrics) (profiles) (logs)
dsd_uds_10mb_3k_contexts_cpu % cpu utilization -5.14 [-33.17, +22.88] 1 (metrics) (profiles) (logs)
dsd_uds_1mb_3k_contexts_cpu % cpu utilization -5.56 [-55.67, +44.55] 1 (metrics) (profiles) (logs)
otlp_ingest_logs_5mb_memory memory utilization -6.09 [-6.55, -5.64] 1 (metrics) (profiles) (logs)

Bounds Checks: ✅ Passed

perf experiment bounds_check_name replicates_passed links
quality_gates_rss_dsd_heavy memory_usage 10/10 (metrics) (profiles) (logs)
quality_gates_rss_dsd_low memory_usage 10/10 (metrics) (profiles) (logs)
quality_gates_rss_dsd_medium memory_usage 10/10 (metrics) (profiles) (logs)
quality_gates_rss_dsd_ultraheavy memory_usage 10/10 (metrics) (profiles) (logs)
quality_gates_rss_idle memory_usage 10/10 (metrics) (profiles) (logs)

Explanation

Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%

Performance changes are noted in the perf column of each table:

  • ✅ = significantly better comparison variant performance
  • ❌ = significantly worse comparison variant performance
  • ➖ = no significant change in performance

A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".

For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:

  1. Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.

  2. Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.

  3. Its configuration does not mark it "erratic".

Replicate Execution Details

We run multiple replicates for each experiment/variant. However, we allow replicates to be automatically retried if there are any failures, up to 8 times, at which point the replicate is marked dead and we are unable to run analysis for the entire experiment. We call each of these attempts at running replicates a replicate execution. This section lists all replicate executions that failed due to the target crashing or being oom killed.

Note: In the below tables we bucket failures by experiment, variant, and failure type. For each of these buckets we list out the replicate indexes that failed with an annotation signifying how many times said replicate failed with the given failure mode. In the below example the baseline variant of the experiment named experiment_with_failures had two replicates that failed by oom kills. Replicate 0, which failed 8 executions, and replicate 1 which failed 6 executions, all with the same failure mode.

Experiment Variant Replicates Failure Logs Debug Dashboard
experiment_with_failures baseline 0 (x8) 1 (x6) Oom killed Debug Dashboard

The debug dashboard links will take you to a debugging dashboard specifically designed to investigate replicate execution failures.

❌ Retried Normal Replicate Execution Failures (non-profiling)

Experiment Variant Replicates Failure Debug Dashboard
dsd_uds_100mb_3k_contexts_cpu comparison 7 Failed to shutdown when requested Debug Dashboard
otlp_ingest_logs_5mb_cpu comparison 8 Failed to shutdown when requested Debug Dashboard
otlp_ingest_logs_5mb_throughput baseline 3 Failed to shutdown when requested Debug Dashboard
otlp_ingest_metrics_5mb_memory comparison 1 Failed to shutdown when requested Debug Dashboard
otlp_ingest_traces_5mb_throughput baseline 7 Failed to shutdown when requested Debug Dashboard
otlp_ingest_traces_5mb_throughput comparison 8 Failed to shutdown when requested Debug Dashboard
quality_gates_rss_dsd_ultraheavy baseline 2 Failed to shutdown when requested Debug Dashboard

Copilot AI review requested due to automatic review settings February 20, 2026 04:26
@tobz tobz force-pushed the tobz/control-plane-tls-early-init branch from 75a8e2c to cc926a0 Compare February 20, 2026 04:26
@tobz tobz force-pushed the tobz/runtime-system-state-mgmt-primitives branch from 52f0a8a to 97b3a4d Compare February 20, 2026 04:27
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@@ -13,21 +13,37 @@ use memory_accounting::allocator::{AllocationGroupRegistry, AllocationGroupToken
use pin_project::pin_project;
use tracing::{debug_span, instrument::Instrumented, Instrument as _};

Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The process ID counter starts at 1, but Id::ROOT is defined as 0. This means Id::new() will never return Id::ROOT, which is correct. However, the counter initialization changed from 0 to 1 in this PR. This is a good change that ensures generated IDs never conflict with the ROOT constant, but it should be documented or made more explicit.

Consider adding a comment explaining why the counter starts at 1, or use Id::ROOT.0 + 1 to make the relationship explicit.

Suggested change
// Start at 1 so that `Id::new()` never generates the special `Id::ROOT` (0) identifier.

Copilot uses AI. Check for mistakes.
Comment on lines +28 to +35
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum Handle {
/// Scoped to a specific process.
Process(Id),

/// A globally unique opaque identifier.
Global(u64),
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Handle enum lacks a Display implementation, which would be useful for debugging and logging. Since handles are used as identifiers in error messages (see PublishError::AlreadyExists), having a consistent string representation would improve observability.

Consider implementing fmt::Display for Handle to provide human-readable output like "Process(42)" or "Global(123)".

Copilot uses AI. Check for mistakes.
@@ -152,7 +170,8 @@ where
type Output = F::Output;

Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CURRENT_PROCESS_ID task-local is set using sync_scope, which is appropriate for synchronous code but may have limitations. According to the Tokio documentation, sync_scope does not propagate the task-local value across .await points within the closure. This means that if the future being polled awaits other futures, those awaited futures won't see the current process ID.

This is likely intentional since process IDs should be scoped to the specific process future being polled, not child futures it spawns or awaits. However, this behavior should be documented in the Id::current() method to make it clear that the process ID is only available during synchronous execution within a process's poll method, not in async contexts spawned from that process.

Suggested change
/// Polls the inner future with the current process ID set in a task-local.
///
/// Note: The process ID is set using `tokio::task_local!`'s `sync_scope`.
/// This means the `CURRENT_PROCESS_ID` is only guaranteed to be visible
/// during the synchronous execution of this `poll` call. Any `.await`
/// points inside `inner` may suspend and resume in a context where
/// `CURRENT_PROCESS_ID` is no longer set, so async work spawned or awaited
/// from within `inner` must not rely on `Id::current()` being present.

Copilot uses AI. Check for mistakes.

Ok(())
}

Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ResourceRegistry API lacks a way to remove or unpublish a resource once it has been published. Once a resource is published, it can only be removed by acquiring it and not returning the guard (which requires panic or mem::forget). This makes it impossible to cleanly retire resources that are no longer needed.

Consider adding an unpublish method that removes a resource from the registry if it's currently available (not acquired). This would make the API more complete and allow for proper resource lifecycle management.

Suggested change
/// Attempts to remove a previously published resource from the registry.
///
/// This will only succeed if the resource is currently available (i.e., not
/// acquired by any process). If the resource is currently acquired or was
/// never published, the function returns `false`.
///
/// Returns `true` if a resource was removed, `false` otherwise.
pub fn unpublish<T>(&self, handle: Handle) -> bool
where
T: Send + Sync + 'static,
{
let key = StorageKey::new::<T>(handle);
let mut state = self.inner.state.lock().unwrap();
state.register_type::<T>();
// Do not remove a resource that is currently acquired.
if state.acquired.contains(&key) {
return false;
}
state.available.remove(&key).is_some()
}

Copilot uses AI. Check for mistakes.
Comment on lines +80 to +85
channels: HashMap<StorageKey, Box<dyn Any + Send + Sync>>,

/// Broadcast senders for wildcard subscriptions, keyed by type only.
///
/// Each entry stores a `broadcast::Sender<AssertionUpdate<T>>`.
wildcard_channels: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broadcast channels created in DataspaceRegistry are never cleaned up. Once a channel is created via get_or_create_sender or get_or_create_wildcard_sender, it remains in the channels or wildcard_channels HashMap indefinitely, even after all subscribers have dropped their receivers. This can lead to unbounded memory growth in long-running applications with many different (type, handle) pairs.

Consider implementing a mechanism to clean up channels when:

  1. All receivers have been dropped (can be detected when broadcast::Sender::receiver_count() returns 0)
  2. The associated value has been retracted and there are no pending assertions

This could be done by periodically checking and removing unused channels, or by implementing a more sophisticated reference counting mechanism.

Copilot uses AI. Check for mistakes.
Comment on lines +283 to +321
loop {
let rx = {
let mut state = self.inner.state.lock().unwrap();
state.register_type::<T>();

// Try to acquire synchronously
if let Some(boxed) = state.available.remove(&key) {
state.acquired.insert(key.clone());
let resource = *boxed.downcast::<T>().expect("type mismatch in registry");
return ResourceGuard {
resource: Some(resource),
registry: Arc::clone(&self.inner),
key,
};
}

// Not available, register as waiter
let (tx, rx) = oneshot::channel();
state.waiters.entry(key.clone()).or_default().push_back(tx);
rx
};

// Wait for resource (lock released)
match rx.await {
Ok(boxed) => {
let resource = *boxed.downcast::<T>().expect("type mismatch in registry");
return ResourceGuard {
resource: Some(resource),
registry: Arc::clone(&self.inner),
key,
};
}
Err(_) => {
// Sender dropped without sending - this can happen if the registry
// is being cleaned up. Retry the acquire.
continue;
}
}
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acquire method could potentially loop indefinitely if there's a race condition or bug that causes the oneshot receiver to continuously receive Err(_) without the resource ever becoming available. While the current implementation handles the case where senders are dropped during registry cleanup, there's no timeout or circuit breaker to prevent an infinite loop in pathological cases.

Consider adding one of the following safeguards:

  1. A maximum retry count with an error return after exhaustion
  2. A timeout parameter for the acquire operation
  3. A cancellation token that can be checked in the loop

This would make the API more robust and debuggable in case of unexpected issues.

Copilot uses AI. Check for mistakes.
@tobz tobz force-pushed the tobz/control-plane-tls-early-init branch from cc926a0 to 2eacb9c Compare February 20, 2026 04:39
@tobz tobz force-pushed the tobz/runtime-system-state-mgmt-primitives branch from 97b3a4d to 90a9af1 Compare February 20, 2026 04:39
Copilot AI review requested due to automatic review settings February 20, 2026 05:20
@tobz tobz force-pushed the tobz/runtime-system-state-mgmt-primitives branch from 90a9af1 to 8a17cac Compare February 20, 2026 05:20
@tobz tobz force-pushed the tobz/control-plane-tls-early-init branch from 2eacb9c to 32b6e9b Compare February 20, 2026 05:20
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@tobz tobz force-pushed the tobz/control-plane-tls-early-init branch from 32b6e9b to 999e501 Compare February 21, 2026 19:52
@tobz tobz force-pushed the tobz/runtime-system-state-mgmt-primitives branch from 8a17cac to 46ef9d0 Compare February 21, 2026 19:52
Copilot AI review requested due to automatic review settings February 25, 2026 03:25
@tobz tobz force-pushed the tobz/runtime-system-state-mgmt-primitives branch from 46ef9d0 to b24c86d Compare February 25, 2026 03:25
@tobz tobz force-pushed the tobz/control-plane-tls-early-init branch from 999e501 to 0954b63 Compare February 25, 2026 03:25
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +306 to +314
match rx.await {
Ok(boxed) => {
let resource = *boxed.downcast::<T>().expect("type mismatch in registry");
return ResourceGuard {
resource: Some(resource),
registry: Arc::clone(&self.inner),
key,
};
}
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource leak vulnerability when acquire future is cancelled. If the future is cancelled after receiving the boxed resource from the oneshot channel but before creating the ResourceGuard, the resource will be dropped without returning to the registry. The key will remain in the acquired set, permanently marking the resource as unavailable. Consider wrapping lines 307-313 in a cancellation-safe guard or restructuring to mark the resource as acquired only after the ResourceGuard is fully constructed.

Copilot uses AI. Check for mistakes.
@tobz tobz force-pushed the tobz/control-plane-tls-early-init branch from 0954b63 to 642f2f6 Compare February 26, 2026 15:20
@tobz tobz force-pushed the tobz/runtime-system-state-mgmt-primitives branch from b24c86d to 09abd4b Compare February 26, 2026 15:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/core Core functionality, event model, etc.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants