Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/aggregator/src/committee_finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl Handler<CommitteeRequested> for CommitteeFinalizer {
let bus = act.bus.clone();
let e3_id_clone = e3_id_for_async.clone();

// XXX: refactor to use blockchain time
let handle = ctx.run_later(
Duration::from_secs(seconds_until_deadline),
move |act, _ctx| {
Expand Down
1 change: 1 addition & 0 deletions crates/evm-helpers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ futures.workspace = true
futures-util.workspace = true
once_cell.workspace = true
tokio.workspace = true
tracing.workspace = true
41 changes: 29 additions & 12 deletions crates/evm-helpers/src/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

use alloy::network::NetworkWallet;
use alloy::providers::fillers::BlobGasFiller;
use alloy::providers::WalletProvider;
use alloy::{
network::{Ethereum, EthereumWallet},
primitives::{Address, Bytes, U256},
Expand Down Expand Up @@ -206,6 +208,8 @@ impl EnclaveContract<ReadWrite> {
pub fn address(&self) -> &Address {
&self.contract_address
}

// pub fn create_new_read_contract()
}

impl EnclaveContract<ReadOnly> {
Expand Down Expand Up @@ -261,18 +265,26 @@ impl EnclaveContractFactory {
contract_address: &str,
private_key: &str,
) -> Result<EnclaveContract<ReadWrite>> {
let contract_address = contract_address.parse()?;

let signer: PrivateKeySigner = private_key.parse()?;
let wallet_address = signer.address();
let wallet = EthereumWallet::from(signer);
let provider = ProviderBuilder::new()
.wallet(wallet)
.connect(http_rpc_url)
.await?;
let provider = Arc::new(
ProviderBuilder::new()
.wallet(wallet)
.connect(http_rpc_url)
.await?,
);
EnclaveContractFactory::create_write_from_provider(contract_address, provider)
}

pub fn create_write_from_provider(
contract_address: &str,
provider: Arc<EnclaveWriteProvider>,
) -> Result<EnclaveContract<ReadWrite>> {
let contract_address = contract_address.parse()?;
let wallet = provider.wallet();
let wallet_address = wallet.default_signer().address();
Ok(EnclaveContract::<ReadWrite> {
provider: Arc::new(provider),
provider,
contract_address,
wallet_address: Some(wallet_address),
_marker: PhantomData,
Expand All @@ -284,12 +296,17 @@ impl EnclaveContractFactory {
http_rpc_url: &str,
contract_address: &str,
) -> Result<EnclaveContract<ReadOnly>> {
let contract_address = contract_address.parse()?;

let provider = ProviderBuilder::new().connect(http_rpc_url).await?;
let provider = Arc::new(ProviderBuilder::new().connect(http_rpc_url).await?);
EnclaveContractFactory::create_read_from_provider(contract_address, provider)
}

pub fn create_read_from_provider(
contract_address: &str,
provider: Arc<EnclaveReadOnlyProvider>,
) -> Result<EnclaveContract<ReadOnly>> {
let contract_address = contract_address.parse()?;
Ok(EnclaveContract::<ReadOnly> {
provider: Arc::new(provider),
provider,
contract_address,
wallet_address: None,
_marker: PhantomData,
Expand Down
1 change: 1 addition & 0 deletions crates/evm-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
pub mod contracts;
pub mod events;
pub mod listener;
pub mod threshold_queue;
108 changes: 103 additions & 5 deletions crates/evm-helpers/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// or FITNESS FOR A PARTICULAR PURPOSE.

use alloy::{
consensus::Header,
network::Ethereum,
primitives::{Address, B256},
providers::{Provider, ProviderBuilder},
Expand All @@ -14,17 +15,27 @@ use alloy::{
use eyre::Result;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
use tokio::{sync::RwLock, task::JoinHandle};
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Duration};
use tokio::{sync::RwLock, time::sleep};
use tracing::{error, info, warn};

use crate::contracts::{EnclaveContractFactory, EnclaveReadOnlyProvider};

type EventHandler =
Box<dyn Fn(&Log) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;

type BlockHandler =
Box<dyn Fn(&Header) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;

#[derive(Clone)]
/// Listens for contract events
pub struct EventListener {
provider: Arc<dyn Provider<Ethereum>>,
filter: Filter,
handlers: Arc<RwLock<HashMap<B256, Vec<EventHandler>>>>,
block_handlers: Arc<RwLock<Vec<BlockHandler>>>,
event_started: bool,
block_started: bool,
}

impl EventListener {
Expand All @@ -33,6 +44,9 @@ impl EventListener {
provider,
filter,
handlers: Arc::new(RwLock::new(HashMap::new())),
block_handlers: Arc::new(RwLock::new(Vec::new())),
event_started: false,
block_started: false,
}
}

Expand Down Expand Up @@ -63,7 +77,20 @@ impl EventListener {
.push(wrapped_handler);
}

async fn listen(&self) -> Result<()> {
pub async fn add_block_handler<F, Fut>(&mut self, handler: F)
where
F: Fn(&Header) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
info!("add_block_handler");
self.block_handlers
.write()
.await
.push(Box::new(move |h: &Header| Box::pin(handler(h))));
}

async fn event_listen_once(&self) -> Result<()> {
info!("event_listen_once()");
let mut stream = self
.provider
.subscribe_logs(&self.filter)
Expand All @@ -89,13 +116,84 @@ impl EventListener {
Ok(())
}

pub fn start(&self) -> JoinHandle<Result<()>> {
async fn block_listen_once(&self) -> Result<()> {
info!("block_listen_once()");
let mut stream = self.provider.subscribe_blocks().await?.into_stream();
while let Some(block) = stream.next().await {
let handlers = self.block_handlers.read().await;
for handler in handlers.iter() {
let fut = handler(&block);
tokio::spawn(async move {
if let Err(e) = fut.await {
eprintln!("Error processing block: {:?}", e);
}
});
}
}
Ok(())
}

fn ensure_block_listen_loop(&mut self) {
info!("start_block_listen_loop");
self.block_started = true;
let this = self.clone();
tokio::spawn(async move { this.listen().await })
tokio::spawn(async move {
let len = { this.block_handlers.read().await.len() };

if len > 0 {
this.retry_loop(|| this.block_listen_once()).await;
}
});
}

fn ensure_event_listen_loop(&mut self) {
info!("ensure_event_listen_loop");
self.event_started = true;
let this = self.clone();
tokio::spawn(async move {
let len = { this.handlers.read().await.len() };
if len > 0 {
this.retry_loop(|| this.event_listen_once()).await;
}
});
}

async fn retry_loop<F, Fut, E>(&self, mut operation: F)
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<(), E>>,
E: std::fmt::Display,
{
loop {
match operation().await {
Ok(_) => {
sleep(Duration::from_secs(1)).await;
}
Err(e) => {
error!("\n**********************************************************");
error!("Error occurred: {}. Retrying in 5 seconds...", e);
error!("**********************************************************\n\n");
sleep(Duration::from_secs(5)).await;
}
}
warn!("Ongoing operation finished unexpectedly");
}
}

pub fn start(&mut self) {
self.ensure_event_listen_loop();
self.ensure_block_listen_loop();
}

pub async fn create_contract_listener(ws_url: &str, contract_address: &str) -> Result<Self> {
let provider = Arc::new(ProviderBuilder::new().connect(ws_url).await?);
EventListener::create_contract_listener_from_provider(contract_address, provider)
}

pub fn create_contract_listener_from_provider(
contract_address: &str,
provider: Arc<EnclaveReadOnlyProvider>,
) -> Result<Self> {
let address = contract_address.parse::<Address>()?;
let filter = Filter::new()
.address(address)
Expand Down
125 changes: 125 additions & 0 deletions crates/evm-helpers/src/threshold_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// SPDX-License-Identifier: LGPL-3.0-only
//
// This file is provided WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

use std::{
cmp::Reverse,
collections::BinaryHeap,
sync::{Arc, RwLock},
};

#[derive(Clone)]
/// An implementation of a ThresholdQueue
pub struct ThresholdQueue<T> {
inner: Arc<RwLock<BinaryHeap<Reverse<T>>>>,
}

/// An item that can be added to a threshold queue
pub trait ThresholdItem: Ord {
type Item;
fn within_threshold(&self, threshold: u64) -> bool;
fn item(&self) -> Self::Item;
}

impl<T, U> ThresholdQueue<T>
where
T: ThresholdItem<Item = U>,
{
/// Create a new ThresholdQueue
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(BinaryHeap::new())),
}
}

/// Push an item onto the queue
pub fn push(&self, item: T) {
self.inner
.write()
.expect("Poisoned write in ThresholdQueue")
.push(Reverse(item));
}

/// Keep taking items off the queue until `item.within_threshold(threshold)` returns false
pub fn take_until_including(&self, threshold: u64) -> Vec<T::Item> {
let mut found = Vec::new();
let mut inner = self
.inner
.write()
.expect("Poisoned write in ThresholdQueue");

while let Some(Reverse(item)) = inner.peek() {
if item.within_threshold(threshold) {
if let Some(Reverse(item)) = inner.pop() {
found.push(item.item());
}
} else {
break;
}
}

found
}
}

#[cfg(test)]
mod tests {
use super::{ThresholdItem, ThresholdQueue};

struct ThreshItem {
val: u64,
rank: u64,
}

impl Eq for ThreshItem {}

impl PartialEq for ThreshItem {
fn eq(&self, other: &Self) -> bool {
self.rank == other.rank
}
}

impl PartialOrd for ThreshItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ThreshItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.rank.cmp(&other.rank)
}
}

impl ThresholdItem for ThreshItem {
type Item = u64;
fn item(&self) -> Self::Item {
self.val
}

fn within_threshold(&self, threshold: u64) -> bool {
self.rank <= threshold
}
}

#[test]
fn test_collection_is_ordered() {
let queue = ThresholdQueue::new();
queue.push(ThreshItem { val: 111, rank: 25 });
queue.push(ThreshItem {
val: 666,
rank: 100,
});
queue.push(ThreshItem { val: 444, rank: 70 });
queue.push(ThreshItem { val: 222, rank: 26 });
let items = queue.take_until_including(70);

assert_eq!(items, vec![111, 222, 444]);

let items = queue.take_until_including(101);

assert_eq!(items, vec![666]);
}
}
Loading
Loading