enhancement(runtime): add resource/pubsub registry for generalized state management in processes#1178
Conversation
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Pull request overview
Adds generalized runtime state management primitives (resource registry + dataspace pub/sub) to support inter-process coordination keyed by typed handles, and wires in process-local identity tracking.
Changes:
- Introduces
ResourceRegistryfor publishing/acquiring typed resources with async waiting and RAII return via guards. - Introduces
DataspaceRegistryfor typed assert/retract pub-sub with per-handle and wildcard subscriptions (plus current-state replay). - Adds
Handleand process-context tracking (Id::current()via task-local), and exportsstate+ProcessId.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| lib/saluki-core/src/runtime/state/resources/mod.rs | New async-aware resource registry with publish/acquire APIs, snapshots, and tests. |
| lib/saluki-core/src/runtime/state/mod.rs | New state module defining Handle and re-exporting registries. |
| lib/saluki-core/src/runtime/state/dataspace/mod.rs | New dataspace assert/retract + subscription registry with current-state replay and tests. |
| lib/saluki-core/src/runtime/process.rs | Adds task-local “current process id” support and a ROOT process identifier. |
| lib/saluki-core/src/runtime/mod.rs | Exposes state module and re-exports ProcessId. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| use std::{ | ||
| any::{Any, TypeId}, | ||
| collections::{HashMap, HashSet, VecDeque}, | ||
| hash::Hash, |
There was a problem hiding this comment.
std::hash::Hash is imported but not used in this module. Please remove it to avoid warnings / keep the import list clean.
| hash::Hash, |
| use std::{ | ||
| any::{Any, TypeId}, | ||
| collections::{HashMap, VecDeque}, | ||
| hash::Hash, |
There was a problem hiding this comment.
std::hash::Hash is imported but not used in this module. Please remove it to avoid warnings / keep the import list clean.
| hash::Hash, |
|
|
||
| // Not available, register as waiter | ||
| let (tx, rx) = oneshot::channel(); | ||
| state.waiters.entry(key.clone()).or_default().push_back(tx); |
There was a problem hiding this comment.
Closed/canceled acquires leave behind oneshot::Senders in state.waiters until a publish/return happens for that key. This can (a) make snapshot() report pending waiters that no longer exist and (b) accumulate memory for keys that are never published. Consider pruning closed senders at insertion time (e.g., retain only !sender.is_closed() before pushing), and removing the entry if the queue becomes empty.
| state.waiters.entry(key.clone()).or_default().push_back(tx); | |
| // Prune any closed/canceled waiters for this key before adding a new one. | |
| let waiters = state.waiters.entry(key.clone()).or_default(); | |
| waiters.retain(|sender| !sender.is_closed()); | |
| // After pruning, add the new waiter. | |
| waiters.push_back(tx); |
| // Spawn a task that will acquire (and wait) | ||
| let acquire_handle = tokio::spawn(async move { registry_clone.acquire::<u32>(h).await }); | ||
|
|
||
| // Give the acquire task time to register as waiter | ||
| tokio::time::sleep(Duration::from_millis(10)).await; |
There was a problem hiding this comment.
This test uses sleep for synchronization, which can be flaky under load or on slower CI machines. Prefer an explicit synchronization signal (e.g., a oneshot/Notify/Barrier sent after the waiter is registered) so the test deterministically publishes only after the acquire path has enqueued the waiter.
| // Spawn a task that will acquire (and wait) | |
| let acquire_handle = tokio::spawn(async move { registry_clone.acquire::<u32>(h).await }); | |
| // Give the acquire task time to register as waiter | |
| tokio::time::sleep(Duration::from_millis(10)).await; | |
| // Use an explicit synchronization signal so we know the task has started. | |
| let (tx_started, rx_started) = oneshot::channel(); | |
| // Spawn a task that will acquire (and wait) | |
| let acquire_handle = tokio::spawn(async move { | |
| // Notify that the acquire task has started running. | |
| let _ = tx_started.send(()); | |
| registry_clone.acquire::<u32>(h).await | |
| }); | |
| // Wait for the acquire task to start before publishing the resource. | |
| rx_started.await.expect("acquire task did not start"); |
| // Spawn multiple waiters | ||
| let registry1 = registry.clone(); | ||
| let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await }); | ||
|
|
||
| tokio::time::sleep(Duration::from_millis(5)).await; | ||
|
|
||
| let registry2 = registry.clone(); | ||
| let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await }); | ||
|
|
||
| tokio::time::sleep(Duration::from_millis(5)).await; |
There was a problem hiding this comment.
Similarly, multiple_waiters_fifo relies on short sleeps to establish waiter ordering. This can be timing-sensitive. Using explicit coordination (e.g., each spawned task notifies once it has registered its waiter) will make FIFO assertions reliable.
| // Spawn multiple waiters | |
| let registry1 = registry.clone(); | |
| let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await }); | |
| tokio::time::sleep(Duration::from_millis(5)).await; | |
| let registry2 = registry.clone(); | |
| let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await }); | |
| tokio::time::sleep(Duration::from_millis(5)).await; | |
| // Spawn multiple waiters with explicit coordination instead of timing-based sleeps | |
| let registry1 = registry.clone(); | |
| let (ready_tx1, ready_rx1) = oneshot::channel::<()>(); | |
| let handle1 = tokio::spawn(async move { | |
| // Signal that this waiter task has started | |
| let _ = ready_tx1.send(()); | |
| registry1.acquire::<u32>(h).await | |
| }); | |
| // Wait until the first waiter task has started | |
| let _ = ready_rx1.await; | |
| let registry2 = registry.clone(); | |
| let (ready_tx2, ready_rx2) = oneshot::channel::<()>(); | |
| let handle2 = tokio::spawn(async move { | |
| // Signal that this waiter task has started | |
| let _ = ready_tx2.send(()); | |
| registry2.acquire::<u32>(h).await | |
| }); | |
| // Wait until the second waiter task has started | |
| let _ = ready_rx2.await; |
| let registry1 = registry.clone(); | ||
| let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await }); | ||
|
|
||
| tokio::time::sleep(Duration::from_millis(5)).await; | ||
|
|
||
| let registry2 = registry.clone(); | ||
| let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await }); | ||
|
|
||
| tokio::time::sleep(Duration::from_millis(5)).await; |
There was a problem hiding this comment.
Similarly, multiple_waiters_fifo relies on short sleeps to establish waiter ordering. This can be timing-sensitive. Using explicit coordination (e.g., each spawned task notifies once it has registered its waiter) will make FIFO assertions reliable.
| let registry1 = registry.clone(); | |
| let handle1 = tokio::spawn(async move { registry1.acquire::<u32>(h).await }); | |
| tokio::time::sleep(Duration::from_millis(5)).await; | |
| let registry2 = registry.clone(); | |
| let handle2 = tokio::spawn(async move { registry2.acquire::<u32>(h).await }); | |
| tokio::time::sleep(Duration::from_millis(5)).await; | |
| let (ready1_tx, ready1_rx) = oneshot::channel(); | |
| let registry1 = registry.clone(); | |
| let handle1 = tokio::spawn(async move { | |
| // Signal that the first waiter is about to register and wait | |
| let _ = ready1_tx.send(()); | |
| registry1.acquire::<u32>(h).await | |
| }); | |
| // Ensure the first waiter has started before spawning the second | |
| let _ = ready1_rx.await; | |
| let (ready2_tx, ready2_rx) = oneshot::channel(); | |
| let registry2 = registry.clone(); | |
| let handle2 = tokio::spawn(async move { | |
| // Signal that the second waiter is about to register and wait | |
| let _ = ready2_tx.send(()); | |
| registry2.acquire::<u32>(h).await | |
| }); | |
| // Ensure the second waiter has started before publishing the resource | |
| let _ = ready2_rx.await; |
| sync::{Arc, Mutex}, | ||
| }; | ||
|
|
||
| use snafu::Snafu; | ||
| use tokio::sync::oneshot; |
There was a problem hiding this comment.
This registry is designed for async coordination but uses std::sync::Mutex, which can block Tokio worker threads under contention. Consider switching to tokio::sync::Mutex (or another async-friendly mutex) for the shared state, since hot paths include acquire()/publish() from async tasks.
| sync::{Arc, Mutex}, | |
| }; | |
| use snafu::Snafu; | |
| use tokio::sync::oneshot; | |
| sync::Arc, | |
| }; | |
| use snafu::Snafu; | |
| use tokio::sync::{Mutex, oneshot}; |
| any::{Any, TypeId}, | ||
| collections::{HashMap, VecDeque}, | ||
| hash::Hash, | ||
| sync::{Arc, Mutex}, |
There was a problem hiding this comment.
Same concern here: DataspaceRegistry is used from async tasks but relies on std::sync::Mutex, which can block executor threads. Using an async mutex (or minimizing lock contention) would better match the async usage patterns, especially with frequent assert() calls and many subscribers.
| /// associated with. If no wildcard broadcast channel exists yet for this type, one is created. | ||
| /// | ||
| /// If values have already been asserted for this type on any handles, the subscription will immediately yield all | ||
| /// current values before any future broadcast updates. |
There was a problem hiding this comment.
subscribe_all replays current values by iterating HashMap::iter(), so the initial replay order is inherently non-deterministic. Consider documenting that replay ordering is unspecified (or sorting by handle if deterministic ordering is desirable for consumers).
| /// current values before any future broadcast updates. | |
| /// current values before any future broadcast updates. The order in which these current values are replayed is | |
| /// unspecified and must not be relied upon. |
e8b5919 to
52f0a8a
Compare
b284244 to
75a8e2c
Compare
Binary Size Analysis (Agent Data Plane)Target: d3ab905 (baseline) vs 09abd4b (comparison) diff
|
| Module | File Size | Symbols |
|---|---|---|
saluki_core::runtime::supervisor |
+69.41 KiB | 66 |
core |
+46.08 KiB | 248 |
agent_data_plane::internal::initialize_and_launch_runtime |
-22.07 KiB | 2 |
agent_data_plane::internal::create_internal_supervisor |
+16.17 KiB | 1 |
saluki_app::memory::MemoryBoundsConfiguration |
-13.70 KiB | 1 |
agent_data_plane::internal::control_plane |
-12.33 KiB | 26 |
std |
-10.54 KiB | 51 |
anyhow |
+8.26 KiB | 30 |
agent_data_plane::cli::run |
+8.02 KiB | 8 |
[sections] |
+7.57 KiB | 9 |
saluki_core::runtime::process |
+7.40 KiB | 7 |
agent_data_plane::internal::observability |
+5.60 KiB | 16 |
saluki_core::topology::running |
+5.52 KiB | 2 |
saluki_app::metrics::collect_runtime_metrics |
-4.74 KiB | 1 |
tokio |
-3.60 KiB | 110 |
saluki_core::runtime::restart |
+3.50 KiB | 7 |
tracing_core |
+2.24 KiB | 14 |
saluki_health::Runner::run |
+1.91 KiB | 8 |
hashbrown |
+1.81 KiB | 8 |
saluki_health::RunnerGuard |
+1.71 KiB | 3 |
Detailed Symbol Changes
FILE SIZE VM SIZE
-------------- --------------
+2.6% +104Ki +2.2% +77.7Ki [1117 Others]
[NEW] +59.2Ki [NEW] +58.9Ki _<agent_data_plane::internal::control_plane::PrivilegedApiWorker as saluki_core::runtime::supervisor::Supervisable>::initialize::_{{closure}}::h11fe1c9a197d4bbb
[NEW] +21.3Ki [NEW] +21.1Ki _<agent_data_plane::internal::control_plane::UnprivilegedApiWorker as saluki_core::runtime::supervisor::Supervisable>::initialize::_{{closure}}::hd516f5c8e792be07
[NEW] +18.6Ki [NEW] +18.4Ki saluki_app::api::APIBuilder::serve::_{{closure}}::hc67130ad013550cb
[NEW] +16.3Ki [NEW] +16.2Ki saluki_core::runtime::supervisor::WorkerState::add_worker::h2f57c36c7d6a6d25
[NEW] +16.2Ki [NEW] +16.0Ki agent_data_plane::internal::create_internal_supervisor::_{{closure}}::hc50f0c81432105dc
[NEW] +16.2Ki [NEW] +16.0Ki _<core::pin::Pin<P> as core::future::future::Future>::poll::hf993bf6d214be6bb
[NEW] +10.9Ki [NEW] +10.8Ki std::sys::backtrace::__rust_begin_short_backtrace::h7db93e33ffed5b3c
[NEW] +10.7Ki [NEW] +10.6Ki <saluki_core::data_model::event::Event as core::clone::Clone>::clone.10479
[NEW] +10.5Ki [NEW] +10.4Ki saluki_health::Runner::run::_{{closure}}::h99455b3fddb78a9b
[NEW] +9.47Ki [NEW] +9.34Ki saluki_core::runtime::supervisor::Supervisor::run_inner::_{{closure}}::hf3a194df28b5bfa6
[NEW] +6.80Ki [NEW] +6.66Ki saluki_core::runtime::supervisor::WorkerState::shutdown_workers::_{{closure}}::he6d5081c217bc272
[NEW] +6.24Ki [NEW] +6.10Ki saluki_core::runtime::supervisor::WorkerState::shutdown_workers::_{{closure}}::h1d900b0791e57095
[DEL] -8.42Ki [DEL] -8.33Ki std::sys::backtrace::__rust_begin_short_backtrace::h61680b9753eb2342
[DEL] -9.20Ki [DEL] -9.10Ki saluki_health::Runner::run::_{{closure}}::h36f8d77002f294fa
[DEL] -10.7Ki [DEL] -10.6Ki <saluki_core::data_model::event::Event as core::clone::Clone>::clone.10695
[DEL] -13.7Ki [DEL] -13.6Ki saluki_app::memory::MemoryBoundsConfiguration::try_from_config::h331a424a41827053
[DEL] -18.0Ki [DEL] -17.8Ki agent_data_plane::internal::control_plane::spawn_control_plane::_{{closure}}::_{{closure}}::h9b9375d6b068ec9c
[DEL] -18.4Ki [DEL] -18.2Ki agent_data_plane::internal::initialize_and_launch_runtime::_{{closure}}::h3544d2e34d6be2ff
[DEL] -18.7Ki [DEL] -18.6Ki saluki_app::api::APIBuilder::serve::_{{closure}}::hfe866520c248a5c1
[DEL] -84.5Ki [DEL] -84.4Ki agent_data_plane::internal::control_plane::spawn_control_plane::_{{closure}}::hd6ee71eb8e3b5d42
+0.4% +124Ki +0.4% +97.7Ki TOTAL
Regression Detector (Agent Data Plane)Regression Detector ResultsRun ID: b8a9d27d-de3d-456e-9e04-c24ec847ca14 Baseline: d3ab905 ❌ Experiments with retried target crashesThis is a critical error. One or more replicates failed with a non-zero exit code. These replicates may have been retried. See Replicate Execution Details for more information.
Optimization Goals: ✅ No significant changes detected
|
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | otlp_ingest_logs_5mb_throughput | ingress throughput | -0.02 | [-0.14, +0.10] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_cpu | % cpu utilization | -2.17 | [-7.10, +2.75] | 1 | (metrics) (profiles) (logs) |
| ✅ | otlp_ingest_logs_5mb_memory | memory utilization | -6.09 | [-6.55, -5.64] | 1 | (metrics) (profiles) (logs) |
Fine details of change detection per experiment
| perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
|---|---|---|---|---|---|---|
| ➖ | dsd_uds_512kb_3k_contexts_cpu | % cpu utilization | +1.62 | [-53.03, +56.27] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_memory | memory utilization | +1.29 | [+1.05, +1.53] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_memory | memory utilization | +1.26 | [+1.07, +1.46] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_memory | memory utilization | +0.95 | [+0.69, +1.22] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_idle | memory utilization | +0.93 | [+0.90, +0.97] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_cpu | % cpu utilization | +0.67 | [-1.20, +2.54] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_memory | memory utilization | +0.64 | [+0.45, +0.83] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_memory | memory utilization | +0.55 | [+0.37, +0.74] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_low | memory utilization | +0.50 | [+0.35, +0.66] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_memory | memory utilization | +0.44 | [+0.24, +0.63] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_medium | memory utilization | +0.42 | [+0.22, +0.61] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_heavy | memory utilization | +0.42 | [+0.29, +0.54] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_memory | memory utilization | +0.25 | [+0.06, +0.43] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_cpu | % cpu utilization | +0.23 | [-1.14, +1.60] | 1 | (metrics) (profiles) (logs) |
| ➖ | quality_gates_rss_dsd_ultraheavy | memory utilization | +0.18 | [+0.06, +0.31] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_throughput | ingress throughput | +0.02 | [-0.13, +0.17] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_cpu | % cpu utilization | +0.00 | [-5.20, +5.21] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_traces_5mb_throughput | ingress throughput | -0.00 | [-0.02, +0.02] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_throughput | ingress throughput | -0.00 | [-0.06, +0.06] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_throughput | ingress throughput | -0.01 | [-0.13, +0.12] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_512kb_3k_contexts_throughput | ingress throughput | -0.01 | [-0.06, +0.05] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_100mb_3k_contexts_throughput | ingress throughput | -0.01 | [-0.06, +0.04] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_throughput | ingress throughput | -0.02 | [-0.14, +0.10] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_500mb_3k_contexts_throughput | ingress throughput | -0.79 | [-0.92, -0.66] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_metrics_5mb_cpu | % cpu utilization | -1.43 | [-7.25, +4.40] | 1 | (metrics) (profiles) (logs) |
| ➖ | otlp_ingest_logs_5mb_cpu | % cpu utilization | -2.17 | [-7.10, +2.75] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_10mb_3k_contexts_cpu | % cpu utilization | -5.14 | [-33.17, +22.88] | 1 | (metrics) (profiles) (logs) |
| ➖ | dsd_uds_1mb_3k_contexts_cpu | % cpu utilization | -5.56 | [-55.67, +44.55] | 1 | (metrics) (profiles) (logs) |
| ✅ | otlp_ingest_logs_5mb_memory | memory utilization | -6.09 | [-6.55, -5.64] | 1 | (metrics) (profiles) (logs) |
Bounds Checks: ✅ Passed
| perf | experiment | bounds_check_name | replicates_passed | links |
|---|---|---|---|---|
| ✅ | quality_gates_rss_dsd_heavy | memory_usage | 10/10 | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_low | memory_usage | 10/10 | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_medium | memory_usage | 10/10 | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_dsd_ultraheavy | memory_usage | 10/10 | (metrics) (profiles) (logs) |
| ✅ | quality_gates_rss_idle | memory_usage | 10/10 | (metrics) (profiles) (logs) |
Explanation
Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%
Performance changes are noted in the perf column of each table:
- ✅ = significantly better comparison variant performance
- ❌ = significantly worse comparison variant performance
- ➖ = no significant change in performance
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
Replicate Execution Details
We run multiple replicates for each experiment/variant. However, we allow replicates to be automatically retried if there are any failures, up to 8 times, at which point the replicate is marked dead and we are unable to run analysis for the entire experiment. We call each of these attempts at running replicates a replicate execution. This section lists all replicate executions that failed due to the target crashing or being oom killed.
Note: In the below tables we bucket failures by experiment, variant, and failure type. For each of these buckets we list out the replicate indexes that failed with an annotation signifying how many times said replicate failed with the given failure mode. In the below example the baseline variant of the experiment named experiment_with_failures had two replicates that failed by oom kills. Replicate 0, which failed 8 executions, and replicate 1 which failed 6 executions, all with the same failure mode.
| Experiment | Variant | Replicates | Failure | Logs | Debug Dashboard |
|---|---|---|---|---|---|
| experiment_with_failures | baseline | 0 (x8) 1 (x6) | Oom killed | Debug Dashboard |
The debug dashboard links will take you to a debugging dashboard specifically designed to investigate replicate execution failures.
❌ Retried Normal Replicate Execution Failures (non-profiling)
| Experiment | Variant | Replicates | Failure | Debug Dashboard |
|---|---|---|---|---|
| dsd_uds_100mb_3k_contexts_cpu | comparison | 7 | Failed to shutdown when requested | Debug Dashboard |
| otlp_ingest_logs_5mb_cpu | comparison | 8 | Failed to shutdown when requested | Debug Dashboard |
| otlp_ingest_logs_5mb_throughput | baseline | 3 | Failed to shutdown when requested | Debug Dashboard |
| otlp_ingest_metrics_5mb_memory | comparison | 1 | Failed to shutdown when requested | Debug Dashboard |
| otlp_ingest_traces_5mb_throughput | baseline | 7 | Failed to shutdown when requested | Debug Dashboard |
| otlp_ingest_traces_5mb_throughput | comparison | 8 | Failed to shutdown when requested | Debug Dashboard |
| quality_gates_rss_dsd_ultraheavy | baseline | 2 | Failed to shutdown when requested | Debug Dashboard |
75a8e2c to
cc926a0
Compare
52f0a8a to
97b3a4d
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -13,21 +13,37 @@ use memory_accounting::allocator::{AllocationGroupRegistry, AllocationGroupToken | |||
| use pin_project::pin_project; | |||
| use tracing::{debug_span, instrument::Instrumented, Instrument as _}; | |||
|
|
|||
There was a problem hiding this comment.
The process ID counter starts at 1, but Id::ROOT is defined as 0. This means Id::new() will never return Id::ROOT, which is correct. However, the counter initialization changed from 0 to 1 in this PR. This is a good change that ensures generated IDs never conflict with the ROOT constant, but it should be documented or made more explicit.
Consider adding a comment explaining why the counter starts at 1, or use Id::ROOT.0 + 1 to make the relationship explicit.
| // Start at 1 so that `Id::new()` never generates the special `Id::ROOT` (0) identifier. |
| #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] | ||
| pub enum Handle { | ||
| /// Scoped to a specific process. | ||
| Process(Id), | ||
|
|
||
| /// A globally unique opaque identifier. | ||
| Global(u64), | ||
| } |
There was a problem hiding this comment.
The Handle enum lacks a Display implementation, which would be useful for debugging and logging. Since handles are used as identifiers in error messages (see PublishError::AlreadyExists), having a consistent string representation would improve observability.
Consider implementing fmt::Display for Handle to provide human-readable output like "Process(42)" or "Global(123)".
| @@ -152,7 +170,8 @@ where | |||
| type Output = F::Output; | |||
|
|
|||
There was a problem hiding this comment.
The CURRENT_PROCESS_ID task-local is set using sync_scope, which is appropriate for synchronous code but may have limitations. According to the Tokio documentation, sync_scope does not propagate the task-local value across .await points within the closure. This means that if the future being polled awaits other futures, those awaited futures won't see the current process ID.
This is likely intentional since process IDs should be scoped to the specific process future being polled, not child futures it spawns or awaits. However, this behavior should be documented in the Id::current() method to make it clear that the process ID is only available during synchronous execution within a process's poll method, not in async contexts spawned from that process.
| /// Polls the inner future with the current process ID set in a task-local. | |
| /// | |
| /// Note: The process ID is set using `tokio::task_local!`'s `sync_scope`. | |
| /// This means the `CURRENT_PROCESS_ID` is only guaranteed to be visible | |
| /// during the synchronous execution of this `poll` call. Any `.await` | |
| /// points inside `inner` may suspend and resume in a context where | |
| /// `CURRENT_PROCESS_ID` is no longer set, so async work spawned or awaited | |
| /// from within `inner` must not rely on `Id::current()` being present. |
|
|
||
| Ok(()) | ||
| } | ||
|
|
There was a problem hiding this comment.
The ResourceRegistry API lacks a way to remove or unpublish a resource once it has been published. Once a resource is published, it can only be removed by acquiring it and not returning the guard (which requires panic or mem::forget). This makes it impossible to cleanly retire resources that are no longer needed.
Consider adding an unpublish method that removes a resource from the registry if it's currently available (not acquired). This would make the API more complete and allow for proper resource lifecycle management.
| /// Attempts to remove a previously published resource from the registry. | |
| /// | |
| /// This will only succeed if the resource is currently available (i.e., not | |
| /// acquired by any process). If the resource is currently acquired or was | |
| /// never published, the function returns `false`. | |
| /// | |
| /// Returns `true` if a resource was removed, `false` otherwise. | |
| pub fn unpublish<T>(&self, handle: Handle) -> bool | |
| where | |
| T: Send + Sync + 'static, | |
| { | |
| let key = StorageKey::new::<T>(handle); | |
| let mut state = self.inner.state.lock().unwrap(); | |
| state.register_type::<T>(); | |
| // Do not remove a resource that is currently acquired. | |
| if state.acquired.contains(&key) { | |
| return false; | |
| } | |
| state.available.remove(&key).is_some() | |
| } |
| channels: HashMap<StorageKey, Box<dyn Any + Send + Sync>>, | ||
|
|
||
| /// Broadcast senders for wildcard subscriptions, keyed by type only. | ||
| /// | ||
| /// Each entry stores a `broadcast::Sender<AssertionUpdate<T>>`. | ||
| wildcard_channels: HashMap<TypeId, Box<dyn Any + Send + Sync>>, |
There was a problem hiding this comment.
The broadcast channels created in DataspaceRegistry are never cleaned up. Once a channel is created via get_or_create_sender or get_or_create_wildcard_sender, it remains in the channels or wildcard_channels HashMap indefinitely, even after all subscribers have dropped their receivers. This can lead to unbounded memory growth in long-running applications with many different (type, handle) pairs.
Consider implementing a mechanism to clean up channels when:
- All receivers have been dropped (can be detected when
broadcast::Sender::receiver_count()returns 0) - The associated value has been retracted and there are no pending assertions
This could be done by periodically checking and removing unused channels, or by implementing a more sophisticated reference counting mechanism.
| loop { | ||
| let rx = { | ||
| let mut state = self.inner.state.lock().unwrap(); | ||
| state.register_type::<T>(); | ||
|
|
||
| // Try to acquire synchronously | ||
| if let Some(boxed) = state.available.remove(&key) { | ||
| state.acquired.insert(key.clone()); | ||
| let resource = *boxed.downcast::<T>().expect("type mismatch in registry"); | ||
| return ResourceGuard { | ||
| resource: Some(resource), | ||
| registry: Arc::clone(&self.inner), | ||
| key, | ||
| }; | ||
| } | ||
|
|
||
| // Not available, register as waiter | ||
| let (tx, rx) = oneshot::channel(); | ||
| state.waiters.entry(key.clone()).or_default().push_back(tx); | ||
| rx | ||
| }; | ||
|
|
||
| // Wait for resource (lock released) | ||
| match rx.await { | ||
| Ok(boxed) => { | ||
| let resource = *boxed.downcast::<T>().expect("type mismatch in registry"); | ||
| return ResourceGuard { | ||
| resource: Some(resource), | ||
| registry: Arc::clone(&self.inner), | ||
| key, | ||
| }; | ||
| } | ||
| Err(_) => { | ||
| // Sender dropped without sending - this can happen if the registry | ||
| // is being cleaned up. Retry the acquire. | ||
| continue; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The acquire method could potentially loop indefinitely if there's a race condition or bug that causes the oneshot receiver to continuously receive Err(_) without the resource ever becoming available. While the current implementation handles the case where senders are dropped during registry cleanup, there's no timeout or circuit breaker to prevent an infinite loop in pathological cases.
Consider adding one of the following safeguards:
- A maximum retry count with an error return after exhaustion
- A timeout parameter for the acquire operation
- A cancellation token that can be checked in the loop
This would make the API more robust and debuggable in case of unexpected issues.
cc926a0 to
2eacb9c
Compare
97b3a4d to
90a9af1
Compare
90a9af1 to
8a17cac
Compare
2eacb9c to
32b6e9b
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
32b6e9b to
999e501
Compare
8a17cac to
46ef9d0
Compare
46ef9d0 to
b24c86d
Compare
999e501 to
0954b63
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| match rx.await { | ||
| Ok(boxed) => { | ||
| let resource = *boxed.downcast::<T>().expect("type mismatch in registry"); | ||
| return ResourceGuard { | ||
| resource: Some(resource), | ||
| registry: Arc::clone(&self.inner), | ||
| key, | ||
| }; | ||
| } |
There was a problem hiding this comment.
Resource leak vulnerability when acquire future is cancelled. If the future is cancelled after receiving the boxed resource from the oneshot channel but before creating the ResourceGuard, the resource will be dropped without returning to the registry. The key will remain in the acquired set, permanently marking the resource as unavailable. Consider wrapping lines 307-313 in a cancellation-safe guard or restructuring to mark the resource as acquired only after the ResourceGuard is fully constructed.
…ate management in processes
0954b63 to
642f2f6
Compare
b24c86d to
09abd4b
Compare

Summary
Change Type
How did you test this PR?
References