Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 111 additions & 75 deletions crates/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use account_state::TransactionCandidate;
use futures::FutureExt;
use miden_node_proto::clients::{Builder, ValidatorClient};
use miden_node_proto::domain::account::NetworkAccountId;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_protocol::Word;
Expand All @@ -20,7 +19,7 @@ use miden_protocol::block::BlockNumber;
use miden_protocol::note::{Note, NoteScript, Nullifier};
use miden_protocol::transaction::TransactionId;
use miden_remote_prover_client::RemoteTransactionProver;
use tokio::sync::{AcquireError, RwLock, Semaphore, mpsc};
use tokio::sync::{AcquireError, Notify, RwLock, Semaphore, mpsc};
use tokio_util::sync::CancellationToken;
use url::Url;

Expand All @@ -29,16 +28,31 @@ use crate::builder::ChainState;
use crate::db::Db;
use crate::store::StoreClient;

// ACTOR NOTIFICATION
/// Converts a database result into an `ActorShutdownReason` error, logging the error on failure.
fn db_query<T>(
account_id: NetworkAccountId,
result: Result<T, miden_node_db::DatabaseError>,
context: &str,
) -> Result<T, ActorShutdownReason> {
result.map_err(|err| {
tracing::error!(err = err.as_report(), account_id = %account_id, "{context}");
ActorShutdownReason::DbError(account_id)
})
}

// ACTOR REQUESTS
// ================================================================================================

