Skip to content

feat: implement StreamManager for gRPC subscription management#985

Open
thlorenz wants to merge 75 commits intomasterfrom
thlorenz/grpc-generational
Open

feat: implement StreamManager for gRPC subscription management#985
thlorenz wants to merge 75 commits intomasterfrom
thlorenz/grpc-generational

Conversation

@thlorenz
Copy link
Collaborator

@thlorenz thlorenz commented Feb 19, 2026

Summary

Implements a comprehensive StreamManager to manage multiple gRPC account and program
subscription streams with automatic optimization.
The solution handles subscription lifecycle, stream promotion/optimization when limits are exceeded, and unified
polling of updates from all streams.

CLOSES: #949

Details

The PR introduces the StreamManager struct which separates stream management logic from the chain laser actor. This enables better testability and cleaner separation of concerns.

Key Features

Subscription Tracking: Maintains a canonical set of active account subscriptions across multiple stream generations using a shared SharedSubscriptions HashSet.

Stream Generational Management: Organizes account streams into three generations:

  • current-new stream: Collects new subscriptions up to max_subs_in_new threshold
  • unoptimized old streams: Streams that accumulate when current-new is promoted
  • optimized old streams: Results of optimization that chunks subscriptions into max_subs_in_old_optimized sized groups

Automatic Optimization: When unoptimized old streams exceed max_old_unoptimized limit, all account subscriptions are rebuilt into optimized chunks.

Update Subscription Retry Logic: Implements exponential backoff (up to 5 retries) when writing subscription updates to stream handles fails.

Unified Stream Polling: All streams (account and program) are maintained in a StreamMap and polled together via next_update() which returns updates tagged with their source type.

Program Subscriptions: Independent program subscription management that can coexist with account subscriptions.

Testing

Comprehensive test suite covering:

  • Subscription tracking and deduplication
  • Current-new stream lifecycle and promotion behavior
  • Optimization triggering and execution
  • Unsubscribe operations (subscription set only, streams unmodified)
  • Stream counting across generations
  • Factory interaction verification
  • from_slot parameter forwarding
  • Mixed account and program stream updates

Error Handling

Added GrpcSubscriptionUpdateFailed error variant to handle gRPC subscription update failures with retry context.

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced blockchain account and program subscription management with improved stream optimization and lifecycle handling.
  • Bug Fixes

    • Improved error reporting for failed subscription updates with detailed retry information.
  • Architecture Improvements

    • Refactored subscription tracking for better testability and maintainability.

