Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/api/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ impl PhotonApi {
.map_err(Into::into)
}

pub async fn health(&self) -> Result<(), PhotonApiError> {
self.get_indexer_health()
.await
.map(|_| ())
.map_err(Into::into)
}

pub async fn get_compressed_account(
&self,
request: CompressedAccountRequest,
Expand Down
25 changes: 24 additions & 1 deletion src/api/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ pub async fn run_server(api: PhotonApi, port: u16) -> Result<ServerHandle, anyho
let middleware = tower::ServiceBuilder::new()
.layer(cors)
.layer(ProxyGetRequestLayer::new("/liveness", "liveness")?)
.layer(ProxyGetRequestLayer::new("/readiness", "readiness")?);
.layer(ProxyGetRequestLayer::new("/readiness", "readiness")?)
.layer(ProxyGetRequestLayer::new("/health", "health")?);
let server = ServerBuilder::default()
.set_middleware(middleware)
.build(addr)
Expand All @@ -43,6 +44,13 @@ fn build_rpc_module(api_and_indexer: PhotonApi) -> Result<RpcModule<PhotonApi>,
api.readiness().await.map_err(Into::into)
})?;

module.register_async_method("health", |_rpc_params, rpc_context| async move {
debug!("Checking Health");
let api = rpc_context.as_ref();
api.health().await.map_err(Into::into)
})?;


module.register_async_method(
"getCompressedAccount",
|rpc_params, rpc_context| async move {
Expand Down Expand Up @@ -183,6 +191,15 @@ fn build_rpc_module(api_and_indexer: PhotonApi) -> Result<RpcModule<PhotonApi>,
.map_err(Into::into)
})?;

// Alias for getIndexerHealth, to enable compatibility with health check
module.register_async_method("getHealth", |_rpc_params, rpc_context| async move {
rpc_context
.as_ref()
.get_indexer_health()
.await
.map_err(Into::into)
})?;

module.register_async_method("getIndexerSlot", |_rpc_params, rpc_context| async move {
let api = rpc_context.as_ref();
api.get_indexer_slot().await.map_err(Into::into)
Expand All @@ -205,6 +222,12 @@ fn build_rpc_module(api_and_indexer: PhotonApi) -> Result<RpcModule<PhotonApi>,
},
)?;

// Alias for getIndexerSlot, to enable compatibility with health check
module.register_async_method("getSlot", |_rpc_params, rpc_context| async move {
let api = rpc_context.as_ref();
api.get_indexer_slot().await.map_err(Into::into)
})?;

