diff --git a/Cargo.lock b/Cargo.lock index f2e2f8b1..466510c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1984,6 +1984,7 @@ dependencies = [ "base-flashblocks", "base-flashblocks-node", "base-flashtypes", + "base-primitives", "criterion", "derive_more", "eyre", @@ -2136,8 +2137,15 @@ dependencies = [ name = "base-txpool" version = "0.0.0" dependencies = [ + "alloy-consensus", + "alloy-eips", "alloy-primitives", + "alloy-rpc-types-engine", "base-client-node", + "base-flashblocks", + "base-flashblocks-node", + "base-flashtypes", + "base-primitives", "chrono", "derive_more", "eyre", @@ -2147,11 +2155,15 @@ dependencies = [ "lru 0.16.3", "metrics", "metrics-derive", + "op-alloy-consensus", + "op-alloy-rpc-types", "reth-node-api", + "reth-optimism-primitives", "reth-primitives-traits", "reth-provider", "reth-tracing", "reth-transaction-pool", + "revm-database", "serde", "serde_json", "tokio", diff --git a/bin/node/src/cli.rs b/bin/node/src/cli.rs index 03177da5..a2e92e18 100644 --- a/bin/node/src/cli.rs +++ b/bin/node/src/cli.rs @@ -60,6 +60,9 @@ impl From for TxpoolConfig { tracing_enabled: args.enable_transaction_tracing, tracing_logs_enabled: args.enable_transaction_tracing_logs, sequencer_rpc: args.rollup_args.sequencer, + flashblocks_config: args + .websocket_url + .map(|url| FlashblocksConfig::new(url, args.max_pending_blocks_depth)), } } } diff --git a/crates/client/flashblocks-node/Cargo.toml b/crates/client/flashblocks-node/Cargo.toml index 81f648f7..ab369cff 100644 --- a/crates/client/flashblocks-node/Cargo.toml +++ b/crates/client/flashblocks-node/Cargo.toml @@ -45,6 +45,7 @@ reth-optimism-chainspec = { workspace = true, optional = true } [dev-dependencies] +base-primitives.workspace = true base-client-node = { workspace = true, features = ["test-utils"] } base-flashblocks-node = { path = ".", features = ["test-utils"] } rand.workspace = true diff --git a/crates/client/flashblocks-node/tests/eip7702_tests.rs b/crates/client/flashblocks-node/tests/eip7702_tests.rs index c30f6449..8259fcf9 100644 --- a/crates/client/flashblocks-node/tests/eip7702_tests.rs +++ b/crates/client/flashblocks-node/tests/eip7702_tests.rs @@ -3,7 +3,7 @@ //! These tests verify that EIP-7702 authorization and delegation //! transactions work correctly in the pending/flashblocks state. -use alloy_consensus::{SignableTransaction, TxEip1559, TxEip7702}; +use alloy_consensus::{SignableTransaction, TxEip7702}; use alloy_eips::{eip2718::Encodable2718, eip7702::Authorization}; use alloy_primitives::{Address, B256, Bytes, U256}; use alloy_provider::Provider; @@ -15,6 +15,7 @@ use base_flashblocks_node::test_harness::FlashblocksHarness; use base_flashtypes::{ ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata, }; +use base_primitives::build_eip1559_tx; use eyre::Result; use op_alloy_network::ReceiptResponse; @@ -93,33 +94,6 @@ fn build_eip7702_tx( signed.encoded_2718().into() } -/// Build and sign an EIP-1559 transaction -fn build_eip1559_tx( - chain_id: u64, - nonce: u64, - to: Address, - value: U256, - input: Bytes, - account: Account, -) -> Bytes { - let tx = TxEip1559 { - chain_id, - nonce, - gas_limit: 200_000, - max_fee_per_gas: 1_000_000_000, - max_priority_fee_per_gas: 1_000_000_000, - to: alloy_primitives::TxKind::Call(to), - value, - access_list: Default::default(), - input, - }; - - let signature = account.signer().sign_hash_sync(&tx.signature_hash()).expect("signing works"); - let signed = tx.into_signed(signature); - - signed.encoded_2718().into() -} - fn create_base_flashblock(setup: &TestSetup) -> Flashblock { Flashblock { payload_id: alloy_rpc_types_engine::PayloadId::new([0; 8]), diff --git a/crates/client/txpool/Cargo.toml b/crates/client/txpool/Cargo.toml index 9f9f25b6..b025ab5d 100644 --- a/crates/client/txpool/Cargo.toml +++ b/crates/client/txpool/Cargo.toml @@ -13,7 +13,8 @@ workspace = true [dependencies] # workspace -base-client-node.workspace = true +base-client-node = { workspace = true, features = ["test-utils"] } +base-flashblocks.workspace = true # reth reth-tracing.workspace = true @@ -47,3 +48,13 @@ eyre.workspace = true httpmock.workspace = true serde_json.workspace = true reth-transaction-pool = { workspace = true, features = ["test-utils"] } +base-flashblocks-node = { workspace = true, features = ["test-utils"] } +alloy-consensus.workspace = true +alloy-eips.workspace = true +op-alloy-consensus.workspace = true +op-alloy-rpc-types.workspace = true +reth-optimism-primitives.workspace = true +revm-database.workspace = true +base-flashtypes.workspace = true +alloy-rpc-types-engine.workspace = true +base-primitives.workspace = true diff --git a/crates/client/txpool/src/events.rs b/crates/client/txpool/src/events.rs index 7ca9faf5..c5d7b902 100644 --- a/crates/client/txpool/src/events.rs +++ b/crates/client/txpool/src/events.rs @@ -23,6 +23,9 @@ pub enum TxEvent { /// Transaction included on chain. #[display("block_inclusion")] BlockInclusion, + /// Transaction included in a flashblock. + #[display("fb_inclusion")] + FbInclusion, /// Transaction moved from pending -> queued. #[display("pending_to_queued")] PendingToQueued, diff --git a/crates/client/txpool/src/extension.rs b/crates/client/txpool/src/extension.rs index 0689dd49..50aee446 100644 --- a/crates/client/txpool/src/extension.rs +++ b/crates/client/txpool/src/extension.rs @@ -1,7 +1,10 @@ //! Contains the [`TxPoolExtension`] which wires up the transaction pool features //! (tracing subscription and status RPC) on the Base node builder. +use std::sync::Arc; + use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig}; +use base_flashblocks::{FlashblocksConfig, FlashblocksState}; use reth_provider::CanonStateSubscriptions; use tokio_stream::wrappers::BroadcastStream; use tracing::info; @@ -17,6 +20,8 @@ pub struct TxpoolConfig { pub tracing_logs_enabled: bool, /// Sequencer RPC endpoint for transaction status proxying. pub sequencer_rpc: Option, + /// Optional Flashblocks configuration (includes state). + pub flashblocks_config: Option, } /// Helper struct that wires the transaction pool features into the node builder. @@ -42,6 +47,7 @@ impl BaseNodeExtension for TxPoolExtension { let sequencer_rpc = config.sequencer_rpc; let tracing_enabled = config.tracing_enabled; let logs_enabled = config.tracing_logs_enabled; + let flashblocks_config = config.flashblocks_config; builder.add_rpc_module(move |ctx| { info!(message = "Starting Transaction Status RPC"); @@ -54,7 +60,12 @@ impl BaseNodeExtension for TxPoolExtension { let canonical_stream = BroadcastStream::new(ctx.provider().subscribe_to_canonical_state()); let pool = ctx.pool().clone(); - tokio::spawn(tracex_subscription(canonical_stream, pool, logs_enabled)); + + // Get flashblocks state from config, or create a default one if not configured + let fb_state: Arc = + flashblocks_config.as_ref().map(|cfg| cfg.state.clone()).unwrap_or_default(); + + tokio::spawn(tracex_subscription(canonical_stream, fb_state, pool, logs_enabled)); } Ok(()) diff --git a/crates/client/txpool/src/metrics.rs b/crates/client/txpool/src/metrics.rs index b27ef7fb..14913bb3 100644 --- a/crates/client/txpool/src/metrics.rs +++ b/crates/client/txpool/src/metrics.rs @@ -14,4 +14,9 @@ pub struct Metrics { describe = "Time taken for a transaction to be included in a block from when it's marked as pending" )] pub inclusion_duration: Histogram, + /// Time taken for a transaction to be included in a flashblock from when it's marked as pending. + #[metric( + describe = "Time taken for a transaction to be included in a flashblock from when it's marked as pending" + )] + pub fb_inclusion_duration: Histogram, } diff --git a/crates/client/txpool/src/subscription.rs b/crates/client/txpool/src/subscription.rs index f02666de..dbc47f61 100644 --- a/crates/client/txpool/src/subscription.rs +++ b/crates/client/txpool/src/subscription.rs @@ -1,10 +1,14 @@ //! Tracex canonical block subscription. +use std::sync::Arc; + +use base_flashblocks::{FlashblocksAPI, PendingBlocks}; use futures::StreamExt; use reth_node_api::NodePrimitives; use reth_provider::CanonStateNotification; use reth_tracing::tracing::debug; use reth_transaction_pool::TransactionPool; +use tokio::sync::broadcast::Receiver; use tokio_stream::wrappers::BroadcastStream; use crate::tracker::Tracker; @@ -12,14 +16,16 @@ use crate::tracker::Tracker; /// Subscription task that tracks transaction timing from mempool to block inclusion. /// /// Monitors transaction lifecycle events and records timing metrics by listening -/// to canonical state notifications and mempool events. -pub async fn tracex_subscription( +/// to canonical state notifications, flashblock updates, and mempool events. +pub async fn tracex_subscription( canonical_stream: BroadcastStream>, + flashblocks_api: Arc, pool: Pool, enable_logs: bool, ) where N: NodePrimitives, Pool: TransactionPool + 'static, + FB: FlashblocksAPI + 'static, { debug!(target: "tracex", "Starting transaction tracking subscription"); let mut tracker = Tracker::new(enable_logs); @@ -28,6 +34,10 @@ pub async fn tracex_subscription( let mut all_events_stream = pool.all_transactions_event_listener(); let mut canonical_stream = canonical_stream; + // Subscribe to flashblocks. + let mut flashblock_stream: Receiver> = + flashblocks_api.subscribe_to_flashblocks(); + loop { tokio::select! { // Track # of transactions dropped and replaced. @@ -37,6 +47,11 @@ pub async fn tracex_subscription( Some(Ok(notification)) = canonical_stream.next() => { tracker.handle_canon_state_notification(notification); } + + // Track flashblock inclusion timing. + Ok(pending_blocks) = flashblock_stream.recv() => { + tracker.handle_flashblock_notification(pending_blocks); + } } } } diff --git a/crates/client/txpool/src/tracker.rs b/crates/client/txpool/src/tracker.rs index 366530f8..7c057cdd 100644 --- a/crates/client/txpool/src/tracker.rs +++ b/crates/client/txpool/src/tracker.rs @@ -2,10 +2,12 @@ use std::{ num::NonZeroUsize, + sync::Arc, time::{Duration, Instant}, }; use alloy_primitives::TxHash; +use base_flashblocks::PendingBlocks; use chrono::Local; use lru::LruCache; use reth_node_api::{BlockBody, NodePrimitives}; @@ -75,6 +77,11 @@ impl Tracker { self.track_committed_chain(¬ification.committed()); } + /// Parse flashblock updates and track transaction inclusion in flashblocks. + pub fn handle_flashblock_notification(&mut self, pending_blocks: Arc) { + self.track_flashblock_transactions(&pending_blocks); + } + fn track_committed_chain(&mut self, chain: &Chain) { for block in chain.blocks().values() { for transaction in block.body().transactions() { @@ -83,6 +90,13 @@ impl Tracker { } } + fn track_flashblock_transactions(&mut self, pending_blocks: &PendingBlocks) { + // Get all transaction hashes from pending blocks + for tx_hash in pending_blocks.get_pending_transaction_hashes() { + self.transaction_fb_included(tx_hash); + } + } + /// Track the first time we see a transaction in the mempool. pub fn transaction_inserted(&mut self, tx_hash: TxHash, event: TxEvent) { // If we've seen the tx before, don't track it again. For example, @@ -170,6 +184,28 @@ impl Tracker { } } + /// Track a transaction being included in a flashblock. This will not remove + /// the tx from the cache. + pub fn transaction_fb_included(&mut self, tx_hash: TxHash) { + // Only track if we have seen this transaction before + if let Some(event_log) = self.txs.peek(&tx_hash) { + // Record `fb_inclusion_duration` metric if transaction was pending + if let Some(pending_time) = event_log.pending_time { + let time_pending_to_fb_inclusion = Instant::now().duration_since(pending_time); + self.metrics + .fb_inclusion_duration + .record(time_pending_to_fb_inclusion.as_millis() as f64); + + debug!( + target: "tracex", + tx_hash = ?tx_hash, + duration_ms = time_pending_to_fb_inclusion.as_millis(), + "Transaction included in flashblock" + ); + } + } + } + /// Track a transaction being replaced by removing it from the cache and adding the new tx. pub fn transaction_replaced(&mut self, tx_hash: TxHash, replaced_by: TxHash) { if let Some(mut event_log) = self.txs.pop(&tx_hash) { @@ -221,6 +257,18 @@ impl Tracker { #[cfg(test)] mod tests { + use std::ops::Deref; + + use alloy_primitives::{Address, B256, Bytes, U256}; + use base_client_node::test_utils::{Account, L1_BLOCK_INFO_DEPOSIT_TX}; + use base_flashblocks::FlashblocksAPI; + use base_flashblocks_node::test_harness::FlashblocksHarness; + use base_flashtypes::{ + ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata, + }; + use base_primitives::build_eip1559_tx; + use tokio::time; + use super::*; #[test] @@ -527,4 +575,71 @@ mod tests { assert_eq!(tracker.txs.len(), 1); assert!(tracker.txs.get(&tx_hash2).is_some()); } + + #[tokio::test] + async fn test_receive_fb() -> eyre::Result<()> { + // Setup + let harness = FlashblocksHarness::new().await?; + let mut tracker = Tracker::new(false); + + // Build transaction + let tx = build_eip1559_tx( + harness.chain_id(), + 0, + Account::Alice.address(), + U256::ZERO, + Bytes::new(), + Account::Alice, + ); + let tx_hash = alloy_primitives::keccak256(&tx); + let fb = Flashblock { + payload_id: alloy_rpc_types_engine::PayloadId::new([0; 8]), + index: 0, + base: Some(ExecutionPayloadBaseV1 { + parent_beacon_block_root: B256::default(), + parent_hash: B256::default(), + fee_recipient: Address::ZERO, + prev_randao: B256::default(), + block_number: 1, + gas_limit: 30_000_000, + timestamp: 0, + extra_data: Bytes::new(), + base_fee_per_gas: U256::ZERO, + }), + diff: ExecutionPayloadFlashblockDeltaV1 { + blob_gas_used: Some(0), + transactions: vec![L1_BLOCK_INFO_DEPOSIT_TX, tx], + ..Default::default() + }, + metadata: Metadata { block_number: 1 }, + }; + + // Mimic sending a tx to the mpool/builder + tracker.transaction_inserted(tx_hash, TxEvent::Pending); + + // Wait a bit to simulate builder picking and building the tx into the pending block + time::sleep(Duration::from_millis(10)).await; + harness.send_flashblock(fb).await?; + + let state = harness.flashblocks_state().get_pending_blocks(); + // Verify we have some pending transactions + let ptxs = state.as_ref().map(|pb| pb.get_pending_transaction_hashes()).unwrap_or_default(); + assert_eq!(ptxs.len(), 2); // L1Info + tx + assert_eq!(ptxs[1], tx_hash); + + let pb = state.as_ref().unwrap().deref(); + tracker.track_flashblock_transactions(pb); + + // It should still be in the tracker + assert!(tracker.txs.get(&tx_hash).is_some()); + + // Wait until its included in canonical block + time::sleep(Duration::from_millis(1500)).await; + tracker.transaction_completed(tx_hash, TxEvent::BlockInclusion); + + // It should be removed from the tracker + assert!(tracker.txs.get(&tx_hash).is_none()); + + Ok(()) + } } diff --git a/crates/shared/primitives/src/test_utils/mod.rs b/crates/shared/primitives/src/test_utils/mod.rs index 8626c87e..9a61e339 100644 --- a/crates/shared/primitives/src/test_utils/mod.rs +++ b/crates/shared/primitives/src/test_utils/mod.rs @@ -11,3 +11,6 @@ pub use contracts::{ AccessListContract, ContractFactory, DoubleCounter, Logic, Logic2, Minimal7702Account, MockERC20, Proxy, SimpleStorage, TransparentUpgradeableProxy, }; + +mod transactions; +pub use transactions::build_eip1559_tx; diff --git a/crates/shared/primitives/src/test_utils/transactions.rs b/crates/shared/primitives/src/test_utils/transactions.rs new file mode 100644 index 00000000..961d57f2 --- /dev/null +++ b/crates/shared/primitives/src/test_utils/transactions.rs @@ -0,0 +1,33 @@ +use alloy_consensus::{SignableTransaction, TxEip1559}; +use alloy_eips::eip2718::Encodable2718; +use alloy_primitives::{Address, Bytes, U256}; +use alloy_signer::SignerSync; + +use super::Account; + +/// Build and sign an EIP-1559 transaction +pub fn build_eip1559_tx( + chain_id: u64, + nonce: u64, + to: Address, + value: U256, + input: Bytes, + account: Account, +) -> Bytes { + let tx = TxEip1559 { + chain_id, + nonce, + gas_limit: 200_000, + max_fee_per_gas: 1_000_000_000, + max_priority_fee_per_gas: 1_000_000_000, + to: alloy_primitives::TxKind::Call(to), + value, + access_list: Default::default(), + input, + }; + + let signature = account.signer().sign_hash_sync(&tx.signature_hash()).expect("signing works"); + let signed = tx.into_signed(signature); + + signed.encoded_2718().into() +}