/// A notification sent from an account actor to the coordinator.
pub enum ActorNotification {
/// A request sent from an account actor to the coordinator via a shared mpsc channel.
pub enum ActorRequest {
/// One or more notes failed during transaction execution and should have their attempt
/// counters incremented.
/// counters incremented. The actor waits for the coordinator to acknowledge the DB write via
/// the oneshot channel, preventing race conditions where the actor could re-select the same
/// notes before the failure is persisted.
NotesFailed {
nullifiers: Vec<Nullifier>,
block_num: BlockNumber,
ack_tx: tokio::sync::oneshot::Sender<()>,
},
/// A note script was fetched from the remote store and should be persisted to the local DB.
CacheNoteScript { script_root: Word, script: NoteScript },
Expand All @@ -49,15 +63,14 @@ pub enum ActorNotification {

/// The reason an actor has shut down.
pub enum ActorShutdownReason {
/// Occurs when an account actor detects failure in the messaging channel used by the
/// coordinator.
EventChannelClosed,
/// Occurs when an account actor detects failure in acquiring the rate-limiting semaphore.
SemaphoreFailed(AcquireError),
/// Occurs when an account actor detects its corresponding cancellation token has been triggered
/// by the coordinator. Cancellation tokens are triggered by the coordinator to initiate
/// graceful shutdown of actors.
Cancelled(NetworkAccountId),
/// Occurs when the actor encounters a database error it cannot recover from.
DbError(NetworkAccountId),
}

// ACCOUNT ACTOR CONFIG
Expand Down Expand Up @@ -87,8 +100,8 @@ pub struct AccountActorContext {
pub max_note_attempts: usize,
/// Database for persistent state.
pub db: Db,
/// Channel for sending notifications to the coordinator (via the builder event loop).
pub notification_tx: mpsc::Sender<ActorNotification>,
/// Channel for sending requests to the coordinator (via the builder event loop).
pub request_tx: mpsc::Sender<ActorRequest>,
}

// ACCOUNT ORIGIN
Expand Down Expand Up @@ -179,7 +192,7 @@ pub struct AccountActor {
store: StoreClient,
db: Db,
mode: ActorMode,
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
notify: Arc<Notify>,
cancel_token: CancellationToken,
block_producer: BlockProducerClient,
validator: ValidatorClient,
Expand All @@ -190,17 +203,16 @@ pub struct AccountActor {
max_notes_per_tx: NonZeroUsize,
/// Maximum number of note execution attempts before dropping a note.
max_note_attempts: usize,
/// Channel for sending notifications to the coordinator.
notification_tx: mpsc::Sender<ActorNotification>,
/// Channel for sending requests to the coordinator.
request_tx: mpsc::Sender<ActorRequest>,
}

impl AccountActor {
/// Constructs a new account actor and corresponding messaging channel with the given
/// configuration.
/// Constructs a new account actor with the given configuration.
pub fn new(
origin: AccountOrigin,
actor_context: &AccountActorContext,
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
notify: Arc<Notify>,
cancel_token: CancellationToken,
) -> Self {
let block_producer = BlockProducerClient::new(actor_context.block_producer_url.clone());
Expand All @@ -217,7 +229,7 @@ impl AccountActor {
store: actor_context.store.clone(),
db: actor_context.db.clone(),
mode: ActorMode::NoViableNotes,
event_rx,
notify,
cancel_token,
block_producer,
validator,
Expand All @@ -226,7 +238,7 @@ impl AccountActor {
script_cache: actor_context.script_cache.clone(),
max_notes_per_tx: actor_context.max_notes_per_tx,
max_note_attempts: actor_context.max_note_attempts,
notification_tx: actor_context.notification_tx.clone(),
request_tx: actor_context.request_tx.clone(),
}
}

Expand All @@ -237,11 +249,14 @@ impl AccountActor {

// Determine initial mode by checking DB for available notes.
let block_num = self.chain_state.read().await.chain_tip_header.block_num();
let has_notes = self
.db
.has_available_notes(account_id, block_num, self.max_note_attempts)
.await
.expect("actor should be able to check for available notes");
let has_notes = match db_query(
account_id,
self.db.has_available_notes(account_id, block_num, self.max_note_attempts).await,
"failed to check for available notes",
) {
Ok(v) => v,
Err(reason) => return reason,
};

if has_notes {
self.mode = ActorMode::NotesAvailable;
Expand All @@ -261,28 +276,28 @@ impl AccountActor {
_ = self.cancel_token.cancelled() => {
return ActorShutdownReason::Cancelled(account_id);
}
// Handle mempool events.
event = self.event_rx.recv() => {
let Some(event) = event else {
return ActorShutdownReason::EventChannelClosed;
};
// Re-enable transaction execution if the transaction being waited on has
// been resolved (added to mempool, committed in a block, or reverted).
if let ActorMode::TransactionInflight(awaited_id) = self.mode {
let should_wake = match event.as_ref() {
MempoolEvent::TransactionAdded { id, .. } => *id == awaited_id,
MempoolEvent::BlockCommitted { txs, .. } => {
txs.contains(&awaited_id)
},
MempoolEvent::TransactionsReverted(tx_ids) => {
tx_ids.contains(&awaited_id)
},
};
if should_wake {
// Handle coordinator notifications. On notification, re-evaluate state from DB.
_ = self.notify.notified() => {
match self.mode {
ActorMode::TransactionInflight(awaited_id) => {
// Check DB: is the inflight tx still pending?
let resolved = match db_query(
account_id,
self.db
.is_transaction_resolved(account_id, awaited_id)
.await,
"failed to check transaction status",
) {
Ok(v) => v,
Err(reason) => return reason,
};
if resolved {
self.mode = ActorMode::NotesAvailable;
}
},
_ => {
self.mode = ActorMode::NotesAvailable;
}
} else {
self.mode = ActorMode::NotesAvailable;
}
},
// Execute transactions.
Expand All @@ -293,10 +308,13 @@ impl AccountActor {
let chain_state = self.chain_state.read().await.clone();

// Query DB for latest account and available notes.
let tx_candidate = self.select_candidate_from_db(
let tx_candidate = match self.select_candidate_from_db(
account_id,
chain_state,
).await;
).await {
Ok(candidate) => candidate,
Err(shutdown_reason) => return shutdown_reason,
};

if let Some(tx_candidate) = tx_candidate {
self.execute_transactions(account_id, tx_candidate).await;
Expand All @@ -319,30 +337,32 @@ impl AccountActor {
&self,
account_id: NetworkAccountId,
chain_state: ChainState,
) -> Option<TransactionCandidate> {
) -> Result<Option<TransactionCandidate>, ActorShutdownReason> {
let block_num = chain_state.chain_tip_header.block_num();
let max_notes = self.max_notes_per_tx.get();

let (latest_account, notes) = self
.db
.select_candidate(account_id, block_num, self.max_note_attempts)
.await
.expect("actor should be able to query DB for candidate");
let (latest_account, notes) = db_query(
account_id,
self.db.select_candidate(account_id, block_num, self.max_note_attempts).await,
"failed to query DB for transaction candidate",
)?;

let account = latest_account?;
let Some(account) = latest_account else {
return Ok(None);
};

let notes: Vec<_> = notes.into_iter().take(max_notes).collect();
if notes.is_empty() {
return None;
return Ok(None);
}

let (chain_tip_header, chain_mmr) = chain_state.into_parts();
Some(TransactionCandidate {
Ok(Some(TransactionCandidate {
account,
notes,
chain_tip_header,
chain_mmr,
})
}))
}

/// Execute a transaction candidate and mark notes as failed as required.
Expand All @@ -369,17 +389,13 @@ impl AccountActor {
let notes = tx_candidate.notes.clone();
let execution_result = context.execute_transaction(tx_candidate).await;
match execution_result {
// Execution completed without failed notes.
Ok((tx_id, failed, scripts_to_cache)) if failed.is_empty() => {
self.cache_note_scripts(scripts_to_cache).await;
self.mode = ActorMode::TransactionInflight(tx_id);
},
// Execution completed with some failed notes.
Ok((tx_id, failed, scripts_to_cache)) => {
self.cache_note_scripts(scripts_to_cache).await;
let nullifiers: Vec<_> =
failed.into_iter().map(|note| note.note.nullifier()).collect();
self.mark_notes_failed(&nullifiers, block_num).await;
if !failed.is_empty() {
let nullifiers: Vec<_> =
failed.into_iter().map(|note| note.note.nullifier()).collect();
self.mark_notes_failed(&nullifiers, block_num).await;
}
self.mode = ActorMode::TransactionInflight(tx_id);
},
// Transaction execution failed.
Expand All @@ -395,25 +411,45 @@ impl AccountActor {
}
}

/// Sends notifications to the coordinator to cache note scripts fetched from the remote store.
/// Sends requests to the coordinator to cache note scripts fetched from the remote store.
async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) {
for (script_root, script) in scripts {
let _ = self
.notification_tx
.send(ActorNotification::CacheNoteScript { script_root, script })
.await;
if self
.request_tx
.send(ActorRequest::CacheNoteScript { script_root, script })
.await
.is_err()
{
tracing::warn!(
"failed to send cache note script request, coordinator is shutting down"
);
break;
}
}
}

/// Sends a notification to the coordinator to mark notes as failed.
/// Sends a request to the coordinator to mark notes as failed and waits for the DB write to
/// complete. This prevents a race condition where the actor could re-select the same notes
/// before the failure counts are updated in the database.
async fn mark_notes_failed(&self, nullifiers: &[Nullifier], block_num: BlockNumber) {
let _ = self
.notification_tx
.send(ActorNotification::NotesFailed {
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
if self
.request_tx
.send(ActorRequest::NotesFailed {
nullifiers: nullifiers.to_vec(),
block_num,
ack_tx,
})
.await;
.await
.is_err()
{
tracing::warn!("failed to send notes failed request, coordinator is shutting down");
return;
}
// Wait for the coordinator to confirm the DB write.
if ack_rx.await.is_err() {
tracing::warn!("failed to receive notes failed ack from coordinator");
}
}
}

Expand Down
Loading
Loading