From d8c219884680d42bdcc6a6029f6e125e73bef0a1 Mon Sep 17 00:00:00 2001 From: Matias Fontanini Date: Tue, 24 Feb 2026 13:49:31 -0800 Subject: [PATCH] fix: don't iterate unnecessarily over absolutely all HTXs every time --- keeper/src/l2/escalator.rs | 92 +++++--------------------------------- keeper/src/l2/events.rs | 21 +++------ keeper/src/l2/mod.rs | 4 +- 3 files changed, 17 insertions(+), 100 deletions(-) diff --git a/keeper/src/l2/escalator.rs b/keeper/src/l2/escalator.rs index 1f44fb9..4a817fb 100644 --- a/keeper/src/l2/escalator.rs +++ b/keeper/src/l2/escalator.rs @@ -1,12 +1,11 @@ use crate::{clients::L2KeeperClient, l2::KeeperState, metrics}; use alloy::primitives::{B256, Bytes}; use blacklight_contract_clients::heartbeat_manager::HeartbeatManagerErrors; -use std::{collections::HashMap, sync::Arc}; -use tokio::sync::Mutex; -use tracing::{debug, info, warn}; - use contract_clients_common::errors::decode_any_error; use contract_clients_common::tx_submitter::TransactionSubmitter; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::Mutex; +use tracing::{info, warn}; pub(crate) struct RoundEscalator { client: Arc, @@ -25,27 +24,19 @@ impl RoundEscalator { } pub(crate) async fn process_escalations(&self, block_timestamp: u64) -> anyhow::Result<()> { - let (candidates, fallback_heartbeats) = { + let candidates = { let state = self.state.lock().await; let mut best_rounds: HashMap = HashMap::new(); - let mut fallback = Vec::new(); for (key, round) in state.rounds.iter() { if round.outcome.is_some() { continue; } - let deadline = match round.deadline { - Some(value) => value, - None => continue, - }; - let raw_htx = round - .raw_htx - .clone() - .or_else(|| state.raw_htx_by_heartbeat.get(&key.heartbeat_key).cloned()); - let Some(raw_htx) = raw_htx else { + if block_timestamp <= round.deadline { continue; - }; - + } + let deadline = round.deadline; + let raw_htx = round.raw_htx.clone(); let entry = best_rounds .entry(key.heartbeat_key) .or_insert_with(|| (key.round, deadline, raw_htx.clone())); @@ -53,37 +44,10 @@ impl RoundEscalator { *entry = (key.round, deadline, raw_htx); } } - - if best_rounds.is_empty() { - fallback = state - .raw_htx_by_heartbeat - .iter() - .map(|(k, v)| (*k, v.clone())) - .collect(); - } - - ( - best_rounds - .into_iter() - .map(|(k, (round, deadline, raw_htx))| (k, round, deadline, raw_htx)) - .collect::>(), - fallback, - ) + best_rounds }; - for (heartbeat_key, round, deadline, raw_htx) in candidates { - if block_timestamp <= deadline { - debug!( - heartbeat_key = ?heartbeat_key, - round, - deadline, - block_timestamp, - remaining_secs = deadline - block_timestamp, - "Round not yet past deadline, skipping" - ); - continue; - } - + for (heartbeat_key, (round, deadline, raw_htx)) in candidates { info!( heartbeat_key = ?heartbeat_key, round, @@ -114,42 +78,6 @@ impl RoundEscalator { } } - for (heartbeat_key, raw_htx) in fallback_heartbeats { - let should_escalate = self - .client - .heartbeat_manager() - .isPastDeadline(heartbeat_key) - .call() - .await?; - if !should_escalate { - continue; - } - - info!(heartbeat_key = ?heartbeat_key, "Escalating or expiring round"); - let call = self - .client - .heartbeat_manager() - .escalateOrExpire(heartbeat_key, raw_htx.clone()); - - match self.submitter.invoke("escalateOrExpire", call).await { - Ok(tx_hash) => { - info!( - heartbeat_key = ?heartbeat_key, - tx_hash = ?tx_hash, - "Escalate/expire confirmed" - ); - metrics::get().l2.escalations.inc_escalations(); - } - Err(e) => { - warn!( - heartbeat_key = ?heartbeat_key, - error = %decode_any_error(&e), - "Escalate/expire failed" - ); - } - } - } - Ok(()) } } diff --git a/keeper/src/l2/events.rs b/keeper/src/l2/events.rs index 3a29f8c..9503327 100644 --- a/keeper/src/l2/events.rs +++ b/keeper/src/l2/events.rs @@ -34,9 +34,6 @@ impl EventListener { to_block: u64, state: &mut KeeperState, ) -> anyhow::Result<()> { - let enqueued = self - .query_events::(from_block, to_block) - .await?; let rounds_started = self .query_events::(from_block, to_block) .await?; @@ -50,11 +47,6 @@ impl EventListener { .query_events::(from_block, to_block) .await?; - for (event, _log) in enqueued { - state - .raw_htx_by_heartbeat - .insert(event.heartbeatKey, event.rawHTX); - } for (event, _log) in rounds_started { let key = RoundKey { heartbeat_key: event.heartbeatKey, @@ -62,11 +54,8 @@ impl EventListener { }; let entry = state.rounds.entry(key).or_default(); entry.members = event.members; - entry.raw_htx = Some(event.rawHTX.clone()); - entry.deadline = Some(event.deadline); - state - .raw_htx_by_heartbeat - .insert(event.heartbeatKey, event.rawHTX); + entry.raw_htx = event.rawHTX.clone(); + entry.deadline = event.deadline; } for (event, _log) in rounds_finalized { let key = RoundKey { @@ -214,8 +203,8 @@ impl EventListener { .insert(event.heartbeatKey, event.rawHTX.clone()); let entry = guard.rounds.entry(key).or_default(); entry.members = event.members.clone(); - entry.raw_htx = Some(event.rawHTX.clone()); - entry.deadline = Some(event.deadline); + entry.raw_htx = event.rawHTX; + entry.deadline = event.deadline; info!( heartbeat_key = ?event.heartbeatKey, round = event.round, @@ -225,7 +214,7 @@ impl EventListener { ); // Detect ERC-8004 HTXs and track them with HeartbeatManager's heartbeat_key - if let Ok(erc8004_htx) = Erc8004Htx::try_decode(&event.rawHTX) { + if let Ok(erc8004_htx) = Erc8004Htx::try_decode(&entry.raw_htx) { // Only add if not already tracked (first round) if event.round == 1 && !guard diff --git a/keeper/src/l2/mod.rs b/keeper/src/l2/mod.rs index 24a46b3..330f86d 100644 --- a/keeper/src/l2/mod.rs +++ b/keeper/src/l2/mod.rs @@ -19,8 +19,8 @@ struct RoundKey { #[derive(Debug, Clone, Default)] struct RoundState { members: Vec
, - raw_htx: Option, - deadline: Option, + raw_htx: Bytes, + deadline: u64, outcome: Option, rewards_done: bool, jailing_done: bool,