From c940c0501da69383875443f126ab60a562d3f60a Mon Sep 17 00:00:00 2001 From: evalir Date: Mon, 5 Jan 2026 15:31:16 -0400 Subject: [PATCH 1/5] feat(utils): `BlockWatcher` --- Cargo.toml | 2 + src/lib.rs | 4 ++ src/utils/block_watcher.rs | 84 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+) create mode 100644 src/utils/block_watcher.rs diff --git a/Cargo.toml b/Cargo.toml index 7e53b66..6f5604c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ chrono = "0.4.40" # OAuth oauth2 = { version = "5.0.0", optional = true } tokio = { version = "1.36.0", optional = true } +tokio-stream = { version = "0.1", optional = true } # Other axum = "0.8.1" @@ -72,6 +73,7 @@ default = ["alloy", "rustls"] alloy = ["dep:alloy"] aws = ["alloy", "alloy?/signer-aws", "dep:async-trait", "dep:aws-config", "dep:aws-sdk-kms"] perms = ["dep:oauth2", "dep:tokio", "dep:reqwest", "dep:signet-tx-cache"] +block_watcher = ["dep:tokio", "dep:tokio-stream"] rustls = ["dep:rustls", "rustls/aws-lc-rs"] [[example]] diff --git a/src/lib.rs b/src/lib.rs index a114ff5..cf1efc0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,10 @@ pub mod utils { /// Tracing utilities. pub mod tracing; + + /// Block watcher utilities. + #[cfg(feature = "block_watcher")] + pub mod block_watcher; } /// Re-exports of common dependencies. diff --git a/src/utils/block_watcher.rs b/src/utils/block_watcher.rs new file mode 100644 index 0000000..1e17f89 --- /dev/null +++ b/src/utils/block_watcher.rs @@ -0,0 +1,84 @@ +//! Host chain block watcher that subscribes to new blocks and tracks the +//! current host block number. + +use alloy::{ + network::Ethereum, + providers::{Provider, RootProvider}, + transports::TransportError, +}; +use tokio::{sync::watch, task::JoinHandle}; +use tokio_stream::StreamExt; +use tracing::{debug, error, trace}; + +/// Errors that can occur on the [`BlockWatcher`] task. +#[derive(Debug, thiserror::Error)] +pub enum BlockWatcherError { + /// Failed to subscribe to host chain blocks. + #[error("failed to subscribe to host chain blocks: {0}")] + SubscribeError(TransportError), +} + +impl From for BlockWatcherError { + fn from(err: TransportError) -> Self { + BlockWatcherError::SubscribeError(err) + } +} + +/// Host chain block watcher that subscribes to new blocks and broadcasts +/// updates via a watch channel. +#[derive(Debug)] +pub struct BlockWatcher { + /// Watch channel responsible for broadcasting block number updates. + block_number: watch::Sender, + + /// Host chain provider. + host_provider: RootProvider, +} + +impl BlockWatcher { + /// Creates a new [`BlockWatcher`] with the given provider and initial + /// block number. + pub fn new(host_provider: RootProvider, initial: u64) -> Self { + Self { + block_number: watch::channel(initial).0, + host_provider, + } + } + + /// Creates a new [`BlockWatcher`], fetching the current block number first. + pub async fn with_current_block( + host_provider: RootProvider, + ) -> Result { + let block_number = host_provider.get_block_number().await?; + Ok(Self::new(host_provider, block_number)) + } + + /// Subscribe to block number updates. + pub fn subscribe(&self) -> watch::Receiver { + self.block_number.subscribe() + } + + /// Spawns the block watcher task. + pub fn spawn(self) -> JoinHandle<()> { + tokio::spawn(self.task_future()) + } + + async fn task_future(self) { + let sub = match self.host_provider.subscribe_blocks().await { + Ok(sub) => sub, + Err(err) => { + error!(error = ?err, "failed to subscribe to host chain blocks"); + return; + } + }; + let mut stream = sub.into_stream(); + + debug!("subscribed to host chain blocks"); + + while let Some(header) = stream.next().await { + let block_number = header.number; + self.block_number.send_replace(block_number); + trace!(block_number, "updated host block number"); + } + } +} From 2a9df760baee0c3b7e979d5fa4a24edf0826ff0b Mon Sep 17 00:00:00 2001 From: evalir Date: Mon, 5 Jan 2026 15:37:42 -0400 Subject: [PATCH 2/5] feat: add helper type for subscribing to block number upates --- src/utils/block_watcher.rs | 42 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/src/utils/block_watcher.rs b/src/utils/block_watcher.rs index 1e17f89..09667fc 100644 --- a/src/utils/block_watcher.rs +++ b/src/utils/block_watcher.rs @@ -54,8 +54,8 @@ impl BlockWatcher { } /// Subscribe to block number updates. - pub fn subscribe(&self) -> watch::Receiver { - self.block_number.subscribe() + pub fn subscribe(&self) -> SharedBlockNumber { + self.block_number.subscribe().into() } /// Spawns the block watcher task. @@ -82,3 +82,41 @@ impl BlockWatcher { } } } + +/// A shared block number, wrapped in a [`tokio::sync::watch`] Receiver. +/// +/// The block number is periodically updated by a [`BlockWatcher`] task, and +/// can be read or awaited for changes. This allows multiple tasks to observe +/// block number updates. +#[derive(Debug, Clone)] +pub struct SharedBlockNumber(watch::Receiver); + +impl From> for SharedBlockNumber { + fn from(inner: watch::Receiver) -> Self { + Self(inner) + } +} + +impl SharedBlockNumber { + /// Get the current block number. + pub fn get(&self) -> u64 { + *self.0.borrow() + } + + /// Wait for the block number to change, then return the new value. + /// + /// This is implemented using [`Receiver::changed`]. + /// + /// [`Receiver::changed`]: tokio::sync::watch::Receiver::changed + pub async fn changed(&mut self) -> Result { + self.0.changed().await?; + Ok(*self.0.borrow_and_update()) + } + + /// Wait for the block number to reach at least `target`. + /// + /// Returns the block number once it is >= `target`. + pub async fn wait_until(&mut self, target: u64) -> Result { + self.0.wait_for(|&n| n >= target).await.map(|r| *r) + } +} From 4fb76461a0ec6081849a742df0af4096afa33088 Mon Sep 17 00:00:00 2001 From: evalir Date: Mon, 5 Jan 2026 20:20:54 -0400 Subject: [PATCH 3/5] chore: do not use tokio streams, handle loop --- Cargo.toml | 3 +-- src/utils/block_watcher.rs | 34 +++++++++++++++++++++++----------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6f5604c..26ff04e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,7 +45,6 @@ chrono = "0.4.40" # OAuth oauth2 = { version = "5.0.0", optional = true } tokio = { version = "1.36.0", optional = true } -tokio-stream = { version = "0.1", optional = true } # Other axum = "0.8.1" @@ -73,7 +72,7 @@ default = ["alloy", "rustls"] alloy = ["dep:alloy"] aws = ["alloy", "alloy?/signer-aws", "dep:async-trait", "dep:aws-config", "dep:aws-sdk-kms"] perms = ["dep:oauth2", "dep:tokio", "dep:reqwest", "dep:signet-tx-cache"] -block_watcher = ["dep:tokio", "dep:tokio-stream"] +block_watcher = ["dep:tokio"] rustls = ["dep:rustls", "rustls/aws-lc-rs"] [[example]] diff --git a/src/utils/block_watcher.rs b/src/utils/block_watcher.rs index 09667fc..7d85dc8 100644 --- a/src/utils/block_watcher.rs +++ b/src/utils/block_watcher.rs @@ -6,9 +6,11 @@ use alloy::{ providers::{Provider, RootProvider}, transports::TransportError, }; -use tokio::{sync::watch, task::JoinHandle}; -use tokio_stream::StreamExt; -use tracing::{debug, error, trace}; +use tokio::{ + sync::{broadcast::error::RecvError, watch}, + task::JoinHandle, +}; +use tracing::{debug, error, trace, warn}; /// Errors that can occur on the [`BlockWatcher`] task. #[derive(Debug, thiserror::Error)] @@ -64,21 +66,31 @@ impl BlockWatcher { } async fn task_future(self) { - let sub = match self.host_provider.subscribe_blocks().await { + let mut sub = match self.host_provider.subscribe_blocks().await { Ok(sub) => sub, - Err(err) => { - error!(error = ?err, "failed to subscribe to host chain blocks"); + Err(error) => { + error!(%error); return; } }; - let mut stream = sub.into_stream(); debug!("subscribed to host chain blocks"); - while let Some(header) = stream.next().await { - let block_number = header.number; - self.block_number.send_replace(block_number); - trace!(block_number, "updated host block number"); + loop { + match sub.recv().await { + Ok(header) => { + let block_number = header.number; + self.block_number.send_replace(block_number); + trace!(block_number, "updated host block number"); + } + Err(RecvError::Lagged(missed)) => { + warn!(%missed, "block subscription lagged"); + } + Err(RecvError::Closed) => { + error!("block subscription closed"); + break; + } + } } } } From 27e6144b19c9bcdf7cfcd251d9ff6be04934255a Mon Sep 17 00:00:00 2001 From: evalir Date: Fri, 9 Jan 2026 07:15:37 +0100 Subject: [PATCH 4/5] chore: code review changes --- src/utils/block_watcher.rs | 24 +++++------------------- 1 file changed, 5 insertions(+), 19 deletions(-) diff --git a/src/utils/block_watcher.rs b/src/utils/block_watcher.rs index 7d85dc8..ab209f7 100644 --- a/src/utils/block_watcher.rs +++ b/src/utils/block_watcher.rs @@ -10,21 +10,7 @@ use tokio::{ sync::{broadcast::error::RecvError, watch}, task::JoinHandle, }; -use tracing::{debug, error, trace, warn}; - -/// Errors that can occur on the [`BlockWatcher`] task. -#[derive(Debug, thiserror::Error)] -pub enum BlockWatcherError { - /// Failed to subscribe to host chain blocks. - #[error("failed to subscribe to host chain blocks: {0}")] - SubscribeError(TransportError), -} - -impl From for BlockWatcherError { - fn from(err: TransportError) -> Self { - BlockWatcherError::SubscribeError(err) - } -} +use tracing::{debug, error, info, trace, warn}; /// Host chain block watcher that subscribes to new blocks and broadcasts /// updates via a watch channel. @@ -50,7 +36,7 @@ impl BlockWatcher { /// Creates a new [`BlockWatcher`], fetching the current block number first. pub async fn with_current_block( host_provider: RootProvider, - ) -> Result { + ) -> Result { let block_number = host_provider.get_block_number().await?; Ok(Self::new(host_provider, block_number)) } @@ -61,8 +47,8 @@ impl BlockWatcher { } /// Spawns the block watcher task. - pub fn spawn(self) -> JoinHandle<()> { - tokio::spawn(self.task_future()) + pub fn spawn(self) -> (SharedBlockNumber, JoinHandle<()>) { + (self.subscribe(), tokio::spawn(self.task_future())) } async fn task_future(self) { @@ -87,7 +73,7 @@ impl BlockWatcher { warn!(%missed, "block subscription lagged"); } Err(RecvError::Closed) => { - error!("block subscription closed"); + info!("block subscription closed"); break; } } From bb99abfd9d0c88a7dceb7e0f3a264ae38ac07ba8 Mon Sep 17 00:00:00 2001 From: evalir Date: Tue, 13 Jan 2026 13:53:56 +0100 Subject: [PATCH 5/5] chore: comments --- src/utils/block_watcher.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/utils/block_watcher.rs b/src/utils/block_watcher.rs index ab209f7..9eb6727 100644 --- a/src/utils/block_watcher.rs +++ b/src/utils/block_watcher.rs @@ -10,7 +10,7 @@ use tokio::{ sync::{broadcast::error::RecvError, watch}, task::JoinHandle, }; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, trace}; /// Host chain block watcher that subscribes to new blocks and broadcasts /// updates via a watch channel. @@ -70,10 +70,10 @@ impl BlockWatcher { trace!(block_number, "updated host block number"); } Err(RecvError::Lagged(missed)) => { - warn!(%missed, "block subscription lagged"); + debug!(%missed, "block subscription lagged"); } Err(RecvError::Closed) => { - info!("block subscription closed"); + debug!("block subscription closed"); break; } }