diff --git a/src/io/utils.rs b/src/io/utils.rs index 4657688f5..46dd8639a 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -26,8 +26,8 @@ use lightning::routing::scoring::{ ChannelLiquidities, ProbabilisticScorer, ProbabilisticScoringDecayParameters, }; use lightning::util::persist::{ - migrate_kv_store_data_async, KVStore, KVSTORE_NAMESPACE_KEY_ALPHABET, - KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY, + migrate_kv_store_data_async, KVStore, PaginatedKVStore, PaginatedListResponse, + KVSTORE_NAMESPACE_KEY_ALPHABET, KVSTORE_NAMESPACE_KEY_MAX_LEN, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE, OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, @@ -222,7 +222,8 @@ where }) } -/// Read all objects of type `T` from the given namespace, spawning reads in parallel. +/// Read all objects of type `T` from the given namespace, listing keys page-by-page and spawning +/// reads in parallel. pub(crate) async fn read_all_objects( kv_store: &DynStore, primary_namespace: &str, secondary_namespace: &str, logger: L, ) -> Result, std::io::Error> @@ -234,55 +235,72 @@ where let type_name = std::any::type_name::(); let mut res = Vec::new(); - let mut stored_keys = KVStore::list(&*kv_store, primary_namespace, secondary_namespace).await?; - const BATCH_SIZE: usize = 50; let mut set = tokio::task::JoinSet::new(); - - // Fill JoinSet with tasks if possible - while set.len() < BATCH_SIZE && !stored_keys.is_empty() { - if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key); - set.spawn(fut); - debug_assert!(set.len() <= BATCH_SIZE); + let mut page_token = None; + + loop { + let PaginatedListResponse { keys, next_page_token } = PaginatedKVStore::list_paginated( + &*kv_store, + primary_namespace, + secondary_namespace, + page_token, + ) + .await?; + let mut stored_keys = keys; + + // Fill JoinSet with tasks if possible + while set.len() < BATCH_SIZE && !stored_keys.is_empty() { + if let Some(next_key) = stored_keys.pop() { + let fut = + KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } } - } - while let Some(read_res) = set.join_next().await { - // Exit early if we get an IO error. - let reader = read_res - .map_err(|e| { - log_error!(logger, "Failed to read {}: {}", type_name, e); - set.abort_all(); - e - })? - .map_err(|e| { - log_error!(logger, "Failed to read {}: {}", type_name, e); - set.abort_all(); - e - })?; + while let Some(read_res) = set.join_next().await { + // Exit early if we get an IO error. + let reader = read_res + .map_err(|e| { + log_error!(logger, "Failed to read {}: {}", type_name, e); + set.abort_all(); + e + })? + .map_err(|e| { + log_error!(logger, "Failed to read {}: {}", type_name, e); + set.abort_all(); + e + })?; + + // Refill set for every finished future, if we still have something to do. + if let Some(next_key) = stored_keys.pop() { + let fut = + KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } - // Refill set for every finished future, if we still have something to do. - if let Some(next_key) = stored_keys.pop() { - let fut = KVStore::read(kv_store, primary_namespace, secondary_namespace, &next_key); - set.spawn(fut); - debug_assert!(set.len() <= BATCH_SIZE); + // Handle result. + let object = T::read(&mut &*reader).map_err(|e| { + log_error!(logger, "Failed to deserialize {}: {}", type_name, e); + std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Failed to deserialize {}", type_name), + ) + })?; + res.push(object); } - // Handle result. - let object = T::read(&mut &*reader).map_err(|e| { - log_error!(logger, "Failed to deserialize {}: {}", type_name, e); - std::io::Error::new( - std::io::ErrorKind::InvalidData, - format!("Failed to deserialize {}", type_name), - ) - })?; - res.push(object); - } + debug_assert!(set.is_empty()); + debug_assert!(stored_keys.is_empty()); - debug_assert!(set.is_empty()); - debug_assert!(stored_keys.is_empty()); + page_token = next_page_token; + if page_token.is_none() { + break; + } + } Ok(res) } @@ -718,19 +736,56 @@ fn recover_incomplete_fs_store_migration(storage_dir_path: &Path) -> Result<(), mod tests { use std::fs; use std::path::{Path, PathBuf}; + use std::sync::Arc; use lightning::util::persist::{migrate_kv_store_data_async, KVStore}; + use lightning::util::ser::Writeable; + use lightning::util::test_utils::TestLogger; use lightning_persister::fs_store::v1::FilesystemStore; use lightning_persister::fs_store::v2::FilesystemStoreV2; use super::test_utils::random_storage_path; - use super::{open_or_migrate_fs_store, read_or_generate_seed_file}; + use super::{open_or_migrate_fs_store, read_all_objects, read_or_generate_seed_file}; + use crate::io::test_utils::InMemoryStore; + use crate::types::{DynStore, DynStoreWrapper}; const TEST_PRIMARY_NAMESPACE: &str = "test_primary_namespace"; const TEST_SECONDARY_NAMESPACE: &str = "test_secondary_namespace"; const TEST_KEY: &str = "test_key"; const TEST_VALUE: &[u8] = b"test_value"; + #[tokio::test] + async fn read_all_objects_reads_across_pages() { + let store: Arc = Arc::new(DynStoreWrapper(InMemoryStore::new())); + let logger = Arc::new(TestLogger::new()); + + // Write more objects than fit in a single page to exercise the pagination loop. + let num_objects = 120u64; + for i in 0..num_objects { + let key = format!("key_{:03}", i); + KVStore::write( + &*store, + TEST_PRIMARY_NAMESPACE, + TEST_SECONDARY_NAMESPACE, + &key, + i.encode(), + ) + .await + .unwrap(); + } + + let mut read: Vec = read_all_objects( + &*store, + TEST_PRIMARY_NAMESPACE, + TEST_SECONDARY_NAMESPACE, + Arc::clone(&logger), + ) + .await + .unwrap(); + read.sort_unstable(); + assert_eq!(read, (0..num_objects).collect::>()); + } + #[test] fn generated_seed_is_readable() { let mut rand_path = random_storage_path();