thlorenz and others added 30 commits February 6, 2026 12:36
* master:
  chore: add subscription activation and per-program metrics (#929)
thlorenz and others added 26 commits February 16, 2026 14:12
…ational

* master:
  chore: improve subscription reconciler (#945)
  feat: implement pubsub connection pooling (#931)
  fix: project ata from eata delegation update (#963)
@github-actions
Copy link

github-actions bot commented Feb 19, 2026

Manual Deploy Available

You can trigger a manual deploy of this PR branch to testnet:

Deploy to Testnet 🚀

Alternative: Comment /deploy on this PR to trigger deployment directly.

⚠️ Note: Manual deploy requires authorization. Only authorized users can trigger deployments.

Comment updated automatically when the PR is synchronized.

* master:
  hotfix: integration tests (#983)
  hotfix: integration tests (#982)
  chore: revert offset change (#981)
  fix: stop setting loaded accounts data size limit (#980)
  Fix: flaky ledger reset integration test (#977)
  Feat: tui interface for the validator (#972)
  fix: slack ready-for-review reviewer display (#978)
  fix: SetLoadedAccountsDataSize (#976)
  feat: add call_handler_v2 support for ScheduleIntentBundle actions (#946)
  feat: implement ephemeral accounts (#915)
  fix: prevent overrides on subscription updates (#970)
  fix: ignore error on delete in fat truncation. Case when the disk is … (#924)
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 19, 2026

📝 Walkthrough

Walkthrough

This pull request refactors the ChainLaserActor to implement a generational subscription model for GRPC laser streams. The changes introduce a StreamManager abstraction to handle multi-generation stream management, replacing previous in-process subscription tracking. The actor is genericized to accept injectable stream factories and handles, enabling dependency injection for testability. Core modifications include immediate subscription activation to streams, dynamic stream promotion from new to old generations based on subscription count thresholds, and periodic optimization to consolidate streams. Supporting infrastructure includes new trait abstractions (StreamFactory, StreamHandle), mock implementations for testing, and expanded error handling.

Assessment against linked issues

Objective Addressed Explanation
Implement generational subscription approach with current new stream, unoptimized old streams, and optimized old streams [#949]
Immediately activate subscriptions when requested instead of deferring activation [#949]
Implement stream optimization logic to consolidate subscriptions into chunks [#949]
Apply generational approach to account subscriptions and keep program subscriptions in separate single stream [#949]
Maintain subscriptions HashSet for tracking active subscriptions independently from streams [#949]
Implement configurable limits for stream management (MAX_SUBS_IN_OLD_OPTIMIZED, MAX_OLD_UNOPTIMIZED, MAX_SUBS_IN_NEW) [#949]

Suggested reviewers

  • Reviewer with expertise in subscription lifecycle management and GRPC stream handling
  • Reviewer familiar with multi-generational resource management patterns
  • Reviewer experienced with Rust async/await patterns and trait abstractions
✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch thlorenz/grpc-generational

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs`:
- Around line 294-308: The two branches in the ProgramSubscribe handler
duplicate the same response send and warning logic; replace the if/else with a
single send of the operation result: after calling
self.stream_manager.add_program_subscription(pubkey, &self.commitment).await
store it in result and call response.send(result) once, attaching the same
.inspect_err closure that logs client_id and program_id (pubkey) on send
failure; remove the duplicated if let Err / else branches around response.send
while keeping the rest of the handler (including returning false).
- Around line 338-377: The current add_sub implementation calls
determine_from_slot (which atomically swaps last_activation_slot) before
awaiting stream_manager.account_subscribe, causing last_activation_slot to
advance even if the subscribe fails; to fix, change the flow so
determine_from_slot does not perform the atomic swap until after
account_subscribe returns Ok — either by splitting determine_from_slot into a
read-only getter and a separate commit step or by deferring the swap/assignment
of last_activation_slot until after stream_manager.account_subscribe(&[pubkey],
&self.commitment, from_slot).await succeeds (refer to add_sub,
determine_from_slot, last_activation_slot, and stream_manager.account_subscribe
to locate the code).

In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`:
- Around line 170-174: In stream_manager.rs update the error formatting for
RemoteAccountProviderError::GrpcSubscriptionUpdateFailed so the formatted string
properly closes the parenthesis; replace the malformed format call that
currently uses format!("{err} ({err:?}") with one that includes the missing ')'
(e.g. format!("{err} ({err:?})") so the error message shows both display and
debug forms correctly for the variables used (task, MAX_RETRIES, err).
- Around line 507-512: The pattern in the first conditional binds `subs` but
never uses it; change the destructuring there to ignore the first element (e.g.
match Some((_, handle)) on self.program_sub) so the compiler warning goes away
while leaving the subsequent mutable `if let Some((subs, _)) = &mut
self.program_sub` intact to assign `*subs = subscribed_programs`; update the `if
let` that calls `Self::update_subscriptions(handle, "program_subscribe",
request).await?;` accordingly.
- Around line 148-178: The backoff in update_subscriptions is currently linear
(backoff_ms = 50ms * attempt) but the PR claims exponential; change the backoff
calculation in update_subscriptions to exponential backoff: compute an attempt
index (e.g., attempt = initial_retries - retries or 0-based attempt) and set
backoff_ms = base_ms * 2u64.pow(attempt) (with base_ms = 50), and use
Duration::from_millis(backoff_ms) before sleep; keep MAX_RETRIES, retries, and
the retry loop logic intact and ensure backoff_ms uses saturating/checked
multiplication to avoid overflow.
- Around line 238-268: The promotion logic only runs once, so a very large
single batch can seed current_new_subs with more than config.max_subs_in_new
(via overflow_pks and insert_current_new_stream), leaving current_new_subs above
the threshold; update the logic around current_new_subs/overflow_pks (and the
block that moves StreamKey::CurrentNew to StreamKey::UnoptimizedOld and pushes
into unoptimized_old_handles) to loop until current_new_subs.len() <=
config.max_subs_in_new (i.e., repeatedly compute overflow_start/overflow_pks,
promote the current_new stream and handle into unoptimized_old, clear
current_new_subs, repopulate with the next overflow_pks and call
insert_current_new_stream) so the threshold is strictly enforced within a single
account_subscribe call, or alternatively add a clear comment by the
current_new_subs/overflow_pks code explaining that the limit is a soft limit and
may be temporarily exceeded by large single-batch subscribes.

Comment on lines 294 to 308
ProgramSubscribe { pubkey, response } => {
let commitment = self.commitment;
let laser_client_config = self.laser_client_config.clone();
self.add_program_sub(pubkey, commitment, laser_client_config);
let _ = response.send(Ok(())).inspect_err(|_| {
warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
});
let result = self
.stream_manager
.add_program_subscription(pubkey, &self.commitment)
.await;
if let Err(e) = result {
let _ = response.send(Err(e)).inspect_err(|_| {
warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
});
} else {
let _ = response.send(Ok(())).inspect_err(|_| {
warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
});
};
false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider simplifying the ProgramSubscribe response path.

The if let Err / else branches both send the result and log the same warning on failure. This can be collapsed.

Proposed simplification
             ProgramSubscribe { pubkey, response } => {
-                let result = self
-                    .stream_manager
-                    .add_program_subscription(pubkey, &self.commitment)
-                    .await;
-                if let Err(e) = result {
-                    let _ = response.send(Err(e)).inspect_err(|_| {
-                        warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
-                    });
-                } else {
-                    let _ = response.send(Ok(())).inspect_err(|_| {
-                        warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
-                    });
-                };
+                let result = self
+                    .stream_manager
+                    .add_program_subscription(pubkey, &self.commitment)
+                    .await;
+                let _ = response.send(result).inspect_err(|_| {
+                    warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
+                });
                 false
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ProgramSubscribe { pubkey, response } => {
let commitment = self.commitment;
let laser_client_config = self.laser_client_config.clone();
self.add_program_sub(pubkey, commitment, laser_client_config);
let _ = response.send(Ok(())).inspect_err(|_| {
warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
});
let result = self
.stream_manager
.add_program_subscription(pubkey, &self.commitment)
.await;
if let Err(e) = result {
let _ = response.send(Err(e)).inspect_err(|_| {
warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
});
} else {
let _ = response.send(Ok(())).inspect_err(|_| {
warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
});
};
false
ProgramSubscribe { pubkey, response } => {
let result = self
.stream_manager
.add_program_subscription(pubkey, &self.commitment)
.await;
let _ = response.send(result).inspect_err(|_| {
warn!(client_id = self.client_id, program_id = %pubkey, "Failed to send program subscribe response");
});
false
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs`
around lines 294 - 308, The two branches in the ProgramSubscribe handler
duplicate the same response send and warning logic; replace the if/else with a
single send of the operation result: after calling
self.stream_manager.add_program_subscription(pubkey, &self.commitment).await
store it in result and call response.send(result) once, attaching the same
.inspect_err closure that logs client_id and program_id (pubkey) on send
failure; remove the duplicated if let Err / else branches around response.send
while keeping the rest of the handler (including returning false).

Comment on lines +338 to 377
async fn add_sub(
&mut self,
pubkey: Pubkey,
sub_response: oneshot::Sender<RemoteAccountProviderResult<()>>,
) {
let inserted = {
// Fast path: check with read lock first
let already_subscribed = {
let subs = self.subscriptions.read();
subs.contains(&pubkey)
};

if already_subscribed {
false
} else {
// Write lock only when we need to modify
let mut subs = self.subscriptions.write();
subs.insert(pubkey);
true
}
};
if !inserted {
trace!(pubkey = %pubkey, "Already subscribed to account");
if self.stream_manager.is_subscribed(&pubkey) {
debug!(
pubkey = %pubkey,
"Already subscribed to account"
);
sub_response.send(Ok(())).unwrap_or_else(|_| {
warn!(pubkey = %pubkey, "Failed to send already subscribed response");
});
} else {
if self.active_subscriptions.is_empty() {
self.update_active_subscriptions();
}
sub_response.send(Ok(())).unwrap_or_else(|_| {
warn!(pubkey = %pubkey, "Failed to send subscribe response");
})
return;
}

let from_slot = self.determine_from_slot().map(|(_, fs)| fs);
let result = self
.stream_manager
.account_subscribe(&[pubkey], &self.commitment, from_slot)
.await;

let response = match result {
Ok(()) => Ok(()),
Err(e) => {
error!(
pubkey = %pubkey,
error = ?e,
"Failed to subscribe to account"
);
Err(e)
}
};
sub_response.send(response).unwrap_or_else(|_| {
warn!(
pubkey = %pubkey,
"Failed to send subscribe response"
);
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

add_sub updates last_activation_slot even if subscription fails.

determine_from_slot() (line 354) atomically swaps last_activation_slot before account_subscribe is attempted. If the subscribe call on line 355-358 fails, the slot has already advanced, which could cause a subsequent successful subscribe to miss backfill for the skipped interval. Consider moving the slot swap to after a successful subscribe, or accepting this as a known trade-off.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/actor.rs`
around lines 338 - 377, The current add_sub implementation calls
determine_from_slot (which atomically swaps last_activation_slot) before
awaiting stream_manager.account_subscribe, causing last_activation_slot to
advance even if the subscribe fails; to fix, change the flow so
determine_from_slot does not perform the atomic swap until after
account_subscribe returns Ok — either by splitting determine_from_slot into a
read-only getter and a separate commit step or by deferring the swap/assignment
of last_activation_slot until after stream_manager.account_subscribe(&[pubkey],
&self.commitment, from_slot).await succeeds (refer to add_sub,
determine_from_slot, last_activation_slot, and stream_manager.account_subscribe
to locate the code).

Comment on lines +148 to +178
async fn update_subscriptions(
handle: &S,
task: &str,
request: SubscribeRequest,
) -> RemoteAccountProviderResult<()> {
const MAX_RETRIES: usize = 5;
let mut retries = MAX_RETRIES;
let initial_retries = retries;

loop {
match handle.write(request.clone()).await {
Ok(()) => return Ok(()),
Err(err) => {
if retries > 0 {
retries -= 1;
// Linear backoff: sleep longer as retries decrease
let backoff_ms =
50u64 * (initial_retries - retries) as u64;
tokio::time::sleep(Duration::from_millis(backoff_ms))
.await;
continue;
}
return Err(RemoteAccountProviderError::GrpcSubscriptionUpdateFailed(
task.to_string(),
MAX_RETRIES,
format!("{err} ({err:?}"),
));
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Backoff is linear, not exponential as stated in PR objectives.

The doc comment accurately says "linear backoff" (50ms × attempt number), but the PR description claims "exponential backoff." If exponential was intended, the backoff calculation needs adjustment. If linear is the actual intent, no code change is needed — just a PR description correction.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`
around lines 148 - 178, The backoff in update_subscriptions is currently linear
(backoff_ms = 50ms * attempt) but the PR claims exponential; change the backoff
calculation in update_subscriptions to exponential backoff: compute an attempt
index (e.g., attempt = initial_retries - retries or 0-based attempt) and set
backoff_ms = base_ms * 2u64.pow(attempt) (with base_ms = 50), and use
Duration::from_millis(backoff_ms) before sleep; keep MAX_RETRIES, retries, and
the retry loop logic intact and ensure backoff_ms uses saturating/checked
multiplication to avoid overflow.

Comment on lines +170 to +174
return Err(RemoteAccountProviderError::GrpcSubscriptionUpdateFailed(
task.to_string(),
MAX_RETRIES,
format!("{err} ({err:?}"),
));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing closing parenthesis in error format string.

Line 173: format!("{err} ({err:?}") is missing a ) before the closing ", producing malformed error messages like "some error (SomeError { ... }".

Proposed fix
-                        format!("{err} ({err:?}"),
+                        format!("{err} ({err:?})"),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return Err(RemoteAccountProviderError::GrpcSubscriptionUpdateFailed(
task.to_string(),
MAX_RETRIES,
format!("{err} ({err:?}"),
));
return Err(RemoteAccountProviderError::GrpcSubscriptionUpdateFailed(
task.to_string(),
MAX_RETRIES,
format!("{err} ({err:?})"),
));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`
around lines 170 - 174, In stream_manager.rs update the error formatting for
RemoteAccountProviderError::GrpcSubscriptionUpdateFailed so the formatted string
properly closes the parenthesis; replace the malformed format call that
currently uses format!("{err} ({err:?}") with one that includes the missing ')'
(e.g. format!("{err} ({err:?})") so the error message shows both display and
debug forms correctly for the variables used (task, MAX_RETRIES, err).

Comment on lines +238 to +268
// Promote if current-new exceeds threshold.
if self.current_new_subs.len() > self.config.max_subs_in_new {
let overflow_count =
self.current_new_subs.len() - self.config.max_subs_in_new;
// The overflow pubkeys are the tail of new_pks.
let overflow_start = new_pks.len().saturating_sub(overflow_count);
let overflow_pks = &new_pks[overflow_start..];

// Move current-new to unoptimized old.
if let Some(stream) = self.stream_map.remove(&StreamKey::CurrentNew)
{
let idx = self.unoptimized_old_handles.len();
self.stream_map
.insert(StreamKey::UnoptimizedOld(idx), stream);
}
if let Some(handle) = self.current_new_handle.take() {
self.unoptimized_old_handles.push(handle);
}
self.current_new_subs.clear();

// Start fresh current-new with overflow pubkeys.
if !overflow_pks.is_empty() {
for pk in overflow_pks {
self.current_new_subs.insert(*pk);
}
self.insert_current_new_stream(
&overflow_pks.iter().collect::<Vec<_>>(),
commitment,
from_slot,
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Large single-batch subscribe can leave current_new_subs above threshold.

If a single call provides more than 2 × max_subs_in_new pubkeys (e.g., 12 with max=5), the overflow (7 pubkeys) written into the fresh current-new stream already exceeds the threshold. The promotion check doesn't re-run within the same call, so the overshoot persists until the next account_subscribe invocation. This is likely acceptable as a soft limit, but worth documenting or adding a while loop if strict enforcement is desired.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`
around lines 238 - 268, The promotion logic only runs once, so a very large
single batch can seed current_new_subs with more than config.max_subs_in_new
(via overflow_pks and insert_current_new_stream), leaving current_new_subs above
the threshold; update the logic around current_new_subs/overflow_pks (and the
block that moves StreamKey::CurrentNew to StreamKey::UnoptimizedOld and pushes
into unoptimized_old_handles) to loop until current_new_subs.len() <=
config.max_subs_in_new (i.e., repeatedly compute overflow_start/overflow_pks,
promote the current_new stream and handle into unoptimized_old, clear
current_new_subs, repopulate with the next overflow_pks and call
insert_current_new_stream) so the threshold is strictly enforced within a single
account_subscribe call, or alternatively add a clear comment by the
current_new_subs/overflow_pks code explaining that the limit is a soft limit and
may be temporarily exceeded by large single-batch subscribes.

Comment on lines +507 to +512
if let Some((subs, handle)) = &self.program_sub {
Self::update_subscriptions(handle, "program_subscribe", request)
.await?;
if let Some((subs, _)) = &mut self.program_sub {
*subs = subscribed_programs;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Unused binding subs in pattern.

On line 507, subs is bound but never read inside the if let body. Use _ to clarify intent.

Proposed fix
-        if let Some((subs, handle)) = &self.program_sub {
+        if let Some((_, handle)) = &self.program_sub {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if let Some((subs, handle)) = &self.program_sub {
Self::update_subscriptions(handle, "program_subscribe", request)
.await?;
if let Some((subs, _)) = &mut self.program_sub {
*subs = subscribed_programs;
}
if let Some((_, handle)) = &self.program_sub {
Self::update_subscriptions(handle, "program_subscribe", request)
.await?;
if let Some((subs, _)) = &mut self.program_sub {
*subs = subscribed_programs;
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@magicblock-chainlink/src/remote_account_provider/chain_laser_actor/stream_manager.rs`
around lines 507 - 512, The pattern in the first conditional binds `subs` but
never uses it; change the destructuring there to ignore the first element (e.g.
match Some((_, handle)) on self.program_sub) so the compiler warning goes away
while leaving the subsequent mutable `if let Some((subs, _)) = &mut
self.program_sub` intact to assign `*subs = subscribed_programs`; update the `if
let` that calls `Self::update_subscriptions(handle, "program_subscribe",
request).await?;` accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: GRPC generational subscription approach to allow direct account subscriptions

1 participant

Comments