Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions crates/evm-helpers/src/block_listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// SPDX-License-Identifier: LGPL-3.0-only
//
// This file is provided WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

use alloy::{network::Ethereum, providers::Provider, rpc::types::Header};
use eyre::Result;
use futures::stream::StreamExt;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::sync::RwLock;
use tracing::info;

type BlockHandler =
Box<dyn Fn(&Header) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;

#[derive(Clone)]
pub struct BlockListener {
provider: Arc<dyn Provider<Ethereum>>,
block_handlers: Arc<RwLock<Vec<BlockHandler>>>,
}

impl BlockListener {
pub fn new(provider: Arc<dyn Provider<Ethereum>>) -> Self {
Self {
provider,
block_handlers: Arc::new(RwLock::new(Vec::new())),
}
}

pub async fn add_block_handler<F, Fut>(&self, handler: F)
where
F: Fn(&Header) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<()>> + Send + 'static,
{
info!("add_block_handler");
self.block_handlers
.write()
.await
.push(Box::new(move |h: &Header| Box::pin(handler(h))));
}

pub async fn listen(&self) -> Result<()> {
let mut stream = self.provider.subscribe_blocks().await?.into_stream();
while let Some(block) = stream.next().await {
let handlers = self.block_handlers.read().await;
for handler in handlers.iter() {
let fut = handler(&block);
tokio::spawn(async move {
if let Err(e) = fut.await {
eprintln!("Error processing block: {:?}", e);
}
});
}
}
Ok(())
}
}
8 changes: 4 additions & 4 deletions crates/evm-helpers/src/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ pub struct EnclaveContractFactory;
impl EnclaveContractFactory {
/// Create a write-capable contract
pub async fn create_write(
http_rpc_url: &str,
rpc_url: &str,
contract_address: &str,
private_key: &str,
) -> Result<EnclaveContract<ReadWrite>> {
Expand All @@ -261,7 +261,7 @@ impl EnclaveContractFactory {
let wallet = EthereumWallet::from(signer);
let provider = ProviderBuilder::new()
.wallet(wallet)
.connect(http_rpc_url)
.connect(rpc_url)
.await?;

Ok(EnclaveContract::<ReadWrite> {
Expand All @@ -274,12 +274,12 @@ impl EnclaveContractFactory {

/// Create a read-only contract
pub async fn create_read(
http_rpc_url: &str,
rpc_url: &str,
contract_address: &str,
) -> Result<EnclaveContract<ReadOnly>> {
let contract_address = contract_address.parse()?;

let provider = ProviderBuilder::new().connect(http_rpc_url).await?;
let provider = ProviderBuilder::new().connect(rpc_url).await?;

Ok(EnclaveContract::<ReadOnly> {
provider: Arc::new(provider),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
use tokio::sync::RwLock;
use tracing::info;

type EventHandler =
Box<dyn Fn(&Log) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>;
Expand Down Expand Up @@ -90,9 +89,13 @@ impl EventListener {
Ok(())
}

pub fn provider(&self) -> Arc<dyn Provider<Ethereum>> {
self.provider.clone()
}

/// Create a contract listener that will listen to events from all addresses.
pub async fn create_contract_listener(ws_url: &str, addresses: &[&str]) -> Result<Self> {
let provider = Arc::new(ProviderBuilder::new().connect(ws_url).await?);
pub async fn create_contract_listener(rpc_url: &str, addresses: &[&str]) -> Result<Self> {
let provider = Arc::new(ProviderBuilder::new().connect(rpc_url).await?);

let address = addresses
.iter()
Expand Down
4 changes: 3 additions & 1 deletion crates/evm-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

pub mod block_listener;
pub mod contracts;
pub mod event_listener;
pub mod events;
pub mod listener;
pub mod threshold_queue;
130 changes: 130 additions & 0 deletions crates/evm-helpers/src/threshold_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// SPDX-License-Identifier: LGPL-3.0-only
//
// This file is provided WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY
// or FITNESS FOR A PARTICULAR PURPOSE.

use std::{
cmp::Reverse,
collections::BinaryHeap,
sync::{Arc, RwLock},
};

#[derive(Clone)]
/// An implementation of a ThresholdQueue. This enables releasing elements from a collection when
/// the given timestamp is reached or a given count has reached a certain threshold.
pub struct ThresholdQueue<T> {
inner: Arc<RwLock<BinaryHeap<Reverse<T>>>>,
}

/// An item that can be added to a threshold queue
pub trait ThresholdItem: Ord {
type Item;

/// Defines what it means to be withing the threshold eg self.myprop <= threshold
Comment thread
ryardley marked this conversation as resolved.
fn within_threshold(&self, threshold: u64) -> bool;

/// Access the inner item this wraps
fn item(&self) -> Self::Item;
}

impl<T, U> ThresholdQueue<T>
where
T: ThresholdItem<Item = U>,
{
/// Create a new ThresholdQueue
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(BinaryHeap::new())),
}
}

/// Push an item onto the queue
pub fn push(&self, item: T) {
self.inner
.write()
.expect("Poisoned write in ThresholdQueue")
.push(Reverse(item));
}

/// Keep taking items off the queue until `item.within_threshold(threshold)` returns false
pub fn take_until_including(&self, threshold: u64) -> Vec<T::Item> {
let mut found = Vec::new();
let mut inner = self
.inner
.write()
.expect("Poisoned write in ThresholdQueue");

while let Some(Reverse(item)) = inner.peek() {
if item.within_threshold(threshold) {
if let Some(Reverse(item)) = inner.pop() {
found.push(item.item());
}
} else {
break;
}
}

found
}
}

#[cfg(test)]
mod tests {
use super::{ThresholdItem, ThresholdQueue};

struct ThreshItem {
val: u64,
rank: u64,
}

impl Eq for ThreshItem {}

impl PartialEq for ThreshItem {
fn eq(&self, other: &Self) -> bool {
self.rank == other.rank
}
}

impl PartialOrd for ThreshItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

impl Ord for ThreshItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.rank.cmp(&other.rank)
}
}

impl ThresholdItem for ThreshItem {
type Item = u64;
fn item(&self) -> Self::Item {
self.val
}

fn within_threshold(&self, threshold: u64) -> bool {
self.rank <= threshold
}
}

#[test]
fn test_collection_is_ordered() {
let queue = ThresholdQueue::new();
queue.push(ThreshItem { val: 111, rank: 25 });
queue.push(ThreshItem {
val: 666,
rank: 100,
});
queue.push(ThreshItem { val: 444, rank: 70 });
queue.push(ThreshItem { val: 222, rank: 26 });
let items = queue.take_until_including(70);

assert_eq!(items, vec![111, 222, 444]);

let items = queue.take_until_including(101);

assert_eq!(items, vec![666]);
}
}
52 changes: 50 additions & 2 deletions crates/evm-helpers/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
// or FITNESS FOR A PARTICULAR PURPOSE.

mod helpers;
use alloy::sol;
use e3_evm_helpers::listener::EventListener;
use alloy::consensus::BlockHeader;
use alloy::providers::ext::AnvilApi;
use alloy::{node_bindings::Anvil, providers::ProviderBuilder, sol};
use e3_evm_helpers::block_listener;
use e3_evm_helpers::{block_listener::BlockListener, event_listener::EventListener};
use eyre::Result;
use helpers::setup_logs_contract;
use std::{
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio::sync::Mutex;
use tokio::time::sleep;

sol!(
Expand Down Expand Up @@ -202,3 +206,47 @@ async fn test_overlapping_listener_handlers() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_block_listener() -> Result<()> {
let anvil = Anvil::new().try_spawn()?;
let provider = Arc::new(ProviderBuilder::new().connect(&anvil.ws_endpoint()).await?);
let block_listener = Arc::new(BlockListener::new(provider.clone()));
let events: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(vec![]));
let events_handler = events.clone();

// Save each block number to a vector.
block_listener
.add_block_handler(move |block| {
let events = events_handler.clone();
let blockheight = block.number();
async move {
let mut events = events.lock().await;
events.push(blockheight);
Ok(())
}
})
.await;

// Start up a listener
let listen_handle = tokio::spawn(async move {
let _ = block_listener.listen().await;
});

// Give the listener time to start
sleep(Duration::from_millis(100)).await;

// Mine a few blocks
provider.anvil_mine(Some(5), None).await?;

// Wait for the block to be processed
sleep(Duration::from_secs(1)).await;

// Cancel the listener
listen_handle.abort();

let guard = events.lock().await;
assert_eq!(*guard, vec![1, 2, 3, 4, 5]);

Ok(())
}
Loading
Loading