module.register_async_method(
"getCompressedAccountsByOwner",
|rpc_params, rpc_context| async move {
Expand Down
2 changes: 1 addition & 1 deletion src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub async fn fetch_current_slot_with_infinite_retry(client: &RpcClient) -> u64 {
pub fn get_rpc_client(rpc_url: &str) -> Arc<RpcClient> {
Arc::new(RpcClient::new_with_timeout_and_commitment(
rpc_url.to_string(),
Duration::from_secs(90),
Duration::from_secs(180),
CommitmentConfig::confirmed(),
))
}
Expand Down
17 changes: 15 additions & 2 deletions src/ingester/fetchers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,21 @@ fn parse_block(block: SubscribeUpdateBlock) -> BlockInfo {

fn parse_transaction(transaction: SubscribeUpdateTransactionInfo) -> TransactionInfo {
let meta = transaction.meta.unwrap();
let error = create_tx_error(meta.err.as_ref()).unwrap();
let error = error.map(|e| e.to_string());

// We attempt to decode the TransactionError, but if it fails, we log the error and set error to None.
let tx_error_result = create_tx_error(meta.err.as_ref());

let error = match tx_error_result {
Ok(tx_error_option) => tx_error_option.map(|e| e.to_string()),
Err(e) => {
// We log the decoding error, but we don't panic.
error!("Failed to decode TransactionError: {}", e);
metric! {
statsd_count!("tx_error_decode_failure", 1); // It's worth adding a metric
}
None
}
};

let signature = Signature::try_from(transaction.signature).unwrap();
let message = transaction.transaction.unwrap().message.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/ingester/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::{
};

use super::typedefs::block_info::BlockInfo;
const POST_BACKFILL_FREQUENCY: u64 = 10;
const PRE_BACKFILL_FREQUENCY: u64 = 10;
const POST_BACKFILL_FREQUENCY: u64 = 10000;
const PRE_BACKFILL_FREQUENCY: u64 = 10000;

#[derive(FromQueryResult)]
pub struct OptionalContextModel {
Expand Down
20 changes: 19 additions & 1 deletion src/ingester/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,29 +94,47 @@ pub async fn index_block_batch(
state_updates.push(derive_block_state_update(db, block).await?);
}
persist::persist_state_update(&tx, StateUpdate::merge_updates(state_updates)).await?;
tx.commit().await?;
metric! {
statsd_count!("blocks_indexed", blocks_len as i64);
}
tx.commit().await?;
Ok(())
}

pub async fn index_block_batch_with_infinite_retries(
db: &DatabaseConnection,
block_batch: Vec<BlockInfo>,
) {
let mut retry_count = 0;
loop {
match index_block_batch(db, &block_batch).await {
Ok(()) => return,
Err(e) => {
let start_block = block_batch.first().unwrap().metadata.slot;
let end_block = block_batch.last().unwrap().metadata.slot;
metric! {
statsd_count!("indexing_batch_failure_attempt", 1);
}
log::error!(
"Failed to index block batch {}-{}. Got error {}",
start_block,
end_block,
e
);

retry_count += 1;
if retry_count > 5 {
metric! {
statsd_count!("indexing_batch_permanently_skipped", 1);
}

log::error!(
"Max retries exceeded for faulty batch. Skipping blocks {}-{}",
start_block,
end_block
);
return;
}
sleep(Duration::from_secs(1));
}
}
Expand Down
153 changes: 85 additions & 68 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ struct Args {
#[arg(short, long)]
db_url: Option<String>,

// Snapshot offset determines how much newer the snapshot from what is in the DB for us to start indexing from the snapshot instead of DB
#[arg(short, long)]
snapshot_offset: Option<u64>,

/// The start slot to begin indexing from. Defaults to the last indexed slot in the database plus
/// one.
/// one.
#[arg(short, long)]
start_slot: Option<String>,

Expand Down Expand Up @@ -209,61 +213,92 @@ async fn main() {
let is_rpc_node_local = args.rpc_url.contains("127.0.0.1");
let rpc_client = get_rpc_client(&args.rpc_url);

if let Some(snapshot_dir) = args.snapshot_dir {
let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir));
let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter)
.await
.unwrap();
if !snapshot_files.is_empty() {
// Sync tree metadata from on-chain before processing snapshot
// This is REQUIRED so the indexer knows about all existing trees
info!("Syncing tree metadata from on-chain before loading snapshot...");
if let Err(e) = photon_indexer::monitor::tree_metadata_sync::sync_tree_metadata(
rpc_client.as_ref(),
db_conn.as_ref(),
)
.await
{
error!(
"Failed to sync tree metadata: {}. Cannot proceed with snapshot loading.",
e
);
error!("Tree metadata must be synced before loading snapshots to avoid skipping transactions.");
std::process::exit(1);
}
info!("Tree metadata sync completed successfully");

info!("Detected snapshot files. Loading snapshot...");
let last_slot = snapshot_files.last().unwrap().end_slot;
let block_stream =
load_block_stream_from_directory_adapter(directory_adapter.clone()).await;
pin_mut!(block_stream);
let first_blocks = block_stream.next().await.unwrap();
let last_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot;
let block_stream = stream! {
yield first_blocks;
while let Some(blocks) = block_stream.next().await {
yield blocks;
}
};
index_block_stream(
block_stream,
db_conn.clone(),
rpc_client.clone(),
last_indexed_slot,
Some(last_slot),
)
.await;
}
}

let (indexer_handle, monitor_handle) = match args.disable_indexing {
true => {
info!("Indexing is disabled");
(None, None)
}
false => {
info!("Starting indexer...");

let last_indexed_slot = match args.start_slot {
Some(start_slot) => match start_slot.as_str() {
"latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await,
_ => {
fetch_block_parent_slot(&rpc_client, start_slot.parse::<u64>().unwrap())
.await
}
},
None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref())
.await
.unwrap_or(
get_network_start_slot(&rpc_client)
.await
.try_into()
.unwrap(),
)
.try_into()
.unwrap(),
};
if let Some(snapshot_dir) = args.snapshot_dir {
let directory_adapter = Arc::new(DirectoryAdapter::from_local_directory(snapshot_dir));
let snapshot_files = get_snapshot_files_with_metadata(&directory_adapter)
.await
.unwrap();
if !snapshot_files.is_empty() {

// Sync tree metadata from on-chain before processing snapshot
// This is REQUIRED so the indexer knows about all existing trees
info!("Syncing tree metadata from on-chain before loading snapshot...");
if let Err(e) = photon_indexer::monitor::tree_metadata_sync::sync_tree_metadata(
rpc_client.as_ref(),
db_conn.as_ref(),
)
.await
{
error!(
"Failed to sync tree metadata: {}. Cannot proceed with snapshot loading.",
e
);
error!("Tree metadata must be synced before loading snapshots to avoid skipping transactions.");
std::process::exit(1);
}
info!("Tree metadata sync completed successfully");

info!("Detected snapshot files. Loading snapshot...");
let last_slot = snapshot_files.last().unwrap().end_slot;
// Compute the snapshot offset, if the snapshot is not more recent than this offset then don't fetch the snapshot
let snapshot_offset = last_slot - args.snapshot_offset.unwrap_or(0);

if snapshot_offset >= last_indexed_slot {
info!("Snapshot is newer than the last indexed slot. Loading snapshot...");

let block_stream =
load_block_stream_from_directory_adapter(directory_adapter.clone()).await;
pin_mut!(block_stream);
let first_blocks = block_stream.next().await.unwrap();
let last_stream_indexed_slot = first_blocks.first().unwrap().metadata.parent_slot;
let block_stream = stream! {
yield first_blocks;
while let Some(blocks) = block_stream.next().await {
yield blocks;
}
};
index_block_stream(
block_stream,
db_conn.clone(),
rpc_client.clone(),
last_stream_indexed_slot,
Some(last_slot),
)
.await;
} else {
info!("Snapshot is already indexed. Skipping...");
}
}
}


// For localnet we can safely use a large batch size to speed up indexing.
let max_concurrent_block_fetches = match args.max_concurrent_block_fetches {
Some(max_concurrent_block_fetches) => max_concurrent_block_fetches,
Expand All @@ -275,25 +310,7 @@ async fn main() {
}
}
};
let last_indexed_slot = match args.start_slot {
Some(start_slot) => match start_slot.as_str() {
"latest" => fetch_current_slot_with_infinite_retry(&rpc_client).await,
_ => {
fetch_block_parent_slot(&rpc_client, start_slot.parse::<u64>().unwrap())
.await
}
},
None => fetch_last_indexed_slot_with_infinite_retry(db_conn.as_ref())
.await
.unwrap_or(
get_network_start_slot(&rpc_client)
.await
.try_into()
.unwrap(),
)
.try_into()
.unwrap(),
};


let block_stream_config = BlockStreamConfig {
rpc_client: rpc_client.clone(),
Expand Down
8 changes: 6 additions & 2 deletions src/migration/migrations/standard/m20220101_000001_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,14 @@ impl MigrationTrait for Migration {
"ALTER TABLE owner_balances ADD COLUMN lamports bigint2 NOT NULL;",
)
.await?;


// HACK: We need to use a larger precision for the amount column to avoid overflow, appears when saving snapshot to database (if type is bigint2)
// DO UPDATE SET amount = token_owner_balances.amount + excluded.amount.
// ERROR: numeric field overflow
// DETAIL: A field with precision 20, scale 0 must round to an absolute value less than 10^20.
execute_sql(
manager,
"ALTER TABLE token_owner_balances ADD COLUMN amount bigint2 NOT NULL;",
"ALTER TABLE token_owner_balances ADD COLUMN amount numeric(22, 0) NOT NULL;",
)
.await?;
}
Expand Down
11 changes: 9 additions & 2 deletions src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ pub fn continously_monitor_photon(
0
};
metric! {
statsd_gauge!("latest_slot", latest_slot);
statsd_gauge!("last_indexed_slot", last_indexed_slot);
statsd_gauge!("indexing_lag", lag);
}
if lag < HEALTH_CHECK_SLOT_DISTANCE as u64 {
Expand Down Expand Up @@ -242,12 +244,17 @@ async fn validate_tree_roots(rpc_client: &RpcClient, db_roots: Vec<(Pubkey, Hash
db_hash,
account_roots
);
return;
let pubkey_str = pubkey.to_string();
metric! {
statsd_count!("root_validation_failures", 1, "pubkey" => &pubkey_str);
statsd_gauge!("root_validation_success", 0);
}
return;
}
}
}
}
metric! {
statsd_count!("root_validation_success", 1);
statsd_gauge!("root_validation_success", 1);
}
}
Loading