From a943e5a31c2c3caa1b31f07d36857630afda84ad Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Mon, 23 Feb 2026 14:42:23 -0300 Subject: [PATCH 1/5] refactor(ntx-builder): simplify coordinator-actor messaging with Notify --- crates/ntx-builder/src/actor/mod.rs | 88 ++++++------ crates/ntx-builder/src/builder.rs | 51 +++---- crates/ntx-builder/src/coordinator.rs | 134 ++++++------------ crates/ntx-builder/src/db/mod.rs | 14 ++ .../src/db/models/queries/accounts.rs | 28 ++++ crates/ntx-builder/src/lib.rs | 23 +-- 6 files changed, 155 insertions(+), 183 deletions(-) diff --git a/crates/ntx-builder/src/actor/mod.rs b/crates/ntx-builder/src/actor/mod.rs index ecb72552b..a848f47a1 100644 --- a/crates/ntx-builder/src/actor/mod.rs +++ b/crates/ntx-builder/src/actor/mod.rs @@ -11,7 +11,6 @@ use account_state::TransactionCandidate; use futures::FutureExt; use miden_node_proto::clients::{Builder, ValidatorClient}; use miden_node_proto::domain::account::NetworkAccountId; -use miden_node_proto::domain::mempool::MempoolEvent; use miden_node_utils::ErrorReport; use miden_node_utils::lru_cache::LruCache; use miden_protocol::Word; @@ -20,7 +19,7 @@ use miden_protocol::block::BlockNumber; use miden_protocol::note::{Note, NoteScript, Nullifier}; use miden_protocol::transaction::TransactionId; use miden_remote_prover_client::RemoteTransactionProver; -use tokio::sync::{AcquireError, RwLock, Semaphore, mpsc}; +use tokio::sync::{AcquireError, Notify, RwLock, Semaphore, mpsc}; use tokio_util::sync::CancellationToken; use url::Url; @@ -29,16 +28,19 @@ use crate::builder::ChainState; use crate::db::Db; use crate::store::StoreClient; -// ACTOR NOTIFICATION +// ACTOR REQUESTS // ================================================================================================ -/// A notification sent from an account actor to the coordinator. -pub enum ActorNotification { +/// A request sent from an account actor to the coordinator via a shared mpsc channel. +pub enum ActorRequest { /// One or more notes failed during transaction execution and should have their attempt - /// counters incremented. + /// counters incremented. The actor waits for the coordinator to acknowledge the DB write via + /// the oneshot channel, preventing race conditions where the actor could re-select the same + /// notes before the failure is persisted. NotesFailed { nullifiers: Vec, block_num: BlockNumber, + ack_tx: tokio::sync::oneshot::Sender<()>, }, /// A note script was fetched from the remote store and should be persisted to the local DB. CacheNoteScript { script_root: Word, script: NoteScript }, @@ -49,9 +51,6 @@ pub enum ActorNotification { /// The reason an actor has shut down. pub enum ActorShutdownReason { - /// Occurs when an account actor detects failure in the messaging channel used by the - /// coordinator. - EventChannelClosed, /// Occurs when an account actor detects failure in acquiring the rate-limiting semaphore. SemaphoreFailed(AcquireError), /// Occurs when an account actor detects its corresponding cancellation token has been triggered @@ -87,8 +86,8 @@ pub struct AccountActorContext { pub max_note_attempts: usize, /// Database for persistent state. pub db: Db, - /// Channel for sending notifications to the coordinator (via the builder event loop). - pub notification_tx: mpsc::Sender, + /// Channel for sending requests to the coordinator (via the builder event loop). + pub request_tx: mpsc::Sender, } // ACCOUNT ORIGIN @@ -179,7 +178,7 @@ pub struct AccountActor { store: StoreClient, db: Db, mode: ActorMode, - event_rx: mpsc::Receiver>, + notify: Arc, cancel_token: CancellationToken, block_producer: BlockProducerClient, validator: ValidatorClient, @@ -190,17 +189,16 @@ pub struct AccountActor { max_notes_per_tx: NonZeroUsize, /// Maximum number of note execution attempts before dropping a note. max_note_attempts: usize, - /// Channel for sending notifications to the coordinator. - notification_tx: mpsc::Sender, + /// Channel for sending requests to the coordinator. + request_tx: mpsc::Sender, } impl AccountActor { - /// Constructs a new account actor and corresponding messaging channel with the given - /// configuration. + /// Constructs a new account actor with the given configuration. pub fn new( origin: AccountOrigin, actor_context: &AccountActorContext, - event_rx: mpsc::Receiver>, + notify: Arc, cancel_token: CancellationToken, ) -> Self { let block_producer = BlockProducerClient::new(actor_context.block_producer_url.clone()); @@ -217,7 +215,7 @@ impl AccountActor { store: actor_context.store.clone(), db: actor_context.db.clone(), mode: ActorMode::NoViableNotes, - event_rx, + notify, cancel_token, block_producer, validator, @@ -226,7 +224,7 @@ impl AccountActor { script_cache: actor_context.script_cache.clone(), max_notes_per_tx: actor_context.max_notes_per_tx, max_note_attempts: actor_context.max_note_attempts, - notification_tx: actor_context.notification_tx.clone(), + request_tx: actor_context.request_tx.clone(), } } @@ -261,28 +259,22 @@ impl AccountActor { _ = self.cancel_token.cancelled() => { return ActorShutdownReason::Cancelled(account_id); } - // Handle mempool events. - event = self.event_rx.recv() => { - let Some(event) = event else { - return ActorShutdownReason::EventChannelClosed; - }; - // Re-enable transaction execution if the transaction being waited on has - // been resolved (added to mempool, committed in a block, or reverted). - if let ActorMode::TransactionInflight(awaited_id) = self.mode { - let should_wake = match event.as_ref() { - MempoolEvent::TransactionAdded { id, .. } => *id == awaited_id, - MempoolEvent::BlockCommitted { txs, .. } => { - txs.contains(&awaited_id) - }, - MempoolEvent::TransactionsReverted(tx_ids) => { - tx_ids.contains(&awaited_id) - }, - }; - if should_wake { + // Handle coordinator notifications. On notification, re-evaluate state from DB. + _ = self.notify.notified() => { + match self.mode { + ActorMode::TransactionInflight(awaited_id) => { + // Check DB: is the inflight tx still pending? + let resolved = self.db + .is_transaction_resolved(account_id, awaited_id) + .await + .expect("should be able to check tx status"); + if resolved { + self.mode = ActorMode::NotesAvailable; + } + }, + _ => { self.mode = ActorMode::NotesAvailable; } - } else { - self.mode = ActorMode::NotesAvailable; } }, // Execute transactions. @@ -395,25 +387,31 @@ impl AccountActor { } } - /// Sends notifications to the coordinator to cache note scripts fetched from the remote store. + /// Sends requests to the coordinator to cache note scripts fetched from the remote store. async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) { for (script_root, script) in scripts { let _ = self - .notification_tx - .send(ActorNotification::CacheNoteScript { script_root, script }) + .request_tx + .send(ActorRequest::CacheNoteScript { script_root, script }) .await; } } - /// Sends a notification to the coordinator to mark notes as failed. + /// Sends a request to the coordinator to mark notes as failed and waits for the DB write to + /// complete. This prevents a race condition where the actor could re-select the same notes + /// before the failure counts are updated in the database. async fn mark_notes_failed(&self, nullifiers: &[Nullifier], block_num: BlockNumber) { + let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); let _ = self - .notification_tx - .send(ActorNotification::NotesFailed { + .request_tx + .send(ActorRequest::NotesFailed { nullifiers: nullifiers.to_vec(), block_num, + ack_tx, }) .await; + // Wait for the coordinator to confirm the DB write. + let _ = ack_rx.await; } } diff --git a/crates/ntx-builder/src/builder.rs b/crates/ntx-builder/src/builder.rs index 20090c5b9..6ef7a92c2 100644 --- a/crates/ntx-builder/src/builder.rs +++ b/crates/ntx-builder/src/builder.rs @@ -14,7 +14,7 @@ use tokio_stream::StreamExt; use tonic::Status; use crate::NtxBuilderConfig; -use crate::actor::{AccountActorContext, AccountOrigin, ActorNotification}; +use crate::actor::{AccountActorContext, AccountOrigin, ActorRequest}; use crate::coordinator::Coordinator; use crate::db::Db; use crate::store::StoreClient; @@ -98,8 +98,8 @@ pub struct NetworkTransactionBuilder { actor_context: AccountActorContext, /// Stream of mempool events from the block producer. mempool_events: MempoolEventStream, - /// Receiver for notifications from account actors (e.g., note failures). - notification_rx: mpsc::Receiver, + /// Receiver for requests from account actors (note failures, script caching). + actor_request_rx: mpsc::Receiver, } impl NetworkTransactionBuilder { @@ -112,7 +112,7 @@ impl NetworkTransactionBuilder { chain_state: Arc>, actor_context: AccountActorContext, mempool_events: MempoolEventStream, - notification_rx: mpsc::Receiver, + actor_request_rx: mpsc::Receiver, ) -> Self { Self { config, @@ -122,7 +122,7 @@ impl NetworkTransactionBuilder { chain_state, actor_context, mempool_events, - notification_rx, + actor_request_rx, } } @@ -164,7 +164,7 @@ impl NetworkTransactionBuilder { .context("mempool event stream ended")? .context("mempool event stream failed")?; - self.handle_mempool_event(event.into()).await?; + self.handle_mempool_event(event).await?; }, // Handle account batches loaded from the store. // Once all accounts are loaded, the channel closes and this branch @@ -172,9 +172,9 @@ impl NetworkTransactionBuilder { Some(account_id) = account_rx.recv() => { self.handle_loaded_account(account_id).await?; }, - // Handle actor notifications (DB writes delegated from actors). - Some(notification) = self.notification_rx.recv() => { - self.handle_actor_notification(notification).await; + // Handle requests from actors. + Some(request) = self.actor_request_rx.recv() => { + self.handle_actor_request(request).await; }, // Handle account loader task completion/failure. // If the task fails, we abort since the builder would be in a degraded state @@ -227,18 +227,14 @@ impl NetworkTransactionBuilder { .context("failed to sync account to DB")?; self.coordinator - .spawn_actor(AccountOrigin::store(account_id), &self.actor_context) - .await?; + .spawn_actor(AccountOrigin::store(account_id), &self.actor_context); Ok(()) } - /// Handles mempool events by writing to DB first, then routing to actors. + /// Handles mempool events by writing to DB first, then notifying actors. #[tracing::instrument(name = "ntx.builder.handle_mempool_event", skip(self, event))] - async fn handle_mempool_event( - &mut self, - event: Arc, - ) -> Result<(), anyhow::Error> { - match event.as_ref() { + async fn handle_mempool_event(&mut self, event: MempoolEvent) -> Result<(), anyhow::Error> { + match &event { MempoolEvent::TransactionAdded { account_delta, .. } => { // Write event effects to DB first. self.coordinator @@ -253,13 +249,11 @@ impl NetworkTransactionBuilder { // Spawn new actors if a transaction creates a new network account. let is_creating_account = delta.is_full_state(); if is_creating_account { - self.coordinator - .spawn_actor(network_account, &self.actor_context) - .await?; + self.coordinator.spawn_actor(network_account, &self.actor_context); } } } - self.coordinator.send_targeted(&event).await?; + self.coordinator.send_targeted(&event); Ok(()) }, // Update chain state and broadcast. @@ -271,7 +265,7 @@ impl NetworkTransactionBuilder { .context("failed to write BlockCommitted to DB")?; self.update_chain_tip(header.as_ref().clone()).await; - self.coordinator.broadcast(event.clone()).await; + self.coordinator.broadcast(); Ok(()) }, // Broadcast to all actors. @@ -283,7 +277,7 @@ impl NetworkTransactionBuilder { .await .context("failed to write TransactionsReverted to DB")?; - self.coordinator.broadcast(event.clone()).await; + self.coordinator.broadcast(); // Cancel actors for reverted account creations. for account_id in &reverted_accounts { @@ -294,15 +288,16 @@ impl NetworkTransactionBuilder { } } - /// Processes a notification from an account actor by performing the corresponding DB write. - async fn handle_actor_notification(&mut self, notification: ActorNotification) { - match notification { - ActorNotification::NotesFailed { nullifiers, block_num } => { + /// Processes a request from an account actor. + async fn handle_actor_request(&mut self, request: ActorRequest) { + match request { + ActorRequest::NotesFailed { nullifiers, block_num, ack_tx } => { if let Err(err) = self.db.notes_failed(nullifiers, block_num).await { tracing::error!(err = %err, "failed to mark notes as failed"); } + let _ = ack_tx.send(()); }, - ActorNotification::CacheNoteScript { script_root, script } => { + ActorRequest::CacheNoteScript { script_root, script } => { if let Err(err) = self.db.insert_note_script(script_root, &script).await { tracing::error!(err = %err, "failed to cache note script"); } diff --git a/crates/ntx-builder/src/coordinator.rs b/crates/ntx-builder/src/coordinator.rs index a857bdc64..c464b8389 100644 --- a/crates/ntx-builder/src/coordinator.rs +++ b/crates/ntx-builder/src/coordinator.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use anyhow::Context; @@ -7,8 +7,7 @@ use miden_node_proto::domain::account::NetworkAccountId; use miden_node_proto::domain::mempool::MempoolEvent; use miden_node_proto::domain::note::{NetworkNote, SingleTargetNetworkNote}; use miden_protocol::account::delta::AccountUpdateDetails; -use tokio::sync::mpsc::error::SendError; -use tokio::sync::{Semaphore, mpsc}; +use tokio::sync::{Notify, Semaphore}; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; @@ -21,20 +20,20 @@ use crate::db::Db; /// Handle to account actors that are spawned by the coordinator. #[derive(Clone)] struct ActorHandle { - event_tx: mpsc::Sender>, + notify: Arc, cancel_token: CancellationToken, } impl ActorHandle { - fn new(event_tx: mpsc::Sender>, cancel_token: CancellationToken) -> Self { - Self { event_tx, cancel_token } + fn new(notify: Arc, cancel_token: CancellationToken) -> Self { + Self { notify, cancel_token } } } // COORDINATOR // ================================================================================================ -/// Coordinator for managing [`AccountActor`] instances, tasks, and associated communication. +/// Coordinator for managing [`AccountActor`] instances, tasks, and notifications. /// /// The `Coordinator` is the central orchestrator of the network transaction builder system. /// It manages the lifecycle of account actors. Each actor is responsible for handling transactions @@ -43,14 +42,15 @@ impl ActorHandle { /// /// ## Actor Management /// - Spawns new [`AccountActor`] instances for network accounts as needed. -/// - Maintains a registry of active actors with their communication channels. +/// - Maintains a registry of active actors with their notification handles. /// - Gracefully handles actor shutdown and cleanup when actors complete or fail. /// - Monitors actor tasks through a join set to detect completion or errors. /// -/// ## Event Broadcasting -/// - Distributes mempool events to all account actors. -/// - Handles communication failures by canceling disconnected actors. -/// - Maintains reliable message delivery through dedicated channels per actor. +/// ## Event Notification +/// - Notifies actors via [`Notify`] when state may have changed. +/// - The DB is the source of truth: actors re-evaluate their state from DB on notification. +/// - Notifications are coalesced: multiple notifications while an actor is busy result in a single +/// wake-up. /// /// ## Resource Management /// - Controls transaction concurrency across all network accounts using a semaphore. @@ -58,15 +58,14 @@ impl ActorHandle { /// /// The coordinator operates in an event-driven manner: /// 1. Network accounts are registered and actors spawned as needed. -/// 2. Mempool events are broadcast to all active actors. +/// 2. Mempool events are written to DB, then actors are notified. /// 3. Actor completion/failure events are monitored and handled. /// 4. Failed or completed actors are cleaned up from the registry. pub struct Coordinator { - /// Mapping of network account IDs to their respective message channels and cancellation - /// tokens. + /// Mapping of network account IDs to their notification handles and cancellation tokens. /// - /// This registry serves as the primary directory for communicating with active account actors. - /// When actors are spawned, they register their communication channel here. When events need + /// This registry serves as the primary directory for notifying active account actors. + /// When actors are spawned, they register their notification handle here. When events need /// to be broadcast, this registry is used to locate the appropriate actors. The registry is /// automatically cleaned up when actors complete their execution. actor_registry: HashMap, @@ -89,21 +88,16 @@ pub struct Coordinator { /// Database for persistent state. db: Db, - - /// Channel size for each actor's event channel. - actor_channel_size: usize, } impl Coordinator { - /// Creates a new coordinator with the specified maximum number of inflight transactions - /// and actor channel size. - pub fn new(max_inflight_transactions: usize, actor_channel_size: usize, db: Db) -> Self { + /// Creates a new coordinator with the specified maximum number of inflight transactions. + pub fn new(max_inflight_transactions: usize, db: Db) -> Self { Self { actor_registry: HashMap::new(), actor_join_set: JoinSet::new(), semaphore: Arc::new(Semaphore::new(max_inflight_transactions)), db, - actor_channel_size, } } @@ -113,11 +107,7 @@ impl Coordinator { /// and adds it to the coordinator's management system. The actor will be responsible for /// processing transactions and managing state for the network account. #[tracing::instrument(name = "ntx.builder.spawn_actor", skip(self, origin, actor_context))] - pub async fn spawn_actor( - &mut self, - origin: AccountOrigin, - actor_context: &AccountActorContext, - ) -> Result<(), SendError>> { + pub fn spawn_actor(&mut self, origin: AccountOrigin, actor_context: &AccountActorContext) { let account_id = origin.id(); // If an actor already exists for this account ID, something has gone wrong. @@ -129,10 +119,10 @@ impl Coordinator { handle.cancel_token.cancel(); } - let (event_tx, event_rx) = mpsc::channel(self.actor_channel_size); + let notify = Arc::new(Notify::new()); let cancel_token = tokio_util::sync::CancellationToken::new(); - let actor = AccountActor::new(origin, actor_context, event_rx, cancel_token.clone()); - let handle = ActorHandle::new(event_tx, cancel_token); + let actor = AccountActor::new(origin, actor_context, notify.clone(), cancel_token.clone()); + let handle = ActorHandle::new(notify, cancel_token); // Run the actor. Actor reads state from DB on startup. let semaphore = self.semaphore.clone(); @@ -140,39 +130,16 @@ impl Coordinator { self.actor_registry.insert(account_id, handle); tracing::info!(account_id = %account_id, "Created actor for account prefix"); - Ok(()) } - /// Broadcasts a mempool event to all active account actors. - /// - /// This method distributes the provided event to every actor currently registered - /// with the coordinator. Each actor will receive the event through its dedicated - /// message channel and can process it accordingly. + /// Notifies all active account actors that state may have changed. /// - /// If an actor fails to receive the event, it will be canceled. - #[tracing::instrument(name = "ntx.coordinator.broadcast", skip_all, fields( - actor.count = self.actor_registry.len(), - event.kind = %event.kind() - ))] - pub async fn broadcast(&mut self, event: Arc) { - let mut failed_actors = Vec::new(); - - // Send event to all actors. - for (account_id, handle) in &self.actor_registry { - if let Err(err) = Self::send(handle, event.clone()).await { - tracing::error!( - account_id = %account_id, - error = %err, - "Failed to send event to actor" - ); - failed_actors.push(*account_id); - } - } - // Remove failed actors from registry and cancel them. - for account_id in failed_actors { - let handle = - self.actor_registry.remove(&account_id).expect("actor found in send loop above"); - handle.cancel_token.cancel(); + /// Each actor will re-evaluate its state from the DB on the next iteration of its run loop. + /// Notifications are coalesced: multiple notifications while an actor is busy result in a + /// single wake-up. + pub fn broadcast(&self) { + for handle in self.actor_registry.values() { + handle.notify.notify_one(); } } @@ -194,9 +161,6 @@ impl Coordinator { tracing::info!(account_id = %account_id, "Account actor cancelled"); Ok(()) }, - ActorShutdownReason::EventChannelClosed => { - anyhow::bail!("event channel closed"); - }, ActorShutdownReason::SemaphoreFailed(err) => Err(err).context("semaphore failed"), }, Some(Err(err)) => { @@ -210,19 +174,14 @@ impl Coordinator { } } - /// Sends a mempool event to all network account actors that are found in the corresponding - /// transaction's notes. + /// Notifies account actors that are affected by a `TransactionAdded` event. /// - /// Events are sent only to actors that are currently active. Since event effects are already + /// Only actors that are currently active are notified. Since event effects are already /// persisted in the DB by `write_event()`, actors that spawn later read their state from the /// DB and do not need predating events. - pub async fn send_targeted( - &mut self, - event: &Arc, - ) -> Result<(), SendError>> { - let mut target_actors = HashMap::new(); - if let MempoolEvent::TransactionAdded { network_notes, account_delta, .. } = event.as_ref() - { + pub fn send_targeted(&self, event: &MempoolEvent) { + let mut target_account_ids = HashSet::new(); + if let MempoolEvent::TransactionAdded { network_notes, account_delta, .. } = event { // We need to inform the account if it was updated. This lets it know that its own // transaction has been applied, and in the future also resolves race conditions with // external network transactions (once these are allowed). @@ -231,8 +190,8 @@ impl Coordinator { if account_id.is_network() { let network_account_id = account_id.try_into().expect("account is network account"); - if let Some(actor) = self.actor_registry.get(&network_account_id) { - target_actors.insert(network_account_id, actor); + if self.actor_registry.contains_key(&network_account_id) { + target_account_ids.insert(network_account_id); } } } @@ -241,16 +200,17 @@ impl Coordinator { for note in network_notes { let NetworkNote::SingleTarget(note) = note; let network_account_id = note.account_id(); - if let Some(actor) = self.actor_registry.get(&network_account_id) { - target_actors.insert(network_account_id, actor); + if self.actor_registry.contains_key(&network_account_id) { + target_account_ids.insert(network_account_id); } } } - // Send event to target actors. - for actor in target_actors.values() { - Self::send(actor, event.clone()).await?; + // Notify target actors. + for account_id in &target_account_ids { + if let Some(handle) = self.actor_registry.get(account_id) { + handle.notify.notify_one(); + } } - Ok(()) } /// Writes mempool event effects to the database. @@ -303,12 +263,4 @@ impl Coordinator { handle.cancel_token.cancel(); } } - - /// Helper function to send an event to a single account actor. - async fn send( - handle: &ActorHandle, - event: Arc, - ) -> Result<(), SendError>> { - handle.event_tx.send(event).await - } } diff --git a/crates/ntx-builder/src/db/mod.rs b/crates/ntx-builder/src/db/mod.rs index 47352e29e..6af5349c3 100644 --- a/crates/ntx-builder/src/db/mod.rs +++ b/crates/ntx-builder/src/db/mod.rs @@ -77,6 +77,20 @@ impl Db { .await } + /// Returns `true` when the given transaction has been resolved (committed or reverted) for the + /// given account, i.e. no inflight account row exists with that transaction ID. + pub async fn is_transaction_resolved( + &self, + account_id: NetworkAccountId, + tx_id: TransactionId, + ) -> Result { + self.inner + .query("is_transaction_resolved", move |conn| { + queries::is_transaction_resolved(conn, account_id, &tx_id) + }) + .await + } + /// Returns the latest account state and available notes for the given account. pub async fn select_candidate( &self, diff --git a/crates/ntx-builder/src/db/models/queries/accounts.rs b/crates/ntx-builder/src/db/models/queries/accounts.rs index 833f60ed8..8a2df3174 100644 --- a/crates/ntx-builder/src/db/models/queries/accounts.rs +++ b/crates/ntx-builder/src/db/models/queries/accounts.rs @@ -4,6 +4,7 @@ use diesel::prelude::*; use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; use miden_protocol::account::Account; +use miden_protocol::transaction::TransactionId; use crate::db::models::conv as conversions; use crate::db::schema; @@ -100,3 +101,30 @@ pub fn get_account( row.map(|AccountRow { account_data, .. }| conversions::account_from_bytes(&account_data)) .transpose() } + +/// Returns `true` when no inflight account row exists with the given `transaction_id`, meaning +/// the transaction was committed or reverted. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT COUNT(*) +/// FROM accounts +/// WHERE account_id = ?1 AND transaction_id = ?2 +/// ``` +pub fn is_transaction_resolved( + conn: &mut SqliteConnection, + account_id: NetworkAccountId, + tx_id: &TransactionId, +) -> Result { + let account_id_bytes = conversions::network_account_id_to_bytes(account_id); + let tx_id_bytes = conversions::transaction_id_to_bytes(tx_id); + + let count: i64 = schema::accounts::table + .filter(schema::accounts::account_id.eq(&account_id_bytes)) + .filter(schema::accounts::transaction_id.eq(&tx_id_bytes)) + .count() + .get_result(conn)?; + + Ok(count == 0) +} diff --git a/crates/ntx-builder/src/lib.rs b/crates/ntx-builder/src/lib.rs index 02c9f547c..bf8772f0b 100644 --- a/crates/ntx-builder/src/lib.rs +++ b/crates/ntx-builder/src/lib.rs @@ -44,9 +44,6 @@ const DEFAULT_MAX_BLOCK_COUNT: usize = 4; /// Default channel capacity for account loading from the store. const DEFAULT_ACCOUNT_CHANNEL_CAPACITY: usize = 1_000; -/// Default channel size for actor event channels. -const DEFAULT_ACTOR_CHANNEL_SIZE: usize = 100; - /// Default maximum number of attempts to execute a failing note before dropping it. const DEFAULT_MAX_NOTE_ATTEMPTS: usize = 30; @@ -95,9 +92,6 @@ pub struct NtxBuilderConfig { /// Channel capacity for loading accounts from the store during startup. pub account_channel_capacity: usize, - /// Channel size for each actor's event channel. - pub actor_channel_size: usize, - /// Path to the SQLite database file used for persistent state. pub database_filepath: PathBuf, } @@ -120,7 +114,6 @@ impl NtxBuilderConfig { max_note_attempts: DEFAULT_MAX_NOTE_ATTEMPTS, max_block_count: DEFAULT_MAX_BLOCK_COUNT, account_channel_capacity: DEFAULT_ACCOUNT_CHANNEL_CAPACITY, - actor_channel_size: DEFAULT_ACTOR_CHANNEL_SIZE, database_filepath, } } @@ -186,13 +179,6 @@ impl NtxBuilderConfig { self } - /// Sets the actor event channel size. - #[must_use] - pub fn with_actor_channel_size(mut self, size: usize) -> Self { - self.actor_channel_size = size; - self - } - /// Builds and initializes the network transaction builder. /// /// This method connects to the store and block producer services, fetches the current @@ -212,8 +198,7 @@ impl NtxBuilderConfig { db.purge_inflight().await.context("failed to purge inflight state")?; let script_cache = LruCache::new(self.script_cache_size); - let coordinator = - Coordinator::new(self.max_concurrent_txs, self.actor_channel_size, db.clone()); + let coordinator = Coordinator::new(self.max_concurrent_txs, db.clone()); let store = StoreClient::new(self.store_url.clone()); let block_producer = BlockProducerClient::new(self.block_producer_url.clone()); @@ -249,7 +234,7 @@ impl NtxBuilderConfig { let chain_state = Arc::new(RwLock::new(ChainState::new(chain_tip_header, chain_mmr))); - let (notification_tx, notification_rx) = mpsc::channel(1); + let (request_tx, actor_request_rx) = mpsc::channel(1); let actor_context = AccountActorContext { block_producer_url: self.block_producer_url.clone(), @@ -261,7 +246,7 @@ impl NtxBuilderConfig { max_notes_per_tx: self.max_notes_per_tx, max_note_attempts: self.max_note_attempts, db: db.clone(), - notification_tx, + request_tx, }; Ok(NetworkTransactionBuilder::new( @@ -272,7 +257,7 @@ impl NtxBuilderConfig { chain_state, actor_context, mempool_events, - notification_rx, + actor_request_rx, )) } } From 4419820e3c82b86dec0dcb44ba36dc315112d359 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Tue, 24 Feb 2026 11:40:59 -0300 Subject: [PATCH 2/5] review: add DbError error case --- crates/ntx-builder/src/actor/mod.rs | 70 ++++++++++++++++++++------- crates/ntx-builder/src/coordinator.rs | 4 ++ 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/crates/ntx-builder/src/actor/mod.rs b/crates/ntx-builder/src/actor/mod.rs index a848f47a1..d14ad37b7 100644 --- a/crates/ntx-builder/src/actor/mod.rs +++ b/crates/ntx-builder/src/actor/mod.rs @@ -57,6 +57,8 @@ pub enum ActorShutdownReason { /// by the coordinator. Cancellation tokens are triggered by the coordinator to initiate /// graceful shutdown of actors. Cancelled(NetworkAccountId), + /// Occurs when the actor encounters a database error it cannot recover from. + DbError(NetworkAccountId), } // ACCOUNT ACTOR CONFIG @@ -235,11 +237,17 @@ impl AccountActor { // Determine initial mode by checking DB for available notes. let block_num = self.chain_state.read().await.chain_tip_header.block_num(); - let has_notes = self + let has_notes = match self .db .has_available_notes(account_id, block_num, self.max_note_attempts) .await - .expect("actor should be able to check for available notes"); + { + Ok(has_notes) => has_notes, + Err(err) => { + tracing::error!(err = err.as_report(), account_id = %account_id, "failed to check for available notes"); + return ActorShutdownReason::DbError(account_id); + }, + }; if has_notes { self.mode = ActorMode::NotesAvailable; @@ -264,10 +272,16 @@ impl AccountActor { match self.mode { ActorMode::TransactionInflight(awaited_id) => { // Check DB: is the inflight tx still pending? - let resolved = self.db + let resolved = match self.db .is_transaction_resolved(account_id, awaited_id) .await - .expect("should be able to check tx status"); + { + Ok(resolved) => resolved, + Err(err) => { + tracing::error!(err = err.as_report(), account_id = %account_id, "failed to check transaction status"); + return ActorShutdownReason::DbError(account_id); + }, + }; if resolved { self.mode = ActorMode::NotesAvailable; } @@ -285,10 +299,13 @@ impl AccountActor { let chain_state = self.chain_state.read().await.clone(); // Query DB for latest account and available notes. - let tx_candidate = self.select_candidate_from_db( + let tx_candidate = match self.select_candidate_from_db( account_id, chain_state, - ).await; + ).await { + Ok(candidate) => candidate, + Err(shutdown_reason) => return shutdown_reason, + }; if let Some(tx_candidate) = tx_candidate { self.execute_transactions(account_id, tx_candidate).await; @@ -311,7 +328,7 @@ impl AccountActor { &self, account_id: NetworkAccountId, chain_state: ChainState, - ) -> Option { + ) -> Result, ActorShutdownReason> { let block_num = chain_state.chain_tip_header.block_num(); let max_notes = self.max_notes_per_tx.get(); @@ -319,22 +336,27 @@ impl AccountActor { .db .select_candidate(account_id, block_num, self.max_note_attempts) .await - .expect("actor should be able to query DB for candidate"); + .map_err(|err| { + tracing::error!(err = err.as_report(), account_id = %account_id, "failed to query DB for transaction candidate"); + ActorShutdownReason::DbError(account_id) + })?; - let account = latest_account?; + let Some(account) = latest_account else { + return Ok(None); + }; let notes: Vec<_> = notes.into_iter().take(max_notes).collect(); if notes.is_empty() { - return None; + return Ok(None); } let (chain_tip_header, chain_mmr) = chain_state.into_parts(); - Some(TransactionCandidate { + Ok(Some(TransactionCandidate { account, notes, chain_tip_header, chain_mmr, - }) + })) } /// Execute a transaction candidate and mark notes as failed as required. @@ -390,10 +412,17 @@ impl AccountActor { /// Sends requests to the coordinator to cache note scripts fetched from the remote store. async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) { for (script_root, script) in scripts { - let _ = self + if self .request_tx .send(ActorRequest::CacheNoteScript { script_root, script }) - .await; + .await + .is_err() + { + tracing::warn!( + "failed to send cache note script request, coordinator is shutting down" + ); + break; + } } } @@ -402,16 +431,23 @@ impl AccountActor { /// before the failure counts are updated in the database. async fn mark_notes_failed(&self, nullifiers: &[Nullifier], block_num: BlockNumber) { let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); - let _ = self + if self .request_tx .send(ActorRequest::NotesFailed { nullifiers: nullifiers.to_vec(), block_num, ack_tx, }) - .await; + .await + .is_err() + { + tracing::warn!("failed to send notes failed request, coordinator is shutting down"); + return; + } // Wait for the coordinator to confirm the DB write. - let _ = ack_rx.await; + if ack_rx.await.is_err() { + tracing::warn!("failed to receive notes failed ack from coordinator"); + } } } diff --git a/crates/ntx-builder/src/coordinator.rs b/crates/ntx-builder/src/coordinator.rs index c464b8389..2f04d7fdc 100644 --- a/crates/ntx-builder/src/coordinator.rs +++ b/crates/ntx-builder/src/coordinator.rs @@ -162,6 +162,10 @@ impl Coordinator { Ok(()) }, ActorShutdownReason::SemaphoreFailed(err) => Err(err).context("semaphore failed"), + ActorShutdownReason::DbError(account_id) => { + tracing::error!(account_id = %account_id, "Account actor shut down due to DB error"); + Ok(()) + }, }, Some(Err(err)) => { tracing::error!(err = %err, "actor task failed"); From 2e765a9001742c310a156f196bd6bae502101661 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Tue, 24 Feb 2026 11:45:32 -0300 Subject: [PATCH 3/5] chore: simplify matches --- crates/ntx-builder/src/actor/mod.rs | 74 +++++++++++++++-------------- 1 file changed, 38 insertions(+), 36 deletions(-) diff --git a/crates/ntx-builder/src/actor/mod.rs b/crates/ntx-builder/src/actor/mod.rs index d14ad37b7..e74702c20 100644 --- a/crates/ntx-builder/src/actor/mod.rs +++ b/crates/ntx-builder/src/actor/mod.rs @@ -28,6 +28,18 @@ use crate::builder::ChainState; use crate::db::Db; use crate::store::StoreClient; +/// Converts a database result into an `ActorShutdownReason` error, logging the error on failure. +fn db_query( + account_id: NetworkAccountId, + result: Result, + context: &str, +) -> Result { + result.map_err(|err| { + tracing::error!(err = err.as_report(), account_id = %account_id, "{context}"); + ActorShutdownReason::DbError(account_id) + }) +} + // ACTOR REQUESTS // ================================================================================================ @@ -237,16 +249,13 @@ impl AccountActor { // Determine initial mode by checking DB for available notes. let block_num = self.chain_state.read().await.chain_tip_header.block_num(); - let has_notes = match self - .db - .has_available_notes(account_id, block_num, self.max_note_attempts) - .await - { - Ok(has_notes) => has_notes, - Err(err) => { - tracing::error!(err = err.as_report(), account_id = %account_id, "failed to check for available notes"); - return ActorShutdownReason::DbError(account_id); - }, + let has_notes = match db_query( + account_id, + self.db.has_available_notes(account_id, block_num, self.max_note_attempts).await, + "failed to check for available notes", + ) { + Ok(v) => v, + Err(reason) => return reason, }; if has_notes { @@ -272,15 +281,15 @@ impl AccountActor { match self.mode { ActorMode::TransactionInflight(awaited_id) => { // Check DB: is the inflight tx still pending? - let resolved = match self.db - .is_transaction_resolved(account_id, awaited_id) - .await - { - Ok(resolved) => resolved, - Err(err) => { - tracing::error!(err = err.as_report(), account_id = %account_id, "failed to check transaction status"); - return ActorShutdownReason::DbError(account_id); - }, + let resolved = match db_query( + account_id, + self.db + .is_transaction_resolved(account_id, awaited_id) + .await, + "failed to check transaction status", + ) { + Ok(v) => v, + Err(reason) => return reason, }; if resolved { self.mode = ActorMode::NotesAvailable; @@ -332,14 +341,11 @@ impl AccountActor { let block_num = chain_state.chain_tip_header.block_num(); let max_notes = self.max_notes_per_tx.get(); - let (latest_account, notes) = self - .db - .select_candidate(account_id, block_num, self.max_note_attempts) - .await - .map_err(|err| { - tracing::error!(err = err.as_report(), account_id = %account_id, "failed to query DB for transaction candidate"); - ActorShutdownReason::DbError(account_id) - })?; + let (latest_account, notes) = db_query( + account_id, + self.db.select_candidate(account_id, block_num, self.max_note_attempts).await, + "failed to query DB for transaction candidate", + )?; let Some(account) = latest_account else { return Ok(None); @@ -383,17 +389,13 @@ impl AccountActor { let notes = tx_candidate.notes.clone(); let execution_result = context.execute_transaction(tx_candidate).await; match execution_result { - // Execution completed without failed notes. - Ok((tx_id, failed, scripts_to_cache)) if failed.is_empty() => { - self.cache_note_scripts(scripts_to_cache).await; - self.mode = ActorMode::TransactionInflight(tx_id); - }, - // Execution completed with some failed notes. Ok((tx_id, failed, scripts_to_cache)) => { self.cache_note_scripts(scripts_to_cache).await; - let nullifiers: Vec<_> = - failed.into_iter().map(|note| note.note.nullifier()).collect(); - self.mark_notes_failed(&nullifiers, block_num).await; + if !failed.is_empty() { + let nullifiers: Vec<_> = + failed.into_iter().map(|note| note.note.nullifier()).collect(); + self.mark_notes_failed(&nullifiers, block_num).await; + } self.mode = ActorMode::TransactionInflight(tx_id); }, // Transaction execution failed. From 6472314ff013f55517363686222100e48a363264 Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Wed, 25 Feb 2026 11:32:11 -0300 Subject: [PATCH 4/5] review: rename transaction_exists, update query, improve docs --- crates/ntx-builder/src/actor/mod.rs | 14 ++++------- crates/ntx-builder/src/builder.rs | 2 +- crates/ntx-builder/src/db/mod.rs | 13 +++-------- .../src/db/models/queries/accounts.rs | 23 ++++++++----------- 4 files changed, 17 insertions(+), 35 deletions(-) diff --git a/crates/ntx-builder/src/actor/mod.rs b/crates/ntx-builder/src/actor/mod.rs index e74702c20..cac7fcb3d 100644 --- a/crates/ntx-builder/src/actor/mod.rs +++ b/crates/ntx-builder/src/actor/mod.rs @@ -281,17 +281,17 @@ impl AccountActor { match self.mode { ActorMode::TransactionInflight(awaited_id) => { // Check DB: is the inflight tx still pending? - let resolved = match db_query( + let exists = match db_query( account_id, self.db - .is_transaction_resolved(account_id, awaited_id) + .transaction_exists(awaited_id) .await, "failed to check transaction status", ) { Ok(v) => v, Err(reason) => return reason, }; - if resolved { + if !exists { self.mode = ActorMode::NotesAvailable; } }, @@ -420,9 +420,6 @@ impl AccountActor { .await .is_err() { - tracing::warn!( - "failed to send cache note script request, coordinator is shutting down" - ); break; } } @@ -443,13 +440,10 @@ impl AccountActor { .await .is_err() { - tracing::warn!("failed to send notes failed request, coordinator is shutting down"); return; } // Wait for the coordinator to confirm the DB write. - if ack_rx.await.is_err() { - tracing::warn!("failed to receive notes failed ack from coordinator"); - } + let _ = ack_rx.await; } } diff --git a/crates/ntx-builder/src/builder.rs b/crates/ntx-builder/src/builder.rs index 6ef7a92c2..0e4f61304 100644 --- a/crates/ntx-builder/src/builder.rs +++ b/crates/ntx-builder/src/builder.rs @@ -98,7 +98,7 @@ pub struct NetworkTransactionBuilder { actor_context: AccountActorContext, /// Stream of mempool events from the block producer. mempool_events: MempoolEventStream, - /// Receiver for requests from account actors (note failures, script caching). + /// Database update requests from account actors. actor_request_rx: mpsc::Receiver, } diff --git a/crates/ntx-builder/src/db/mod.rs b/crates/ntx-builder/src/db/mod.rs index 6af5349c3..4cae5cac6 100644 --- a/crates/ntx-builder/src/db/mod.rs +++ b/crates/ntx-builder/src/db/mod.rs @@ -77,17 +77,10 @@ impl Db { .await } - /// Returns `true` when the given transaction has been resolved (committed or reverted) for the - /// given account, i.e. no inflight account row exists with that transaction ID. - pub async fn is_transaction_resolved( - &self, - account_id: NetworkAccountId, - tx_id: TransactionId, - ) -> Result { + /// Returns `true` when an inflight account row exists with the given transaction ID. + pub async fn transaction_exists(&self, tx_id: TransactionId) -> Result { self.inner - .query("is_transaction_resolved", move |conn| { - queries::is_transaction_resolved(conn, account_id, &tx_id) - }) + .query("transaction_exists", move |conn| queries::transaction_exists(conn, &tx_id)) .await } diff --git a/crates/ntx-builder/src/db/models/queries/accounts.rs b/crates/ntx-builder/src/db/models/queries/accounts.rs index 8a2df3174..7d52c6554 100644 --- a/crates/ntx-builder/src/db/models/queries/accounts.rs +++ b/crates/ntx-builder/src/db/models/queries/accounts.rs @@ -1,5 +1,6 @@ //! Account-related queries and models. +use diesel::dsl::exists; use diesel::prelude::*; use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; @@ -102,29 +103,23 @@ pub fn get_account( .transpose() } -/// Returns `true` when no inflight account row exists with the given `transaction_id`, meaning -/// the transaction was committed or reverted. +/// Returns `true` when an inflight account row exists with the given `transaction_id`. /// /// # Raw SQL /// /// ```sql -/// SELECT COUNT(*) -/// FROM accounts -/// WHERE account_id = ?1 AND transaction_id = ?2 +/// SELECT EXISTS (SELECT 1 FROM accounts WHERE transaction_id = ?1) /// ``` -pub fn is_transaction_resolved( +pub fn transaction_exists( conn: &mut SqliteConnection, - account_id: NetworkAccountId, tx_id: &TransactionId, ) -> Result { - let account_id_bytes = conversions::network_account_id_to_bytes(account_id); let tx_id_bytes = conversions::transaction_id_to_bytes(tx_id); - let count: i64 = schema::accounts::table - .filter(schema::accounts::account_id.eq(&account_id_bytes)) - .filter(schema::accounts::transaction_id.eq(&tx_id_bytes)) - .count() - .get_result(conn)?; + let result: bool = diesel::select(exists( + schema::accounts::table.filter(schema::accounts::transaction_id.eq(&tx_id_bytes)), + )) + .get_result(conn)?; - Ok(count == 0) + Ok(result) } From 4d5f47eb3b9acf7d238eb06823f6f54403df695a Mon Sep 17 00:00:00 2001 From: SantiagoPittella Date: Wed, 25 Feb 2026 11:48:29 -0300 Subject: [PATCH 5/5] review: update receiver docs --- crates/ntx-builder/src/builder.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/ntx-builder/src/builder.rs b/crates/ntx-builder/src/builder.rs index 0e4f61304..8e857b733 100644 --- a/crates/ntx-builder/src/builder.rs +++ b/crates/ntx-builder/src/builder.rs @@ -99,6 +99,9 @@ pub struct NetworkTransactionBuilder { /// Stream of mempool events from the block producer. mempool_events: MempoolEventStream, /// Database update requests from account actors. + /// + /// We keep database writes centralized so this is how actors communicate + /// items to write. actor_request_rx: mpsc::Receiver, }