Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/evm-helpers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ repository = "https://github.com/gnosisguild/enclave/crates/evm-helpers"

[dependencies]
alloy.workspace = true
anyhow.workspace = true
async-trait.workspace = true
eyre.workspace = true
futures.workspace = true
futures-util.workspace = true
once_cell.workspace = true
tokio.workspace = true
tracing.workspace = true
e3-utils.workspace = true
61 changes: 30 additions & 31 deletions crates/evm-helpers/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

use eyre::Result;
use e3_utils::{retry_with_backoff, RetryError};
use std::future::Future;
use tokio::time::{sleep, Duration};
use tracing::info;

const READ_RETRY_MAX_ATTEMPTS: u32 = 3;
const READ_RETRY_INITIAL_DELAY_MS: u64 = 2000;
const RETRY_MAX_ATTEMPTS: u32 = 3;
const RETRY_INITIAL_DELAY_MS: u64 = 2000;

fn should_retry_error(error: &str, retry_on_errors: &[&str]) -> bool {
if retry_on_errors.is_empty() {
Expand All @@ -22,39 +21,39 @@ fn should_retry_error(error: &str, retry_on_errors: &[&str]) -> bool {
pub async fn call_with_retry<F, Fut, T>(
operation_name: &str,
retry_on_errors: &[&str],
read_fn: F,
) -> Result<T>
operation_fn: F,
) -> anyhow::Result<T>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T>>,
Fut: Future<Output = anyhow::Result<T>>,
{
let op_name = operation_name.to_string();
let retry_codes: Vec<String> = retry_on_errors.iter().map(|s| s.to_string()).collect();
let mut attempts = 0;
let mut delay = READ_RETRY_INITIAL_DELAY_MS;

loop {
attempts += 1;
let result = read_fn().await;

match result {
Ok(value) => return Ok(value),
Err(e) => {
let error_str = format!("{}", e);
let retry_refs: Vec<&str> = retry_codes.iter().map(|s| s.as_str()).collect();

if should_retry_error(&error_str, &retry_refs) && attempts < READ_RETRY_MAX_ATTEMPTS
{
info!(
"{}: error (attempt {}/{}), will retry after {}ms: {}",
op_name, attempts, READ_RETRY_MAX_ATTEMPTS, delay, e
);
sleep(Duration::from_millis(delay)).await;
delay *= 2;
} else {
return Err(e);
retry_with_backoff(
|| {
let op_name = op_name.clone();
let retry_codes = retry_codes.clone();
let fut = operation_fn();
async move {
match fut.await {
Ok(value) => Ok(value),
Err(e) => {
let error_str = format!("{}", e);
let retry_refs: Vec<&str> =
retry_codes.iter().map(|s| s.as_str()).collect();
if should_retry_error(&error_str, &retry_refs) {
info!("{}: error, will retry: {}", op_name, e);
Err(RetryError::Retry(e))
} else {
Err(RetryError::Failure(e))
}
}
}
}
}
}
},
RETRY_MAX_ATTEMPTS,
RETRY_INITIAL_DELAY_MS,
)
.await
}
1 change: 1 addition & 0 deletions crates/evm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ pub use enclave_sol::EnclaveSol;
pub use enclave_sol_reader::EnclaveSolReader;
pub use enclave_sol_writer::EnclaveSolWriter;
pub use event_reader::{EnclaveEvmEvent, EvmEventReader, EvmEventReaderState, ExtractorFn};
pub use helpers::send_tx_with_retry;
pub use historical_event_coordinator::{CoordinatorStart, HistoricalEventCoordinator};
pub use repo::*;
85 changes: 73 additions & 12 deletions examples/CRISP/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 31 additions & 5 deletions examples/CRISP/server/src/server/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,15 @@ pub async fn register_e3_requested(
let e3 = call_with_retry("get_e3", &["0xcd6f4a4f"], || {
let contract = contract.clone();
let event_e3_id = event.e3Id;
async move { contract.get_e3(event_e3_id).await }
async move {
contract
.get_e3(event_e3_id)
.await
.map_err(|e| anyhow::anyhow!("{}", e))
}
})
.await?;
.await
.map_err(|e| eyre::eyre!("{}", e))?;

// Convert custom params bytes back to token address and balance threshold.

Expand Down Expand Up @@ -341,9 +347,15 @@ pub async fn register_committee_published(
let e3 = call_with_retry("get_e3", &["0xcd6f4a4f"], || {
let contract = contract.clone();
let event_e3_id = event.e3Id;
async move { contract.get_e3(event_e3_id).await }
async move {
contract
.get_e3(event_e3_id)
.await
.map_err(|e| anyhow::anyhow!("{}", e))
}
})
.await?;
.await
.map_err(|e| eyre::eyre!("{}", e))?;
if u64::try_from(e3.expiration)? > 0 {
info!("[e3_id={}] E3 already activated", event.e3Id);
return Ok(());
Expand Down Expand Up @@ -375,7 +387,21 @@ async fn handle_committee_time_expired(
ctx: Arc<IndexerContext<impl DataStore, ReadWrite>>,
) -> eyre::Result<()> {
// If not activated activate
let tx = ctx.contract().activate(event.e3Id).await?;
let tx = call_with_retry("activate", &["0x45ccf3c6"], || {
let value = ctx.clone();
async move {
info!("[e3_id={}] Calling Enclave.Activate", event.e3Id);
let receipt = value
.contract()
.activate(event.e3Id)
.await
.map_err(|e| anyhow::anyhow!("{:?}", e))?;
anyhow::Ok(receipt)
}
})
.await
.map_err(|e| eyre::eyre!("{:?}", e))?;

info!(
"[e3_id={}] E3 activated with tx: {:?}",
event.e3Id, tx.transaction_hash
Expand Down
15 changes: 9 additions & 6 deletions examples/CRISP/server/src/server/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,23 @@ impl<S: DataStore> CurrentRoundRepository<S> {
}

/// Get the current (most recent) round for a specific requester
///
///
/// # Arguments
/// * `requester` - The requester address to find the current round for
///
///
/// # Returns
/// * The CurrentRound object for the most recent round by this requester, or None if not found
pub async fn get_current_round_for_requester(&self, requester: String) -> Result<Option<CurrentRound>> {
pub async fn get_current_round_for_requester(
&self,
requester: String,
) -> Result<Option<CurrentRound>> {
// Get the current round count to iterate through all rounds
let round_count = self.get_current_round_id().await?;

// Iterate backwards from the most recent round to find the latest one for this requester
for round_id in (0..=round_count).rev() {
let crisp_repo = CrispE3Repository::new(self.store.clone(), round_id);

match crisp_repo.get_e3_state_lite().await {
Ok(state) => {
if state.requester == requester {
Expand Down Expand Up @@ -180,7 +183,7 @@ impl<S: DataStore> CrispE3Repository<S> {
token_address,
balance_threshold,
ciphertext_inputs: vec![],
requester
requester,
})
.await
}
Expand Down
Loading
Loading