feat: implement StreamManager for gRPC subscription management#985
feat: implement StreamManager for gRPC subscription management#985
Conversation
Amp-Thread-ID: https://ampcode.com/threads/T-019c330f-5ffe-720e-a1d5-c702b78de081 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c330f-e8f6-72c0-9e0e-93bbc1f1b9c2 Co-authored-by: Amp <amp@ampcode.com>
…scriptions Amp-Thread-ID: https://ampcode.com/threads/T-019c3702-6f33-74eb-89d9-fad10aac35ed Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c3830-1c06-7406-831c-fe2af2fd8314 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c3838-288d-732e-bcf0-bafe984b350b Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c419c-5b98-774b-b9d6-40733b964154 Co-authored-by: Amp <amp@ampcode.com>
* master: chore: add subscription activation and per-program metrics (#929)
Amp-Thread-ID: https://ampcode.com/threads/T-019c5080-01ee-71fa-aa34-97264f3a0900 Co-authored-by: Amp <amp@ampcode.com>
Amp-Thread-ID: https://ampcode.com/threads/T-019c6553-680d-75f8-9fae-9b898cdaba19 Co-authored-by: Amp <amp@ampcode.com>
Manual Deploy AvailableYou can trigger a manual deploy of this PR branch to testnet: Alternative: Comment
Comment updated automatically when the PR is synchronized. |
* master: hotfix: integration tests (#983) hotfix: integration tests (#982) chore: revert offset change (#981) fix: stop setting loaded accounts data size limit (#980) Fix: flaky ledger reset integration test (#977) Feat: tui interface for the validator (#972) fix: slack ready-for-review reviewer display (#978) fix: SetLoadedAccountsDataSize (#976) feat: add call_handler_v2 support for ScheduleIntentBundle actions (#946) feat: implement ephemeral accounts (#915) fix: prevent overrides on subscription updates (#970) fix: ignore error on delete in fat truncation. Case when the disk is … (#924)
📝 WalkthroughWalkthroughThis pull request refactors the Assessment against linked issues
Suggested reviewers
✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs`:
- Around line 294-308: The two branches in the ProgramSubscribe handler
duplicate the same response send and warning logic; replace the if/else with a
single send of the operation result: after calling
self.stream_manager.add_program_subscription(pubkey, &self.commitment).await
store it in result and call response.send(result) once, attaching the same
.inspect_err closure that logs client_id and program_id (pubkey) on send
failure; remove the duplicated if let Err / else branches around response.send
while keeping the rest of the handler (including returning false).
- Around line 338-377: The current add_sub implementation calls
determine_from_slot (which atomically swaps last_activation_slot) before
awaiting stream_manager.account_subscribe, causing last_activation_slot to
advance even if the subscribe fails; to fix, change the flow so
determine_from_slot does not perform the atomic swap until after
account_subscribe returns Ok — either by splitting determine_from_slot into a
read-only getter and a separate commit step or by deferring the swap/assignment
of last_activation_slot until after stream_manager.account_subscribe(&[pubkey],
&self.commitment, from_slot).await succeeds (refer to add_sub,
determine_from_slot, last_activation_slot, and stream_manager.account_subscribe
to locate the code).
In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`:
- Around line 170-174: In stream_manager.rs update the error formatting for
RemoteAccountProviderError::GrpcSubscriptionUpdateFailed so the formatted string
properly closes the parenthesis; replace the malformed format call that
currently uses format!("{err} ({err:?}") with one that includes the missing ')'
(e.g. format!("{err} ({err:?})") so the error message shows both display and
debug forms correctly for the variables used (task, MAX_RETRIES, err).
- Around line 507-512: The pattern in the first conditional binds `subs` but
never uses it; change the destructuring there to ignore the first element (e.g.
match Some((_, handle)) on self.program_sub) so the compiler warning goes away
while leaving the subsequent mutable `if let Some((subs, _)) = &mut
self.program_sub` intact to assign `*subs = subscribed_programs`; update the `if
let` that calls `Self::update_subscriptions(handle, "program_subscribe",
request).await?;` accordingly.
- Around line 148-178: The backoff in update_subscriptions is currently linear
(backoff_ms = 50ms * attempt) but the PR claims exponential; change the backoff
calculation in update_subscriptions to exponential backoff: compute an attempt
index (e.g., attempt = initial_retries - retries or 0-based attempt) and set
backoff_ms = base_ms * 2u64.pow(attempt) (with base_ms = 50), and use
Duration::from_millis(backoff_ms) before sleep; keep MAX_RETRIES, retries, and
the retry loop logic intact and ensure backoff_ms uses saturating/checked
multiplication to avoid overflow.
- Around line 238-268: The promotion logic only runs once, so a very large
single batch can seed current_new_subs with more than config.max_subs_in_new
(via overflow_pks and insert_current_new_stream), leaving current_new_subs above
the threshold; update the logic around current_new_subs/overflow_pks (and the
block that moves StreamKey::CurrentNew to StreamKey::UnoptimizedOld and pushes
into unoptimized_old_handles) to loop until current_new_subs.len() <=
config.max_subs_in_new (i.e., repeatedly compute overflow_start/overflow_pks,
promote the current_new stream and handle into unoptimized_old, clear
current_new_subs, repopulate with the next overflow_pks and call
insert_current_new_stream) so the threshold is strictly enforced within a single
account_subscribe call, or alternatively add a clear comment by the
current_new_subs/overflow_pks code explaining that the limit is a soft limit and
may be temporarily exceeded by large single-batch subscribes.
| ProgramSubscribe { pubkey, response } => { | ||
| let commitment = self.commitment; | ||
| let laser_client_config = self.laser_client_config.clone(); | ||
| self.add_program_sub(pubkey, commitment, laser_client_config); | ||
| let _ = response.send(Ok(())).inspect_err(|_| { | ||
| warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); | ||
| }); | ||
| let result = self | ||
| .stream_manager | ||
| .add_program_subscription(pubkey, &self.commitment) | ||
| .await; | ||
| if let Err(e) = result { | ||
| let _ = response.send(Err(e)).inspect_err(|_| { | ||
| warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); | ||
| }); | ||
| } else { | ||
| let _ = response.send(Ok(())).inspect_err(|_| { | ||
| warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); | ||
| }); | ||
| }; | ||
| false |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider simplifying the ProgramSubscribe response path.
The if let Err / else branches both send the result and log the same warning on failure. This can be collapsed.
Proposed simplification
ProgramSubscribe { pubkey, response } => {
- let result = self
- .stream_manager
- .add_program_subscription(pubkey, &self.commitment)
- .await;
- if let Err(e) = result {
- let _ = response.send(Err(e)).inspect_err(|_| {
- warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
- });
- } else {
- let _ = response.send(Ok(())).inspect_err(|_| {
- warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
- });
- };
+ let result = self
+ .stream_manager
+ .add_program_subscription(pubkey, &self.commitment)
+ .await;
+ let _ = response.send(result).inspect_err(|_| {
+ warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
+ });
false
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ProgramSubscribe { pubkey, response } => { | |
| let commitment = self.commitment; | |
| let laser_client_config = self.laser_client_config.clone(); | |
| self.add_program_sub(pubkey, commitment, laser_client_config); | |
| let _ = response.send(Ok(())).inspect_err(|_| { | |
| warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); | |
| }); | |
| let result = self | |
| .stream_manager | |
| .add_program_subscription(pubkey, &self.commitment) | |
| .await; | |
| if let Err(e) = result { | |
| let _ = response.send(Err(e)).inspect_err(|_| { | |
| warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); | |
| }); | |
| } else { | |
| let _ = response.send(Ok(())).inspect_err(|_| { | |
| warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); | |
| }); | |
| }; | |
| false | |
| ProgramSubscribe { pubkey, response } => { | |
| let result = self | |
| .stream_manager | |
| .add_program_subscription(pubkey, &self.commitment) | |
| .await; | |
| let _ = response.send(result).inspect_err(|_| { | |
| warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response"); | |
| }); | |
| false |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs`
around lines 294 - 308, The two branches in the ProgramSubscribe handler
duplicate the same response send and warning logic; replace the if/else with a
single send of the operation result: after calling
self.stream_manager.add_program_subscription(pubkey, &self.commitment).await
store it in result and call response.send(result) once, attaching the same
.inspect_err closure that logs client_id and program_id (pubkey) on send
failure; remove the duplicated if let Err / else branches around response.send
while keeping the rest of the handler (including returning false).
| async fn add_sub( | ||
| &mut self, | ||
| pubkey: Pubkey, | ||
| sub_response: oneshot::Sender<RemoteAccountProviderResult<()>>, | ||
| ) { | ||
| let inserted = { | ||
| // Fast path: check with read lock first | ||
| let already_subscribed = { | ||
| let subs = self.subscriptions.read(); | ||
| subs.contains(&pubkey) | ||
| }; | ||
|
|
||
| if already_subscribed { | ||
| false | ||
| } else { | ||
| // Write lock only when we need to modify | ||
| let mut subs = self.subscriptions.write(); | ||
| subs.insert(pubkey); | ||
| true | ||
| } | ||
| }; | ||
| if !inserted { | ||
| trace!(pubkey = %pubkey, "Already subscribed to account"); | ||
| if self.stream_manager.is_subscribed(&pubkey) { | ||
| debug!( | ||
| pubkey = %pubkey, | ||
| "Already subscribed to account" | ||
| ); | ||
| sub_response.send(Ok(())).unwrap_or_else(|_| { | ||
| warn!(pubkey = %pubkey, "Failed to send already subscribed response"); | ||
| }); | ||
| } else { | ||
| if self.active_subscriptions.is_empty() { | ||
| self.update_active_subscriptions(); | ||
| } | ||
| sub_response.send(Ok(())).unwrap_or_else(|_| { | ||
| warn!(pubkey = %pubkey, "Failed to send subscribe response"); | ||
| }) | ||
| return; | ||
| } | ||
|
|
||
| let from_slot = self.determine_from_slot().map(|(_, fs)| fs); | ||
| let result = self | ||
| .stream_manager | ||
| .account_subscribe(&[pubkey], &self.commitment, from_slot) | ||
| .await; | ||
|
|
||
| let response = match result { | ||
| Ok(()) => Ok(()), | ||
| Err(e) => { | ||
| error!( | ||
| pubkey = %pubkey, | ||
| error = ?e, | ||
| "Failed to subscribe to account" | ||
| ); | ||
| Err(e) | ||
| } | ||
| }; | ||
| sub_response.send(response).unwrap_or_else(|_| { | ||
| warn!( | ||
| pubkey = %pubkey, | ||
| "Failed to send subscribe response" | ||
| ); | ||
| }); | ||
| } |
There was a problem hiding this comment.
add_sub updates last_activation_slot even if subscription fails.
determine_from_slot() (line 354) atomically swaps last_activation_slot before account_subscribe is attempted. If the subscribe call on line 355-358 fails, the slot has already advanced, which could cause a subsequent successful subscribe to miss backfill for the skipped interval. Consider moving the slot swap to after a successful subscribe, or accepting this as a known trade-off.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs`
around lines 338 - 377, The current add_sub implementation calls
determine_from_slot (which atomically swaps last_activation_slot) before
awaiting stream_manager.account_subscribe, causing last_activation_slot to
advance even if the subscribe fails; to fix, change the flow so
determine_from_slot does not perform the atomic swap until after
account_subscribe returns Ok — either by splitting determine_from_slot into a
read-only getter and a separate commit step or by deferring the swap/assignment
of last_activation_slot until after stream_manager.account_subscribe(&[pubkey],
&self.commitment, from_slot).await succeeds (refer to add_sub,
determine_from_slot, last_activation_slot, and stream_manager.account_subscribe
to locate the code).
| async fn update_subscriptions( | ||
| handle: &S, | ||
| task: &str, | ||
| request: SubscribeRequest, | ||
| ) -> RemoteAccountProviderResult<()> { | ||
| const MAX_RETRIES: usize = 5; | ||
| let mut retries = MAX_RETRIES; | ||
| let initial_retries = retries; | ||
|
|
||
| loop { | ||
| match handle.write(request.clone()).await { | ||
| Ok(()) => return Ok(()), | ||
| Err(err) => { | ||
| if retries > 0 { | ||
| retries -= 1; | ||
| // Linear backoff: sleep longer as retries decrease | ||
| let backoff_ms = | ||
| 50u64 * (initial_retries - retries) as u64; | ||
| tokio::time::sleep(Duration::from_millis(backoff_ms)) | ||
| .await; | ||
| continue; | ||
| } | ||
| return Err(RemoteAccountProviderError::GrpcSubscriptionUpdateFailed( | ||
| task.to_string(), | ||
| MAX_RETRIES, | ||
| format!("{err} ({err:?}"), | ||
| )); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Backoff is linear, not exponential as stated in PR objectives.
The doc comment accurately says "linear backoff" (50ms × attempt number), but the PR description claims "exponential backoff." If exponential was intended, the backoff calculation needs adjustment. If linear is the actual intent, no code change is needed — just a PR description correction.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`
around lines 148 - 178, The backoff in update_subscriptions is currently linear
(backoff_ms = 50ms * attempt) but the PR claims exponential; change the backoff
calculation in update_subscriptions to exponential backoff: compute an attempt
index (e.g., attempt = initial_retries - retries or 0-based attempt) and set
backoff_ms = base_ms * 2u64.pow(attempt) (with base_ms = 50), and use
Duration::from_millis(backoff_ms) before sleep; keep MAX_RETRIES, retries, and
the retry loop logic intact and ensure backoff_ms uses saturating/checked
multiplication to avoid overflow.
| return Err(RemoteAccountProviderError::GrpcSubscriptionUpdateFailed( | ||
| task.to_string(), | ||
| MAX_RETRIES, | ||
| format!("{err} ({err:?}"), | ||
| )); |
There was a problem hiding this comment.
Missing closing parenthesis in error format string.
Line 173: format!("{err} ({err:?}") is missing a ) before the closing ", producing malformed error messages like "some error (SomeError { ... }".
Proposed fix
- format!("{err} ({err:?}"),
+ format!("{err} ({err:?})"),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return Err(RemoteAccountProviderError::GrpcSubscriptionUpdateFailed( | |
| task.to_string(), | |
| MAX_RETRIES, | |
| format!("{err} ({err:?}"), | |
| )); | |
| return Err(RemoteAccountProviderError::GrpcSubscriptionUpdateFailed( | |
| task.to_string(), | |
| MAX_RETRIES, | |
| format!("{err} ({err:?})"), | |
| )); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`
around lines 170 - 174, In stream_manager.rs update the error formatting for
RemoteAccountProviderError::GrpcSubscriptionUpdateFailed so the formatted string
properly closes the parenthesis; replace the malformed format call that
currently uses format!("{err} ({err:?}") with one that includes the missing ')'
(e.g. format!("{err} ({err:?})") so the error message shows both display and
debug forms correctly for the variables used (task, MAX_RETRIES, err).
| // Promote if current-new exceeds threshold. | ||
| if self.current_new_subs.len() > self.config.max_subs_in_new { | ||
| let overflow_count = | ||
| self.current_new_subs.len() - self.config.max_subs_in_new; | ||
| // The overflow pubkeys are the tail of new_pks. | ||
| let overflow_start = new_pks.len().saturating_sub(overflow_count); | ||
| let overflow_pks = &new_pks[overflow_start..]; | ||
|
|
||
| // Move current-new to unoptimized old. | ||
| if let Some(stream) = self.stream_map.remove(&StreamKey::CurrentNew) | ||
| { | ||
| let idx = self.unoptimized_old_handles.len(); | ||
| self.stream_map | ||
| .insert(StreamKey::UnoptimizedOld(idx), stream); | ||
| } | ||
| if let Some(handle) = self.current_new_handle.take() { | ||
| self.unoptimized_old_handles.push(handle); | ||
| } | ||
| self.current_new_subs.clear(); | ||
|
|
||
| // Start fresh current-new with overflow pubkeys. | ||
| if !overflow_pks.is_empty() { | ||
| for pk in overflow_pks { | ||
| self.current_new_subs.insert(*pk); | ||
| } | ||
| self.insert_current_new_stream( | ||
| &overflow_pks.iter().collect::<Vec<_>>(), | ||
| commitment, | ||
| from_slot, | ||
| ); | ||
| } |
There was a problem hiding this comment.
Large single-batch subscribe can leave current_new_subs above threshold.
If a single call provides more than 2 × max_subs_in_new pubkeys (e.g., 12 with max=5), the overflow (7 pubkeys) written into the fresh current-new stream already exceeds the threshold. The promotion check doesn't re-run within the same call, so the overshoot persists until the next account_subscribe invocation. This is likely acceptable as a soft limit, but worth documenting or adding a while loop if strict enforcement is desired.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`
around lines 238 - 268, The promotion logic only runs once, so a very large
single batch can seed current_new_subs with more than config.max_subs_in_new
(via overflow_pks and insert_current_new_stream), leaving current_new_subs above
the threshold; update the logic around current_new_subs/overflow_pks (and the
block that moves StreamKey::CurrentNew to StreamKey::UnoptimizedOld and pushes
into unoptimized_old_handles) to loop until current_new_subs.len() <=
config.max_subs_in_new (i.e., repeatedly compute overflow_start/overflow_pks,
promote the current_new stream and handle into unoptimized_old, clear
current_new_subs, repopulate with the next overflow_pks and call
insert_current_new_stream) so the threshold is strictly enforced within a single
account_subscribe call, or alternatively add a clear comment by the
current_new_subs/overflow_pks code explaining that the limit is a soft limit and
may be temporarily exceeded by large single-batch subscribes.
| if let Some((subs, handle)) = &self.program_sub { | ||
| Self::update_subscriptions(handle, "program_subscribe", request) | ||
| .await?; | ||
| if let Some((subs, _)) = &mut self.program_sub { | ||
| *subs = subscribed_programs; | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Unused binding subs in pattern.
On line 507, subs is bound but never read inside the if let body. Use _ to clarify intent.
Proposed fix
- if let Some((subs, handle)) = &self.program_sub {
+ if let Some((_, handle)) = &self.program_sub {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if let Some((subs, handle)) = &self.program_sub { | |
| Self::update_subscriptions(handle, "program_subscribe", request) | |
| .await?; | |
| if let Some((subs, _)) = &mut self.program_sub { | |
| *subs = subscribed_programs; | |
| } | |
| if let Some((_, handle)) = &self.program_sub { | |
| Self::update_subscriptions(handle, "program_subscribe", request) | |
| .await?; | |
| if let Some((subs, _)) = &mut self.program_sub { | |
| *subs = subscribed_programs; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`
around lines 507 - 512, The pattern in the first conditional binds `subs` but
never uses it; change the destructuring there to ignore the first element (e.g.
match Some((_, handle)) on self.program_sub) so the compiler warning goes away
while leaving the subsequent mutable `if let Some((subs, _)) = &mut
self.program_sub` intact to assign `*subs = subscribed_programs`; update the `if
let` that calls `Self::update_subscriptions(handle, "program_subscribe",
request).await?;` accordingly.
Summary
Implements a comprehensive StreamManager to manage multiple gRPC account and program
subscription streams with automatic optimization.
The solution handles subscription lifecycle, stream promotion/optimization when limits are exceeded, and unified
polling of updates from all streams.
CLOSES: #949
Details
The PR introduces the
StreamManagerstruct which separates stream management logic from the chain laser actor. This enables better testability and cleaner separation of concerns.Key Features
Subscription Tracking: Maintains a canonical set of active account subscriptions across multiple stream generations using a shared
SharedSubscriptionsHashSet.Stream Generational Management: Organizes account streams into three generations:
max_subs_in_newthresholdmax_subs_in_old_optimizedsized groupsAutomatic Optimization: When unoptimized old streams exceed
max_old_unoptimizedlimit, all account subscriptions are rebuilt into optimized chunks.Update Subscription Retry Logic: Implements exponential backoff (up to 5 retries) when writing subscription updates to stream handles fails.
Unified Stream Polling: All streams (account and program) are maintained in a
StreamMapand polled together vianext_update()which returns updates tagged with their source type.Program Subscriptions: Independent program subscription management that can coexist with account subscriptions.
Testing
Comprehensive test suite covering:
Error Handling
Added
GrpcSubscriptionUpdateFailederror variant to handle gRPC subscription update failures with retry context.Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Architecture Improvements