diff --git a/src/ingester/persist/mod.rs b/src/ingester/persist/mod.rs index 2d64cd1f..e1baecb7 100644 --- a/src/ingester/persist/mod.rs +++ b/src/ingester/persist/mod.rs @@ -27,10 +27,12 @@ use sea_orm::{ sea_query::OnConflict, ColumnTrait, ConnectionTrait, DatabaseBackend, DatabaseTransaction, EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect, QueryTrait, Set, Statement, }; +// Added imports for missing account logic +use std::{cmp::max, collections::HashMap, collections::HashSet}; use solana_pubkey::{pubkey, Pubkey}; use solana_signature::Signature; use sqlx::types::Decimal; -use std::{cmp::max, collections::HashMap}; + pub mod indexed_merkle_tree; pub mod persisted_indexed_merkle_tree; @@ -199,8 +201,13 @@ pub async fn persist_state_update( } } + // *************************************************************** + // * CRITICAL ORDER CHANGE: Transactions moved to the end + // *************************************************************** + let transactions_vec = transactions.into_iter().collect::>(); + // 1. INSERT MAIN TRANSACTIONS (parent table: transactions) debug!("Persisting transaction metadatas..."); let (compression_transactions, non_compression_transactions): (Vec<_>, Vec<_>) = transactions_vec @@ -222,12 +229,14 @@ pub async fn persist_state_update( persist_transactions(txn, chunk).await?; } + // 2. INSERT ACCOUNT-TRANSACTION LINKS (child table: account_transactions) debug!("Persisting account transactions..."); let account_transactions = account_transactions.into_iter().collect::>(); for chunk in account_transactions.chunks(MAX_SQL_INSERTS) { + // This function now uses logic to insert missing accounts! persist_account_transactions(txn, chunk).await?; } - + persist_batch_events(txn, batch_merkle_tree_events, &tree_info_cache).await?; metric! { @@ -279,6 +288,11 @@ pub struct EnrichedTokenAccount { pub hash: Hash, } +#[derive(Debug, Clone, PartialEq, Eq, sea_orm::FromQueryResult)] +struct MinimalAccountHash { + hash: Vec, +} + #[derive(Debug)] enum AccountType { Account, @@ -595,10 +609,109 @@ async fn persist_transactions( Ok(()) } +// *************************************************************** +// * NEW HELPER FUNCTIONS FOR MISSING ACCOUNT HANDLING +// *************************************************************** +async fn get_missing_hashes( + txn: &DatabaseTransaction, + all_hashes: Vec, +) -> Result, IngesterError> { + if all_hashes.is_empty() { + return Ok(Vec::new()); + } + + // 1. Check which hashes EXIST in the database + let existing_hashes: Vec> = accounts::Entity::find() + .select_only() + .column(accounts::Column::Hash) + .filter(accounts::Column::Hash.is_in(all_hashes.iter().map(|h| h.to_vec()))) + // Changed into_model() to into_model::() + .into_model::() + .all(txn) + .await + .map_err(|e| IngesterError::DatabaseError(format!("Failed to query existing accounts: {}", e)))? + .into_iter() + .map(|model| model.hash) // <-- Now the type 'model' is explicitly known as MinimalAccountHash + .collect(); + + let existing_set: HashSet> = existing_hashes.into_iter().collect(); + + // 2. Find missing hashes + let missing_hashes: Vec = all_hashes.into_iter() + .filter(|hash| !existing_set.contains(&hash.to_vec())) + .collect(); + + Ok(missing_hashes) +} + +async fn persist_missing_accounts( + txn: &DatabaseTransaction, + missing_hashes: &[Hash], +) -> Result<(), IngesterError> { + if missing_hashes.is_empty() { + return Ok(()); + } + + let missing_account_models: Vec = missing_hashes + .iter() + .map(|hash| { + // We insert minimal, safe data for NOT NULL columns. + accounts::ActiveModel { + hash: Set(hash.to_vec()), + owner: Set(vec![0; 32]), // Example empty bytea value (32 bytes) + tree: Set(vec![0; 32]), // Example empty bytea value + leaf_index: Set(0 as i64), + slot_created: Set(0 as i64), + spent: Set(true), // Mark as spent/incomplete + lamports: Set(Decimal::from(0)), + nullified_in_tree: Set(false), + in_output_queue: Set(false), + // Optional values (Option) default to Default::default() (None) + ..Default::default() + } + }) + .collect(); + + log::warn!("Inserting {} missing stub accounts to satisfy FK constraint.", missing_hashes.len()); + + // We use insert_many with DO NOTHING to avoid conflicts in case of concurrent insertion + let query = accounts::Entity::insert_many(missing_account_models) + .on_conflict( + OnConflict::column(accounts::Column::Hash) + .do_nothing() + .to_owned(), + ) + .build(txn.get_database_backend()); + + txn.execute(query).await?; + + Ok(()) +} + + +// *************************************************************** +// * MODIFIED persist_account_transactions FUNCTION +// *************************************************************** + async fn persist_account_transactions( txn: &DatabaseTransaction, account_transactions: &[AccountTransaction], ) -> Result<(), IngesterError> { + + // --- STEP 1: COLLECTING AND VERIFYING ACCOUNT HASHES --- + let all_hashes: Vec = account_transactions.iter().map(|tx| tx.hash.clone()).collect(); + + // Get unique hashes for verification + let unique_hashes: Vec = all_hashes.into_iter().collect::>().into_iter().collect(); + + let missing_hashes = get_missing_hashes(txn, unique_hashes).await?; + + // --- STEP 2: INSERT MISSING STUB ACCOUNTS --- + // This prevents the "account_transactions_hash_fk" foreign key violation + persist_missing_accounts(txn, &missing_hashes).await?; + + // --- STEP 3: INSERT LINKS (ORIGINAL LOGIC) --- + let account_transaction_models = account_transactions .iter() .map(|transaction| account_transactions::ActiveModel {