diff --git a/Cargo.lock b/Cargo.lock index 51c3fd3d..06a33cdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1251,9 +1251,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" dependencies = [ "serde", ] @@ -1967,7 +1967,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" dependencies = [ "data-encoding", - "syn 2.0.111", + "syn 1.0.109", ] [[package]] @@ -6790,9 +6790,9 @@ dependencies = [ [[package]] name = "rsa" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40a0376c50d0358279d9d643e4bf7b7be212f1f4ff1da9070a7b54d22ef75c88" +checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" dependencies = [ "const-oid", "digest 0.10.7", @@ -7931,12 +7931,12 @@ dependencies = [ "fuel-streams-types", "fuel-web-utils", "futures", - "itertools 0.14.0", "pretty_assertions", "rand 0.9.2", "serde", "serde_bytes", "serde_json", + "tempfile", "thiserror 2.0.17", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 0464b5f7..57495dd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ version = "0.0.34" anyhow = "1.0.97" apache-avro = { version = "0.17.0", features = ["derive"] } async-trait = "0.1.88" -axum = { version = "0.8.3", features = ["json", "ws", "query"] } chrono = { version = "0.4.40", features = ["serde"] } clap = { version = "4.5.35", features = ["derive", "env"] } derive_more = { version = "2.0.1", features = [ @@ -44,11 +43,9 @@ fuel-data-parser = { version = "0.0.34", path = "crates/data-parser" } fuel-streams-domains = { version = "0.0.34", path = "crates/domains" } fuel-streams-types = { version = "0.0.34", path = "crates/types" } fuel-web-utils = { version = "0.0.34", path = "crates/web-utils" } -futures = "0.3.31" hex = "0.4.3" pretty_assertions = "1.4.1" rand = "0.9.0" -rayon = "1.10.0" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" serde_urlencoded = "0.7.1" @@ -61,7 +58,6 @@ sqlx = { version = "0.8.3", default-features = false, features = [ ] } strum = { version = "0.27.1", features = ["derive"] } strum_macros = "0.27.1" -test-case = "3.3.1" thiserror = "2.0.12" tokio = { version = "1.44.1", features = [ "io-util", diff --git a/cluster/docker/sv-dune.Dockerfile b/cluster/docker/sv-dune.Dockerfile index e77ec3f4..14437c4a 100644 --- a/cluster/docker/sv-dune.Dockerfile +++ b/cluster/docker/sv-dune.Dockerfile @@ -56,7 +56,7 @@ FROM debian:bookworm-slim AS run WORKDIR /usr/src RUN apt-get update -y \ - && apt-get install -y --no-install-recommends ca-certificates curl \ + && apt-get install -y --no-install-recommends ca-certificates curl jq less unzip zip procps \ # Clean up && apt-get autoremove -y \ && apt-get clean -y \ @@ -65,4 +65,7 @@ RUN apt-get update -y \ COPY --from=builder /root/sv-dune . COPY --from=builder /root/sv-dune.d . +# run app as non-root user +USER nobody + CMD ["./sv-dune"] diff --git a/services/dune/Cargo.toml b/services/dune/Cargo.toml index d7c26a19..5e995b56 100644 --- a/services/dune/Cargo.toml +++ b/services/dune/Cargo.toml @@ -34,7 +34,6 @@ fuel-streams-domains = { workspace = true, features = ["test-helpers"] } fuel-streams-types = { workspace = true, features = ["test-helpers"] } fuel-web-utils = { workspace = true, features = ["test-helpers"] } futures = "0.3.31" -itertools = "0.14.0" rand.workspace = true serde.workspace = true serde_bytes = "0.11.17" @@ -46,6 +45,7 @@ url = "2.5.7" [dev-dependencies] pretty_assertions.workspace = true +tempfile = "3" tokio = { workspace = true, features = [ "rt-multi-thread", "macros", diff --git a/services/dune/src/alloc_counter.rs b/services/dune/src/alloc_counter.rs new file mode 100644 index 00000000..8bde82b7 --- /dev/null +++ b/services/dune/src/alloc_counter.rs @@ -0,0 +1,47 @@ +//! Diagnostic allocation counters for tracking object lifetimes. +//! +//! Each counter is incremented on object creation and decremented **only** via +//! `Drop` impls, so a counter that trends upward over time is proof of a leak — +//! the object was created but never deallocated. +//! +//! Call [`log_all`] periodically to emit all live counts. + +use std::sync::atomic::{AtomicI64, Ordering}; + +macro_rules! define_counters { + ($($name:ident),+ $(,)?) => { + $(pub static $name: AtomicI64 = AtomicI64::new(0);)+ + + /// Log all counters at INFO level, including tokio task count. + pub fn log_all() { + let tokio_tasks = tokio::runtime::Handle::current() + .metrics() + .num_alive_tasks(); + tracing::info!( + concat!("alloc_counters: ", $(stringify!($name), "={} ",)+ "TOKIO_TASKS={}"), + $($name.load(Ordering::Relaxed),)+ + tokio_tasks, + ); + } + }; +} + +define_counters! { + GRAPHQL_FETCHER, + BLOCK_STREAM, + AVRO_FILE_WRITERS, + AVRO_FILE_WRITER, + FINALIZED_BATCH_FILES, +} + +/// Increment a counter (call from constructors). +#[inline] +pub fn inc(counter: &AtomicI64) { + counter.fetch_add(1, Ordering::Relaxed); +} + +/// Decrement a counter (call **only** from `Drop` impls). +#[inline] +pub fn dec(counter: &AtomicI64) { + counter.fetch_sub(1, Ordering::Relaxed); +} diff --git a/services/dune/src/block_buffer.rs b/services/dune/src/block_buffer.rs new file mode 100644 index 00000000..fef7320b --- /dev/null +++ b/services/dune/src/block_buffer.rs @@ -0,0 +1,496 @@ +use std::{ + fs, + path::{ + Path, + PathBuf, + }, +}; + +use fuel_streams_types::BlockHeight; + +use crate::{ + DuneError, + DuneResult, + alloc_counter, + helpers::{ + AvroFileWriter, + AvroParser, + }, + schemas::{ + AvroBlock, + AvroReceipt, + AvroTransaction, + ReceiptMetadata, + }, +}; +use fuel_streams_domains::{ + blocks::Block, + transactions::Transaction, +}; + +/// The result of finalizing a batch to files, containing paths to Avro files. +/// Files are streamed directly to S3 without loading into memory. +pub struct FinalizedBatchFiles { + pub first_height: BlockHeight, + pub last_height: BlockHeight, + /// Path to the blocks Avro file + pub blocks_path: PathBuf, + /// Path to the transactions Avro file + pub transactions_path: PathBuf, + /// Path to the receipts Avro file + pub receipts_path: PathBuf, + /// Temporary directory containing the files (for cleanup) + temp_dir: PathBuf, +} + +impl FinalizedBatchFiles { + /// Cleans up all temporary files after upload + pub fn cleanup(&self) { + let _ = fs::remove_dir_all(&self.temp_dir); + } +} + +impl Drop for FinalizedBatchFiles { + fn drop(&mut self) { + self.cleanup(); + alloc_counter::dec(&alloc_counter::FINALIZED_BATCH_FILES); + } +} + +// ============================================================================ +// Avro file writers for disk-based buffering +// ============================================================================ + +/// Paths to finalized Avro files ready for upload. +/// Note: Does NOT implement Drop - ownership of temp_dir is transferred to FinalizedBatchFiles. +struct FinalizedAvroFiles { + temp_dir: PathBuf, + blocks_path: PathBuf, + transactions_path: PathBuf, + receipts_path: PathBuf, +} + +/// Manages Avro file writers for blocks, transactions, and receipts. +/// Writes directly to disk to avoid memory accumulation. +/// +/// Implements Drop to clean up temp directory on error. On success, +/// ownership is transferred via `finalize_to_paths()` and Drop becomes a no-op. +struct AvroFileWriters { + /// Temp directory path. Set to None after successful finalize_to_paths() + /// to transfer ownership and prevent cleanup on drop. + temp_dir: Option, + blocks_writer: Option>, + transactions_writer: Option>, + receipts_writer: Option>, +} + +impl Drop for AvroFileWriters { + fn drop(&mut self) { + // Clean up temp directory if we still own it (i.e., finalize_to_paths wasn't called) + if let Some(ref temp_dir) = self.temp_dir { + let _ = fs::remove_dir_all(temp_dir); + } + alloc_counter::dec(&alloc_counter::AVRO_FILE_WRITERS); + } +} + +impl AvroFileWriters { + /// Creates new Avro file writers in a temporary directory + fn new() -> DuneResult { + let temp_dir = std::env::temp_dir().join(format!( + "dune-avro-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + Self::with_dir(&temp_dir) + } + + /// Creates new Avro file writers in the specified directory. + /// Cleans up the directory if any writer creation fails. + fn with_dir(dir: impl AsRef) -> DuneResult { + let temp_dir = dir.as_ref().to_path_buf(); + fs::create_dir_all(&temp_dir)?; + + // Use inner function to enable cleanup on any failure after dir creation + let result = Self::create_writers(&temp_dir); + + if result.is_err() { + // Clean up the directory we created + let _ = fs::remove_dir_all(&temp_dir); + } + + result + } + + /// Helper to create all writers. Called by with_dir(). + fn create_writers(temp_dir: &Path) -> DuneResult { + let parser = AvroParser::default(); + + let blocks_writer = parser + .file_writer_with_schema(temp_dir.join("blocks.avro")) + .map_err(|e| { + DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)) + })?; + let transactions_writer = parser + .file_writer_with_schema(temp_dir.join("transactions.avro")) + .map_err(|e| { + DuneError::Other(anyhow::anyhow!( + "Failed to create transactions writer: {}", + e + )) + })?; + let receipts_writer = parser + .file_writer_with_schema(temp_dir.join("receipts.avro")) + .map_err(|e| { + DuneError::Other(anyhow::anyhow!( + "Failed to create receipts writer: {}", + e + )) + })?; + + alloc_counter::inc(&alloc_counter::AVRO_FILE_WRITERS); + Ok(Self { + temp_dir: Some(temp_dir.to_path_buf()), + blocks_writer: Some(blocks_writer), + transactions_writer: Some(transactions_writer), + receipts_writer: Some(receipts_writer), + }) + } + + /// Appends block data directly to the Avro writers. + /// + /// After writing all data for a block, the writers are flushed to disk + /// to prevent memory accumulation. Without flushing, the Avro Writer + /// buffers all data in memory until finalize_to_paths() is called. + fn append(&mut self, block: &Block, transactions: &[Transaction]) -> DuneResult<()> { + let blocks_writer = self.blocks_writer.as_mut().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("blocks_writer not available")) + })?; + let transactions_writer = self.transactions_writer.as_mut().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("transactions_writer not available")) + })?; + let receipts_writer = self.receipts_writer.as_mut().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("receipts_writer not available")) + })?; + + // Convert and write block + let avro_block = AvroBlock::new(block); + blocks_writer.append(&avro_block)?; + + // Convert and write transactions + for tx in transactions { + let avro_tx = AvroTransaction::new( + tx, + Some(block.height.into()), + Some(block.header.get_timestamp_utc().timestamp()), + Some(block.id.as_ref().to_vec().into()), + Some(block.version.to_string()), + Some(block.producer.as_ref().to_vec().into()), + ); + transactions_writer.append(&avro_tx)?; + } + + // Convert and write receipts + for tx in transactions { + let receipt_metadata = ReceiptMetadata { + block_time: Some(block.header.get_timestamp_utc().timestamp()), + block_height: Some(block.height.0 as i64), + block_version: Some(block.version.to_string()), + block_producer: Some(block.producer.clone().into()), + transaction_id: Some(tx.id.clone().into()), + }; + for receipt in &tx.receipts { + let avro_receipt = AvroReceipt::new(receipt, &receipt_metadata); + receipts_writer.append(&avro_receipt)?; + } + } + + // Flush all writers to disk after each block to prevent memory accumulation. + // The Avro Writer buffers data internally for performance, but without + // periodic flushing this buffer grows unboundedly until finalize. + blocks_writer.flush()?; + transactions_writer.flush()?; + receipts_writer.flush()?; + + Ok(()) + } + + /// Finalizes all writers and returns paths to the Avro files. + /// Does NOT load files into memory - use this for large batches. + /// + /// On success, ownership of temp_dir is transferred to FinalizedAvroFiles, + /// and this struct's Drop will not clean up the directory. + /// On error, Drop will clean up the temp directory. + fn finalize_to_paths(&mut self) -> DuneResult { + let blocks_writer = self.blocks_writer.take().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("blocks_writer already taken")) + })?; + let transactions_writer = self.transactions_writer.take().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("transactions_writer already taken")) + })?; + let receipts_writer = self.receipts_writer.take().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("receipts_writer already taken")) + })?; + + let blocks_path = blocks_writer.finalize_path()?; + let transactions_path = transactions_writer.finalize_path()?; + let receipts_path = receipts_writer.finalize_path()?; + + // Take ownership of temp_dir so Drop won't clean it up + let temp_dir = self + .temp_dir + .take() + .ok_or_else(|| DuneError::Other(anyhow::anyhow!("temp_dir already taken")))?; + + Ok(FinalizedAvroFiles { + temp_dir, + blocks_path, + transactions_path, + receipts_path, + }) + } +} + +// ============================================================================ +// Disk-based buffer implementation +// ============================================================================ + +/// Disk-based block buffer that writes blocks directly to Avro files. +/// Uses minimal memory by streaming data directly to disk. +pub struct DiskBuffer { + writers: Option, + first_height: Option, + last_height: Option, + block_count: usize, +} + +impl DiskBuffer { + pub fn new() -> DuneResult { + let writers = AvroFileWriters::new()?; + Ok(Self { + writers: Some(writers), + first_height: None, + last_height: None, + block_count: 0, + }) + } + + #[cfg(test)] + pub fn with_dir(dir: impl AsRef) -> DuneResult { + let writers = AvroFileWriters::with_dir(dir)?; + Ok(Self { + writers: Some(writers), + first_height: None, + last_height: None, + block_count: 0, + }) + } + + /// Returns the number of blocks in the buffer + pub fn len(&self) -> usize { + self.block_count + } + + /// Returns true if the buffer is empty + pub fn is_empty(&self) -> bool { + self.block_count == 0 + } + + /// Returns the first block height in the buffer + pub fn first_height(&self) -> Option { + self.first_height + } + + /// Returns the last block height in the buffer + pub fn last_height(&self) -> Option { + self.last_height + } + + /// Appends a block and its transactions to the buffer. + /// Data is written directly to Avro files on disk. + pub fn append( + &mut self, + block: &Block, + transactions: &[Transaction], + ) -> DuneResult<()> { + let height = block.height; + + if self.first_height.is_none() { + self.first_height = Some(height); + } + self.last_height = Some(height); + + let writers = self + .writers + .as_mut() + .ok_or_else(|| DuneError::Other(anyhow::anyhow!("Writers not available")))?; + + writers.append(block, transactions)?; + self.block_count += 1; + + Ok(()) + } + + /// Finalizes the buffer, returning paths to the Avro files for upload. + /// + /// WARNING: This method consumes the internal writers and transfers ownership + /// of the temp directory to `FinalizedBatchFiles`. Once called: + /// - The writers are no longer available (subsequent calls will error) + /// - When `FinalizedBatchFiles` is dropped, the temp files are deleted + /// + /// This means retry is NOT possible after calling finalize(). If upload fails, + /// call `reset()` to create new writers and re-buffer the data from scratch, + /// or let the service reconnect the stream to resume from the last saved height. + pub fn finalize(&mut self) -> DuneResult { + let first_height = self.first_height.ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) + })?; + let last_height = self.last_height.ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) + })?; + + let writers = self + .writers + .as_mut() + .ok_or_else(|| DuneError::Other(anyhow::anyhow!("Writers not available")))?; + + let avro_files = writers.finalize_to_paths()?; + + alloc_counter::inc(&alloc_counter::FINALIZED_BATCH_FILES); + Ok(FinalizedBatchFiles { + first_height, + last_height, + blocks_path: avro_files.blocks_path, + transactions_path: avro_files.transactions_path, + receipts_path: avro_files.receipts_path, + temp_dir: avro_files.temp_dir, + }) + } + + /// Resets the buffer for reuse, clearing all data. + /// Call this after successful upload to prepare for the next batch. + pub fn reset(&mut self) -> DuneResult<()> { + // Drop old writers (this cleans up the temp directory) + let _ = self.writers.take(); + + // Create new writers + self.writers = Some(AvroFileWriters::new()?); + + self.first_height = None; + self.last_height = None; + self.block_count = 0; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use fuel_streams_domains::mocks::{ + MockBlock, + MockReceipt, + MockTransaction, + }; + use pretty_assertions::assert_eq; + use tempfile::tempdir; + + #[test] + fn test_disk_buffer_basic_operations() -> DuneResult<()> { + let dir = tempdir().unwrap(); + let mut buffer = DiskBuffer::with_dir(dir.path())?; + + assert!(buffer.is_empty()); + assert_eq!(buffer.len(), 0); + + // Add some blocks + for i in 1..=10 { + let mut block = MockBlock::random(); + block.height = BlockHeight::from(i); + let txs = vec![MockTransaction::script(vec![], vec![], vec![])]; + buffer.append(&block, &txs)?; + } + + assert_eq!(buffer.len(), 10); + assert_eq!(buffer.first_height(), Some(BlockHeight::from(1))); + assert_eq!(buffer.last_height(), Some(BlockHeight::from(10))); + + // Finalize and verify + let finalized = buffer.finalize()?; + assert_eq!(*finalized.first_height, 1); + assert_eq!(*finalized.last_height, 10); + + // Verify files exist + assert!(finalized.blocks_path.exists()); + assert!(finalized.transactions_path.exists()); + assert!(finalized.receipts_path.exists()); + + Ok(()) + } + + #[test] + fn test_disk_buffer_reset() -> DuneResult<()> { + let dir = tempdir().unwrap(); + let mut buffer = DiskBuffer::with_dir(dir.path())?; + + // Add blocks + for i in 1..=5 { + let mut block = MockBlock::random(); + block.height = BlockHeight::from(i); + let txs = vec![MockTransaction::script(vec![], vec![], vec![])]; + buffer.append(&block, &txs)?; + } + + assert_eq!(buffer.len(), 5); + + // Reset + buffer.reset()?; + + assert!(buffer.is_empty()); + assert_eq!(buffer.first_height(), None); + assert_eq!(buffer.last_height(), None); + + // Can add more blocks after reset + let mut block = MockBlock::random(); + block.height = BlockHeight::from(100); + let txs = vec![MockTransaction::script(vec![], vec![], vec![])]; + buffer.append(&block, &txs)?; + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer.first_height(), Some(BlockHeight::from(100))); + + Ok(()) + } + + #[test] + fn test_disk_buffer_with_receipts() -> DuneResult<()> { + let dir = tempdir().unwrap(); + let mut buffer = DiskBuffer::with_dir(dir.path())?; + + for i in 1..=3 { + let mut block = MockBlock::random(); + block.height = BlockHeight::from(i); + let txs = vec![MockTransaction::script(vec![], vec![], MockReceipt::all())]; + buffer.append(&block, &txs)?; + } + + let finalized = buffer.finalize()?; + + // Verify receipts file exists and has content + assert!(finalized.receipts_path.exists()); + let receipts_size = std::fs::metadata(&finalized.receipts_path)?.len(); + assert!(receipts_size > 0, "Receipts file should not be empty"); + + Ok(()) + } + + #[test] + fn test_disk_buffer_new() -> DuneResult<()> { + let buffer = DiskBuffer::new()?; + assert!(buffer.is_empty()); + Ok(()) + } +} diff --git a/services/dune/src/helpers/avro.rs b/services/dune/src/helpers/avro.rs index 42060f7f..a4511d83 100644 --- a/services/dune/src/helpers/avro.rs +++ b/services/dune/src/helpers/avro.rs @@ -1,21 +1,55 @@ -use std::sync::Arc; +use std::{ + any::TypeId, + collections::HashMap, + fs::File, + io::{BufWriter, Write}, + path::{Path, PathBuf}, + sync::RwLock, +}; + +use crate::alloc_counter; use apache_avro::{ - AvroSchema, - Codec, - Reader, - Schema, - Writer, - from_value, - schema::{ - Namespace, - derive::AvroSchemaComponent, - }, -}; -use serde::{ - Serialize, - de::DeserializeOwned, + AvroSchema, Codec, Reader, Schema, Writer, from_value, + schema::{Namespace, derive::AvroSchemaComponent}, }; +use serde::{Serialize, de::DeserializeOwned}; + +/// Global cache for Avro schemas, keyed by TypeId. +/// Schemas are immutable and type-specific, so we cache them to avoid +/// repeated allocations and memory leaks from Box::leak. +static SCHEMA_CACHE: RwLock>> = RwLock::new(None); + +/// Gets or creates a cached static schema for type T. +/// The schema is created once per type and cached globally. +fn get_cached_schema() -> &'static Schema { + let type_id = TypeId::of::(); + + // Fast path: try to read from cache + { + let cache = SCHEMA_CACHE.read().unwrap(); + if let Some(ref map) = *cache + && let Some(schema) = map.get(&type_id) + { + return schema; + } + } + + // Slow path: create schema and cache it + let mut cache = SCHEMA_CACHE.write().unwrap(); + let map = cache.get_or_insert_with(HashMap::new); + + // Double-check after acquiring write lock + if let Some(schema) = map.get(&type_id) { + return schema; + } + + // Create and cache the schema + let schema = T::get_schema_in_ctxt(&mut Default::default(), &Namespace::default()); + let schema_static: &'static Schema = Box::leak(Box::new(schema)); + map.insert(type_id, schema_static); + schema_static +} /// Data parser error types. #[derive(Debug, thiserror::Error)] @@ -24,6 +58,8 @@ pub enum AvroParserError { Avro(Box), #[error("Schema not found {0}")] SchemaNotFound(String), + #[error("IO error: {0}")] + Io(String), } impl From for AvroParserError { @@ -33,7 +69,7 @@ impl From for AvroParserError { } pub struct AvroWriter { - writer: Writer<'static, Vec>, // We'll adjust this + writer: Writer<'static, Vec>, _phantom: std::marker::PhantomData, } @@ -41,10 +77,10 @@ impl AvroWriter where T: AvroSchema + AvroSchemaComponent + Serialize + Send + Sync + 'static, { - pub fn new(schema: Schema, codec: Codec) -> Self { - let schema_static: &'static Schema = Box::leak(Box::new(schema)); + pub fn new(codec: Codec) -> Self { + let schema = get_cached_schema::(); let writer = Writer::builder() - .schema(schema_static) + .schema(schema) .codec(codec) .writer(Vec::new()) .build(); @@ -64,6 +100,91 @@ where } } +/// An Avro writer that writes directly to a file on disk. +/// This reduces memory usage by not accumulating data in memory. +pub struct AvroFileWriter { + writer: Option>>, + file_path: PathBuf, + _phantom: std::marker::PhantomData, +} + +impl AvroFileWriter +where + T: AvroSchema + AvroSchemaComponent + Serialize + Send + Sync + 'static, +{ + /// Creates a new file-based Avro writer at the specified path + pub fn new(path: impl AsRef, codec: Codec) -> Result { + let file_path = path.as_ref().to_path_buf(); + let file = File::create(&file_path) + .map_err(|e| AvroParserError::Io(format!("Failed to create file: {}", e)))?; + let buf_writer = BufWriter::new(file); + + let schema = get_cached_schema::(); + let writer = Writer::builder() + .schema(schema) + .codec(codec) + .writer(buf_writer) + .build(); + + alloc_counter::inc(&alloc_counter::AVRO_FILE_WRITER); + Ok(Self { + writer: Some(writer), + file_path, + _phantom: std::marker::PhantomData, + }) + } + + /// Appends a value to the file. + /// + /// Note: Data is buffered internally by the Avro Writer. Call `flush()` + /// periodically to write buffered data to disk and prevent memory accumulation. + pub fn append(&mut self, value: &T) -> Result<(), AvroParserError> { + self.writer + .as_mut() + .ok_or_else(|| AvroParserError::Io("Writer already finalized".into()))? + .append_ser(value)?; + Ok(()) + } + + /// Flushes buffered data to disk. + /// + /// The Avro Writer buffers data internally for performance. Without + /// periodic flushing, all data accumulates in memory until finalize_path(). + /// Call this after processing each block to bound memory usage. + pub fn flush(&mut self) -> Result<(), AvroParserError> { + self.writer + .as_mut() + .ok_or_else(|| AvroParserError::Io("Writer already finalized".into()))? + .flush()?; + Ok(()) + } + + /// Finalizes the file and returns just the path. + /// The file is flushed and closed, ready for streaming to its destination. + /// The inner Writer is taken via `.take()`, consumed by `into_inner()`, + /// and deallocated. `self` then drops with `writer: None`, firing `Drop` + /// which decrements the counter. + pub fn finalize_path(mut self) -> Result { + let writer = self + .writer + .take() + .ok_or_else(|| AvroParserError::Io("Writer already finalized".into()))?; + let mut inner = writer.into_inner().map_err(|e| { + AvroParserError::Io(format!("Failed to finalize writer: {}", e)) + })?; + inner.flush().map_err(|e| { + AvroParserError::Io(format!("Failed to flush final data: {}", e)) + })?; + Ok(self.file_path.clone()) + } +} + +impl Drop for AvroFileWriter { + fn drop(&mut self) { + alloc_counter::dec(&alloc_counter::AVRO_FILE_WRITER); + } +} + #[derive(Clone)] pub struct AvroParser { codec: Option, @@ -78,26 +199,23 @@ impl Default for AvroParser { } impl AvroParser { - pub fn arc(self) -> Arc { - Arc::new(self) - } - - pub fn with_codec(&mut self, codec: Codec) -> &mut Self { - self.codec = Some(codec); - self - } - pub fn writer_with_schema< T: AvroSchema + AvroSchemaComponent + Serialize + Send + Sync + 'static, >( &self, ) -> Result, AvroParserError> { - let schema = - T::get_schema_in_ctxt(&mut Default::default(), &Namespace::default()); - Ok(AvroWriter::new( - schema, - self.codec.unwrap_or(Codec::Deflate), - )) + Ok(AvroWriter::new(self.codec.unwrap_or(Codec::Deflate))) + } + + /// Creates a file-based Avro writer that writes directly to disk. + /// Use this to avoid accumulating large amounts of data in memory. + pub fn file_writer_with_schema< + T: AvroSchema + AvroSchemaComponent + Serialize + Send + Sync + 'static, + >( + &self, + path: impl AsRef, + ) -> Result, AvroParserError> { + AvroFileWriter::new(path, self.codec.unwrap_or(Codec::Deflate)) } pub fn reader_with_schema< diff --git a/services/dune/src/lib.rs b/services/dune/src/lib.rs index 0378aac3..eca0da2d 100644 --- a/services/dune/src/lib.rs +++ b/services/dune/src/lib.rs @@ -4,6 +4,8 @@ // Used in the `main.rs` use fuel_web_utils as _; +pub mod alloc_counter; +pub mod block_buffer; mod cli; mod error; pub mod helpers; @@ -11,6 +13,8 @@ pub mod processor; pub mod s3; pub mod schemas; pub mod service; +pub mod tracked; +pub use block_buffer::*; pub use cli::*; pub use error::*; diff --git a/services/dune/src/processor.rs b/services/dune/src/processor.rs index 9358ef26..8bb9d9af 100644 --- a/services/dune/src/processor.rs +++ b/services/dune/src/processor.rs @@ -205,6 +205,53 @@ impl Processor { Ok(file_paths) } + /// Process pre-serialized data directly (from disk buffer) + /// This method takes data that has already been serialized to Avro format + pub async fn process_data( + &self, + start_height: BlockHeight, + end_height: BlockHeight, + data: Vec, + table: S3TableName, + ) -> DuneResult { + let network = FuelNetwork::load_from_env(); + let key_builder = S3KeyBuilder::new(network).with_table(table); + let key = key_builder.build_key_from_heights(start_height, end_height); + let file_path = self.create_output(data, &key).await?; + tracing::info!("New file saved: {}", file_path); + Ok(file_path) + } + + /// Process data from a file path, streaming directly to S3. + /// This avoids loading the entire file into memory - ideal for large batches. + pub async fn process_data_from_file( + &self, + start_height: BlockHeight, + end_height: BlockHeight, + file_path: impl AsRef, + table: S3TableName, + ) -> DuneResult { + let network = FuelNetwork::load_from_env(); + let key_builder = S3KeyBuilder::new(network).with_table(table); + let key = key_builder.build_key_from_heights(start_height, end_height); + + match &self.storage_type { + StorageType::File => { + // For file storage, read and write locally + let data = std::fs::read(file_path.as_ref())?; + let output_path = self.create_output(data, &key).await?; + tracing::info!("New file saved: {}", output_path); + Ok(output_path) + } + StorageType::S3(s3_storage) => { + // Stream directly from file to S3 + s3_storage.store_from_file(&key, file_path).await?; + tracing::info!("New file uploaded to S3: {}", key); + Ok(key) + } + } + } + fn spli_batches< T: serde::Serialize + serde::de::DeserializeOwned diff --git a/services/dune/src/s3/client.rs b/services/dune/src/s3/client.rs index 7d6231f2..5b967d4c 100644 --- a/services/dune/src/s3/client.rs +++ b/services/dune/src/s3/client.rs @@ -19,7 +19,9 @@ use aws_sdk_s3::{ config::retry::RetryConfig as S3RetryConfig, error::SdkError, operation::get_object::GetObjectError, + primitives::ByteStream, }; +use std::path::Path; #[derive(Debug, Clone)] pub struct S3Storage { @@ -239,7 +241,7 @@ impl S3Storage { for (i, chunk) in chunks.enumerate() { let part_number = (i + 1) as i32; - match self + let response = match self .client .upload_part() .bucket(self.config.bucket()) @@ -250,38 +252,49 @@ impl S3Storage { .send() .await { - Ok(response) => { - if let Some(e_tag) = response.e_tag() { - completed_parts.push( - aws_sdk_s3::types::CompletedPart::builder() - .e_tag(e_tag) - .part_number(part_number) - .build(), - ); - } - } + Ok(response) => response, Err(err) => { // Abort the multipart upload if a part fails - self.client + let _ = self + .client .abort_multipart_upload() .bucket(self.config.bucket()) .key(key) .upload_id(upload_id) .send() - .await - .map_err(|e| { - StorageError::StoreError(format!( - "Failed to abort multipart upload: {:?}", - e.as_service_error() - )) - })?; + .await; return Err(StorageError::StoreError(format!( "Failed to upload part: {:?}", err.as_service_error() ))); } - } + }; + + // ETag is required to complete multipart upload. If missing, abort to prevent + // silent data corruption from incomplete uploads. + let Some(e_tag) = response.e_tag() else { + let _ = self + .client + .abort_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .send() + .await; + + return Err(StorageError::StoreError(format!( + "Upload part {} succeeded but returned no ETag", + part_number + ))); + }; + + completed_parts.push( + aws_sdk_s3::types::CompletedPart::builder() + .e_tag(e_tag) + .part_number(part_number) + .build(), + ); tracing::debug!( "Uploaded part {}/{} for key={}", @@ -409,6 +422,235 @@ impl S3Storage { self.retry_config = config; self } + + /// Stores a file by streaming directly from disk to S3. + /// This avoids loading the entire file into memory, making it suitable + /// for large files that would otherwise cause OOM errors. + pub async fn store_from_file( + &self, + key: &str, + file_path: impl AsRef, + ) -> Result<(), StorageError> { + let file_path = file_path.as_ref().to_path_buf(); + let file_size = std::fs::metadata(&file_path) + .map_err(|e| { + StorageError::StoreError(format!("Failed to get file metadata: {}", e)) + })? + .len() as usize; + + with_retry(&self.retry_config, "store_from_file", || { + let file_path = file_path.clone(); + async move { + #[allow(clippy::identity_op)] + const LARGE_FILE_THRESHOLD: usize = 100 * 1024 * 1024; // 100MB + + let result = if file_size >= LARGE_FILE_THRESHOLD { + tracing::debug!( + "Uploading file {} to S3 using multipart streaming (size: {} bytes)", + file_path.display(), + file_size + ); + self.upload_multipart_from_file(key, &file_path, file_size) + .await + } else { + tracing::debug!( + "Uploading file {} to S3 using streaming put_object (size: {} bytes)", + file_path.display(), + file_size + ); + self.put_object_from_file(key, &file_path).await + }; + if let Err(ref e) = result { + tracing::error!("Storage error: {:?}", e); + } + result + } + }) + .await + } + + async fn put_object_from_file( + &self, + key: &str, + file_path: &Path, + ) -> Result<(), StorageError> { + let body = ByteStream::from_path(file_path).await.map_err(|e| { + StorageError::StoreError(format!("Failed to create byte stream: {}", e)) + })?; + + let result = self + .client + .put_object() + .bucket(self.config.bucket()) + .key(key) + .body(body) + .send() + .await; + + match result { + Ok(_) => { + tracing::info!("Successfully stored object with key: {}", key); + Ok(()) + } + Err(err) => { + tracing::error!( + "Failed to store object. Error details: {:?}", + err.as_service_error() + ); + Err(StorageError::StoreError(err.to_string())) + } + } + } + + async fn upload_multipart_from_file( + &self, + key: &str, + file_path: &Path, + file_size: usize, + ) -> Result<(), StorageError> { + use std::io::{ + Read, + Seek, + SeekFrom, + }; + + const CHUNK_SIZE: usize = 100 * 1024 * 1024; // 100MB chunks + + // Create multipart upload + let create_multipart = self + .client + .create_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .send() + .await + .map_err(|e| { + StorageError::StoreError(format!( + "Failed to create multipart upload: {:?}", + e.as_service_error() + )) + })?; + + let upload_id = create_multipart.upload_id().ok_or_else(|| { + StorageError::StoreError("Failed to get upload ID".to_string()) + })?; + + let mut completed_parts = Vec::new(); + let total_chunks = file_size.div_ceil(CHUNK_SIZE); + + let mut file = std::fs::File::open(file_path).map_err(|e| { + StorageError::StoreError(format!("Failed to open file: {}", e)) + })?; + + let mut buffer = vec![0u8; CHUNK_SIZE]; + let mut part_number = 1i32; + let mut bytes_uploaded = 0usize; + + loop { + // Seek to the current position + file.seek(SeekFrom::Start(bytes_uploaded as u64)) + .map_err(|e| { + StorageError::StoreError(format!("Failed to seek file: {}", e)) + })?; + + // Read a chunk + let bytes_read = file.read(&mut buffer).map_err(|e| { + StorageError::StoreError(format!("Failed to read file: {}", e)) + })?; + + if bytes_read == 0 { + break; + } + + // Upload the chunk + let response = match self + .client + .upload_part() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .body(buffer[..bytes_read].to_vec().into()) + .part_number(part_number) + .send() + .await + { + Ok(response) => response, + Err(err) => { + // Abort the multipart upload if a part fails + let _ = self + .client + .abort_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .send() + .await; + + return Err(StorageError::StoreError(format!( + "Failed to upload part: {:?}", + err.as_service_error() + ))); + } + }; + + // ETag is required to complete multipart upload. If missing, abort to prevent + // silent data corruption from incomplete uploads. + let Some(e_tag) = response.e_tag() else { + let _ = self + .client + .abort_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .send() + .await; + + return Err(StorageError::StoreError(format!( + "Upload part {} succeeded but returned no ETag", + part_number + ))); + }; + + completed_parts.push( + aws_sdk_s3::types::CompletedPart::builder() + .e_tag(e_tag) + .part_number(part_number) + .build(), + ); + + tracing::debug!( + "Uploaded part {}/{} for key={}", + part_number, + total_chunks, + key + ); + + bytes_uploaded += bytes_read; + part_number += 1; + } + + // Complete multipart upload + self.client + .complete_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .multipart_upload( + aws_sdk_s3::types::CompletedMultipartUpload::builder() + .set_parts(Some(completed_parts)) + .build(), + ) + .send() + .await + .map_err(|e| { + StorageError::StoreError(format!( + "Failed to complete multipart upload: {:?}", + e.as_service_error() + )) + })?; + + Ok(()) + } } #[cfg(test)] diff --git a/services/dune/src/service.rs b/services/dune/src/service.rs index 2b746063..9c92cecb 100644 --- a/services/dune/src/service.rs +++ b/services/dune/src/service.rs @@ -1,10 +1,19 @@ use crate::{ DuneError, + alloc_counter, + block_buffer::{ + DiskBuffer, + FinalizedBatchFiles, + }, processor::{ Processor, StorageTypeConfig, }, s3::S3TableName, + tracked::{ + TrackedFetcher, + TrackedStream, + }, }; use fuel_core_client::client::FuelClient; use fuel_core_services::{ @@ -13,10 +22,7 @@ use fuel_core_services::{ ServiceRunner, StateWatcher, TaskNextAction, - stream::{ - BoxStream, - IntoBoxStream, - }, + stream::IntoBoxStream, }; use fuel_core_types::{ fuel_tx::AssetId, @@ -36,7 +42,6 @@ use fuel_streams_domains::{ transactions::Transaction, }; use futures::StreamExt; -use itertools::Itertools; use std::{ cmp::Ordering, num::NonZeroUsize, @@ -56,18 +61,24 @@ pub struct Config { pub struct UninitializedTask { config: Config, - fetcher: GraphqlFetcher, + fetcher_factory: Arc GraphqlFetcher + Send + Sync>, shared: SharedState, } pub struct Task { height: watch::Sender, - fetcher: GraphqlFetcher, - blocks_stream: BoxStream>, - pending_events: Vec, + /// Factory function to create new GraphqlFetcher instances on reconnection. + /// We recreate the fetcher on each reconnection to avoid memory leaks from + /// accumulated background tasks and channels in the external library. + fetcher_factory: Arc GraphqlFetcher + Send + Sync>, + blocks_stream: TrackedStream, + /// Disk-based block buffer that writes directly to Avro files + buffer: DiskBuffer, processor: Processor, base_asset_id: AssetId, batch_size: usize, + /// Counter for periodic alloc_counter logging + run_iterations: u64, } #[derive(Clone)] @@ -115,7 +126,7 @@ impl RunnableService for UninitializedTask { ) -> anyhow::Result { let Self { config, - fetcher, + fetcher_factory, shared, } = self; @@ -134,14 +145,19 @@ impl RunnableService for UninitializedTask { .unwrap_or(config.starting_height); shared.block_height.send_replace(current_height); + // Create disk buffer for block accumulation + let buffer = DiskBuffer::new()?; + tracing::info!("Using disk buffer for block accumulation"); + let mut task = Task { - blocks_stream: futures::stream::pending().into_boxed(), + blocks_stream: TrackedStream::new(futures::stream::pending().into_boxed()), height: shared.block_height, - fetcher, - pending_events: vec![], + fetcher_factory, + buffer, processor, base_asset_id, batch_size: config.batch_size, + run_iterations: 0, }; task.connect_block_stream().await?; @@ -152,15 +168,21 @@ impl RunnableService for UninitializedTask { impl RunnableTask for Task { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { - match self.pending_events.len().cmp(&self.batch_size) { + // Log allocation counters periodically (every 100 iterations) + self.run_iterations += 1; + if self.run_iterations % 100 == 0 { + alloc_counter::log_all(); + } + + match self.buffer.len().cmp(&self.batch_size) { Ordering::Less => {} Ordering::Equal => { return TaskNextAction::always_continue(self.post_blocks().await) } Ordering::Greater => { tracing::error!( - "Pending events exceeded batch size: {} > {}", - self.pending_events.len(), + "Batch size exceeded: {} > {}", + self.buffer.len(), self.batch_size ); return TaskNextAction::Stop; @@ -177,8 +199,10 @@ impl RunnableTask for Task { block = self.blocks_stream.next() => { match block { Some(Ok(event)) => { - let current_height = if let Some(block) = self.pending_events.last() { - *block.header.height() + // Get the current height, converting from fuel_streams_types::BlockHeight + // to fuel_core_types::fuel_types::BlockHeight if needed + let current_height: BlockHeight = if let Some(last_height) = self.buffer.last_height() { + (*last_height).into() } else { *self.height.borrow() }; @@ -203,9 +227,14 @@ impl RunnableTask for Task { } } - self.pending_events.push(event); - - TaskNextAction::Continue + // Convert event to block and transactions, then buffer + match self.append_event_to_buffer(&event) { + Ok(_) => TaskNextAction::Continue, + Err(e) => { + tracing::error!("Failed to buffer block: {e}"); + TaskNextAction::Stop + } + } } Some(Err(e)) => { tracing::error!("Error receiving block event: {e}; reconnecting stream"); @@ -239,14 +268,27 @@ impl RunnableTask for Task { impl Task { async fn connect_block_stream(&mut self) -> anyhow::Result<()> { - self.pending_events.clear(); + // Reset the buffer when reconnecting + self.buffer.reset()?; let height = *self.height.borrow(); let next_height = height.succ().ok_or_else(|| { anyhow::anyhow!("Block height overflowed when connecting block stream") })?; - let stream = self - .fetcher + + // Create a fresh GraphqlFetcher for each connection. + // This is critical to avoid memory leaks: the external library spawns + // background tasks and creates large-capacity channels on each + // blocks_stream_starting_from() call. By creating a new fetcher, + // we ensure the old tasks/channels are properly abandoned. + // + // Wrapped in TrackedFetcher to observe via alloc counters whether + // Drop actually fires at the end of this scope. If the counter + // doesn't decrement, something is holding the fetcher alive. + let fetcher = TrackedFetcher::new((self.fetcher_factory)()); + tracing::debug!("Created new GraphqlFetcher for stream connection"); + + let stream = fetcher .blocks_stream_starting_from(next_height) .await? .map(|result| { @@ -258,52 +300,72 @@ impl Task { }) }) .into_boxed(); - self.blocks_stream = stream; + + self.blocks_stream = TrackedStream::new(stream); + // `fetcher` drops here — TrackedFetcher::drop decrements the counter. + // If GRAPHQL_FETCHER trends upward, something inside the stream + // is keeping the fetcher (or its spawned tasks) alive. Ok(()) } - async fn post_blocks(&mut self) -> anyhow::Result<()> { - if self.pending_events.is_empty() { - return Ok(()) - } - - let last_height = *self - .pending_events - .last() - .expect("We have pending events; qed") - .header - .height(); + /// Converts a block event to domain types and adds to the buffer + fn append_event_to_buffer(&mut self, event: &BlockEvent) -> anyhow::Result<()> { + let block = Block::new( + &event.header, + event.consensus.clone(), + event.transactions.len(), + )?; - let blocks_and_txs: Vec<_> = self - .pending_events + let transactions: Vec<_> = event + .transactions .iter() - .map(|event| { - let block = Block::new( - &event.header, - event.consensus.clone(), - event.transactions.len(), - )?; - let transaction = event - .transactions - .iter() - .zip(event.statuses.iter()) - .map(|(tx, status)| { - Transaction::new( - &status.id.into(), - tx.as_ref(), - &status.into(), - &self.base_asset_id, - status.result.receipts(), - ) - }) - .collect(); - - Ok::<_, anyhow::Error>((block, transaction)) + .zip(event.statuses.iter()) + .map(|(tx, status)| { + Transaction::new( + &status.id.into(), + tx.as_ref(), + &status.into(), + &self.base_asset_id, + status.result.receipts(), + ) }) - .try_collect()?; + .collect(); + + // Add to disk buffer (writes directly to Avro files) + self.buffer.append(&block, &transactions)?; - process_batch(&self.processor, &blocks_and_txs) + Ok(()) + } + + async fn post_blocks(&mut self) -> anyhow::Result<()> { + if self.buffer.is_empty() { + return Ok(()) + } + + // Finalize the buffer and get the file paths for upload. + // Note: finalize() consumes the writers and FinalizedBatchFiles owns the temp files. + let finalized = self + .buffer + .finalize() + .map_err(|err| anyhow::anyhow!("Failed to finalize buffer: {err}"))?; + + // Convert from fuel_streams_types::BlockHeight to fuel_core_types::fuel_types::BlockHeight + let last_height_u32: u32 = *finalized.last_height; + let last_height: BlockHeight = last_height_u32.into(); + + // IMPORTANT: Reset buffer immediately after finalize() succeeds. + // finalize() consumes the internal writers, leaving the buffer in an inconsistent + // state where block_count > 0 but writers are None. If we don't reset here and + // the upload fails, the next run() iteration would see len() == batch_size and + // try to call post_blocks() again, which would fail on finalize() with + // "blocks_writer already taken" - creating an infinite error loop. + // By resetting here, the buffer is always in a consistent state. If upload fails, + // the service reconnects and re-buffers blocks from the last saved height. + self.buffer.reset()?; + + // Upload the Avro files to storage + process_finalized_batch(&self.processor, finalized) .await .map_err(|err| { anyhow::anyhow!( @@ -311,29 +373,46 @@ impl Task { ) })?; + // Only after successful upload do we update the persisted height self.processor.save_latest_height(last_height).await?; self.height.send_replace(last_height); - self.pending_events.clear(); - Ok(()) } } pub fn new_service(config: Config) -> anyhow::Result> { - let graphql_config = GraphqlEventAdapterConfig { - client: Arc::new(FuelClient::new(&config.url)?), - heartbeat_capacity: NonZeroUsize::new(100_000).expect("Is not zero; qed"), - event_capacity: NonZeroUsize::new(100_000).expect("Is not zero; qed"), - blocks_request_batch_size: config.blocks_request_batch_size, - blocks_request_concurrency: config.blocks_request_concurrency, - pending_blocks_limit: config.pending_blocks, - }; - let fetcher = create_graphql_event_adapter(graphql_config); + // Create a shared client that will be reused across all fetcher instances + let client = Arc::new(FuelClient::new(&config.url)?); + + // Capture config values for the factory closure + let blocks_request_batch_size = config.blocks_request_batch_size; + let blocks_request_concurrency = config.blocks_request_concurrency; + let pending_blocks_limit = config.pending_blocks; + + // Create a factory that produces fresh GraphqlFetcher instances. + // Each fetcher is created with reduced channel capacities to limit memory usage. + // The external library spawns background tasks on each stream creation, + // so we need fresh fetchers on reconnection to avoid task/memory accumulation. + let fetcher_factory: Arc GraphqlFetcher + Send + Sync> = + Arc::new(move || { + let graphql_config = GraphqlEventAdapterConfig { + client: client.clone(), + // The external library creates broadcast channels with this capacity + // that persist until background tasks terminate. + heartbeat_capacity: NonZeroUsize::new(10_000).expect("Is not zero; qed"), + event_capacity: NonZeroUsize::new(10_000).expect("Is not zero; qed"), + blocks_request_batch_size, + blocks_request_concurrency, + pending_blocks_limit, + }; + create_graphql_event_adapter(graphql_config) + }); + let (height, _) = watch::channel(config.starting_height); let task = UninitializedTask { config, - fetcher, + fetcher_factory, shared: SharedState { block_height: height, }, @@ -342,6 +421,66 @@ pub fn new_service(config: Config) -> anyhow::Result anyhow::Result<()> { + let first_height = files.first_height; + let last_height = files.last_height; + + // Upload sequentially to minimize memory usage + // Each upload streams from disk to S3 without loading into memory + tracing::info!( + "Uploading blocks from file: {}", + files.blocks_path.display() + ); + processor + .process_data_from_file( + first_height, + last_height, + &files.blocks_path, + S3TableName::Blocks, + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to upload blocks: {}", e))?; + + tracing::info!( + "Uploading transactions from file: {}", + files.transactions_path.display() + ); + processor + .process_data_from_file( + first_height, + last_height, + &files.transactions_path, + S3TableName::Transactions, + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to upload transactions: {}", e))?; + + tracing::info!( + "Uploading receipts from file: {}", + files.receipts_path.display() + ); + processor + .process_data_from_file( + first_height, + last_height, + &files.receipts_path, + S3TableName::Receipts, + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to upload receipts: {}", e))?; + + // FinalizedBatchFiles::drop() will clean up the temp directory + Ok(()) +} + +/// Process a batch of blocks and transactions (legacy in-memory method) +/// Kept for backwards compatibility with tests pub async fn process_batch( processor: &Processor, blocks_and_txs: &[(Block, Vec)], diff --git a/services/dune/src/tracked.rs b/services/dune/src/tracked.rs new file mode 100644 index 00000000..df8abe14 --- /dev/null +++ b/services/dune/src/tracked.rs @@ -0,0 +1,84 @@ +//! Newtype wrappers that provide `Drop`-based allocation tracking +//! for types we don't own. + +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use fuel_core_services::stream::BoxStream; +use fuel_indexer_types::events::BlockEvent; +use fuel_receipts_manager::adapters::graphql_event_adapter::GraphqlFetcher; +use futures::Stream; + +use crate::alloc_counter; + +// --------------------------------------------------------------------------- +// TrackedStream +// --------------------------------------------------------------------------- + +/// A `BoxStream` wrapper whose `Drop` proves the stream was actually +/// deallocated, not just replaced. +pub struct TrackedStream { + inner: BoxStream>, +} + +impl TrackedStream { + pub fn new(inner: BoxStream>) -> Self { + alloc_counter::inc(&alloc_counter::BLOCK_STREAM); + Self { inner } + } +} + +impl Stream for TrackedStream { + type Item = anyhow::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl Drop for TrackedStream { + fn drop(&mut self) { + alloc_counter::dec(&alloc_counter::BLOCK_STREAM); + } +} + +// --------------------------------------------------------------------------- +// TrackedFetcher +// --------------------------------------------------------------------------- + +/// A `GraphqlFetcher` wrapper whose `Drop` proves the fetcher (and any +/// resources it holds) was actually deallocated. +pub struct TrackedFetcher { + inner: GraphqlFetcher, +} + +impl TrackedFetcher { + pub fn new(inner: GraphqlFetcher) -> Self { + alloc_counter::inc(&alloc_counter::GRAPHQL_FETCHER); + Self { inner } + } + + /// Delegate to the inner fetcher. + pub fn inner(&self) -> &GraphqlFetcher { + &self.inner + } +} + +impl std::ops::Deref for TrackedFetcher { + type Target = GraphqlFetcher; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl Drop for TrackedFetcher { + fn drop(&mut self) { + alloc_counter::dec(&alloc_counter::GRAPHQL_FETCHER); + } +}