diff --git a/src/api/api.rs b/src/api/api.rs index faf5f835..42e04548 100644 --- a/src/api/api.rs +++ b/src/api/api.rs @@ -140,6 +140,13 @@ impl PhotonApi { .map_err(Into::into) } + pub async fn health(&self) -> Result<(), PhotonApiError> { + self.get_indexer_health() + .await + .map(|_| ()) + .map_err(Into::into) + } + pub async fn get_compressed_account( &self, request: CompressedAccountRequest, diff --git a/src/api/rpc_server.rs b/src/api/rpc_server.rs index f29678aa..4e7ff7b5 100644 --- a/src/api/rpc_server.rs +++ b/src/api/rpc_server.rs @@ -19,7 +19,8 @@ pub async fn run_server(api: PhotonApi, port: u16) -> Result Result, api.readiness().await.map_err(Into::into) })?; + module.register_async_method("health", |_rpc_params, rpc_context| async move { + debug!("Checking Health"); + let api = rpc_context.as_ref(); + api.health().await.map_err(Into::into) + })?; + + module.register_async_method( "getCompressedAccount", |rpc_params, rpc_context| async move { @@ -183,6 +191,15 @@ fn build_rpc_module(api_and_indexer: PhotonApi) -> Result, .map_err(Into::into) })?; + // Alias for getIndexerHealth, to enable compatibility with health check + module.register_async_method("getHealth", |_rpc_params, rpc_context| async move { + rpc_context + .as_ref() + .get_indexer_health() + .await + .map_err(Into::into) + })?; + module.register_async_method("getIndexerSlot", |_rpc_params, rpc_context| async move { let api = rpc_context.as_ref(); api.get_indexer_slot().await.map_err(Into::into) @@ -205,6 +222,12 @@ fn build_rpc_module(api_and_indexer: PhotonApi) -> Result, }, )?; + // Alias for getIndexerSlot, to enable compatibility with health check + module.register_async_method("getSlot", |_rpc_params, rpc_context| async move { + let api = rpc_context.as_ref(); + api.get_indexer_slot().await.map_err(Into::into) + })?; + module.register_async_method( "getCompressedAccountsByOwner", |rpc_params, rpc_context| async move { diff --git a/src/common/mod.rs b/src/common/mod.rs index ac9942ae..bd2e8728 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -149,7 +149,7 @@ pub async fn fetch_current_slot_with_infinite_retry(client: &RpcClient) -> u64 { pub fn get_rpc_client(rpc_url: &str) -> Arc { Arc::new(RpcClient::new_with_timeout_and_commitment( rpc_url.to_string(), - Duration::from_secs(90), + Duration::from_secs(180), CommitmentConfig::confirmed(), )) } diff --git a/src/ingester/fetchers/grpc.rs b/src/ingester/fetchers/grpc.rs index 5a4a6764..f34625d0 100644 --- a/src/ingester/fetchers/grpc.rs +++ b/src/ingester/fetchers/grpc.rs @@ -298,8 +298,21 @@ fn parse_block(block: SubscribeUpdateBlock) -> BlockInfo { fn parse_transaction(transaction: SubscribeUpdateTransactionInfo) -> TransactionInfo { let meta = transaction.meta.unwrap(); - let error = create_tx_error(meta.err.as_ref()).unwrap(); - let error = error.map(|e| e.to_string()); + + // We attempt to decode the TransactionError, but if it fails, we log the error and set error to None. + let tx_error_result = create_tx_error(meta.err.as_ref()); + + let error = match tx_error_result { + Ok(tx_error_option) => tx_error_option.map(|e| e.to_string()), + Err(e) => { + // We log the decoding error, but we don't panic. + error!("Failed to decode TransactionError: {}", e); + metric! { + statsd_count!("tx_error_decode_failure", 1); // It's worth adding a metric + } + None + } + }; let signature = Signature::try_from(transaction.signature).unwrap(); let message = transaction.transaction.unwrap().message.unwrap(); diff --git a/src/ingester/indexer/mod.rs b/src/ingester/indexer/mod.rs index fa696d56..01152fa1 100644 --- a/src/ingester/indexer/mod.rs +++ b/src/ingester/indexer/mod.rs @@ -12,8 +12,8 @@ use crate::{ }; use super::typedefs::block_info::BlockInfo; -const POST_BACKFILL_FREQUENCY: u64 = 10; -const PRE_BACKFILL_FREQUENCY: u64 = 10; +const POST_BACKFILL_FREQUENCY: u64 = 10000; +const PRE_BACKFILL_FREQUENCY: u64 = 10000; #[derive(FromQueryResult)] pub struct OptionalContextModel { diff --git a/src/ingester/mod.rs b/src/ingester/mod.rs index c9b953d0..afbab96a 100644 --- a/src/ingester/mod.rs +++ b/src/ingester/mod.rs @@ -94,10 +94,10 @@ pub async fn index_block_batch( state_updates.push(derive_block_state_update(db, block).await?); } persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?; + tx.commit().await?; metric! { statsd_count!("blocks_indexed", blocks_len as i64); } - tx.commit().await?; Ok(()) } @@ -105,18 +105,36 @@ pub async fn index_block_batch_with_infinite_retries( db: &DatabaseConnection, block_batch: Vec, ) { + let mut retry_count = 0; loop { match index_block_batch(db, &block_batch).await { Ok(()) => return, Err(e) => { let start_block = block_batch.first().unwrap().metadata.slot; let end_block = block_batch.last().unwrap().metadata.slot; + metric! { + statsd_count!("indexing_batch_failure_attempt", 1); + } log::error!( "Failed to index block batch {}-{}. Got error {}", start_block, end_block, e ); + + retry_count += 1; + if retry_count > 5 { + metric! { + statsd_count!("indexing_batch_permanently_skipped", 1); + } + + log::error!( + "Max retries exceeded for faulty batch. Skipping blocks {}-{}", + start_block, + end_block + ); + return; + } sleep(Duration::from_secs(1)); } } diff --git a/src/main.rs b/src/main.rs index 7e0f8cd2..e5d11920 100644 --- a/src/main.rs +++ b/src/main.rs @@ -51,8 +51,12 @@ struct Args { #[arg(short, long)] db_url: Option, + // Snapshot offset determines how much newer the snapshot from what is in the DB for us to start indexing from the snapshot instead of DB + #[arg(short, long)] + snapshot_offset: Option, + /// The start slot to begin indexing from. Defaults to the last indexed slot in the database plus - /// one. + /// one. #[arg(short, long)] start_slot: Option, @@ -209,54 +213,6 @@ async fn main() { let is_rpc_node_local = args.rpc_url.contains("127.0.0.1"); let rpc_client = get_rpc_client(&args.rpc_url); - if let Some(snapshot_dir) = args.snapshot_dir { - let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir)); - let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter) - .await - .unwrap(); - if !snapshot_files.is_empty() { - // Sync tree metadata from on-chain before processing snapshot - // This is REQUIRED so the indexer knows about all existing trees - info!("Syncing tree metadata from on-chain before loading snapshot..."); - if let Err(e) = photon_indexer::monitor::tree_metadata_sync::sync_tree_metadata( - rpc_client.as_ref(), - db_conn.as_ref(), - ) - .await - { - error!( - "Failed to sync tree metadata: {}. Cannot proceed with snapshot loading.", - e - ); - error!("Tree metadata must be synced before loading snapshots to avoid skipping transactions."); - std::process::exit(1); - } - info!("Tree metadata sync completed successfully"); - - info!("Detected snapshot files. Loading snapshot..."); - let last_slot = snapshot_files.last().unwrap().end_slot; - let block_stream = - load_block_stream_from_directory_adapter(directory_adapter.clone()).await; - pin_mut!(block_stream); - let first_blocks = block_stream.next().await.unwrap(); - let last_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot; - let block_stream = stream! { - yield first_blocks; - while let Some(blocks) = block_stream.next().await { - yield blocks; - } - }; - index_block_stream( - block_stream, - db_conn.clone(), - rpc_client.clone(), - last_indexed_slot, - Some(last_slot), - ) - .await; - } - } - let (indexer_handle, monitor_handle) = match args.disable_indexing { true => { info!("Indexing is disabled"); @@ -264,6 +220,85 @@ async fn main() { } false => { info!("Starting indexer..."); + + let last_indexed_slot = match args.start_slot { + Some(start_slot) => match start_slot.as_str() { + "latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await, + _ => { + fetch_block_parent_slot(&rpc_client, start_slot.parse::().unwrap()) + .await + } + }, + None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref()) + .await + .unwrap_or( + get_network_start_slot(&rpc_client) + .await + .try_into() + .unwrap(), + ) + .try_into() + .unwrap(), + }; + if let Some(snapshot_dir) = args.snapshot_dir { + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir)); + let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter) + .await + .unwrap(); + if !snapshot_files.is_empty() { + + // Sync tree metadata from on-chain before processing snapshot + // This is REQUIRED so the indexer knows about all existing trees + info!("Syncing tree metadata from on-chain before loading snapshot..."); + if let Err(e) = photon_indexer::monitor::tree_metadata_sync::sync_tree_metadata( + rpc_client.as_ref(), + db_conn.as_ref(), + ) + .await + { + error!( + "Failed to sync tree metadata: {}. Cannot proceed with snapshot loading.", + e + ); + error!("Tree metadata must be synced before loading snapshots to avoid skipping transactions."); + std::process::exit(1); + } + info!("Tree metadata sync completed successfully"); + + info!("Detected snapshot files. Loading snapshot..."); + let last_slot = snapshot_files.last().unwrap().end_slot; + // Compute the snapshot offset, if the snapshot is not more recent than this offset then don't fetch the snapshot + let snapshot_offset = last_slot - args.snapshot_offset.unwrap_or(0); + + if snapshot_offset >= last_indexed_slot { + info!("Snapshot is newer than the last indexed slot. Loading snapshot..."); + + let block_stream = + load_block_stream_from_directory_adapter(directory_adapter.clone()).await; + pin_mut!(block_stream); + let first_blocks = block_stream.next().await.unwrap(); + let last_stream_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot; + let block_stream = stream! { + yield first_blocks; + while let Some(blocks) = block_stream.next().await { + yield blocks; + } + }; + index_block_stream( + block_stream, + db_conn.clone(), + rpc_client.clone(), + last_stream_indexed_slot, + Some(last_slot), + ) + .await; + } else { + info!("Snapshot is already indexed. Skipping..."); + } + } + } + + // For localnet we can safely use a large batch size to speed up indexing. let max_concurrent_block_fetches = match args.max_concurrent_block_fetches { Some(max_concurrent_block_fetches) => max_concurrent_block_fetches, @@ -275,25 +310,7 @@ async fn main() { } } }; - let last_indexed_slot = match args.start_slot { - Some(start_slot) => match start_slot.as_str() { - "latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await, - _ => { - fetch_block_parent_slot(&rpc_client, start_slot.parse::().unwrap()) - .await - } - }, - None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref()) - .await - .unwrap_or( - get_network_start_slot(&rpc_client) - .await - .try_into() - .unwrap(), - ) - .try_into() - .unwrap(), - }; + let block_stream_config = BlockStreamConfig { rpc_client: rpc_client.clone(), diff --git a/src/migration/migrations/standard/m20220101_000001_init.rs b/src/migration/migrations/standard/m20220101_000001_init.rs index 1f5fa7ff..67447660 100644 --- a/src/migration/migrations/standard/m20220101_000001_init.rs +++ b/src/migration/migrations/standard/m20220101_000001_init.rs @@ -267,10 +267,14 @@ impl MigrationTrait for Migration { "ALTER TABLE owner_balances ADD COLUMN lamports bigint2 NOT NULL;", ) .await?; - + + // HACK: We need to use a larger precision for the amount column to avoid overflow, appears when saving snapshot to database (if type is bigint2) + // DO UPDATE SET amount = token_owner_balances.amount + excluded.amount. + // ERROR: numeric field overflow + // DETAIL: A field with precision 20, scale 0 must round to an absolute value less than 10^20. execute_sql( manager, - "ALTER TABLE token_owner_balances ADD COLUMN amount bigint2 NOT NULL;", + "ALTER TABLE token_owner_balances ADD COLUMN amount numeric(22, 0) NOT NULL;", ) .await?; } diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index 7a487b32..ff9bbfcd 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -82,6 +82,8 @@ pub fn continously_monitor_photon( 0 }; metric! { + statsd_gauge!("latest_slot", latest_slot); + statsd_gauge!("last_indexed_slot", last_indexed_slot); statsd_gauge!("indexing_lag", lag); } if lag < HEALTH_CHECK_SLOT_DISTANCE as u64 { @@ -242,12 +244,17 @@ async fn validate_tree_roots(rpc_client: &RpcClient, db_roots: Vec<(Pubkey, Hash db_hash, account_roots ); - return; + let pubkey_str = pubkey.to_string(); + metric! { + statsd_count!("root_validation_failures", 1, "pubkey" => &pubkey_str); + statsd_gauge!("root_validation_success", 0); + } + return; } } } } metric! { - statsd_count!("root_validation_success", 1); + statsd_gauge!("root_validation_success", 1); } } diff --git a/src/snapshot/loader/main.rs b/src/snapshot/loader/main.rs index fb44270e..3a0ae0f5 100644 --- a/src/snapshot/loader/main.rs +++ b/src/snapshot/loader/main.rs @@ -1,10 +1,13 @@ use anyhow::Context; use clap::Parser; use futures::StreamExt; -use log::error; +use log::{error, info}; use photon_indexer::common::{setup_logging, LoggingFormat}; -use photon_indexer::snapshot::{create_snapshot_from_byte_stream, DirectoryAdapter}; +use photon_indexer::snapshot::{ + create_snapshot_from_byte_stream,get_snapshot_files_with_metadata, DirectoryAdapter, +}; use std::path::Path; +use std::sync::Arc; /// Photon Loader: a utility to load snapshots from a snapshot server #[derive(Parser, Debug)] @@ -18,6 +21,10 @@ struct Args { #[arg(long)] snapshot_dir: String, + /// Minmum snapshot age to fetching from + #[arg(long, default_value = "1000")] + min_snapshot_age: Option, + /// Logging format #[arg(short, long, default_value_t = LoggingFormat::Standard)] logging_format: LoggingFormat, @@ -28,15 +35,58 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); setup_logging(args.logging_format); + let snapshot_dir = &args.snapshot_dir; + // Create snapshot directory if it doesn't exist - if !Path::new(&args.snapshot_dir).exists() { - std::fs::create_dir_all(&args.snapshot_dir).unwrap(); + if !Path::new(snapshot_dir).exists() { + std::fs::create_dir_all(snapshot_dir).unwrap(); } let http_client = reqwest::Client::new(); + + // Get the snapshots from the local directory + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir.clone())); + let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter) + .await + .unwrap(); + if !snapshot_files.is_empty() { + info!("Detected snapshot files. Loading snapshot..."); + // Fetch the maximum end_slot from snapshot_files + let latest_snapshot = snapshot_files + .iter() + .max_by_key(|file| file.end_slot) + .unwrap(); + + // Get the remote snapshot + let response = http_client + .get(format!("{}/slot", args.snapshot_server_url)) + .send() + .await + .unwrap(); + + if response.status().is_success() { + // REad response body and return it as an integre + let remote_end_slot = response + .text() + .await + .unwrap() + .parse::() + .context("Failed to parse remote end slot")?; + + + if remote_end_slot <= latest_snapshot.end_slot+args.min_snapshot_age.unwrap_or(0) { + info!("Local snapshot is up to date"); + return Ok(()); + } + + } else { + error!("Failed to query snapshot status") + } + } + // Call the download snapshot endpoint let response = http_client - .get(&format!("{}/download", args.snapshot_server_url)) + .get(format!("{}/download", args.snapshot_server_url)) .send() .await .unwrap();