Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 79 additions & 47 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use solana_sdk::{
transaction::TransactionError,
};
use tokio::{
sync::{broadcast, broadcast::error::RecvError, mpsc, oneshot, Mutex},
sync::{mpsc, oneshot, Mutex},
task::JoinHandle,
time::{sleep, Instant, MissedTickBehavior},
};
Expand Down Expand Up @@ -259,7 +259,6 @@ pub struct EpochManager<R: Rpc + Indexer> {
trees: Arc<Mutex<Vec<TreeAccounts>>>,
slot_tracker: Arc<SlotTracker>,
processing_epochs: Arc<DashMap<u64, Arc<AtomicBool>>>,
new_tree_sender: broadcast::Sender<TreeAccounts>,
tx_cache: Arc<Mutex<ProcessedHashCache>>,
ops_cache: Arc<Mutex<ProcessedHashCache>>,
/// Proof caches for pre-warming during idle slots
Expand Down Expand Up @@ -293,7 +292,6 @@ impl<R: Rpc + Indexer> Clone for EpochManager<R> {
trees: self.trees.clone(),
slot_tracker: self.slot_tracker.clone(),
processing_epochs: self.processing_epochs.clone(),
new_tree_sender: self.new_tree_sender.clone(),
tx_cache: self.tx_cache.clone(),
ops_cache: self.ops_cache.clone(),
proof_caches: self.proof_caches.clone(),
Expand Down Expand Up @@ -322,7 +320,6 @@ impl<R: Rpc + Indexer> EpochManager<R> {
work_report_sender: mpsc::Sender<WorkReport>,
trees: Vec<TreeAccounts>,
slot_tracker: Arc<SlotTracker>,
new_tree_sender: broadcast::Sender<TreeAccounts>,
tx_cache: Arc<Mutex<ProcessedHashCache>>,
ops_cache: Arc<Mutex<ProcessedHashCache>>,
compressible_tracker: Option<Arc<CTokenAccountTracker>>,
Expand All @@ -344,7 +341,6 @@ impl<R: Rpc + Indexer> EpochManager<R> {
trees: Arc::new(Mutex::new(trees)),
slot_tracker,
processing_epochs: Arc::new(DashMap::new()),
new_tree_sender,
tx_cache,
ops_cache,
proof_caches: Arc::new(DashMap::new()),
Expand Down Expand Up @@ -384,9 +380,9 @@ impl<R: Rpc + Indexer> EpochManager<R> {
}
});

let new_tree_handle = tokio::spawn({
let tree_discovery_handle = tokio::spawn({
let self_clone = Arc::clone(&self);
async move { self_clone.handle_new_trees().await }
async move { self_clone.discover_trees_periodically().await }
});

let balance_check_handle = tokio::spawn({
Expand All @@ -397,7 +393,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
let _guard = scopeguard::guard(
(
current_previous_handle,
new_tree_handle,
tree_discovery_handle,
balance_check_handle,
),
|(h2, h3, h4)| {
Expand Down Expand Up @@ -548,48 +544,87 @@ impl<R: Rpc + Indexer> EpochManager<R> {
}
}

async fn handle_new_trees(self: Arc<Self>) -> Result<()> {
let mut receiver = self.new_tree_sender.subscribe();
/// Periodically fetches trees from on-chain and adds newly discovered ones.
async fn discover_trees_periodically(self: Arc<Self>) -> Result<()> {
let interval_secs = self.config.general_config.tree_discovery_interval_seconds;
if interval_secs == 0 {
info!(event = "tree_discovery_disabled", run_id = %self.run_id, "Tree discovery disabled (interval=0)");
return Ok(());
}
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
// Skip the first immediate tick — initial trees are already loaded at startup
interval.tick().await;

info!(
event = "tree_discovery_started",
run_id = %self.run_id,
interval_secs,
"Starting periodic tree discovery"
);

let mut group_authority: Option<Pubkey> = self.config.general_config.group_authority;

loop {
match receiver.recv().await {
Ok(new_tree) => {
info!(
event = "new_tree_received",
interval.tick().await;

let rpc = match self.rpc_pool.get_connection().await {
Ok(rpc) => rpc,
Err(e) => {
warn!(event = "tree_discovery_rpc_failed", run_id = %self.run_id, error = ?e, "Tree discovery: failed to get RPC connection");
continue;
}
};

// Lazily resolve group authority (retry each tick until successful)
if group_authority.is_none() {
group_authority = fetch_protocol_group_authority(&*rpc, &self.run_id)
.await
.ok();
}

let mut fetched_trees = match fetch_trees(&*rpc).await {
Ok(trees) => trees,
Err(e) => {
warn!(event = "tree_discovery_fetch_failed", run_id = %self.run_id, error = ?e, "Tree discovery: failed to fetch trees");
continue;
}
};

if let Some(ga) = group_authority {
fetched_trees.retain(|tree| tree.owner == ga);
}
if !self.config.general_config.tree_ids.is_empty() {
let tree_ids = &self.config.general_config.tree_ids;
fetched_trees.retain(|tree| tree_ids.contains(&tree.merkle_tree));
}

let known_trees = self.trees.lock().await;
let known_pubkeys: std::collections::HashSet<Pubkey> =
known_trees.iter().map(|t| t.merkle_tree).collect();
drop(known_trees);

for tree in fetched_trees {
if known_pubkeys.contains(&tree.merkle_tree) {
continue;
}
info!(
event = "tree_discovery_new_tree",
run_id = %self.run_id,
tree = %tree.merkle_tree,
tree_type = ?tree.tree_type,
queue = %tree.queue,
"Discovered new tree"
);
if let Err(e) = self.add_new_tree(tree).await {
error!(
event = "tree_discovery_add_failed",
run_id = %self.run_id,
tree = %new_tree.merkle_tree,
tree_type = ?new_tree.tree_type,
"Received new tree"
error = ?e,
"Failed to add discovered tree"
);
if let Err(e) = self.add_new_tree(new_tree).await {
error!(
event = "new_tree_add_failed",
run_id = %self.run_id,
error = ?e,
"Failed to add new tree"
);
// Continue processing other trees instead of crashing
}
}
Err(e) => match e {
RecvError::Lagged(lag) => {
warn!(
event = "new_tree_receiver_lagged",
run_id = %self.run_id,
lag, "Lagged while receiving new trees"
);
}
RecvError::Closed => {
info!(
event = "new_tree_receiver_closed",
run_id = %self.run_id,
"New tree receiver closed"
);
break;
}
},
}
}
Ok(())
}

async fn add_new_tree(&self, new_tree: TreeAccounts) -> Result<()> {
Expand Down Expand Up @@ -4186,8 +4221,6 @@ pub async fn run_service<R: Rpc + Indexer>(
};
trace!("Fetched initial trees: {:?}", trees);

let (new_tree_sender, _) = broadcast::channel(100);

if !config.general_config.tree_ids.is_empty() {
info!(
event = "tree_discovery_limited_to_explicit_ids",
Expand Down Expand Up @@ -4235,7 +4268,6 @@ pub async fn run_service<R: Rpc + Indexer>(
work_report_sender.clone(),
trees.clone(),
slot_tracker.clone(),
new_tree_sender.clone(),
tx_cache.clone(),
ops_cache.clone(),
compressible_tracker.clone(),
Expand Down
38 changes: 27 additions & 11 deletions forester/src/tree_data_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,9 @@ pub async fn fetch_trees_filtered(rpc_url: &str) -> Result<Vec<TreeAccounts>> {
Ok(accounts) => {
debug!("Fetched {} batched tree accounts", accounts.len());
for (pubkey, mut account) in accounts {
// Try state first, then address
if let Ok(tree) = process_batch_state_account(&mut account, pubkey) {
if let Ok(Some(tree)) = process_batch_state_account(&mut account, pubkey) {
all_trees.push(tree);
} else if let Ok(tree) = process_batch_address_account(&mut account, pubkey) {
} else if let Ok(Some(tree)) = process_batch_address_account(&mut account, pubkey) {
all_trees.push(tree);
}
}
Expand All @@ -107,7 +106,7 @@ pub async fn fetch_trees_filtered(rpc_url: &str) -> Result<Vec<TreeAccounts>> {
Ok(accounts) => {
debug!("Fetched {} state V1 tree accounts", accounts.len());
for (pubkey, account) in accounts {
if let Ok(tree) = process_state_account(&account, pubkey) {
if let Ok(Some(tree)) = process_state_account(&account, pubkey) {
all_trees.push(tree);
}
}
Expand All @@ -127,7 +126,7 @@ pub async fn fetch_trees_filtered(rpc_url: &str) -> Result<Vec<TreeAccounts>> {
Ok(accounts) => {
debug!("Fetched {} address V1 tree accounts", accounts.len());
for (pubkey, account) in accounts {
if let Ok(tree) = process_address_account(&account, pubkey) {
if let Ok(Some(tree)) = process_address_account(&account, pubkey) {
all_trees.push(tree);
}
}
Expand Down Expand Up @@ -245,9 +244,10 @@ fn process_account(pubkey: Pubkey, mut account: Account) -> Option<TreeAccounts>
.or_else(|_| process_address_account(&account, pubkey))
.or_else(|_| process_batch_address_account(&mut account, pubkey))
.ok()
.flatten()
}

fn process_state_account(account: &Account, pubkey: Pubkey) -> Result<TreeAccounts> {
fn process_state_account(account: &Account, pubkey: Pubkey) -> Result<Option<TreeAccounts>> {
check_discriminator::<StateMerkleTreeAccount>(&account.data)?;
let tree_account = StateMerkleTreeAccount::deserialize(&mut &account.data[8..])?;
Ok(create_tree_accounts(
Expand All @@ -257,7 +257,7 @@ fn process_state_account(account: &Account, pubkey: Pubkey) -> Result<TreeAccoun
))
}

fn process_address_account(account: &Account, pubkey: Pubkey) -> Result<TreeAccounts> {
fn process_address_account(account: &Account, pubkey: Pubkey) -> Result<Option<TreeAccounts>> {
check_discriminator::<AddressMerkleTreeAccount>(&account.data)?;
let tree_account = AddressMerkleTreeAccount::deserialize(&mut &account.data[8..])?;
Ok(create_tree_accounts(
Expand All @@ -267,7 +267,10 @@ fn process_address_account(account: &Account, pubkey: Pubkey) -> Result<TreeAcco
))
}

fn process_batch_state_account(account: &mut Account, pubkey: Pubkey) -> Result<TreeAccounts> {
fn process_batch_state_account(
account: &mut Account,
pubkey: Pubkey,
) -> Result<Option<TreeAccounts>> {
light_account_checks::checks::check_discriminator::<BatchedMerkleTreeAccount>(&account.data)
.map_err(|_| AccountDeserializationError::BatchStateMerkleTree {
error: "Invalid discriminator".to_string(),
Expand All @@ -286,7 +289,10 @@ fn process_batch_state_account(account: &mut Account, pubkey: Pubkey) -> Result<
))
}

fn process_batch_address_account(account: &mut Account, pubkey: Pubkey) -> Result<TreeAccounts> {
fn process_batch_address_account(
account: &mut Account,
pubkey: Pubkey,
) -> Result<Option<TreeAccounts>> {
light_account_checks::checks::check_discriminator::<BatchedMerkleTreeAccount>(&account.data)
.map_err(|_| AccountDeserializationError::BatchAddressMerkleTree {
error: "Invalid discriminator".to_string(),
Expand All @@ -309,7 +315,17 @@ fn create_tree_accounts(
pubkey: Pubkey,
metadata: &MerkleTreeMetadata,
tree_type: TreeType,
) -> TreeAccounts {
) -> Option<TreeAccounts> {
if metadata.rollover_metadata.network_fee == 0 {
debug!(
event = "tree_skipped_no_network_fee",
tree = %pubkey,
tree_type = ?tree_type,
"Skipping tree with network_fee=0"
);
return None;
}

let tree_accounts = TreeAccounts::new(
pubkey,
metadata.associated_queue.into(),
Expand All @@ -326,7 +342,7 @@ fn create_tree_accounts(
tree_accounts.is_rolledover,
tree_accounts.owner
);
tree_accounts
Some(tree_accounts)
}

pub async fn fetch_protocol_group_authority<R: Rpc>(rpc: &R, run_id: &str) -> Result<Pubkey> {
Expand Down