From a9f08ac3721131d95baf30fce0c00b8bb3f07c37 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 24 Oct 2024 08:41:19 +0000 Subject: [PATCH 01/18] add a health endpoint --- src/api/api.rs | 7 +++++++ src/api/rpc_server.rs | 10 +++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/api/api.rs b/src/api/api.rs index faf5f835..42e04548 100644 --- a/src/api/api.rs +++ b/src/api/api.rs @@ -140,6 +140,13 @@ impl PhotonApi { .map_err(Into::into) } + pub async fn health(&self) -> Result<(), PhotonApiError> { + self.get_indexer_health() + .await + .map(|_| ()) + .map_err(Into::into) + } + pub async fn get_compressed_account( &self, request: CompressedAccountRequest, diff --git a/src/api/rpc_server.rs b/src/api/rpc_server.rs index f29678aa..cf57e172 100644 --- a/src/api/rpc_server.rs +++ b/src/api/rpc_server.rs @@ -19,7 +19,8 @@ pub async fn run_server(api: PhotonApi, port: u16) -> Result Result, api.readiness().await.map_err(Into::into) })?; + module.register_async_method("health", |_rpc_params, rpc_context| async move { + debug!("Checking Health"); + let api = rpc_context.as_ref(); + api.health().await.map_err(Into::into) + })?; + + module.register_async_method( "getCompressedAccount", |rpc_params, rpc_context| async move { From 051e62b30185a1e5a42248cf3cce9ede86142f82 Mon Sep 17 00:00:00 2001 From: bruswejn Date: Mon, 28 Oct 2024 13:24:08 +0100 Subject: [PATCH 02/18] reducing logging frequency --- src/ingester/indexer/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ingester/indexer/mod.rs b/src/ingester/indexer/mod.rs index fa696d56..01152fa1 100644 --- a/src/ingester/indexer/mod.rs +++ b/src/ingester/indexer/mod.rs @@ -12,8 +12,8 @@ use crate::{ }; use super::typedefs::block_info::BlockInfo; -const POST_BACKFILL_FREQUENCY: u64 = 10; -const PRE_BACKFILL_FREQUENCY: u64 = 10; +const POST_BACKFILL_FREQUENCY: u64 = 10000; +const PRE_BACKFILL_FREQUENCY: u64 = 10000; #[derive(FromQueryResult)] pub struct OptionalContextModel { From db403fd68d5e8e3393e8d36c86741bc86c8514e7 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Fri, 10 Jan 2025 09:25:27 +0000 Subject: [PATCH 03/18] Add metrics to figure out persistance bugs --- src/ingester/mod.rs | 2 +- src/monitor/mod.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ingester/mod.rs b/src/ingester/mod.rs index c9b953d0..1ccd1291 100644 --- a/src/ingester/mod.rs +++ b/src/ingester/mod.rs @@ -94,10 +94,10 @@ pub async fn index_block_batch( state_updates.push(derive_block_state_update(db, block).await?); } persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?; + tx.commit().await?; metric! { statsd_count!("blocks_indexed", blocks_len as i64); } - tx.commit().await?; Ok(()) } diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index 7a487b32..db3e5948 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -82,6 +82,8 @@ pub fn continously_monitor_photon( 0 }; metric! { + statsd_count!("latest_slot", latest_slot); + statsd_count!("last_indexed_slot", last_indexed_slot); statsd_gauge!("indexing_lag", lag); } if lag < HEALTH_CHECK_SLOT_DISTANCE as u64 { From 5f0766f778d88648d2722fc053682098329d7dd7 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Fri, 10 Jan 2025 10:21:34 +0000 Subject: [PATCH 04/18] Add a flag to determine the snapshot age --- src/main.rs | 92 ++++++++++++++++++++++++++++++----------------------- 1 file changed, 53 insertions(+), 39 deletions(-) diff --git a/src/main.rs b/src/main.rs index 7e0f8cd2..d7c0623d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -51,8 +51,12 @@ struct Args { #[arg(short, long)] db_url: Option, + // Snapshot offset determines how much newer the snapshot from what is in the DB for us to start indexing from the snapshot instead of DB + #[arg(short, long)] + snapshot_offset: Option, + /// The start slot to begin indexing from. Defaults to the last indexed slot in the database plus - /// one. + /// one. #[arg(short, long)] start_slot: Option, @@ -209,6 +213,25 @@ async fn main() { let is_rpc_node_local = args.rpc_url.contains("127.0.0.1"); let rpc_client = get_rpc_client(&args.rpc_url); + let last_indexed_slot = match args.start_slot { + Some(start_slot) => match start_slot.as_str() { + "latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await, + _ => { + fetch_block_parent_slot(&rpc_client, start_slot.parse::().unwrap()) + .await + } + }, + None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref()) + .await + .unwrap_or( + get_network_start_slot(&rpc_client) + .await + .try_into() + .unwrap(), + ) + .try_into() + .unwrap(), + }; if let Some(snapshot_dir) = args.snapshot_dir { let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir)); let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter) @@ -235,25 +258,34 @@ async fn main() { info!("Detected snapshot files. Loading snapshot..."); let last_slot = snapshot_files.last().unwrap().end_slot; - let block_stream = - load_block_stream_from_directory_adapter(directory_adapter.clone()).await; - pin_mut!(block_stream); - let first_blocks = block_stream.next().await.unwrap(); - let last_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot; - let block_stream = stream! { - yield first_blocks; - while let Some(blocks) = block_stream.next().await { - yield blocks; - } - }; - index_block_stream( - block_stream, - db_conn.clone(), - rpc_client.clone(), - last_indexed_slot, - Some(last_slot), - ) - .await; + // Compute the snapshot offset, if the snapshot is not more recent than this offset then don't fetch the snapshot + let snapshot_offset = last_slot - args.snapshot_offset.unwrap_or(0); + + if snapshot_offset >= last_indexed_slot { + info!("Snapshot is newer than the last indexed slot. Loading snapshot..."); + + let block_stream = + load_block_stream_from_directory_adapter(directory_adapter.clone()).await; + pin_mut!(block_stream); + let first_blocks = block_stream.next().await.unwrap(); + let last_stream_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot; + let block_stream = stream! { + yield first_blocks; + while let Some(blocks) = block_stream.next().await { + yield blocks; + } + }; + index_block_stream( + block_stream, + db_conn.clone(), + rpc_client.clone(), + last_stream_indexed_slot, + Some(last_slot), + ) + .await; + } else { + info!("Snapshot is already indexed. Skipping..."); + } } } @@ -275,25 +307,7 @@ async fn main() { } } }; - let last_indexed_slot = match args.start_slot { - Some(start_slot) => match start_slot.as_str() { - "latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await, - _ => { - fetch_block_parent_slot(&rpc_client, start_slot.parse::().unwrap()) - .await - } - }, - None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref()) - .await - .unwrap_or( - get_network_start_slot(&rpc_client) - .await - .try_into() - .unwrap(), - ) - .try_into() - .unwrap(), - }; + let block_stream_config = BlockStreamConfig { rpc_client: rpc_client.clone(), From 6ee53a6844b0983e98716931a10809c14d3daeba Mon Sep 17 00:00:00 2001 From: bruswejn Date: Mon, 13 Jan 2025 12:37:34 +0100 Subject: [PATCH 05/18] latest slots to gauge --- src/monitor/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index db3e5948..7bf9d46d 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -82,8 +82,8 @@ pub fn continously_monitor_photon( 0 }; metric! { - statsd_count!("latest_slot", latest_slot); - statsd_count!("last_indexed_slot", last_indexed_slot); + statsd_gauge!("latest_slot", latest_slot); + statsd_gauge!("last_indexed_slot", last_indexed_slot); statsd_gauge!("indexing_lag", lag); } if lag < HEALTH_CHECK_SLOT_DISTANCE as u64 { From 319671efdebcf64916fb32861581b3a92ab8738d Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Mon, 20 Jan 2025 12:19:00 +0000 Subject: [PATCH 06/18] allow loader to only fetch snapshot if needed --- src/snapshot/loader/main.rs | 60 +++++++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 5 deletions(-) diff --git a/src/snapshot/loader/main.rs b/src/snapshot/loader/main.rs index fb44270e..3a0ae0f5 100644 --- a/src/snapshot/loader/main.rs +++ b/src/snapshot/loader/main.rs @@ -1,10 +1,13 @@ use anyhow::Context; use clap::Parser; use futures::StreamExt; -use log::error; +use log::{error, info}; use photon_indexer::common::{setup_logging, LoggingFormat}; -use photon_indexer::snapshot::{create_snapshot_from_byte_stream, DirectoryAdapter}; +use photon_indexer::snapshot::{ + create_snapshot_from_byte_stream,get_snapshot_files_with_metadata, DirectoryAdapter, +}; use std::path::Path; +use std::sync::Arc; /// Photon Loader: a utility to load snapshots from a snapshot server #[derive(Parser, Debug)] @@ -18,6 +21,10 @@ struct Args { #[arg(long)] snapshot_dir: String, + /// Minmum snapshot age to fetching from + #[arg(long, default_value = "1000")] + min_snapshot_age: Option, + /// Logging format #[arg(short, long, default_value_t = LoggingFormat::Standard)] logging_format: LoggingFormat, @@ -28,15 +35,58 @@ async fn main() -> anyhow::Result<()> { let args = Args::parse(); setup_logging(args.logging_format); + let snapshot_dir = &args.snapshot_dir; + // Create snapshot directory if it doesn't exist - if !Path::new(&args.snapshot_dir).exists() { - std::fs::create_dir_all(&args.snapshot_dir).unwrap(); + if !Path::new(snapshot_dir).exists() { + std::fs::create_dir_all(snapshot_dir).unwrap(); } let http_client = reqwest::Client::new(); + + // Get the snapshots from the local directory + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir.clone())); + let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter) + .await + .unwrap(); + if !snapshot_files.is_empty() { + info!("Detected snapshot files. Loading snapshot..."); + // Fetch the maximum end_slot from snapshot_files + let latest_snapshot = snapshot_files + .iter() + .max_by_key(|file| file.end_slot) + .unwrap(); + + // Get the remote snapshot + let response = http_client + .get(format!("{}/slot", args.snapshot_server_url)) + .send() + .await + .unwrap(); + + if response.status().is_success() { + // REad response body and return it as an integre + let remote_end_slot = response + .text() + .await + .unwrap() + .parse::() + .context("Failed to parse remote end slot")?; + + + if remote_end_slot <= latest_snapshot.end_slot+args.min_snapshot_age.unwrap_or(0) { + info!("Local snapshot is up to date"); + return Ok(()); + } + + } else { + error!("Failed to query snapshot status") + } + } + // Call the download snapshot endpoint let response = http_client - .get(&format!("{}/download", args.snapshot_server_url)) + .get(format!("{}/download", args.snapshot_server_url)) .send() .await .unwrap(); From 74e4073cc903861a2fa186187616a90133f762e9 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Tue, 28 Jan 2025 03:22:23 +0000 Subject: [PATCH 07/18] Add monitoring metrics - Track the number of root hash failures - Turn success metric into a gauge - Report 0/1 on the success metric --- src/monitor/mod.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index 7bf9d46d..b32237fb 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -244,12 +244,16 @@ async fn validate_tree_roots(rpc_client: &RpcClient, db_roots: Vec<(Pubkey, Hash db_hash, account_roots ); - return; + metric! { + statsd_count!("root_validation_failures", 1); + statsd_gauge!("root_validation_success", 0); + } + return; } } } } metric! { - statsd_count!("root_validation_success", 1); + statsd_gauge!("root_validation_success", 1); } } From 1f1253bd0f1851553b6a66eff66ff30b91befbdd Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 30 Jan 2025 14:38:15 +0530 Subject: [PATCH 08/18] Update mod.rs Identify which trees are failing --- src/monitor/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index b32237fb..9676ef4a 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -245,7 +245,7 @@ async fn validate_tree_roots(rpc_client: &RpcClient, db_roots: Vec<(Pubkey, Hash account_roots ); metric! { - statsd_count!("root_validation_failures", 1); + statsd_count!("root_validation_failures", 1, "pubkey" => pubkey); statsd_gauge!("root_validation_success", 0); } return; From 71d77fb423d99ad6dfba85da75fd93ce85bee167 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Wed, 29 Jan 2025 19:09:05 +0530 Subject: [PATCH 09/18] Move snapshot logic into the indexer branch --- src/main.rs | 155 ++++++++++++++++++++++++++-------------------------- 1 file changed, 79 insertions(+), 76 deletions(-) diff --git a/src/main.rs b/src/main.rs index d7c0623d..e5d11920 100644 --- a/src/main.rs +++ b/src/main.rs @@ -213,82 +213,6 @@ async fn main() { let is_rpc_node_local = args.rpc_url.contains("127.0.0.1"); let rpc_client = get_rpc_client(&args.rpc_url); - let last_indexed_slot = match args.start_slot { - Some(start_slot) => match start_slot.as_str() { - "latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await, - _ => { - fetch_block_parent_slot(&rpc_client, start_slot.parse::().unwrap()) - .await - } - }, - None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref()) - .await - .unwrap_or( - get_network_start_slot(&rpc_client) - .await - .try_into() - .unwrap(), - ) - .try_into() - .unwrap(), - }; - if let Some(snapshot_dir) = args.snapshot_dir { - let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir)); - let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter) - .await - .unwrap(); - if !snapshot_files.is_empty() { - // Sync tree metadata from on-chain before processing snapshot - // This is REQUIRED so the indexer knows about all existing trees - info!("Syncing tree metadata from on-chain before loading snapshot..."); - if let Err(e) = photon_indexer::monitor::tree_metadata_sync::sync_tree_metadata( - rpc_client.as_ref(), - db_conn.as_ref(), - ) - .await - { - error!( - "Failed to sync tree metadata: {}. Cannot proceed with snapshot loading.", - e - ); - error!("Tree metadata must be synced before loading snapshots to avoid skipping transactions."); - std::process::exit(1); - } - info!("Tree metadata sync completed successfully"); - - info!("Detected snapshot files. Loading snapshot..."); - let last_slot = snapshot_files.last().unwrap().end_slot; - // Compute the snapshot offset, if the snapshot is not more recent than this offset then don't fetch the snapshot - let snapshot_offset = last_slot - args.snapshot_offset.unwrap_or(0); - - if snapshot_offset >= last_indexed_slot { - info!("Snapshot is newer than the last indexed slot. Loading snapshot..."); - - let block_stream = - load_block_stream_from_directory_adapter(directory_adapter.clone()).await; - pin_mut!(block_stream); - let first_blocks = block_stream.next().await.unwrap(); - let last_stream_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot; - let block_stream = stream! { - yield first_blocks; - while let Some(blocks) = block_stream.next().await { - yield blocks; - } - }; - index_block_stream( - block_stream, - db_conn.clone(), - rpc_client.clone(), - last_stream_indexed_slot, - Some(last_slot), - ) - .await; - } else { - info!("Snapshot is already indexed. Skipping..."); - } - } - } - let (indexer_handle, monitor_handle) = match args.disable_indexing { true => { info!("Indexing is disabled"); @@ -296,6 +220,85 @@ async fn main() { } false => { info!("Starting indexer..."); + + let last_indexed_slot = match args.start_slot { + Some(start_slot) => match start_slot.as_str() { + "latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await, + _ => { + fetch_block_parent_slot(&rpc_client, start_slot.parse::().unwrap()) + .await + } + }, + None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref()) + .await + .unwrap_or( + get_network_start_slot(&rpc_client) + .await + .try_into() + .unwrap(), + ) + .try_into() + .unwrap(), + }; + if let Some(snapshot_dir) = args.snapshot_dir { + let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir)); + let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter) + .await + .unwrap(); + if !snapshot_files.is_empty() { + + // Sync tree metadata from on-chain before processing snapshot + // This is REQUIRED so the indexer knows about all existing trees + info!("Syncing tree metadata from on-chain before loading snapshot..."); + if let Err(e) = photon_indexer::monitor::tree_metadata_sync::sync_tree_metadata( + rpc_client.as_ref(), + db_conn.as_ref(), + ) + .await + { + error!( + "Failed to sync tree metadata: {}. Cannot proceed with snapshot loading.", + e + ); + error!("Tree metadata must be synced before loading snapshots to avoid skipping transactions."); + std::process::exit(1); + } + info!("Tree metadata sync completed successfully"); + + info!("Detected snapshot files. Loading snapshot..."); + let last_slot = snapshot_files.last().unwrap().end_slot; + // Compute the snapshot offset, if the snapshot is not more recent than this offset then don't fetch the snapshot + let snapshot_offset = last_slot - args.snapshot_offset.unwrap_or(0); + + if snapshot_offset >= last_indexed_slot { + info!("Snapshot is newer than the last indexed slot. Loading snapshot..."); + + let block_stream = + load_block_stream_from_directory_adapter(directory_adapter.clone()).await; + pin_mut!(block_stream); + let first_blocks = block_stream.next().await.unwrap(); + let last_stream_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot; + let block_stream = stream! { + yield first_blocks; + while let Some(blocks) = block_stream.next().await { + yield blocks; + } + }; + index_block_stream( + block_stream, + db_conn.clone(), + rpc_client.clone(), + last_stream_indexed_slot, + Some(last_slot), + ) + .await; + } else { + info!("Snapshot is already indexed. Skipping..."); + } + } + } + + // For localnet we can safely use a large batch size to speed up indexing. let max_concurrent_block_fetches = match args.max_concurrent_block_fetches { Some(max_concurrent_block_fetches) => max_concurrent_block_fetches, From 7082c060de873f7bb1ab28ba0f776193bd02dac8 Mon Sep 17 00:00:00 2001 From: bruswejn Date: Mon, 3 Feb 2025 00:08:54 +0100 Subject: [PATCH 10/18] fix --- src/monitor/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index 9676ef4a..1ed416a4 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -231,6 +231,7 @@ async fn load_accounts_with_infinite_retry( } async fn validate_tree_roots(rpc_client: &RpcClient, db_roots: Vec<(Pubkey, Hash)>) { + let mut root_validation_errors = 0; for chunk in db_roots.chunks(CHUNK_SIZE) { let pubkeys = chunk.iter().map(|(pubkey, _)| *pubkey).collect(); let accounts = load_accounts_with_infinite_retry(rpc_client, pubkeys).await; From 228e25437b7fd9d18bc4ccc28316fd99c28afffd Mon Sep 17 00:00:00 2001 From: bruswejn Date: Mon, 3 Feb 2025 00:25:11 +0100 Subject: [PATCH 11/18] pubkey fix --- src/monitor/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index 1ed416a4..0421899d 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -245,8 +245,9 @@ async fn validate_tree_roots(rpc_client: &RpcClient, db_roots: Vec<(Pubkey, Hash db_hash, account_roots ); + let pubkey_str = pubkey.to_string(); metric! { - statsd_count!("root_validation_failures", 1, "pubkey" => pubkey); + statsd_count!("root_validation_failures", 1, "pubkey" => &pubkey_str); statsd_gauge!("root_validation_success", 0); } return; From 695e138d558d3822bc7a243194f2088d7b5444d6 Mon Sep 17 00:00:00 2001 From: bruswejn Date: Wed, 2 Apr 2025 11:33:44 +0200 Subject: [PATCH 12/18] db column change type --- .../migrations/standard/m20220101_000001_init.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/migration/migrations/standard/m20220101_000001_init.rs b/src/migration/migrations/standard/m20220101_000001_init.rs index 1f5fa7ff..67447660 100644 --- a/src/migration/migrations/standard/m20220101_000001_init.rs +++ b/src/migration/migrations/standard/m20220101_000001_init.rs @@ -267,10 +267,14 @@ impl MigrationTrait for Migration { "ALTER TABLE owner_balances ADD COLUMN lamports bigint2 NOT NULL;", ) .await?; - + + // HACK: We need to use a larger precision for the amount column to avoid overflow, appears when saving snapshot to database (if type is bigint2) + // DO UPDATE SET amount = token_owner_balances.amount + excluded.amount. + // ERROR: numeric field overflow + // DETAIL: A field with precision 20, scale 0 must round to an absolute value less than 10^20. execute_sql( manager, - "ALTER TABLE token_owner_balances ADD COLUMN amount bigint2 NOT NULL;", + "ALTER TABLE token_owner_balances ADD COLUMN amount numeric(22, 0) NOT NULL;", ) .await?; } From 1adfa6307c0b88b2fce07ce8d20905c10458b6c7 Mon Sep 17 00:00:00 2001 From: bruswejn Date: Wed, 28 May 2025 11:53:29 +0200 Subject: [PATCH 13/18] add getSlot for healthcheck --- src/api/rpc_server.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/api/rpc_server.rs b/src/api/rpc_server.rs index cf57e172..167216ec 100644 --- a/src/api/rpc_server.rs +++ b/src/api/rpc_server.rs @@ -213,6 +213,12 @@ fn build_rpc_module(api_and_indexer: PhotonApi) -> Result, }, )?; + // Alias for getIndexerSlot, to enable compatibility with health check + module.register_async_method("getSlot", |_rpc_params, rpc_context| async move { + let api = rpc_context.as_ref(); + api.get_indexer_slot().await.map_err(Into::into) + })?; + module.register_async_method( "getCompressedAccountsByOwner", |rpc_params, rpc_context| async move { From 6c335a77245648a52a02f99de60ebc9139b07542 Mon Sep 17 00:00:00 2001 From: bruswejn Date: Wed, 28 May 2025 12:29:26 +0200 Subject: [PATCH 14/18] also getHealth --- src/api/rpc_server.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/api/rpc_server.rs b/src/api/rpc_server.rs index 167216ec..4e7ff7b5 100644 --- a/src/api/rpc_server.rs +++ b/src/api/rpc_server.rs @@ -191,6 +191,15 @@ fn build_rpc_module(api_and_indexer: PhotonApi) -> Result, .map_err(Into::into) })?; + // Alias for getIndexerHealth, to enable compatibility with health check + module.register_async_method("getHealth", |_rpc_params, rpc_context| async move { + rpc_context + .as_ref() + .get_indexer_health() + .await + .map_err(Into::into) + })?; + module.register_async_method("getIndexerSlot", |_rpc_params, rpc_context| async move { let api = rpc_context.as_ref(); api.get_indexer_slot().await.map_err(Into::into) From d48861673ab23d6191eb93719cdb176e62b8a7c2 Mon Sep 17 00:00:00 2001 From: bruswejn Date: Tue, 14 Oct 2025 22:40:20 +0200 Subject: [PATCH 15/18] decoding error grpc --- src/ingester/fetchers/grpc.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/ingester/fetchers/grpc.rs b/src/ingester/fetchers/grpc.rs index 5a4a6764..71a156ab 100644 --- a/src/ingester/fetchers/grpc.rs +++ b/src/ingester/fetchers/grpc.rs @@ -298,8 +298,23 @@ fn parse_block(block: SubscribeUpdateBlock) -> BlockInfo { fn parse_transaction(transaction: SubscribeUpdateTransactionInfo) -> TransactionInfo { let meta = transaction.meta.unwrap(); - let error = create_tx_error(meta.err.as_ref()).unwrap(); - let error = error.map(|e| e.to_string()); + + // We attempt to decode the TransactionError, but if it fails, we log the error and set error to None. + let tx_error_result = create_tx_error(meta.err.as_ref()); + + let error = match tx_error_result { + Ok(tx_error_option) => tx_error_option.map(|e| e.to_string()), + Err(e) => { + // We log the decoding error, but we don't panic. + // We treat this as a lack of decoded transaction error. + // You can use error! or info! depending on your preference. + error!("Failed to decode TransactionError: {}", e); + metric! { + statsd_count!("tx_error_decode_failure", 1); // It's worth adding a metric + } + None + } + }; let signature = Signature::try_from(transaction.signature).unwrap(); let message = transaction.transaction.unwrap().message.unwrap(); From 870e7cf88116f29cbeff09e6b83b987cf8ae366e Mon Sep 17 00:00:00 2001 From: bruswejn Date: Wed, 15 Oct 2025 10:34:19 +0200 Subject: [PATCH 16/18] update --- src/ingester/fetchers/grpc.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/ingester/fetchers/grpc.rs b/src/ingester/fetchers/grpc.rs index 71a156ab..f34625d0 100644 --- a/src/ingester/fetchers/grpc.rs +++ b/src/ingester/fetchers/grpc.rs @@ -306,8 +306,6 @@ fn parse_transaction(transaction: SubscribeUpdateTransactionInfo) -> Transaction Ok(tx_error_option) => tx_error_option.map(|e| e.to_string()), Err(e) => { // We log the decoding error, but we don't panic. - // We treat this as a lack of decoded transaction error. - // You can use error! or info! depending on your preference. error!("Failed to decode TransactionError: {}", e); metric! { statsd_count!("tx_error_decode_failure", 1); // It's worth adding a metric From 9522204c3fb7b48b40de1c3f3738536c90c67753 Mon Sep 17 00:00:00 2001 From: bruswejn Date: Fri, 24 Oct 2025 00:34:51 +0200 Subject: [PATCH 17/18] fix --- src/monitor/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/monitor/mod.rs b/src/monitor/mod.rs index 0421899d..ff9bbfcd 100644 --- a/src/monitor/mod.rs +++ b/src/monitor/mod.rs @@ -231,7 +231,6 @@ async fn load_accounts_with_infinite_retry( } async fn validate_tree_roots(rpc_client: &RpcClient, db_roots: Vec<(Pubkey, Hash)>) { - let mut root_validation_errors = 0; for chunk in db_roots.chunks(CHUNK_SIZE) { let pubkeys = chunk.iter().map(|(pubkey, _)| *pubkey).collect(); let accounts = load_accounts_with_infinite_retry(rpc_client, pubkeys).await; From 3c51f13c3cca241098a7cc29ef85a1949e4a2d46 Mon Sep 17 00:00:00 2001 From: bruswejn Date: Sun, 26 Oct 2025 09:08:40 +0100 Subject: [PATCH 18/18] devnet test block batch --- src/common/mod.rs | 2 +- src/ingester/mod.rs | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/common/mod.rs b/src/common/mod.rs index ac9942ae..bd2e8728 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -149,7 +149,7 @@ pub async fn fetch_current_slot_with_infinite_retry(client: &RpcClient) -> u64 { pub fn get_rpc_client(rpc_url: &str) -> Arc { Arc::new(RpcClient::new_with_timeout_and_commitment( rpc_url.to_string(), - Duration::from_secs(90), + Duration::from_secs(180), CommitmentConfig::confirmed(), )) } diff --git a/src/ingester/mod.rs b/src/ingester/mod.rs index 1ccd1291..afbab96a 100644 --- a/src/ingester/mod.rs +++ b/src/ingester/mod.rs @@ -105,18 +105,36 @@ pub async fn index_block_batch_with_infinite_retries( db: &DatabaseConnection, block_batch: Vec, ) { + let mut retry_count = 0; loop { match index_block_batch(db, &block_batch).await { Ok(()) => return, Err(e) => { let start_block = block_batch.first().unwrap().metadata.slot; let end_block = block_batch.last().unwrap().metadata.slot; + metric! { + statsd_count!("indexing_batch_failure_attempt", 1); + } log::error!( "Failed to index block batch {}-{}. Got error {}", start_block, end_block, e ); + + retry_count += 1; + if retry_count > 5 { + metric! { + statsd_count!("indexing_batch_permanently_skipped", 1); + } + + log::error!( + "Max retries exceeded for faulty batch. Skipping blocks {}-{}", + start_block, + end_block + ); + return; + } sleep(Duration::from_secs(1)); } }