From 6daa1c1747ff01e3439334ae48679286e88f6d37 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 14 Jan 2026 13:15:12 -0500 Subject: [PATCH 1/9] spike --- Cargo.lock | 1 + bin/node/src/cli.rs | 6 ++++ crates/client/txpool/Cargo.toml | 1 + crates/client/txpool/src/events.rs | 3 ++ crates/client/txpool/src/extension.rs | 18 +++++++++++- crates/client/txpool/src/metrics.rs | 5 ++++ crates/client/txpool/src/subscription.rs | 24 ++++++++++++++-- crates/client/txpool/src/tracker.rs | 35 ++++++++++++++++++++++++ 8 files changed, 90 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2e2f8b1..1e30b121 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2138,6 +2138,7 @@ version = "0.0.0" dependencies = [ "alloy-primitives", "base-client-node", + "base-flashblocks", "chrono", "derive_more", "eyre", diff --git a/bin/node/src/cli.rs b/bin/node/src/cli.rs index 03177da5..fb3588c9 100644 --- a/bin/node/src/cli.rs +++ b/bin/node/src/cli.rs @@ -56,10 +56,16 @@ impl From for Option { impl From for TxpoolConfig { fn from(args: Args) -> Self { + let flashblocks_config = args + .websocket_url + .as_ref() + .map(|url| FlashblocksConfig::new(url.clone(), args.max_pending_blocks_depth)); + Self { tracing_enabled: args.enable_transaction_tracing, tracing_logs_enabled: args.enable_transaction_tracing_logs, sequencer_rpc: args.rollup_args.sequencer, + flashblocks_config, } } } diff --git a/crates/client/txpool/Cargo.toml b/crates/client/txpool/Cargo.toml index 9f9f25b6..990f00f6 100644 --- a/crates/client/txpool/Cargo.toml +++ b/crates/client/txpool/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] # workspace base-client-node.workspace = true +base-flashblocks.workspace = true # reth reth-tracing.workspace = true diff --git a/crates/client/txpool/src/events.rs b/crates/client/txpool/src/events.rs index 7ca9faf5..c5d7b902 100644 --- a/crates/client/txpool/src/events.rs +++ b/crates/client/txpool/src/events.rs @@ -23,6 +23,9 @@ pub enum TxEvent { /// Transaction included on chain. #[display("block_inclusion")] BlockInclusion, + /// Transaction included in a flashblock. + #[display("fb_inclusion")] + FbInclusion, /// Transaction moved from pending -> queued. #[display("pending_to_queued")] PendingToQueued, diff --git a/crates/client/txpool/src/extension.rs b/crates/client/txpool/src/extension.rs index 0689dd49..e1747a96 100644 --- a/crates/client/txpool/src/extension.rs +++ b/crates/client/txpool/src/extension.rs @@ -1,7 +1,10 @@ //! Contains the [`TxPoolExtension`] which wires up the transaction pool features //! (tracing subscription and status RPC) on the Base node builder. +use std::sync::Arc; + use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig}; +use base_flashblocks::{FlashblocksConfig, FlashblocksState}; use reth_provider::CanonStateSubscriptions; use tokio_stream::wrappers::BroadcastStream; use tracing::info; @@ -17,6 +20,8 @@ pub struct TxpoolConfig { pub tracing_logs_enabled: bool, /// Sequencer RPC endpoint for transaction status proxying. pub sequencer_rpc: Option, + /// Optional Flashblocks configuration (includes state). + pub flashblocks_config: Option, } /// Helper struct that wires the transaction pool features into the node builder. @@ -42,6 +47,7 @@ impl BaseNodeExtension for TxPoolExtension { let sequencer_rpc = config.sequencer_rpc; let tracing_enabled = config.tracing_enabled; let logs_enabled = config.tracing_logs_enabled; + let flashblocks_config = config.flashblocks_config; builder.add_rpc_module(move |ctx| { info!(message = "Starting Transaction Status RPC"); @@ -54,7 +60,17 @@ impl BaseNodeExtension for TxPoolExtension { let canonical_stream = BroadcastStream::new(ctx.provider().subscribe_to_canonical_state()); let pool = ctx.pool().clone(); - tokio::spawn(tracex_subscription(canonical_stream, pool, logs_enabled)); + + // Get flashblocks state from config if available + let flashblocks_state: Option> = + flashblocks_config.as_ref().map(|cfg| cfg.state.clone()); + + tokio::spawn(tracex_subscription( + canonical_stream, + flashblocks_state, + pool, + logs_enabled, + )); } Ok(()) diff --git a/crates/client/txpool/src/metrics.rs b/crates/client/txpool/src/metrics.rs index b27ef7fb..14913bb3 100644 --- a/crates/client/txpool/src/metrics.rs +++ b/crates/client/txpool/src/metrics.rs @@ -14,4 +14,9 @@ pub struct Metrics { describe = "Time taken for a transaction to be included in a block from when it's marked as pending" )] pub inclusion_duration: Histogram, + /// Time taken for a transaction to be included in a flashblock from when it's marked as pending. + #[metric( + describe = "Time taken for a transaction to be included in a flashblock from when it's marked as pending" + )] + pub fb_inclusion_duration: Histogram, } diff --git a/crates/client/txpool/src/subscription.rs b/crates/client/txpool/src/subscription.rs index f02666de..7dff5679 100644 --- a/crates/client/txpool/src/subscription.rs +++ b/crates/client/txpool/src/subscription.rs @@ -1,10 +1,14 @@ //! Tracex canonical block subscription. +use std::sync::Arc; + +use base_flashblocks::{FlashblocksAPI, PendingBlocks}; use futures::StreamExt; use reth_node_api::NodePrimitives; use reth_provider::CanonStateNotification; use reth_tracing::tracing::debug; use reth_transaction_pool::TransactionPool; +use tokio::sync::broadcast::Receiver; use tokio_stream::wrappers::BroadcastStream; use crate::tracker::Tracker; @@ -12,14 +16,16 @@ use crate::tracker::Tracker; /// Subscription task that tracks transaction timing from mempool to block inclusion. /// /// Monitors transaction lifecycle events and records timing metrics by listening -/// to canonical state notifications and mempool events. -pub async fn tracex_subscription( +/// to canonical state notifications, flashblock updates, and mempool events. +pub async fn tracex_subscription( canonical_stream: BroadcastStream>, + flashblocks_api: Option>, pool: Pool, enable_logs: bool, ) where N: NodePrimitives, Pool: TransactionPool + 'static, + FB: FlashblocksAPI + 'static, { debug!(target: "tracex", "Starting transaction tracking subscription"); let mut tracker = Tracker::new(enable_logs); @@ -28,6 +34,10 @@ pub async fn tracex_subscription( let mut all_events_stream = pool.all_transactions_event_listener(); let mut canonical_stream = canonical_stream; + // Subscribe to flashblocks if available + let mut flashblock_stream: Option>> = + flashblocks_api.map(|api| api.subscribe_to_flashblocks()); + loop { tokio::select! { // Track # of transactions dropped and replaced. @@ -37,6 +47,16 @@ pub async fn tracex_subscription( Some(Ok(notification)) = canonical_stream.next() => { tracker.handle_canon_state_notification(notification); } + + // Track flashblock inclusion timing. + Ok(pending_blocks) = async { + match &mut flashblock_stream { + Some(stream) => stream.recv().await, + None => std::future::pending().await, + } + } => { + tracker.handle_flashblock_notification(pending_blocks); + } } } } diff --git a/crates/client/txpool/src/tracker.rs b/crates/client/txpool/src/tracker.rs index 366530f8..d7420d29 100644 --- a/crates/client/txpool/src/tracker.rs +++ b/crates/client/txpool/src/tracker.rs @@ -2,10 +2,12 @@ use std::{ num::NonZeroUsize, + sync::Arc, time::{Duration, Instant}, }; use alloy_primitives::TxHash; +use base_flashblocks::PendingBlocks; use chrono::Local; use lru::LruCache; use reth_node_api::{BlockBody, NodePrimitives}; @@ -75,6 +77,11 @@ impl Tracker { self.track_committed_chain(¬ification.committed()); } + /// Parse flashblock updates and track transaction inclusion in flashblocks. + pub fn handle_flashblock_notification(&mut self, pending_blocks: Arc) { + self.track_flashblock_transactions(&pending_blocks); + } + fn track_committed_chain(&mut self, chain: &Chain) { for block in chain.blocks().values() { for transaction in block.body().transactions() { @@ -83,6 +90,13 @@ impl Tracker { } } + fn track_flashblock_transactions(&mut self, pending_blocks: &PendingBlocks) { + // Get all transaction hashes from pending blocks + for tx_hash in pending_blocks.get_pending_transaction_hashes() { + self.transaction_fb_included(tx_hash); + } + } + /// Track the first time we see a transaction in the mempool. pub fn transaction_inserted(&mut self, tx_hash: TxHash, event: TxEvent) { // If we've seen the tx before, don't track it again. For example, @@ -170,6 +184,27 @@ impl Tracker { } } + /// Track a transaction being included in a flashblock. + pub fn transaction_fb_included(&mut self, tx_hash: TxHash) { + // Only track if we have seen this transaction before + if let Some(event_log) = self.txs.peek(&tx_hash) { + // Record `fb_inclusion_duration` metric if transaction was pending + if let Some(pending_time) = event_log.pending_time { + let time_pending_to_fb_inclusion = Instant::now().duration_since(pending_time); + self.metrics + .fb_inclusion_duration + .record(time_pending_to_fb_inclusion.as_millis() as f64); + + debug!( + target: "tracex", + tx_hash = ?tx_hash, + duration_ms = time_pending_to_fb_inclusion.as_millis(), + "Transaction included in flashblock" + ); + } + } + } + /// Track a transaction being replaced by removing it from the cache and adding the new tx. pub fn transaction_replaced(&mut self, tx_hash: TxHash, replaced_by: TxHash) { if let Some(mut event_log) = self.txs.pop(&tx_hash) { From d7703cfb44085f535ae6b23990e92bf716af78cf Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 14 Jan 2026 13:47:33 -0500 Subject: [PATCH 2/9] add test --- Cargo.lock | 6 ++++ crates/client/txpool/Cargo.toml | 7 +++++ crates/client/txpool/src/tracker.rs | 48 +++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 1e30b121..daf8a1d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2136,6 +2136,8 @@ dependencies = [ name = "base-txpool" version = "0.0.0" dependencies = [ + "alloy-consensus", + "alloy-eips", "alloy-primitives", "base-client-node", "base-flashblocks", @@ -2148,11 +2150,15 @@ dependencies = [ "lru 0.16.3", "metrics", "metrics-derive", + "op-alloy-consensus", + "op-alloy-rpc-types", "reth-node-api", + "reth-optimism-primitives", "reth-primitives-traits", "reth-provider", "reth-tracing", "reth-transaction-pool", + "revm-database 9.0.6", "serde", "serde_json", "tokio", diff --git a/crates/client/txpool/Cargo.toml b/crates/client/txpool/Cargo.toml index 990f00f6..cec938bb 100644 --- a/crates/client/txpool/Cargo.toml +++ b/crates/client/txpool/Cargo.toml @@ -48,3 +48,10 @@ eyre.workspace = true httpmock.workspace = true serde_json.workspace = true reth-transaction-pool = { workspace = true, features = ["test-utils"] } +base-flashblocks = { workspace = true, features = ["test-utils"] } +alloy-consensus.workspace = true +alloy-eips.workspace = true +op-alloy-consensus.workspace = true +op-alloy-rpc-types.workspace = true +reth-optimism-primitives.workspace = true +revm-database.workspace = true diff --git a/crates/client/txpool/src/tracker.rs b/crates/client/txpool/src/tracker.rs index d7420d29..00e862ca 100644 --- a/crates/client/txpool/src/tracker.rs +++ b/crates/client/txpool/src/tracker.rs @@ -256,8 +256,32 @@ impl Tracker { #[cfg(test)] mod tests { + use std::sync::Arc; + + use alloy_primitives::{map::HashMap, Address}; + use base_client_node::test_utils::TestHarness; + use reth_optimism_primitives::OpTransactionSigned; + use reth_transaction_pool::test_utils::TransactionBuilder; + use super::*; + // Test helper to create a transaction + fn create_test_transaction() -> OpTransactionSigned { + TransactionBuilder::default() + .nonce(0) + .gas_limit(21_000) + .max_fee_per_gas(1_000_000_000) + .max_priority_fee_per_gas(1_000_000_000) + .to(Address::random()) + .value(1000) + .chain_id(901) + .into_eip1559() + .as_eip1559() + .unwrap() + .clone() + .into() + } + #[test] fn test_transaction_inserted_pending() { let mut tracker = Tracker::new(false); @@ -562,4 +586,28 @@ mod tests { assert_eq!(tracker.txs.len(), 1); assert!(tracker.txs.get(&tx_hash2).is_some()); } + + #[test] + fn test_transaction_fb_included_with_pending_time() { + let mut tracker = Tracker::new(false); + let tx_hash = TxHash::random(); + + // Insert a pending transaction + tracker.transaction_inserted(tx_hash, TxEvent::Pending); + tracker.transaction_moved(tx_hash, Pool::Pending); + + // Verify pending_time is set + assert!(tracker.txs.get(&tx_hash).unwrap().pending_time.is_some()); + let initial_metric_count = tracker.metrics.fb_inclusion_duration.get_sample_count(); + + // Track FB inclusion + tracker.transaction_fb_included(tx_hash); + + // Verify transaction is still in cache (FB inclusion doesn't remove it) + assert!(tracker.txs.get(&tx_hash).is_some()); + + // Verify metric was recorded + let final_metric_count = tracker.metrics.fb_inclusion_duration.get_sample_count(); + assert_eq!(final_metric_count, initial_metric_count + 1); + } } From eadf4e4dac7107d2c6ce97ac2f071e4b7d3e4022 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 15 Jan 2026 11:42:57 -0500 Subject: [PATCH 3/9] diffs --- Cargo.lock | 2 +- bin/node/src/cli.rs | 7 +------ crates/client/txpool/src/extension.rs | 2 +- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index daf8a1d2..6db8ab62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2158,7 +2158,7 @@ dependencies = [ "reth-provider", "reth-tracing", "reth-transaction-pool", - "revm-database 9.0.6", + "revm-database", "serde", "serde_json", "tokio", diff --git a/bin/node/src/cli.rs b/bin/node/src/cli.rs index fb3588c9..fa4a0973 100644 --- a/bin/node/src/cli.rs +++ b/bin/node/src/cli.rs @@ -56,16 +56,11 @@ impl From for Option { impl From for TxpoolConfig { fn from(args: Args) -> Self { - let flashblocks_config = args - .websocket_url - .as_ref() - .map(|url| FlashblocksConfig::new(url.clone(), args.max_pending_blocks_depth)); - Self { tracing_enabled: args.enable_transaction_tracing, tracing_logs_enabled: args.enable_transaction_tracing_logs, sequencer_rpc: args.rollup_args.sequencer, - flashblocks_config, + flashblocks_config: args.flashblocks_config, } } } diff --git a/crates/client/txpool/src/extension.rs b/crates/client/txpool/src/extension.rs index e1747a96..73bb598e 100644 --- a/crates/client/txpool/src/extension.rs +++ b/crates/client/txpool/src/extension.rs @@ -21,7 +21,7 @@ pub struct TxpoolConfig { /// Sequencer RPC endpoint for transaction status proxying. pub sequencer_rpc: Option, /// Optional Flashblocks configuration (includes state). - pub flashblocks_config: Option, + pub flashblocks_config: Option, } /// Helper struct that wires the transaction pool features into the node builder. From e2fbf87cbf3c70bbcf1e6cded070a691081a2ec3 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 15 Jan 2026 14:01:26 -0500 Subject: [PATCH 4/9] fix --- Cargo.lock | 1 + bin/node/src/cli.rs | 2 +- crates/client/txpool/Cargo.toml | 2 +- crates/client/txpool/src/tracker.rs | 24 ------------------------ 4 files changed, 3 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6db8ab62..3b9fb2e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2141,6 +2141,7 @@ dependencies = [ "alloy-primitives", "base-client-node", "base-flashblocks", + "base-flashblocks-node", "chrono", "derive_more", "eyre", diff --git a/bin/node/src/cli.rs b/bin/node/src/cli.rs index fa4a0973..879d8efb 100644 --- a/bin/node/src/cli.rs +++ b/bin/node/src/cli.rs @@ -60,7 +60,7 @@ impl From for TxpoolConfig { tracing_enabled: args.enable_transaction_tracing, tracing_logs_enabled: args.enable_transaction_tracing_logs, sequencer_rpc: args.rollup_args.sequencer, - flashblocks_config: args.flashblocks_config, + flashblocks_config: args.websocket_url.map(|url| FlashblocksConfig::new(url, args.max_pending_blocks_depth)), } } } diff --git a/crates/client/txpool/Cargo.toml b/crates/client/txpool/Cargo.toml index cec938bb..f55aac13 100644 --- a/crates/client/txpool/Cargo.toml +++ b/crates/client/txpool/Cargo.toml @@ -48,7 +48,7 @@ eyre.workspace = true httpmock.workspace = true serde_json.workspace = true reth-transaction-pool = { workspace = true, features = ["test-utils"] } -base-flashblocks = { workspace = true, features = ["test-utils"] } +base-flashblocks-node = { workspace = true, features = ["test-utils"] } alloy-consensus.workspace = true alloy-eips.workspace = true op-alloy-consensus.workspace = true diff --git a/crates/client/txpool/src/tracker.rs b/crates/client/txpool/src/tracker.rs index 00e862ca..0eeb8a87 100644 --- a/crates/client/txpool/src/tracker.rs +++ b/crates/client/txpool/src/tracker.rs @@ -586,28 +586,4 @@ mod tests { assert_eq!(tracker.txs.len(), 1); assert!(tracker.txs.get(&tx_hash2).is_some()); } - - #[test] - fn test_transaction_fb_included_with_pending_time() { - let mut tracker = Tracker::new(false); - let tx_hash = TxHash::random(); - - // Insert a pending transaction - tracker.transaction_inserted(tx_hash, TxEvent::Pending); - tracker.transaction_moved(tx_hash, Pool::Pending); - - // Verify pending_time is set - assert!(tracker.txs.get(&tx_hash).unwrap().pending_time.is_some()); - let initial_metric_count = tracker.metrics.fb_inclusion_duration.get_sample_count(); - - // Track FB inclusion - tracker.transaction_fb_included(tx_hash); - - // Verify transaction is still in cache (FB inclusion doesn't remove it) - assert!(tracker.txs.get(&tx_hash).is_some()); - - // Verify metric was recorded - let final_metric_count = tracker.metrics.fb_inclusion_duration.get_sample_count(); - assert_eq!(final_metric_count, initial_metric_count + 1); - } } From 88b4fa7e51a5aec34b386707c713c7166e299552 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 15 Jan 2026 15:39:33 -0500 Subject: [PATCH 5/9] add test --- Cargo.lock | 2 + crates/client/txpool/Cargo.toml | 4 +- crates/client/txpool/src/tracker.rs | 136 +++++++++++++++++++++++----- 3 files changed, 118 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3b9fb2e7..16724d4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2139,9 +2139,11 @@ dependencies = [ "alloy-consensus", "alloy-eips", "alloy-primitives", + "alloy-rpc-types-engine", "base-client-node", "base-flashblocks", "base-flashblocks-node", + "base-flashtypes", "chrono", "derive_more", "eyre", diff --git a/crates/client/txpool/Cargo.toml b/crates/client/txpool/Cargo.toml index f55aac13..2efeacdf 100644 --- a/crates/client/txpool/Cargo.toml +++ b/crates/client/txpool/Cargo.toml @@ -13,7 +13,7 @@ workspace = true [dependencies] # workspace -base-client-node.workspace = true +base-client-node = { workspace = true, features = ["test-utils"] } base-flashblocks.workspace = true # reth @@ -55,3 +55,5 @@ op-alloy-consensus.workspace = true op-alloy-rpc-types.workspace = true reth-optimism-primitives.workspace = true revm-database.workspace = true +base-flashtypes.workspace = true +alloy-rpc-types-engine.workspace = true diff --git a/crates/client/txpool/src/tracker.rs b/crates/client/txpool/src/tracker.rs index 0eeb8a87..00146c2c 100644 --- a/crates/client/txpool/src/tracker.rs +++ b/crates/client/txpool/src/tracker.rs @@ -256,32 +256,23 @@ impl Tracker { #[cfg(test)] mod tests { - use std::sync::Arc; - - use alloy_primitives::{map::HashMap, Address}; - use base_client_node::test_utils::TestHarness; - use reth_optimism_primitives::OpTransactionSigned; - use reth_transaction_pool::test_utils::TransactionBuilder; + use std::ops::Deref; + + use alloy_consensus::{SignableTransaction, TxEip1559}; + use alloy_primitives::Address; + use base_client_node::test_utils::{SignerSync, L1_BLOCK_INFO_DEPOSIT_TX}; + use alloy_eips::eip2718::Encodable2718; + use base_flashblocks::FlashblocksAPI; + use base_client_node::test_utils::Account; + use base_flashblocks_node::test_harness::FlashblocksHarness; + use base_flashtypes::{ + ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata, + }; + use alloy_primitives::{Bytes, B256, U256}; + use tokio::time; use super::*; - // Test helper to create a transaction - fn create_test_transaction() -> OpTransactionSigned { - TransactionBuilder::default() - .nonce(0) - .gas_limit(21_000) - .max_fee_per_gas(1_000_000_000) - .max_priority_fee_per_gas(1_000_000_000) - .to(Address::random()) - .value(1000) - .chain_id(901) - .into_eip1559() - .as_eip1559() - .unwrap() - .clone() - .into() - } - #[test] fn test_transaction_inserted_pending() { let mut tracker = Tracker::new(false); @@ -586,4 +577,103 @@ mod tests { assert_eq!(tracker.txs.len(), 1); assert!(tracker.txs.get(&tx_hash2).is_some()); } + + fn build_eip1559_tx( + chain_id: u64, + nonce: u64, + to: Address, + value: U256, + input: Bytes, + account: Account, + ) -> Bytes { + let tx = TxEip1559 { + chain_id, + nonce, + gas_limit: 200_000, + max_fee_per_gas: 1_000_000_000, + max_priority_fee_per_gas: 1_000_000_000, + to: alloy_primitives::TxKind::Call(to), + value, + access_list: Default::default(), + input, + }; + + let signature = account.signer().sign_hash_sync(&tx.signature_hash()).expect("signing works"); + let signed = tx.into_signed(signature); + + signed.encoded_2718().into() + } + + #[tokio::test] + async fn test_receive_fb() -> eyre::Result<()> { + // Setup + let harness = FlashblocksHarness::new().await?; + let mut tracker = Tracker::new(false); + + // Build transaction + let tx = build_eip1559_tx( + harness.chain_id(), + 0, + Account::Alice.address(), + U256::ZERO, + Bytes::new(), + Account::Alice, + ); + let tx_hash = alloy_primitives::keccak256(&tx); + let fb = Flashblock { + payload_id: alloy_rpc_types_engine::PayloadId::new([0; 8]), + index: 0, + base: Some(ExecutionPayloadBaseV1 { + parent_beacon_block_root: B256::default(), + parent_hash: B256::default(), + fee_recipient: Address::ZERO, + prev_randao: B256::default(), + block_number: 1, + gas_limit: 30_000_000, + timestamp: 0, + extra_data: Bytes::new(), + base_fee_per_gas: U256::ZERO, + }), + diff: ExecutionPayloadFlashblockDeltaV1 { + blob_gas_used: Some(0), + transactions: vec![L1_BLOCK_INFO_DEPOSIT_TX, tx], + ..Default::default() + }, + metadata: Metadata { block_number: 1 }, + }; + + // Mimic sending a tx to the mpool/builder + tracker.transaction_inserted(tx_hash, TxEvent::Pending); + + // Wait a bit to simulate builder picking and building the tx into the pending block + time::sleep(Duration::from_millis(10)).await; + harness.send_flashblock(fb).await?; + + let state = harness.flashblocks_state().get_pending_blocks(); + // Verify we have some pending transactions + let ptxs = state + .as_ref() + .map(|pb| pb.get_pending_transaction_hashes()) + .unwrap_or_default(); + assert_eq!(ptxs.len(), 2); // L1Info + tx + assert_eq!(ptxs[1], tx_hash); + + let pb = state + .as_ref() + .unwrap() + .deref(); + tracker.track_flashblock_transactions(pb); + + // It should still be in the tracker + assert!(tracker.txs.get(&tx_hash).is_some()); + + // Wait until its included in canonical block + time::sleep(Duration::from_millis(1500)).await; + tracker.transaction_completed(tx_hash, TxEvent::BlockInclusion); + + // It should be removed from the tracker + assert!(tracker.txs.get(&tx_hash).is_none()); + + Ok(()) + } } From 9c335b75819e4e1da95fe9abd4259e302b880d3d Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 15 Jan 2026 15:47:16 -0500 Subject: [PATCH 6/9] move out build_eip155_tx --- Cargo.lock | 2 ++ crates/client/flashblocks-node/Cargo.toml | 1 + .../flashblocks-node/tests/eip7702_tests.rs | 30 ++--------------- crates/client/txpool/Cargo.toml | 1 + crates/client/txpool/src/tracker.rs | 31 ++--------------- .../shared/primitives/src/test_utils/mod.rs | 3 ++ .../primitives/src/test_utils/transactions.rs | 33 +++++++++++++++++++ 7 files changed, 44 insertions(+), 57 deletions(-) create mode 100644 crates/shared/primitives/src/test_utils/transactions.rs diff --git a/Cargo.lock b/Cargo.lock index 16724d4a..466510c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1984,6 +1984,7 @@ dependencies = [ "base-flashblocks", "base-flashblocks-node", "base-flashtypes", + "base-primitives", "criterion", "derive_more", "eyre", @@ -2144,6 +2145,7 @@ dependencies = [ "base-flashblocks", "base-flashblocks-node", "base-flashtypes", + "base-primitives", "chrono", "derive_more", "eyre", diff --git a/crates/client/flashblocks-node/Cargo.toml b/crates/client/flashblocks-node/Cargo.toml index 81f648f7..b18080ea 100644 --- a/crates/client/flashblocks-node/Cargo.toml +++ b/crates/client/flashblocks-node/Cargo.toml @@ -29,6 +29,7 @@ workspace = true # workspace base-flashblocks.workspace = true base-client-node.workspace = true +base-primitives.workspace = true # reth reth-chain-state.workspace = true diff --git a/crates/client/flashblocks-node/tests/eip7702_tests.rs b/crates/client/flashblocks-node/tests/eip7702_tests.rs index c30f6449..8259fcf9 100644 --- a/crates/client/flashblocks-node/tests/eip7702_tests.rs +++ b/crates/client/flashblocks-node/tests/eip7702_tests.rs @@ -3,7 +3,7 @@ //! These tests verify that EIP-7702 authorization and delegation //! transactions work correctly in the pending/flashblocks state. -use alloy_consensus::{SignableTransaction, TxEip1559, TxEip7702}; +use alloy_consensus::{SignableTransaction, TxEip7702}; use alloy_eips::{eip2718::Encodable2718, eip7702::Authorization}; use alloy_primitives::{Address, B256, Bytes, U256}; use alloy_provider::Provider; @@ -15,6 +15,7 @@ use base_flashblocks_node::test_harness::FlashblocksHarness; use base_flashtypes::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata, }; +use base_primitives::build_eip1559_tx; use eyre::Result; use op_alloy_network::ReceiptResponse; @@ -93,33 +94,6 @@ fn build_eip7702_tx( signed.encoded_2718().into() } -/// Build and sign an EIP-1559 transaction -fn build_eip1559_tx( - chain_id: u64, - nonce: u64, - to: Address, - value: U256, - input: Bytes, - account: Account, -) -> Bytes { - let tx = TxEip1559 { - chain_id, - nonce, - gas_limit: 200_000, - max_fee_per_gas: 1_000_000_000, - max_priority_fee_per_gas: 1_000_000_000, - to: alloy_primitives::TxKind::Call(to), - value, - access_list: Default::default(), - input, - }; - - let signature = account.signer().sign_hash_sync(&tx.signature_hash()).expect("signing works"); - let signed = tx.into_signed(signature); - - signed.encoded_2718().into() -} - fn create_base_flashblock(setup: &TestSetup) -> Flashblock { Flashblock { payload_id: alloy_rpc_types_engine::PayloadId::new([0; 8]), diff --git a/crates/client/txpool/Cargo.toml b/crates/client/txpool/Cargo.toml index 2efeacdf..b025ab5d 100644 --- a/crates/client/txpool/Cargo.toml +++ b/crates/client/txpool/Cargo.toml @@ -57,3 +57,4 @@ reth-optimism-primitives.workspace = true revm-database.workspace = true base-flashtypes.workspace = true alloy-rpc-types-engine.workspace = true +base-primitives.workspace = true diff --git a/crates/client/txpool/src/tracker.rs b/crates/client/txpool/src/tracker.rs index 00146c2c..49988628 100644 --- a/crates/client/txpool/src/tracker.rs +++ b/crates/client/txpool/src/tracker.rs @@ -258,16 +258,15 @@ impl Tracker { mod tests { use std::ops::Deref; - use alloy_consensus::{SignableTransaction, TxEip1559}; use alloy_primitives::Address; - use base_client_node::test_utils::{SignerSync, L1_BLOCK_INFO_DEPOSIT_TX}; - use alloy_eips::eip2718::Encodable2718; + use base_client_node::test_utils::L1_BLOCK_INFO_DEPOSIT_TX; use base_flashblocks::FlashblocksAPI; use base_client_node::test_utils::Account; use base_flashblocks_node::test_harness::FlashblocksHarness; use base_flashtypes::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata, }; + use base_primitives::build_eip1559_tx; use alloy_primitives::{Bytes, B256, U256}; use tokio::time; @@ -578,32 +577,6 @@ mod tests { assert!(tracker.txs.get(&tx_hash2).is_some()); } - fn build_eip1559_tx( - chain_id: u64, - nonce: u64, - to: Address, - value: U256, - input: Bytes, - account: Account, - ) -> Bytes { - let tx = TxEip1559 { - chain_id, - nonce, - gas_limit: 200_000, - max_fee_per_gas: 1_000_000_000, - max_priority_fee_per_gas: 1_000_000_000, - to: alloy_primitives::TxKind::Call(to), - value, - access_list: Default::default(), - input, - }; - - let signature = account.signer().sign_hash_sync(&tx.signature_hash()).expect("signing works"); - let signed = tx.into_signed(signature); - - signed.encoded_2718().into() - } - #[tokio::test] async fn test_receive_fb() -> eyre::Result<()> { // Setup diff --git a/crates/shared/primitives/src/test_utils/mod.rs b/crates/shared/primitives/src/test_utils/mod.rs index 8626c87e..9a61e339 100644 --- a/crates/shared/primitives/src/test_utils/mod.rs +++ b/crates/shared/primitives/src/test_utils/mod.rs @@ -11,3 +11,6 @@ pub use contracts::{ AccessListContract, ContractFactory, DoubleCounter, Logic, Logic2, Minimal7702Account, MockERC20, Proxy, SimpleStorage, TransparentUpgradeableProxy, }; + +mod transactions; +pub use transactions::build_eip1559_tx; diff --git a/crates/shared/primitives/src/test_utils/transactions.rs b/crates/shared/primitives/src/test_utils/transactions.rs new file mode 100644 index 00000000..961d57f2 --- /dev/null +++ b/crates/shared/primitives/src/test_utils/transactions.rs @@ -0,0 +1,33 @@ +use alloy_consensus::{SignableTransaction, TxEip1559}; +use alloy_eips::eip2718::Encodable2718; +use alloy_primitives::{Address, Bytes, U256}; +use alloy_signer::SignerSync; + +use super::Account; + +/// Build and sign an EIP-1559 transaction +pub fn build_eip1559_tx( + chain_id: u64, + nonce: u64, + to: Address, + value: U256, + input: Bytes, + account: Account, +) -> Bytes { + let tx = TxEip1559 { + chain_id, + nonce, + gas_limit: 200_000, + max_fee_per_gas: 1_000_000_000, + max_priority_fee_per_gas: 1_000_000_000, + to: alloy_primitives::TxKind::Call(to), + value, + access_list: Default::default(), + input, + }; + + let signature = account.signer().sign_hash_sync(&tx.signature_hash()).expect("signing works"); + let signed = tx.into_signed(signature); + + signed.encoded_2718().into() +} From dd93848e812baafa802f9df58365474f5bea3ddc Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 15 Jan 2026 15:48:13 -0500 Subject: [PATCH 7/9] fmt --- bin/node/src/cli.rs | 4 +++- crates/client/txpool/src/extension.rs | 2 +- crates/client/txpool/src/tracker.rs | 18 +++++------------- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/bin/node/src/cli.rs b/bin/node/src/cli.rs index 879d8efb..a2e92e18 100644 --- a/bin/node/src/cli.rs +++ b/bin/node/src/cli.rs @@ -60,7 +60,9 @@ impl From for TxpoolConfig { tracing_enabled: args.enable_transaction_tracing, tracing_logs_enabled: args.enable_transaction_tracing_logs, sequencer_rpc: args.rollup_args.sequencer, - flashblocks_config: args.websocket_url.map(|url| FlashblocksConfig::new(url, args.max_pending_blocks_depth)), + flashblocks_config: args + .websocket_url + .map(|url| FlashblocksConfig::new(url, args.max_pending_blocks_depth)), } } } diff --git a/crates/client/txpool/src/extension.rs b/crates/client/txpool/src/extension.rs index 73bb598e..e1747a96 100644 --- a/crates/client/txpool/src/extension.rs +++ b/crates/client/txpool/src/extension.rs @@ -21,7 +21,7 @@ pub struct TxpoolConfig { /// Sequencer RPC endpoint for transaction status proxying. pub sequencer_rpc: Option, /// Optional Flashblocks configuration (includes state). - pub flashblocks_config: Option, + pub flashblocks_config: Option, } /// Helper struct that wires the transaction pool features into the node builder. diff --git a/crates/client/txpool/src/tracker.rs b/crates/client/txpool/src/tracker.rs index 49988628..2a75f866 100644 --- a/crates/client/txpool/src/tracker.rs +++ b/crates/client/txpool/src/tracker.rs @@ -258,16 +258,14 @@ impl Tracker { mod tests { use std::ops::Deref; - use alloy_primitives::Address; - use base_client_node::test_utils::L1_BLOCK_INFO_DEPOSIT_TX; + use alloy_primitives::{Address, B256, Bytes, U256}; + use base_client_node::test_utils::{Account, L1_BLOCK_INFO_DEPOSIT_TX}; use base_flashblocks::FlashblocksAPI; - use base_client_node::test_utils::Account; use base_flashblocks_node::test_harness::FlashblocksHarness; use base_flashtypes::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata, }; use base_primitives::build_eip1559_tx; - use alloy_primitives::{Bytes, B256, U256}; use tokio::time; use super::*; @@ -621,20 +619,14 @@ mod tests { // Wait a bit to simulate builder picking and building the tx into the pending block time::sleep(Duration::from_millis(10)).await; harness.send_flashblock(fb).await?; - + let state = harness.flashblocks_state().get_pending_blocks(); // Verify we have some pending transactions - let ptxs = state - .as_ref() - .map(|pb| pb.get_pending_transaction_hashes()) - .unwrap_or_default(); + let ptxs = state.as_ref().map(|pb| pb.get_pending_transaction_hashes()).unwrap_or_default(); assert_eq!(ptxs.len(), 2); // L1Info + tx assert_eq!(ptxs[1], tx_hash); - let pb = state - .as_ref() - .unwrap() - .deref(); + let pb = state.as_ref().unwrap().deref(); tracker.track_flashblock_transactions(pb); // It should still be in the tracker From 0ac1b7357981b59f58c1d1e3f11ba342e7dda647 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 15 Jan 2026 15:58:54 -0500 Subject: [PATCH 8/9] clippy --- crates/client/flashblocks-node/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/client/flashblocks-node/Cargo.toml b/crates/client/flashblocks-node/Cargo.toml index b18080ea..ab369cff 100644 --- a/crates/client/flashblocks-node/Cargo.toml +++ b/crates/client/flashblocks-node/Cargo.toml @@ -29,7 +29,6 @@ workspace = true # workspace base-flashblocks.workspace = true base-client-node.workspace = true -base-primitives.workspace = true # reth reth-chain-state.workspace = true @@ -46,6 +45,7 @@ reth-optimism-chainspec = { workspace = true, optional = true } [dev-dependencies] +base-primitives.workspace = true base-client-node = { workspace = true, features = ["test-utils"] } base-flashblocks-node = { path = ".", features = ["test-utils"] } rand.workspace = true From b268090ced5a0295e97b6e8262783a9b2184ff36 Mon Sep 17 00:00:00 2001 From: William Law Date: Thu, 15 Jan 2026 16:05:11 -0500 Subject: [PATCH 9/9] opt into fb wss --- crates/client/txpool/src/extension.rs | 13 ++++--------- crates/client/txpool/src/subscription.rs | 15 +++++---------- crates/client/txpool/src/tracker.rs | 3 ++- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/crates/client/txpool/src/extension.rs b/crates/client/txpool/src/extension.rs index e1747a96..50aee446 100644 --- a/crates/client/txpool/src/extension.rs +++ b/crates/client/txpool/src/extension.rs @@ -61,16 +61,11 @@ impl BaseNodeExtension for TxPoolExtension { BroadcastStream::new(ctx.provider().subscribe_to_canonical_state()); let pool = ctx.pool().clone(); - // Get flashblocks state from config if available - let flashblocks_state: Option> = - flashblocks_config.as_ref().map(|cfg| cfg.state.clone()); + // Get flashblocks state from config, or create a default one if not configured + let fb_state: Arc = + flashblocks_config.as_ref().map(|cfg| cfg.state.clone()).unwrap_or_default(); - tokio::spawn(tracex_subscription( - canonical_stream, - flashblocks_state, - pool, - logs_enabled, - )); + tokio::spawn(tracex_subscription(canonical_stream, fb_state, pool, logs_enabled)); } Ok(()) diff --git a/crates/client/txpool/src/subscription.rs b/crates/client/txpool/src/subscription.rs index 7dff5679..dbc47f61 100644 --- a/crates/client/txpool/src/subscription.rs +++ b/crates/client/txpool/src/subscription.rs @@ -19,7 +19,7 @@ use crate::tracker::Tracker; /// to canonical state notifications, flashblock updates, and mempool events. pub async fn tracex_subscription( canonical_stream: BroadcastStream>, - flashblocks_api: Option>, + flashblocks_api: Arc, pool: Pool, enable_logs: bool, ) where @@ -34,9 +34,9 @@ pub async fn tracex_subscription( let mut all_events_stream = pool.all_transactions_event_listener(); let mut canonical_stream = canonical_stream; - // Subscribe to flashblocks if available - let mut flashblock_stream: Option>> = - flashblocks_api.map(|api| api.subscribe_to_flashblocks()); + // Subscribe to flashblocks. + let mut flashblock_stream: Receiver> = + flashblocks_api.subscribe_to_flashblocks(); loop { tokio::select! { @@ -49,12 +49,7 @@ pub async fn tracex_subscription( } // Track flashblock inclusion timing. - Ok(pending_blocks) = async { - match &mut flashblock_stream { - Some(stream) => stream.recv().await, - None => std::future::pending().await, - } - } => { + Ok(pending_blocks) = flashblock_stream.recv() => { tracker.handle_flashblock_notification(pending_blocks); } } diff --git a/crates/client/txpool/src/tracker.rs b/crates/client/txpool/src/tracker.rs index 2a75f866..7c057cdd 100644 --- a/crates/client/txpool/src/tracker.rs +++ b/crates/client/txpool/src/tracker.rs @@ -184,7 +184,8 @@ impl Tracker { } } - /// Track a transaction being included in a flashblock. + /// Track a transaction being included in a flashblock. This will not remove + /// the tx from the cache. pub fn transaction_fb_included(&mut self, tx_hash: TxHash) { // Only track if we have seen this transaction before if let Some(event_log) = self.txs.peek(&tx_hash) {