Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions bin/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ impl From<Args> for TxpoolConfig {
tracing_enabled: args.enable_transaction_tracing,
tracing_logs_enabled: args.enable_transaction_tracing_logs,
sequencer_rpc: args.rollup_args.sequencer,
flashblocks_config: args
.websocket_url
.map(|url| FlashblocksConfig::new(url, args.max_pending_blocks_depth)),
Comment on lines +63 to +65
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we're going to want to create a new Flashblocks config here, we can just pass through the shared state.

See how the bundle does this here:
https://github.com/base/node-reth/blob/main/bin/node/src/main.rs#L36

}
}
}
1 change: 1 addition & 0 deletions crates/client/flashblocks-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ reth-optimism-chainspec = { workspace = true, optional = true }


[dev-dependencies]
base-primitives.workspace = true
base-client-node = { workspace = true, features = ["test-utils"] }
base-flashblocks-node = { path = ".", features = ["test-utils"] }
rand.workspace = true
Expand Down
30 changes: 2 additions & 28 deletions crates/client/flashblocks-node/tests/eip7702_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! These tests verify that EIP-7702 authorization and delegation
//! transactions work correctly in the pending/flashblocks state.

use alloy_consensus::{SignableTransaction, TxEip1559, TxEip7702};
use alloy_consensus::{SignableTransaction, TxEip7702};
use alloy_eips::{eip2718::Encodable2718, eip7702::Authorization};
use alloy_primitives::{Address, B256, Bytes, U256};
use alloy_provider::Provider;
Expand All @@ -15,6 +15,7 @@ use base_flashblocks_node::test_harness::FlashblocksHarness;
use base_flashtypes::{
ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1, Flashblock, Metadata,
};
use base_primitives::build_eip1559_tx;
use eyre::Result;
use op_alloy_network::ReceiptResponse;

Expand Down Expand Up @@ -93,33 +94,6 @@ fn build_eip7702_tx(
signed.encoded_2718().into()
}

/// Build and sign an EIP-1559 transaction
fn build_eip1559_tx(
chain_id: u64,
nonce: u64,
to: Address,
value: U256,
input: Bytes,
account: Account,
) -> Bytes {
let tx = TxEip1559 {
chain_id,
nonce,
gas_limit: 200_000,
max_fee_per_gas: 1_000_000_000,
max_priority_fee_per_gas: 1_000_000_000,
to: alloy_primitives::TxKind::Call(to),
value,
access_list: Default::default(),
input,
};

let signature = account.signer().sign_hash_sync(&tx.signature_hash()).expect("signing works");
let signed = tx.into_signed(signature);

signed.encoded_2718().into()
}

