diff --git a/src/builder.rs b/src/builder.rs index 24c48e59..ab8f6f4e 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -9,18 +9,15 @@ use super::{client::Client, config::NodeConfig, node::Node}; #[cfg(feature = "rusqlite")] use crate::db::error::SqlInitializationError; #[cfg(feature = "rusqlite")] -use crate::db::sqlite::{headers::SqliteHeaderDb, peers::SqlitePeerDb}; +use crate::db::sqlite::peers::SqlitePeerDb; use crate::network::dns::{DnsResolver, DNS_RESOLVER_PORT}; use crate::network::ConnectionType; -use crate::{ - chain::checkpoints::HeaderCheckpoint, - db::traits::{HeaderStore, PeerStore}, -}; +use crate::{chain::checkpoints::HeaderCheckpoint, db::traits::PeerStore}; use crate::{LogLevel, PeerStoreSizeConfig, TrustedPeer}; #[cfg(feature = "rusqlite")] /// The default node returned from the [`NodeBuilder`]. -pub type NodeDefault = Node; +pub type NodeDefault = Node; const MIN_PEERS: u8 = 1; const MAX_PEERS: u8 = 15; @@ -219,26 +216,18 @@ impl NodeBuilder { #[cfg(feature = "rusqlite")] pub fn build(&mut self) -> Result<(NodeDefault, Client), SqlInitializationError> { let peer_store = SqlitePeerDb::new(self.network, self.config.data_path.clone())?; - let header_store = SqliteHeaderDb::new(self.network, self.config.data_path.clone())?; Ok(Node::new( self.network, core::mem::take(&mut self.config), peer_store, - header_store, )) } /// Consume the node builder by using custom database implementations, receiving a [`Node`] and [`Client`]. - pub fn build_with_databases( + pub fn build_with_databases( &mut self, peer_store: P, - header_store: H, - ) -> (Node, Client) { - Node::new( - self.network, - core::mem::take(&mut self.config), - peer_store, - header_store, - ) + ) -> (Node

, Client) { + Node::new(self.network, core::mem::take(&mut self.config), peer_store) } } diff --git a/src/chain/chain.rs b/src/chain/chain.rs index 0fdbefe2..4728577e 100644 --- a/src/chain/chain.rs +++ b/src/chain/chain.rs @@ -1,7 +1,6 @@ extern crate alloc; use std::{ collections::{BTreeMap, HashSet}, - ops::Range, sync::Arc, }; @@ -18,37 +17,33 @@ use super::{ error::{CFHeaderSyncError, CFilterSyncError, HeaderSyncError}, graph::{AcceptHeaderChanges, BlockTree, HeaderRejection}, CFHeaderChanges, Filter, FilterCheck, FilterHeaderRequest, FilterRequest, FilterRequestState, - HeaderChainChanges, HeightExt, HeightMonitor, PeerId, + HeaderChainChanges, HeightMonitor, PeerId, }; #[cfg(feature = "filter-control")] use crate::IndexedFilter; use crate::{ chain::header_batch::HeadersBatch, - db::{traits::HeaderStore, BlockHeaderChanges}, dialog::Dialog, - error::HeaderPersistenceError, messages::{Event, Warning}, Info, Progress, }; -const REORG_LOOKBACK: u32 = 7; const FILTER_BASIC: u8 = 0x00; const CF_HEADER_BATCH_SIZE: u32 = 1_999; const FILTER_BATCH_SIZE: u32 = 999; #[derive(Debug)] -pub(crate) struct Chain { +pub(crate) struct Chain { pub(crate) header_chain: BlockTree, request_state: FilterRequestState, checkpoints: HeaderCheckpoints, network: Network, - db: Arc>, heights: Arc>, scripts: HashSet, dialog: Arc