fn create_base_flashblock(setup: &TestSetup) -> Flashblock {
Flashblock {
payload_id: alloy_rpc_types_engine::PayloadId::new([0; 8]),
Expand Down
13 changes: 12 additions & 1 deletion crates/client/txpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ workspace = true

[dependencies]
# workspace
base-client-node.workspace = true
base-client-node = { workspace = true, features = ["test-utils"] }
base-flashblocks.workspace = true

# reth
reth-tracing.workspace = true
Expand Down Expand Up @@ -47,3 +48,13 @@ eyre.workspace = true
httpmock.workspace = true
serde_json.workspace = true
reth-transaction-pool = { workspace = true, features = ["test-utils"] }
base-flashblocks-node = { workspace = true, features = ["test-utils"] }
alloy-consensus.workspace = true
alloy-eips.workspace = true
op-alloy-consensus.workspace = true
op-alloy-rpc-types.workspace = true
reth-optimism-primitives.workspace = true
revm-database.workspace = true
base-flashtypes.workspace = true
alloy-rpc-types-engine.workspace = true
base-primitives.workspace = true
3 changes: 3 additions & 0 deletions crates/client/txpool/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub enum TxEvent {
/// Transaction included on chain.
#[display("block_inclusion")]
BlockInclusion,
/// Transaction included in a flashblock.
#[display("fb_inclusion")]
FbInclusion,
/// Transaction moved from pending -> queued.
#[display("pending_to_queued")]
PendingToQueued,
Expand Down
13 changes: 12 additions & 1 deletion crates/client/txpool/src/extension.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
//! Contains the [`TxPoolExtension`] which wires up the transaction pool features
//! (tracing subscription and status RPC) on the Base node builder.
use std::sync::Arc;

use base_client_node::{BaseBuilder, BaseNodeExtension, FromExtensionConfig};
use base_flashblocks::{FlashblocksConfig, FlashblocksState};
use reth_provider::CanonStateSubscriptions;
use tokio_stream::wrappers::BroadcastStream;
use tracing::info;
Expand All @@ -17,6 +20,8 @@ pub struct TxpoolConfig {
pub tracing_logs_enabled: bool,
/// Sequencer RPC endpoint for transaction status proxying.
pub sequencer_rpc: Option<String>,
/// Optional Flashblocks configuration (includes state).
pub flashblocks_config: Option<FlashblocksConfig>,
}

/// Helper struct that wires the transaction pool features into the node builder.
Expand All @@ -42,6 +47,7 @@ impl BaseNodeExtension for TxPoolExtension {
let sequencer_rpc = config.sequencer_rpc;
let tracing_enabled = config.tracing_enabled;
let logs_enabled = config.tracing_logs_enabled;
let flashblocks_config = config.flashblocks_config;

builder.add_rpc_module(move |ctx| {
info!(message = "Starting Transaction Status RPC");
Expand All @@ -54,7 +60,12 @@ impl BaseNodeExtension for TxPoolExtension {
let canonical_stream =
BroadcastStream::new(ctx.provider().subscribe_to_canonical_state());
let pool = ctx.pool().clone();
tokio::spawn(tracex_subscription(canonical_stream, pool, logs_enabled));

// Get flashblocks state from config, or create a default one if not configured
let fb_state: Arc<FlashblocksState> =
flashblocks_config.as_ref().map(|cfg| cfg.state.clone()).unwrap_or_default();

tokio::spawn(tracex_subscription(canonical_stream, fb_state, pool, logs_enabled));
}

Ok(())
Expand Down
5 changes: 5 additions & 0 deletions crates/client/txpool/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ pub struct Metrics {
describe = "Time taken for a transaction to be included in a block from when it's marked as pending"
)]
pub inclusion_duration: Histogram,
/// Time taken for a transaction to be included in a flashblock from when it's marked as pending.
#[metric(
describe = "Time taken for a transaction to be included in a flashblock from when it's marked as pending"
)]
pub fb_inclusion_duration: Histogram,
}
19 changes: 17 additions & 2 deletions crates/client/txpool/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
//! Tracex canonical block subscription.
use std::sync::Arc;

use base_flashblocks::{FlashblocksAPI, PendingBlocks};
use futures::StreamExt;
use reth_node_api::NodePrimitives;
use reth_provider::CanonStateNotification;
use reth_tracing::tracing::debug;
use reth_transaction_pool::TransactionPool;
use tokio::sync::broadcast::Receiver;
use tokio_stream::wrappers::BroadcastStream;

use crate::tracker::Tracker;

/// Subscription task that tracks transaction timing from mempool to block inclusion.
///
/// Monitors transaction lifecycle events and records timing metrics by listening
/// to canonical state notifications and mempool events.
pub async fn tracex_subscription<N, Pool>(
/// to canonical state notifications, flashblock updates, and mempool events.
pub async fn tracex_subscription<N, Pool, FB>(
canonical_stream: BroadcastStream<CanonStateNotification<N>>,
flashblocks_api: Arc<FB>,
pool: Pool,
enable_logs: bool,
) where
N: NodePrimitives,
Pool: TransactionPool + 'static,
FB: FlashblocksAPI + 'static,
{
debug!(target: "tracex", "Starting transaction tracking subscription");
let mut tracker = Tracker::new(enable_logs);
Expand All @@ -28,6 +34,10 @@ pub async fn tracex_subscription<N, Pool>(
let mut all_events_stream = pool.all_transactions_event_listener();
let mut canonical_stream = canonical_stream;

// Subscribe to flashblocks.
let mut flashblock_stream: Receiver<Arc<PendingBlocks>> =
flashblocks_api.subscribe_to_flashblocks();

loop {
tokio::select! {
// Track # of transactions dropped and replaced.
Expand All @@ -37,6 +47,11 @@ pub async fn tracex_subscription<N, Pool>(
Some(Ok(notification)) = canonical_stream.next() => {
tracker.handle_canon_state_notification(notification);
}

// Track flashblock inclusion timing.
Ok(pending_blocks) = flashblock_stream.recv() => {
tracker.handle_flashblock_notification(pending_blocks);
}
}
}
}
Loading
Loading