, } -impl Chain { +impl Chain { #[allow(clippy::too_many_arguments)] pub(crate) fn new( network: Network, @@ -57,7 +52,6 @@ impl Chain { checkpoints: HeaderCheckpoints, dialog: Arc, height_monitor: Arc>, - db: H, quorum_required: u8, ) -> Self { let header_chain = BlockTree::new(anchor, network); @@ -66,7 +60,6 @@ impl Chain { checkpoints, request_state: FilterRequestState::new(quorum_required), network, - db: Arc::new(Mutex::new(db)), heights: height_monitor, scripts, dialog, @@ -93,76 +86,6 @@ impl Chain { } } - // Load in headers, ideally allowing the difficulty adjustment to be audited and - // reorganizations to be handled gracefully. - pub(crate) async fn load_headers(&mut self) -> Result<(), HeaderPersistenceError> { - let mut db = self.db.lock().await; - // The original height the user requested a scan after - let scan_height = self.header_chain.height(); - // The header relevant to compute the next adjustment - let last_adjustment = scan_height.last_epoch_start(self.network); - // Seven blocks ago - let reorg = scan_height.saturating_sub(REORG_LOOKBACK); - // To handle adjustments and reorgs, we would have the minimum of each of these heights - let min_interesting_height = last_adjustment.min(reorg); - let max_interesting_height = last_adjustment.max(reorg); - // Get the maximum of the two interesting heights. In case the minimum is not available - if let Some(header) = db.header_at(max_interesting_height).await.ok().flatten() { - self.header_chain = - BlockTree::from_header(max_interesting_height, header, self.network); - } - // If this succeeds, both reorgs and difficulty adjustments can be handled gracefully. - if let Some(header) = db.header_at(min_interesting_height).await.ok().flatten() { - self.header_chain = - BlockTree::from_header(min_interesting_height, header, self.network); - } - // Now that the block tree is updated to the appropriate start, load in the rest of - // the history from this point onward. This is either: from the user start height, - // from the last difficulty adjustment, or seven blocks ago, depending on what the - // header store was able to provide. - let loaded_headers = db - .load(self.header_chain.height().increment()..) - .await - .map_err(HeaderPersistenceError::Database)?; - for (height, header) in loaded_headers { - let apply_header_changes = self.header_chain.accept_header(header); - match apply_header_changes { - AcceptHeaderChanges::Accepted { connected_at } => { - if height.ne(&connected_at.height) { - self.dialog.send_warning(Warning::CorruptedHeaders); - return Err(HeaderPersistenceError::HeadersDoNotLink); - } - if let Some(checkpoint) = self.checkpoints.next() { - if connected_at.header.block_hash().eq(&checkpoint.hash) { - self.checkpoints.advance() - } - } - } - AcceptHeaderChanges::Rejected(reject_reason) => match reject_reason { - HeaderRejection::UnknownPrevHash(_) => { - return Err(HeaderPersistenceError::CannotLocateHistory); - } - HeaderRejection::InvalidPow { expected, got } => { - crate::log!( - self.dialog, - format!( - "Unexpected invalid proof of work when importing a block header. expected {}, got: {}", - expected.to_consensus(), - got.to_consensus() - ) - ); - } - }, - _ => (), - } - } - // Because the user requested a scan after the `scan_height`, the filters below this point - // may be assumed as checked. Note that in a reorg, filters below this height may still be - // retrieved, as this only considers the canonical chain as checked. - self.header_chain.assume_checked_to(scan_height); - Ok(()) - } - // Sync the chain with headers from a peer, adjusting to reorgs if needed pub(crate) async fn sync_chain( &mut self, @@ -176,7 +99,6 @@ impl Chain { // We check first if the peer is sending us nonsense self.sanity_check(&header_batch)?; let next_checkpoint = self.checkpoints.next().copied(); - let mut db = self.db.lock().await; let mut reorged_hashes = None; let mut fork_added = None; for header in header_batch.into_iter() { @@ -191,7 +113,6 @@ impl Chain { connected_at.header.block_hash() ) ); - db.stage(BlockHeaderChanges::Connected(connected_at)); if let Some(checkpoint) = next_checkpoint { if connected_at.height.eq(&checkpoint.height) { if connected_at.header.block_hash().eq(&checkpoint.hash) { @@ -238,10 +159,6 @@ impl Chain { .map(|index| index.header.block_hash()) .collect(); reorged_hashes = Some(removed_hashes); - db.stage(BlockHeaderChanges::Reorganized { - accepted: accepted.clone(), - reorganized: disconnected.clone(), - }); let disconnected_event = Event::BlocksDisconnected { accepted, disconnected, @@ -260,12 +177,6 @@ impl Chain { }, } } - if let Err(e) = db.write().await { - self.dialog.send_warning(Warning::FailedPersistence { - warning: format!("Could not save headers to disk: {e}"), - }); - } - drop(db); match reorged_hashes { Some(reorgs) => { self.clear_compact_filter_queue(); @@ -530,44 +441,6 @@ impl Chain { self.scripts.insert(script); } - // Fetch a header from the cache or disk. - pub(crate) async fn fetch_header( - &mut self, - height: u32, - ) -> Result, HeaderPersistenceError> { - match self.header_chain.header_at_height(height) { - Some(header) => Ok(Some(header)), - None => { - let mut db = self.db.lock().await; - let header_opt = db.header_at(height).await; - if header_opt.is_err() { - self.dialog - .send_warning(Warning::FailedPersistence { - warning: format!( - "Unexpected error fetching a header from the header store at height {height}" - ), - }); - } - header_opt.map_err(HeaderPersistenceError::Database) - } - } - } - - pub(crate) async fn fetch_header_range( - &self, - range: Range, - ) -> Result, HeaderPersistenceError> { - let mut db = self.db.lock().await; - let range_opt = db.load(range).await; - if range_opt.is_err() { - self.dialog.send_warning(Warning::FailedPersistence { - warning: "Unexpected error fetching a range of headers from the header store" - .to_string(), - }); - } - range_opt.map_err(HeaderPersistenceError::Database) - } - // Reset the compact filter queue because we received a new block pub(crate) fn clear_compact_filter_queue(&mut self) { self.request_state.agreement_state.reset_agreements(); @@ -638,7 +511,7 @@ mod tests { anchor: HeaderCheckpoint, height_monitor: Arc>, peers: u8, - ) -> Chain<()> { + ) -> Chain { let (log_tx, _) = tokio::sync::mpsc::channel::(1); let (info_tx, _) = tokio::sync::mpsc::channel::(1); let (warn_tx, _) = tokio::sync::mpsc::unbounded_channel::(); @@ -658,7 +531,6 @@ mod tests { event_tx, )), height_monitor, - (), peers, ) } diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 07a61407..418e3eff 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -239,8 +239,6 @@ trait HeightExt: Clone + Copy + std::hash::Hash + PartialEq + Eq + PartialOrd + fn from_u64_checked(height: u64) -> Option; fn is_adjustment_multiple(&self, params: impl AsRef) -> bool; - - fn last_epoch_start(&self, params: impl AsRef) -> Self; } impl HeightExt for u32 { @@ -255,12 +253,6 @@ impl HeightExt for u32 { fn from_u64_checked(height: u64) -> Option { height.try_into().ok() } - - fn last_epoch_start(&self, params: impl AsRef) -> Self { - let diff_adjustment_interval = params.as_ref().difficulty_adjustment_interval() as u32; - let floor = self / diff_adjustment_interval; - floor * diff_adjustment_interval - } } // Emulation of `GetBlockSubsidy` in Bitcoin Core: https://github.com/bitcoin/bitcoin/blob/master/src/validation.cpp#L1944 @@ -277,7 +269,6 @@ pub(crate) fn block_subsidy(height: u32) -> Amount { #[cfg(test)] mod tests { use super::*; - use bitcoin::Network; #[test] fn test_height_monitor() { @@ -306,18 +297,6 @@ mod tests { assert!(height_monitor.max().unwrap().eq(&12)); } - #[test] - fn test_height_ext() { - assert!(2016.is_adjustment_multiple(Network::Bitcoin)); - assert!(4032.is_adjustment_multiple(Network::Bitcoin)); - let height = 2300; - assert_eq!(height.last_epoch_start(Network::Bitcoin), 2016); - let height = 4033; - assert_eq!(height.last_epoch_start(Network::Bitcoin), 4032); - let height = 4032; - assert_eq!(height.last_epoch_start(Network::Bitcoin), 4032); - } - #[test] fn test_subsidy_calculation() { let first_subsidy = block_subsidy(2); diff --git a/src/client.rs b/src/client.rs index 151f0586..02d29734 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,8 +1,8 @@ #[cfg(not(feature = "filter-control"))] use bitcoin::ScriptBuf; -use bitcoin::{block::Header, BlockHash, FeeRate}; use bitcoin::{Amount, Transaction}; -use std::{collections::BTreeMap, ops::Range, time::Duration}; +use bitcoin::{BlockHash, FeeRate}; +use std::time::Duration; use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; @@ -12,8 +12,8 @@ use crate::{Event, Info, TrustedPeer, TxBroadcast, Warning}; use super::{error::FetchBlockError, messages::BlockRequest, IndexedBlock}; use super::{ - error::{ClientError, FetchFeeRateError, FetchHeaderError}, - messages::{BatchHeaderRequest, ClientMessage, HeaderRequest}, + error::{ClientError, FetchFeeRateError}, + messages::ClientMessage, }; /// A [`Client`] allows for communication with a running node. @@ -133,43 +133,6 @@ impl Requester { .map_err(|_| ClientError::SendError) } - /// Get a header at the specified height, if it exists. - /// - /// # Note - /// - /// The height of the chain is the canonical index of the header in the chain. - /// For example, the genesis block is at a height of zero. - /// - /// # Errors - /// - /// If the node has stopped running. - pub async fn get_header(&self, height: u32) -> Result { - let (tx, rx) = tokio::sync::oneshot::channel::>(); - let message = HeaderRequest::new(tx, height); - self.ntx - .send(ClientMessage::GetHeader(message)) - .map_err(|_| FetchHeaderError::SendError)?; - rx.await.map_err(|_| FetchHeaderError::RecvError)? - } - - /// Get a range of headers by the specified range. - /// - /// # Errors - /// - /// If the node has stopped running. - pub async fn get_header_range( - &self, - range: Range, - ) -> Result, FetchHeaderError> { - let (tx, rx) = - tokio::sync::oneshot::channel::, FetchHeaderError>>(); - let message = BatchHeaderRequest::new(tx, range); - self.ntx - .send(ClientMessage::GetHeaderBatch(message)) - .map_err(|_| FetchHeaderError::SendError)?; - rx.await.map_err(|_| FetchHeaderError::RecvError)? - } - /// Request a block be fetched. Note that this method will request a block /// from a connected peer's inventory, and may take an indefinite amount of /// time, until a peer responds. diff --git a/src/db/sqlite/headers.rs b/src/db/sqlite/headers.rs deleted file mode 100644 index c6d6cfbb..00000000 --- a/src/db/sqlite/headers.rs +++ /dev/null @@ -1,394 +0,0 @@ -use std::collections::{BTreeMap, HashSet}; -use std::fs; -use std::ops::{Bound, RangeBounds}; -use std::path::PathBuf; -use std::sync::Arc; - -use bitcoin::block::Header; -use bitcoin::{consensus, BlockHash, Network}; -use rusqlite::{params, params_from_iter, Connection, Result}; -use tokio::sync::Mutex; - -use crate::db::error::{SqlHeaderStoreError, SqlInitializationError}; -use crate::db::traits::HeaderStore; -use crate::db::BlockHeaderChanges; -use crate::prelude::FutureResult; - -use super::{DATA_DIR, DEFAULT_CWD}; - -const FILE_NAME: &str = "headers.db"; -// Labels for the schema table -const SCHEMA_TABLE_NAME: &str = "header_schema_versions"; -const SCHEMA_COLUMN: &str = "schema_key"; -const VERSION_COLUMN: &str = "version"; -const SCHEMA_KEY: &str = "current_version"; -// Update this in the case of schema changes -const SCHEMA_VERSION: u8 = 0; -// Always execute this query and adjust the schema with migrations -const INITIAL_HEADER_SCHEMA: &str = "CREATE TABLE IF NOT EXISTS headers ( - height INTEGER PRIMARY KEY, - block_hash BLOB NOT NULL, - header BLOB NOT NULL -) STRICT"; - -const LOAD_QUERY_SELECT_PREFIX: &str = "SELECT * FROM headers "; -const LOAD_QUERY_ORDERBY_SUFFIX: &str = "ORDER BY height"; - -/// Header storage implementation with SQL Lite. -#[derive(Debug)] -pub struct SqliteHeaderDb { - conn: Arc>, - accepted: BTreeMap, - disconnected: HashSet, -} - -impl SqliteHeaderDb { - /// Create a new [`SqliteHeaderDb`] with an optional file path. If no path is provided, - /// the file will be stored in a `data` subdirectory where the program is ran. - pub fn new(network: Network, path: Option) -> Result { - let mut path = path.unwrap_or_else(|| PathBuf::from(DEFAULT_CWD)); - path.push(DATA_DIR); - path.push(network.to_string()); - if !path.exists() { - fs::create_dir_all(&path)?; - } - let conn = Connection::open(path.join(FILE_NAME))?; - // Create the schema version - let schema_table_query = format!( - "CREATE TABLE IF NOT EXISTS {SCHEMA_TABLE_NAME} ({SCHEMA_COLUMN} TEXT PRIMARY KEY, {VERSION_COLUMN} INTEGER NOT NULL)"); - // Update the schema version - conn.execute(&schema_table_query, [])?; - let schema_init_version = format!( - "INSERT OR REPLACE INTO {SCHEMA_TABLE_NAME} ({SCHEMA_COLUMN}, {VERSION_COLUMN}) VALUES (?1, ?2)"); - conn.execute(&schema_init_version, params![SCHEMA_KEY, SCHEMA_VERSION])?; - // Build the table if it doesn't exist - conn.execute(INITIAL_HEADER_SCHEMA, [])?; - // Migrate to any new schema versions - Self::migrate(&conn)?; - - Ok(Self { - conn: Arc::new(Mutex::new(conn)), - accepted: BTreeMap::new(), - disconnected: HashSet::new(), - }) - } - - // This function currently does nothing, but if new columns are required this may be used to alter the tables - // without breaking older tables. - fn migrate(conn: &Connection) -> Result<(), SqlInitializationError> { - let version_query = - format!("SELECT {VERSION_COLUMN} FROM {SCHEMA_TABLE_NAME} WHERE {SCHEMA_COLUMN} = ?1"); - let _current_version: u8 = - conn.query_row(&version_query, [SCHEMA_KEY], |row| row.get(0))?; - // Match on the version and migrate to new schemas in the future - Ok(()) - } - - async fn load<'a>( - &mut self, - range: impl RangeBounds + Send + Sync + 'a, - ) -> Result, SqlHeaderStoreError> { - let mut param_list = Vec::new(); - let mut stmt = LOAD_QUERY_SELECT_PREFIX.to_string(); - - match range.start_bound() { - Bound::Unbounded => { - stmt.push_str("WHERE height >= 0 "); - } - Bound::Included(h) => { - stmt.push_str("WHERE height >= ? "); - param_list.push(*h); - } - Bound::Excluded(h) => { - stmt.push_str("WHERE height > ? "); - param_list.push(*h); - } - }; - - match range.end_bound() { - Bound::Unbounded => (), - Bound::Included(h) => { - stmt.push_str("AND height <= ? "); - param_list.push(*h); - } - Bound::Excluded(h) => { - stmt.push_str("AND height < ? "); - param_list.push(*h); - } - }; - - stmt.push_str(LOAD_QUERY_ORDERBY_SUFFIX); - - let mut headers = BTreeMap::::new(); - let write_lock = self.conn.lock().await; - let mut query = write_lock.prepare(&stmt)?; - let mut rows = query.query(params_from_iter(param_list.iter()))?; - while let Some(row) = rows.next()? { - let height: u32 = row.get(0)?; - let header: [u8; 80] = row.get(2)?; - let next_header: Header = consensus::deserialize(&header)?; - if let Some(header) = headers.values().last() { - if header.block_hash().ne(&next_header.prev_blockhash) { - return Err(SqlHeaderStoreError::Corruption); - } - } - headers.insert(height, next_header); - } - Ok(headers) - } - - fn stage(&mut self, changes: BlockHeaderChanges) { - match changes { - BlockHeaderChanges::Connected(indexed_header) => { - self.accepted - .insert(indexed_header.height, indexed_header.header); - } - BlockHeaderChanges::Reorganized { - accepted, - reorganized, - } => { - for indexed_header in reorganized { - let removed_hash = indexed_header.header.block_hash(); - self.accepted - .retain(|_, header| header.block_hash().ne(&removed_hash)); - self.disconnected.insert(removed_hash); - } - for indexed_header in accepted { - self.accepted - .insert(indexed_header.height, indexed_header.header); - } - } - } - } - - async fn write(&mut self) -> Result<(), SqlHeaderStoreError> { - let mut write_lock = self.conn.lock().await; - let tx = write_lock.transaction()?; - for removed in core::mem::take(&mut self.disconnected) { - let hash: Vec = consensus::serialize(&removed); - let stmt = "DELETE FROM headers WHERE block_hash = ?1"; - tx.execute(stmt, params![hash])?; - } - for (height, header) in core::mem::take(&mut self.accepted) { - let hash: Vec = consensus::serialize(&header.block_hash()); - let header: Vec = consensus::serialize(&header); - let stmt = - "INSERT OR REPLACE INTO headers (height, block_hash, header) VALUES (?1, ?2, ?3)"; - tx.execute(stmt, params![height, hash, header])?; - } - tx.commit()?; - Ok(()) - } - - async fn height_of( - &mut self, - block_hash: &BlockHash, - ) -> Result, SqlHeaderStoreError> { - let write_lock = self.conn.lock().await; - let stmt = "SELECT height FROM headers WHERE block_hash = ?1"; - let hash: Vec = consensus::serialize(&block_hash); - let row: Option = write_lock.query_row(stmt, params![hash], |row| row.get(0))?; - Ok(row) - } - - async fn hash_at(&mut self, height: u32) -> Result, SqlHeaderStoreError> { - let write_lock = self.conn.lock().await; - let stmt = "SELECT block_hash FROM headers WHERE height = ?1"; - let row: Option<[u8; 32]> = - write_lock.query_row(stmt, params![height], |row| row.get(0))?; - match row { - Some(hash) => Ok(Some(consensus::deserialize(&hash)?)), - None => Ok(None), - } - } - - async fn header_at(&mut self, height: u32) -> Result, SqlHeaderStoreError> { - let write_lock = self.conn.lock().await; - let stmt = "SELECT * FROM headers WHERE height = ?1"; - let query = write_lock.query_row(stmt, params![height], |row| { - let header_slice: [u8; 80] = row.get(2)?; - consensus::deserialize(&header_slice) - .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e))) - }); - match query { - Ok(header) => Ok(Some(header)), - Err(e) => match e { - rusqlite::Error::QueryReturnedNoRows => Ok(None), - _ => Err(SqlHeaderStoreError::SQL(e)), - }, - } - } -} - -impl HeaderStore for SqliteHeaderDb { - type Error = SqlHeaderStoreError; - - fn load<'a>( - &'a mut self, - range: impl RangeBounds + Send + Sync + 'a, - ) -> FutureResult<'a, BTreeMap, Self::Error> { - Box::pin(self.load(range)) - } - - fn stage(&mut self, changes: BlockHeaderChanges) { - self.stage(changes) - } - - fn write(&mut self) -> FutureResult<'_, (), Self::Error> { - Box::pin(self.write()) - } - - fn height_of<'a>( - &'a mut self, - hash: &'a BlockHash, - ) -> FutureResult<'a, Option, Self::Error> { - Box::pin(self.height_of(hash)) - } - - fn hash_at(&mut self, height: u32) -> FutureResult<'_, Option, Self::Error> { - Box::pin(self.hash_at(height)) - } - - fn header_at(&mut self, height: u32) -> FutureResult<'_, Option
, Self::Error> { - Box::pin(self.header_at(height)) - } -} - -#[cfg(test)] -mod tests { - use crate::chain::IndexedHeader; - - use super::*; - use bitcoin::consensus::deserialize; - - #[tokio::test] - async fn test_sql_header_store_normal_use() { - let binding = tempfile::tempdir().unwrap(); - let path = binding.path(); - let mut db = SqliteHeaderDb::new(Network::Regtest, Some(path.into())).unwrap(); - let block_8: Header = deserialize(&hex::decode("0000002016fe292517eecbbd63227d126a6b1db30ebc5262c61f8f3a4a529206388fc262dfd043cef8454f71f30b5bbb9eb1a4c9aea87390f429721e435cf3f8aa6e2a9171375166ffff7f2000000000").unwrap()).unwrap(); - let block_9: Header = deserialize(&hex::decode("000000205708a90197d93475975545816b2229401ccff7567cb23900f14f2bd46732c605fd8de19615a1d687e89db365503cdf58cb649b8e935a1d3518fa79b0d408704e71375166ffff7f2000000000").unwrap()).unwrap(); - let block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093790c9f554a7780a6043a19619d2a4697364bb62abf6336c0568c31f1eedca3c3e171375166ffff7f2000000000").unwrap()).unwrap(); - let changes_8 = IndexedHeader::new(8, block_8); - let changes_9 = IndexedHeader::new(9, block_9); - let changes_10 = IndexedHeader::new(10, block_10); - let mut map = BTreeMap::new(); - map.insert(8, block_8); - map.insert(9, block_9); - map.insert(10, block_10); - let block_hash_8 = block_8.block_hash(); - let block_hash_9 = block_9.block_hash(); - db.stage(BlockHeaderChanges::Connected(changes_8)); - db.stage(BlockHeaderChanges::Connected(changes_9)); - db.stage(BlockHeaderChanges::Connected(changes_10)); - let w = db.write().await; - assert!(w.is_ok()); - let get_hash_9 = db.hash_at(9).await.unwrap().unwrap(); - assert_eq!(get_hash_9, block_hash_9); - let get_height_8 = db.height_of(&block_hash_8).await.unwrap().unwrap(); - assert_eq!(get_height_8, 8); - let load = db.load(7..).await.unwrap(); - - assert_eq!(map, load); - let get_header_9 = db.header_at(9).await.unwrap().unwrap(); - assert_eq!(get_header_9, block_9); - let get_header_11 = db.header_at(11).await.unwrap(); - assert!(get_header_11.is_none()); - let get_header_7 = db.header_at(7).await.unwrap(); - assert!(get_header_7.is_none()); - drop(db); - binding.close().unwrap(); - } - - #[tokio::test] - async fn test_sql_header_loads_with_fork() { - let binding = tempfile::tempdir().unwrap(); - let path = binding.path(); - let mut db = SqliteHeaderDb::new(Network::Regtest, Some(path.into())).unwrap(); - let block_8: Header = deserialize(&hex::decode("0000002016fe292517eecbbd63227d126a6b1db30ebc5262c61f8f3a4a529206388fc262dfd043cef8454f71f30b5bbb9eb1a4c9aea87390f429721e435cf3f8aa6e2a9171375166ffff7f2000000000").unwrap()).unwrap(); - let block_9: Header = deserialize(&hex::decode("000000205708a90197d93475975545816b2229401ccff7567cb23900f14f2bd46732c605fd8de19615a1d687e89db365503cdf58cb649b8e935a1d3518fa79b0d408704e71375166ffff7f2000000000").unwrap()).unwrap(); - let block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093790c9f554a7780a6043a19619d2a4697364bb62abf6336c0568c31f1eedca3c3e171375166ffff7f2000000000").unwrap()).unwrap(); - let mut map = BTreeMap::new(); - map.insert(8, block_8); - map.insert(9, block_9); - map.insert(10, block_10); - let changes_8 = IndexedHeader::new(8, block_8); - let changes_9 = IndexedHeader::new(9, block_9); - let changes_10 = IndexedHeader::new(10, block_10); - db.stage(BlockHeaderChanges::Connected(changes_8)); - db.stage(BlockHeaderChanges::Connected(changes_9)); - db.stage(BlockHeaderChanges::Connected(changes_10)); - let w = db.write().await; - assert!(w.is_ok()); - let get_height_10 = db.header_at(10).await.unwrap().unwrap(); - assert_eq!(block_10, get_height_10); - let new_block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093792151c0e9ce4e4c789ca98427d7740cc7acf30d2ca0c08baef266bf152289d814567e5e66ffff7f2001000000").unwrap()).unwrap(); - let block_11: Header = deserialize(&hex::decode("00000020efcf8b12221fccc735b9b0b657ce15b31b9c50aff530ce96a5b4cfe02d8c0068496c1b8a89cf5dec22e46c35ea1035f80f5b666a1b3aa7f3d6f0880d0061adcc567e5e66ffff7f2001000000").unwrap()).unwrap(); - let mut map = BTreeMap::new(); - map.insert(10, new_block_10); - map.insert(11, block_11); - let accepted = vec![ - IndexedHeader::new(10, new_block_10), - IndexedHeader::new(11, block_11), - ]; - let reorganized = vec![IndexedHeader::new(10, block_10)]; - db.stage(BlockHeaderChanges::Reorganized { - accepted, - reorganized, - }); - let w = db.write().await; - assert!(w.is_ok()); - let block_hash_11 = block_11.block_hash(); - let block_hash_10 = new_block_10.block_hash(); - let get_height_10 = db.header_at(10).await.unwrap().unwrap(); - assert_eq!(new_block_10, get_height_10); - let get_height_12 = db.header_at(12).await.unwrap(); - assert!(get_height_12.is_none()); - let get_hash_10 = db.hash_at(10).await.unwrap().unwrap(); - assert_eq!(get_hash_10, block_hash_10); - let get_height_11 = db.height_of(&block_hash_11).await.unwrap().unwrap(); - assert_eq!(get_height_11, 11); - let mut map = BTreeMap::new(); - map.insert(8, block_8); - map.insert(9, block_9); - map.insert(10, new_block_10); - map.insert(11, block_11); - let load = db.load(7..).await.unwrap(); - assert_eq!(map, load); - drop(db); - binding.close().unwrap(); - } - - #[tokio::test] - async fn test_range_loads_properly() { - let binding = tempfile::tempdir().unwrap(); - let path = binding.path(); - let mut db = SqliteHeaderDb::new(Network::Regtest, Some(path.into())).unwrap(); - let block_8: Header = deserialize(&hex::decode("0000002016fe292517eecbbd63227d126a6b1db30ebc5262c61f8f3a4a529206388fc262dfd043cef8454f71f30b5bbb9eb1a4c9aea87390f429721e435cf3f8aa6e2a9171375166ffff7f2000000000").unwrap()).unwrap(); - let block_9: Header = deserialize(&hex::decode("000000205708a90197d93475975545816b2229401ccff7567cb23900f14f2bd46732c605fd8de19615a1d687e89db365503cdf58cb649b8e935a1d3518fa79b0d408704e71375166ffff7f2000000000").unwrap()).unwrap(); - let block_10: Header = deserialize(&hex::decode("000000201d062f2162835787db536c55317e08df17c58078c7610328bdced198574093790c9f554a7780a6043a19619d2a4697364bb62abf6336c0568c31f1eedca3c3e171375166ffff7f2000000000").unwrap()).unwrap(); - let mut map = BTreeMap::new(); - map.insert(8, block_8); - map.insert(9, block_9); - map.insert(10, block_10); - let changes_8 = IndexedHeader::new(8, block_8); - let changes_9 = IndexedHeader::new(9, block_9); - let changes_10 = IndexedHeader::new(10, block_10); - db.stage(BlockHeaderChanges::Connected(changes_8)); - db.stage(BlockHeaderChanges::Connected(changes_9)); - db.stage(BlockHeaderChanges::Connected(changes_10)); - let w = db.write().await; - assert!(w.is_ok()); - let load = db.load(7..).await.unwrap(); - assert_eq!(map, load); - let load = db.load(8..).await.unwrap(); - assert_eq!(map, load); - let load = db.load(8..10).await.unwrap(); - map.remove(&10); - assert_eq!(map, load); - let load = db.load(..10).await.unwrap(); - assert_eq!(map, load); - drop(db); - binding.close().unwrap(); - } -} diff --git a/src/db/sqlite/mod.rs b/src/db/sqlite/mod.rs index 8976fa29..c41a13f9 100644 --- a/src/db/sqlite/mod.rs +++ b/src/db/sqlite/mod.rs @@ -1,5 +1,3 @@ -/// SQL block header storage. -pub mod headers; /// SQL peer storage. pub mod peers; diff --git a/src/db/traits.rs b/src/db/traits.rs index 635495d6..21360603 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -1,41 +1,8 @@ -use std::fmt::Debug; -use std::ops::RangeBounds; -use std::{collections::BTreeMap, fmt::Display}; - -use bitcoin::{block::Header, BlockHash}; +use std::fmt::{Debug, Display}; use crate::prelude::FutureResult; -use super::{BlockHeaderChanges, PersistedPeer}; - -/// Methods required to persist the chain of block headers. -pub trait HeaderStore: Debug + Send + Sync { - /// Errors that may occur within a [`HeaderStore`]. - type Error: Debug + Display; - /// Load the headers of the canonical chain for the specified range. - fn load<'a>( - &'a mut self, - range: impl RangeBounds + Send + Sync + 'a, - ) -> FutureResult<'a, BTreeMap, Self::Error>; - - /// Stage changes to the chain to be written in the future. - fn stage(&mut self, changes: BlockHeaderChanges); - - /// Commit the changes by writing them to disk. - fn write(&mut self) -> FutureResult<'_, (), Self::Error>; - - /// Return the height of a block hash in the database, if it exists. - fn height_of<'a>( - &'a mut self, - hash: &'a BlockHash, - ) -> FutureResult<'a, Option, Self::Error>; - - /// Return the hash at the height in the database, if it exists. - fn hash_at(&mut self, height: u32) -> FutureResult<'_, Option, Self::Error>; - - /// Return the header at the height in the database, if it exists. - fn header_at(&mut self, height: u32) -> FutureResult<'_, Option
, Self::Error>; -} +use super::PersistedPeer; /// Methods that define a list of peers on the Bitcoin P2P network. pub trait PeerStore: Debug + Send + Sync { @@ -54,7 +21,6 @@ pub trait PeerStore: Debug + Send + Sync { #[cfg(test)] mod test { use super::*; - use std::convert::Infallible; /// Errors for the [`PeerStore`](crate) of unit type. #[derive(Debug)] @@ -94,50 +60,4 @@ mod test { Box::pin(do_num_unbanned()) } } - - impl HeaderStore for () { - type Error = Infallible; - fn load<'a>( - &'a mut self, - _range: impl RangeBounds + Send + Sync + 'a, - ) -> FutureResult<'a, BTreeMap, Self::Error> { - async fn do_load() -> Result, Infallible> { - Ok(BTreeMap::new()) - } - Box::pin(do_load()) - } - - fn stage(&mut self, _changes: BlockHeaderChanges) {} - - fn write(&mut self) -> FutureResult<'_, (), Self::Error> { - async fn do_write() -> Result<(), Infallible> { - Ok(()) - } - Box::pin(do_write()) - } - - fn height_of<'a>( - &'a mut self, - _block_hash: &'a BlockHash, - ) -> FutureResult<'a, Option, Self::Error> { - async fn do_height_of() -> Result, Infallible> { - Ok(None) - } - Box::pin(do_height_of()) - } - - fn hash_at(&mut self, _height: u32) -> FutureResult<'_, Option, Self::Error> { - async fn do_hast_at() -> Result, Infallible> { - Ok(None) - } - Box::pin(do_hast_at()) - } - - fn header_at(&mut self, _height: u32) -> FutureResult<'_, Option
, Self::Error> { - async fn do_header_at() -> Result, Infallible> { - Ok(None) - } - Box::pin(do_header_at()) - } - } } diff --git a/src/error.rs b/src/error.rs index a2ed98bc..58b92905 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,35 +4,26 @@ use crate::impl_sourceless_error; /// Errors that prevent the node from running. #[derive(Debug)] -pub enum NodeError { - /// The persistence layer experienced a critical error. - HeaderDatabase(HeaderPersistenceError), +pub enum NodeError { /// The persistence layer experienced a critical error. PeerDatabase(PeerManagerError

), } -impl core::fmt::Display for NodeError { +impl core::fmt::Display for NodeError

{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - NodeError::HeaderDatabase(e) => write!(f, "block headers: {e}"), NodeError::PeerDatabase(e) => write!(f, "peer manager: {e}"), } } } -impl std::error::Error for NodeError { +impl std::error::Error for NodeError

{ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { None } } -impl From> for NodeError { - fn from(value: HeaderPersistenceError) -> Self { - NodeError::HeaderDatabase(value) - } -} - -impl From> for NodeError { +impl From> for NodeError

{ fn from(value: PeerManagerError

) -> Self { NodeError::PeerDatabase(value) } @@ -67,36 +58,6 @@ impl From

for PeerManagerError

{ } } -/// Errors with the block header representation that prevent the node from operating. -#[derive(Debug)] -pub enum HeaderPersistenceError { - /// The block headers do not point to each other in a list. - HeadersDoNotLink, - /// Some predefined checkpoint does not match. - MismatchedCheckpoints, - /// A user tried to retrieve headers too far in the past for what is in their database. - CannotLocateHistory, - /// A database error. - Database(H), -} - -impl core::fmt::Display for HeaderPersistenceError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - HeaderPersistenceError::HeadersDoNotLink => write!(f, "the headers loaded from persistence do not link together."), - HeaderPersistenceError::MismatchedCheckpoints => write!(f, "the headers loaded do not match a known checkpoint."), - HeaderPersistenceError::CannotLocateHistory => write!(f, "the configured checkpoint is too far in the past compared to previous syncs. The database cannot reconstruct the chain."), - HeaderPersistenceError::Database(e) => write!(f, "database: {e}"), - } - } -} - -impl std::error::Error for HeaderPersistenceError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - None - } -} - /// Errors occuring when the client is talking to the node. #[derive(Debug)] pub enum ClientError { @@ -116,48 +77,6 @@ impl core::fmt::Display for ClientError { impl_sourceless_error!(ClientError); -/// Errors occuring when the client is fetching headers from the node. -#[derive(Debug)] -pub enum FetchHeaderError { - /// The channel to the node was likely closed and dropped from memory. - /// This implies the node is not running. - SendError, - /// The database operation failed while attempting to find the header. - DatabaseOptFailed { - /// The message from the backend describing the failure. - error: String, - }, - /// The channel to the client was likely closed by the node and dropped from memory. - RecvError, - /// The header at the requested height does not yet exist. - UnknownHeight, -} - -impl core::fmt::Display for FetchHeaderError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - FetchHeaderError::SendError => { - write!(f, "the receiver of this message was dropped from memory.") - } - FetchHeaderError::DatabaseOptFailed { error } => { - write!( - f, - "the database operation failed while attempting to find the header: {error}" - ) - } - FetchHeaderError::RecvError => write!( - f, - "the channel to the client was likely closed by the node and dropped from memory." - ), - FetchHeaderError::UnknownHeight => { - write!(f, "the header at the requested height does not yet exist.") - } - } - } -} - -impl_sourceless_error!(FetchHeaderError); - /// Errors occuring when the client is fetching blocks from the node. #[derive(Debug)] pub enum FetchBlockError { diff --git a/src/lib.rs b/src/lib.rs index 6039f291..542b177f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -116,10 +116,10 @@ pub use chain::checkpoints::{ #[cfg(feature = "rusqlite")] #[doc(inline)] -pub use db::sqlite::{headers::SqliteHeaderDb, peers::SqlitePeerDb}; +pub use db::sqlite::peers::SqlitePeerDb; #[doc(inline)] -pub use db::traits::{HeaderStore, PeerStore}; +pub use db::traits::PeerStore; #[doc(inline)] pub use tokio::sync::mpsc::Receiver; diff --git a/src/messages.rs b/src/messages.rs index 5bb80f03..b40b15e7 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, ops::Range, time::Duration}; +use std::{collections::BTreeMap, time::Duration}; use bitcoin::{ block::Header, p2p::message_network::RejectReason, BlockHash, FeeRate, ScriptBuf, Wtxid, @@ -12,7 +12,7 @@ use crate::{ IndexedBlock, NodeState, TrustedPeer, TxBroadcast, }; -use super::error::{FetchBlockError, FetchHeaderError}; +use super::error::FetchBlockError; /// Informational messages emitted by a node #[derive(Debug, Clone)] @@ -166,45 +166,12 @@ pub(crate) enum ClientMessage { SetDuration(Duration), /// Add another known peer to connect to. AddPeer(TrustedPeer), - /// Request a header from a specified height. - GetHeader(HeaderRequest), - /// Request a range of headers. - GetHeaderBatch(BatchHeaderRequest), /// Request the broadcast minimum fee rate. GetBroadcastMinFeeRate(FeeRateSender), /// Send an empty message to see if the node is running. NoOp, } -type HeaderSender = tokio::sync::oneshot::Sender>; - -#[derive(Debug)] -pub(crate) struct HeaderRequest { - pub(crate) oneshot: HeaderSender, - pub(crate) height: u32, -} - -impl HeaderRequest { - pub(crate) fn new(oneshot: HeaderSender, height: u32) -> Self { - Self { oneshot, height } - } -} - -type BatchHeaderSender = - tokio::sync::oneshot::Sender, FetchHeaderError>>; - -#[derive(Debug)] -pub(crate) struct BatchHeaderRequest { - pub(crate) oneshot: BatchHeaderSender, - pub(crate) range: Range, -} - -impl BatchHeaderRequest { - pub(crate) fn new(oneshot: BatchHeaderSender, range: Range) -> Self { - Self { oneshot, range } - } -} - pub(crate) type FeeRateSender = tokio::sync::oneshot::Sender; #[derive(Debug)] diff --git a/src/node.rs b/src/node.rs index ad50b529..a2e98c45 100644 --- a/src/node.rs +++ b/src/node.rs @@ -26,8 +26,8 @@ use crate::{ error::{CFilterSyncError, HeaderSyncError}, CFHeaderChanges, FilterCheck, HeaderChainChanges, HeightMonitor, }, - db::traits::{HeaderStore, PeerStore}, - error::{FetchBlockError, FetchHeaderError}, + db::traits::PeerStore, + error::FetchBlockError, network::{peer_map::PeerMap, LastBlockMonitor, PeerId}, IndexedBlock, NodeState, TxBroadcast, TxBroadcastPolicy, }; @@ -48,9 +48,9 @@ type PeerRequirement = usize; /// A compact block filter node. Nodes download Bitcoin block headers, block filters, and blocks to send relevant events to a client. #[derive(Debug)] -pub struct Node { +pub struct Node { state: Arc>, - chain: Arc>>, + chain: Arc>, peer_map: Arc>>, required_peers: PeerRequirement, dialog: Arc

, @@ -59,13 +59,8 @@ pub struct Node { peer_recv: Arc>>, } -impl Node { - pub(crate) fn new( - network: Network, - config: NodeConfig, - peer_store: P, - header_store: H, - ) -> (Self, Client) { +impl Node

{ + pub(crate) fn new(network: Network, config: NodeConfig, peer_store: P) -> (Self, Client) { let NodeConfig { required_peers, white_list, @@ -116,7 +111,6 @@ impl Node { checkpoints, Arc::clone(&dialog), height_monitor, - header_store, required_peers, ); let chain = Arc::new(Mutex::new(chain)); @@ -140,7 +134,7 @@ impl Node { /// # Errors /// /// A node will cease running if a fatal error is encountered with either the [`PeerStore`] or [`HeaderStore`]. - pub async fn run(&self) -> Result<(), NodeError> { + pub async fn run(&self) -> Result<(), NodeError> { crate::log!(self.dialog, "Starting node"); crate::log!( self.dialog, @@ -149,7 +143,6 @@ impl Node { self.required_peers ) ); - self.fetch_headers().await?; let mut last_block = LastBlockMonitor::new(); let mut peer_recv = self.peer_recv.lock().await; let mut client_recv = self.client_recv.lock().await; @@ -265,22 +258,6 @@ impl Node { let mut peer_map = self.peer_map.lock().await; peer_map.add_trusted_peer(peer); }, - ClientMessage::GetHeader(request) => { - let mut chain = self.chain.lock().await; - let header_opt = chain.fetch_header(request.height).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() }).and_then(|opt| opt.ok_or(FetchHeaderError::UnknownHeight)); - let send_result = request.oneshot.send(header_opt); - if send_result.is_err() { - self.dialog.send_warning(Warning::ChannelDropped); - }; - }, - ClientMessage::GetHeaderBatch(request) => { - let chain = self.chain.lock().await; - let range_opt = chain.fetch_header_range(request.range).await.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() }); - let send_result = request.oneshot.send(range_opt); - if send_result.is_err() { - self.dialog.send_warning(Warning::ChannelDropped); - }; - }, ClientMessage::GetBroadcastMinFeeRate(request) => { let peer_map = self.peer_map.lock().await; let fee_rate = peer_map.broadcast_min(); @@ -316,7 +293,7 @@ impl Node { } // Connect to a new peer if we are not connected to enough - async fn dispatch(&self) -> Result<(), NodeError> { + async fn dispatch(&self) -> Result<(), NodeError> { let mut peer_map = self.peer_map.lock().await; peer_map.clean().await; let live = peer_map.live(); @@ -440,7 +417,7 @@ impl Node { // After we receiving some chain-syncing message, we decide what chain of data needs to be // requested next. - async fn next_stateful_message(&self, chain: &mut Chain) -> Option { + async fn next_stateful_message(&self, chain: &mut Chain) -> Option { if !chain.is_synced().await { let headers = GetHeaderConfig { locators: chain.header_chain.locators(), @@ -462,7 +439,7 @@ impl Node { &self, nonce: PeerId, version_message: VersionMessage, - ) -> Result> { + ) -> Result> { if version_message.version < WTXID_VERSION { return Ok(MainThreadMessage::Disconnect); } @@ -769,14 +746,4 @@ impl Node { } } } - - // When the application starts, fetch any headers we know about from the database. - async fn fetch_headers(&self) -> Result<(), NodeError> { - crate::log!(self.dialog, "Attempting to load headers from the database"); - let mut chain = self.chain.lock().await; - chain - .load_headers() - .await - .map_err(NodeError::HeaderDatabase) - } } diff --git a/tests/core.rs b/tests/core.rs index 7e4024e4..e581dc9e 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -20,8 +20,7 @@ use corepc_node::serde_json; use corepc_node::{anyhow, exe_path}; use kyoto::{ chain::checkpoints::HeaderCheckpoint, client::Client, lookup_host, node::Node, Address, - BlockHash, Event, Info, ServiceFlags, SqliteHeaderDb, SqlitePeerDb, Transaction, TrustedPeer, - Warning, + BlockHash, Event, Info, ServiceFlags, SqlitePeerDb, Transaction, TrustedPeer, Warning, }; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::UnboundedReceiver; @@ -54,7 +53,7 @@ fn new_node( socket_addr: SocketAddrV4, tempdir_path: PathBuf, checkpoint: Option, -) -> (Node, Client) { +) -> (Node, Client) { let host = (IpAddr::V4(*socket_addr.ip()), Some(socket_addr.port())); let mut trusted: TrustedPeer = host.into(); trusted.set_services(ServiceFlags::P2P_V2); @@ -206,8 +205,6 @@ async fn live_reorg_additional_sync() { // Reorganize the blocks let old_best = best; let old_height = num_blocks(rpc); - let fetched_header = requester.get_header(10).await.unwrap(); - assert_eq!(old_best, fetched_header.block_hash()); invalidate_block(rpc, &best).await; mine_blocks(rpc, &miner, 2, 1).await; let best = best_hash(rpc); @@ -259,10 +256,7 @@ async fn various_client_methods() { } = client; tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await }); sync_assert(&best, &mut channel).await; - let batch = requester.get_header_range(10_000..10_002).await.unwrap(); - assert!(batch.is_empty()); let _ = requester.broadcast_min_feerate().await.unwrap(); - let _ = requester.get_header(3).await.unwrap(); let script = rpc.new_address().unwrap(); requester.add_script(script).unwrap(); assert!(requester.is_running()); @@ -293,8 +287,6 @@ async fn stop_reorg_resync() { } = client; tokio::task::spawn(async move { print_logs(log_rx, warn_rx).await }); sync_assert(&best, &mut channel).await; - let batch = requester.get_header_range(0..10).await.unwrap(); - assert!(!batch.is_empty()); requester.shutdown().unwrap(); // Reorganize the blocks let old_best = best;