From c89c5b500191efad50d444eb555781f42f60b8c0 Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Wed, 4 Feb 2026 10:58:04 -0800 Subject: [PATCH 01/14] feat: add disk based event buffering --- Cargo.lock | 2 +- Cargo.toml | 4 - services/dune/Cargo.toml | 2 +- services/dune/src/block_buffer.rs | 568 ++++++++++++++++++++++++++++++ services/dune/src/cli.rs | 12 +- services/dune/src/lib.rs | 2 + services/dune/src/main.rs | 1 + services/dune/src/processor.rs | 17 + services/dune/src/service.rs | 176 ++++++--- 9 files changed, 728 insertions(+), 56 deletions(-) create mode 100644 services/dune/src/block_buffer.rs diff --git a/Cargo.lock b/Cargo.lock index 51c3fd3d..50e13ff7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7931,12 +7931,12 @@ dependencies = [ "fuel-streams-types", "fuel-web-utils", "futures", - "itertools 0.14.0", "pretty_assertions", "rand 0.9.2", "serde", "serde_bytes", "serde_json", + "tempfile", "thiserror 2.0.17", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 0464b5f7..57495dd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ version = "0.0.34" anyhow = "1.0.97" apache-avro = { version = "0.17.0", features = ["derive"] } async-trait = "0.1.88" -axum = { version = "0.8.3", features = ["json", "ws", "query"] } chrono = { version = "0.4.40", features = ["serde"] } clap = { version = "4.5.35", features = ["derive", "env"] } derive_more = { version = "2.0.1", features = [ @@ -44,11 +43,9 @@ fuel-data-parser = { version = "0.0.34", path = "crates/data-parser" } fuel-streams-domains = { version = "0.0.34", path = "crates/domains" } fuel-streams-types = { version = "0.0.34", path = "crates/types" } fuel-web-utils = { version = "0.0.34", path = "crates/web-utils" } -futures = "0.3.31" hex = "0.4.3" pretty_assertions = "1.4.1" rand = "0.9.0" -rayon = "1.10.0" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" serde_urlencoded = "0.7.1" @@ -61,7 +58,6 @@ sqlx = { version = "0.8.3", default-features = false, features = [ ] } strum = { version = "0.27.1", features = ["derive"] } strum_macros = "0.27.1" -test-case = "3.3.1" thiserror = "2.0.12" tokio = { version = "1.44.1", features = [ "io-util", diff --git a/services/dune/Cargo.toml b/services/dune/Cargo.toml index d7c26a19..5e995b56 100644 --- a/services/dune/Cargo.toml +++ b/services/dune/Cargo.toml @@ -34,7 +34,6 @@ fuel-streams-domains = { workspace = true, features = ["test-helpers"] } fuel-streams-types = { workspace = true, features = ["test-helpers"] } fuel-web-utils = { workspace = true, features = ["test-helpers"] } futures = "0.3.31" -itertools = "0.14.0" rand.workspace = true serde.workspace = true serde_bytes = "0.11.17" @@ -46,6 +45,7 @@ url = "2.5.7" [dev-dependencies] pretty_assertions.workspace = true +tempfile = "3" tokio = { workspace = true, features = [ "rt-multi-thread", "macros", diff --git a/services/dune/src/block_buffer.rs b/services/dune/src/block_buffer.rs new file mode 100644 index 00000000..d683efbb --- /dev/null +++ b/services/dune/src/block_buffer.rs @@ -0,0 +1,568 @@ +use std::{ + fs::{self, File}, + io::{BufReader, BufWriter, Write}, + path::{Path, PathBuf}, +}; + +use fuel_streams_types::BlockHeight; +use serde::{Deserialize, Serialize}; + +use crate::{ + DuneError, + DuneResult, + helpers::AvroParser, + schemas::{AvroBlock, AvroReceipt, AvroTransaction, ReceiptMetadata}, +}; +use fuel_streams_domains::{blocks::Block, transactions::Transaction}; + +/// The result of finalizing a batch, containing Avro-encoded data ready for upload +pub struct FinalizedBatch { + pub first_height: BlockHeight, + pub last_height: BlockHeight, + pub blocks_data: Vec, + pub transactions_data: Vec, + pub receipts_data: Vec, +} + +/// Configuration for the buffer type +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BufferType { + /// Store blocks in memory (faster but uses more RAM) + Memory, + /// Store blocks on disk (slower but uses less RAM) + Disk, +} + +impl std::fmt::Display for BufferType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BufferType::Memory => write!(f, "Memory"), + BufferType::Disk => write!(f, "Disk"), + } + } +} + +impl std::str::FromStr for BufferType { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "memory" | "mem" => Ok(BufferType::Memory), + "disk" | "file" => Ok(BufferType::Disk), + _ => Err(anyhow::anyhow!("Unknown buffer type: {}", s)), + } + } +} + +/// Trait for block buffering implementations. +/// This allows swapping between memory and disk-based buffering. +pub trait BlockBuffer: Send { + /// Returns the number of blocks in the buffer + fn len(&self) -> usize; + + /// Returns true if the buffer is empty + fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the first block height in the buffer + fn first_height(&self) -> Option; + + /// Returns the last block height in the buffer + fn last_height(&self) -> Option; + + /// Appends a block and its transactions to the buffer + fn append(&mut self, block: &Block, transactions: &[Transaction]) -> DuneResult<()>; + + /// Finalizes the buffer, converting all data to Avro format for upload. + /// This consumes the buffer. + fn finalize(self: Box) -> DuneResult; + + /// Resets the buffer for reuse, clearing all data + fn reset(&mut self) -> DuneResult<()>; +} + +/// Creates a new block buffer of the specified type +pub fn create_buffer(buffer_type: BufferType) -> DuneResult> { + match buffer_type { + BufferType::Memory => Ok(Box::new(MemoryBuffer::new())), + BufferType::Disk => Ok(Box::new(DiskBuffer::new()?)), + } +} + +// ============================================================================ +// Memory-based buffer implementation +// ============================================================================ + +/// Intermediate representation of a block for buffering +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BufferedBlock { + pub block: AvroBlock, + pub transactions: Vec, + pub receipts: Vec, +} + +impl BufferedBlock { + /// Creates a new buffered block from domain types + pub fn from_domain(block: &Block, transactions: &[Transaction]) -> Self { + let avro_block = AvroBlock::new(block); + + let avro_transactions: Vec<_> = transactions + .iter() + .map(|tx| { + AvroTransaction::new( + tx, + Some(block.height.into()), + Some(block.header.get_timestamp_utc().timestamp()), + Some(block.id.as_ref().to_vec().into()), + Some(block.version.to_string()), + Some(block.producer.as_ref().to_vec().into()), + ) + }) + .collect(); + + let avro_receipts: Vec<_> = transactions + .iter() + .flat_map(|tx| { + let receipt_metadata = ReceiptMetadata { + block_time: Some(block.header.get_timestamp_utc().timestamp()), + block_height: Some(block.height.0 as i64), + block_version: Some(block.version.to_string()), + block_producer: Some(block.producer.clone().into()), + transaction_id: Some(tx.id.clone().into()), + }; + tx.receipts + .iter() + .map(move |receipt| AvroReceipt::new(receipt, &receipt_metadata)) + }) + .collect(); + + Self { + block: avro_block, + transactions: avro_transactions, + receipts: avro_receipts, + } + } +} + +/// Memory-based block buffer that stores all blocks in a Vec. +/// Faster than disk-based buffering but uses more memory. +pub struct MemoryBuffer { + blocks: Vec, + first_height: Option, + last_height: Option, +} + +impl MemoryBuffer { + pub fn new() -> Self { + Self { + blocks: Vec::new(), + first_height: None, + last_height: None, + } + } + + /// Converts buffered blocks to Avro format + fn to_avro(&self) -> DuneResult<(Vec, Vec, Vec)> { + let parser = AvroParser::default(); + + let mut blocks_writer = parser + .writer_with_schema::() + .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)))?; + let mut transactions_writer = parser + .writer_with_schema::() + .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create transactions writer: {}", e)))?; + let mut receipts_writer = parser + .writer_with_schema::() + .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create receipts writer: {}", e)))?; + + for buffered in &self.blocks { + blocks_writer.append(&buffered.block)?; + for tx in &buffered.transactions { + transactions_writer.append(tx)?; + } + for receipt in &buffered.receipts { + receipts_writer.append(receipt)?; + } + } + + let blocks_data = blocks_writer.into_inner()?; + let transactions_data = transactions_writer.into_inner()?; + let receipts_data = receipts_writer.into_inner()?; + + Ok((blocks_data, transactions_data, receipts_data)) + } +} + +impl Default for MemoryBuffer { + fn default() -> Self { + Self::new() + } +} + +impl BlockBuffer for MemoryBuffer { + fn len(&self) -> usize { + self.blocks.len() + } + + fn first_height(&self) -> Option { + self.first_height + } + + fn last_height(&self) -> Option { + self.last_height + } + + fn append(&mut self, block: &Block, transactions: &[Transaction]) -> DuneResult<()> { + let height = block.height; + + if self.first_height.is_none() { + self.first_height = Some(height); + } + self.last_height = Some(height); + + let buffered = BufferedBlock::from_domain(block, transactions); + self.blocks.push(buffered); + + Ok(()) + } + + fn finalize(self: Box) -> DuneResult { + let first_height = self.first_height.ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) + })?; + let last_height = self.last_height.ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) + })?; + + let (blocks_data, transactions_data, receipts_data) = self.to_avro()?; + + Ok(FinalizedBatch { + first_height, + last_height, + blocks_data, + transactions_data, + receipts_data, + }) + } + + fn reset(&mut self) -> DuneResult<()> { + self.blocks.clear(); + self.first_height = None; + self.last_height = None; + Ok(()) + } +} + +// ============================================================================ +// Disk-based buffer implementation +// ============================================================================ + +/// Disk-based block buffer that writes blocks to temporary files. +/// Uses less memory than the memory-based buffer but is slower. +/// Blocks are stored in JSON Lines format and converted to Avro on finalize. +pub struct DiskBuffer { + temp_dir: PathBuf, + data_file: PathBuf, + writer: Option>, + first_height: Option, + last_height: Option, + block_count: usize, +} + +impl DiskBuffer { + pub fn new() -> DuneResult { + let temp_dir = std::env::temp_dir().join(format!( + "dune-buffer-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + Self::with_dir(&temp_dir) + } + + pub fn with_dir(dir: impl AsRef) -> DuneResult { + let temp_dir = dir.as_ref().to_path_buf(); + fs::create_dir_all(&temp_dir)?; + + let data_file = temp_dir.join("blocks.jsonl"); + let writer = BufWriter::new(File::create(&data_file)?); + + Ok(Self { + temp_dir, + data_file, + writer: Some(writer), + first_height: None, + last_height: None, + block_count: 0, + }) + } + + fn read_and_convert_to_avro(&self) -> DuneResult<(Vec, Vec, Vec)> { + let file = File::open(&self.data_file)?; + let reader = BufReader::new(file); + + let parser = AvroParser::default(); + let mut blocks_writer = parser + .writer_with_schema::() + .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)))?; + let mut transactions_writer = parser + .writer_with_schema::() + .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create transactions writer: {}", e)))?; + let mut receipts_writer = parser + .writer_with_schema::() + .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create receipts writer: {}", e)))?; + + for line in std::io::BufRead::lines(reader) { + let line = line?; + if line.is_empty() { + continue; + } + + let buffered: BufferedBlock = serde_json::from_str(&line).map_err(|e| { + DuneError::Other(anyhow::anyhow!("Failed to deserialize block: {}", e)) + })?; + + blocks_writer.append(&buffered.block)?; + for tx in buffered.transactions { + transactions_writer.append(&tx)?; + } + for receipt in buffered.receipts { + receipts_writer.append(&receipt)?; + } + } + + let blocks_data = blocks_writer.into_inner()?; + let transactions_data = transactions_writer.into_inner()?; + let receipts_data = receipts_writer.into_inner()?; + + Ok((blocks_data, transactions_data, receipts_data)) + } +} + +impl BlockBuffer for DiskBuffer { + fn len(&self) -> usize { + self.block_count + } + + fn first_height(&self) -> Option { + self.first_height + } + + fn last_height(&self) -> Option { + self.last_height + } + + fn append(&mut self, block: &Block, transactions: &[Transaction]) -> DuneResult<()> { + let height = block.height; + + if self.first_height.is_none() { + self.first_height = Some(height); + } + self.last_height = Some(height); + + let buffered = BufferedBlock::from_domain(block, transactions); + + let writer = self.writer.as_mut().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("Writer not available")) + })?; + + serde_json::to_writer(&mut *writer, &buffered).map_err(|e| { + DuneError::Other(anyhow::anyhow!("Failed to serialize block: {}", e)) + })?; + writer.write_all(b"\n")?; + + self.block_count += 1; + + Ok(()) + } + + fn finalize(mut self: Box) -> DuneResult { + let first_height = self.first_height.ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) + })?; + let last_height = self.last_height.ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) + })?; + + // Flush and close the writer + if let Some(mut writer) = self.writer.take() { + writer.flush()?; + } + + let (blocks_data, transactions_data, receipts_data) = self.read_and_convert_to_avro()?; + + Ok(FinalizedBatch { + first_height, + last_height, + blocks_data, + transactions_data, + receipts_data, + }) + } + + fn reset(&mut self) -> DuneResult<()> { + // Close current writer + let _ = self.writer.take(); + + // Remove old file + let _ = fs::remove_file(&self.data_file); + + // Create new writer + self.writer = Some(BufWriter::new(File::create(&self.data_file)?)); + + self.first_height = None; + self.last_height = None; + self.block_count = 0; + + Ok(()) + } +} + +impl Drop for DiskBuffer { + fn drop(&mut self) { + let _ = self.writer.take(); + let _ = fs::remove_dir_all(&self.temp_dir); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use fuel_streams_domains::mocks::{MockBlock, MockReceipt, MockTransaction}; + use tempfile::tempdir; + + fn test_buffer_basic_operations(mut buffer: Box) -> DuneResult<()> { + assert!(buffer.is_empty()); + assert_eq!(buffer.len(), 0); + + // Add some blocks + for i in 1..=10 { + let mut block = MockBlock::random(); + block.height = BlockHeight::from(i); + let txs = vec![MockTransaction::script(vec![], vec![], vec![])]; + buffer.append(&block, &txs)?; + } + + assert_eq!(buffer.len(), 10); + assert_eq!(buffer.first_height(), Some(BlockHeight::from(1))); + assert_eq!(buffer.last_height(), Some(BlockHeight::from(10))); + + // Finalize and verify + let finalized = buffer.finalize()?; + assert_eq!(*finalized.first_height, 1); + assert_eq!(*finalized.last_height, 10); + assert!(!finalized.blocks_data.is_empty()); + assert!(!finalized.transactions_data.is_empty()); + + Ok(()) + } + + fn test_buffer_reset(mut buffer: Box) -> DuneResult<()> { + // Add blocks + for i in 1..=5 { + let mut block = MockBlock::random(); + block.height = BlockHeight::from(i); + let txs = vec![MockTransaction::script(vec![], vec![], vec![])]; + buffer.append(&block, &txs)?; + } + + assert_eq!(buffer.len(), 5); + + // Reset + buffer.reset()?; + + assert!(buffer.is_empty()); + assert_eq!(buffer.first_height(), None); + assert_eq!(buffer.last_height(), None); + + // Can add more blocks after reset + let mut block = MockBlock::random(); + block.height = BlockHeight::from(100); + let txs = vec![MockTransaction::script(vec![], vec![], vec![])]; + buffer.append(&block, &txs)?; + + assert_eq!(buffer.len(), 1); + assert_eq!(buffer.first_height(), Some(BlockHeight::from(100))); + + Ok(()) + } + + fn test_buffer_with_receipts(mut buffer: Box) -> DuneResult<()> { + for i in 1..=3 { + let mut block = MockBlock::random(); + block.height = BlockHeight::from(i); + let txs = vec![MockTransaction::script(vec![], vec![], MockReceipt::all())]; + buffer.append(&block, &txs)?; + } + + let finalized = buffer.finalize()?; + assert!(!finalized.receipts_data.is_empty()); + + Ok(()) + } + + // Memory buffer tests + #[test] + fn test_memory_buffer_basic() -> DuneResult<()> { + test_buffer_basic_operations(Box::new(MemoryBuffer::new())) + } + + #[test] + fn test_memory_buffer_reset() -> DuneResult<()> { + test_buffer_reset(Box::new(MemoryBuffer::new())) + } + + #[test] + fn test_memory_buffer_with_receipts() -> DuneResult<()> { + test_buffer_with_receipts(Box::new(MemoryBuffer::new())) + } + + // Disk buffer tests + #[test] + fn test_disk_buffer_basic() -> DuneResult<()> { + let dir = tempdir().unwrap(); + test_buffer_basic_operations(Box::new(DiskBuffer::with_dir(dir.path())?)) + } + + #[test] + fn test_disk_buffer_reset() -> DuneResult<()> { + let dir = tempdir().unwrap(); + test_buffer_reset(Box::new(DiskBuffer::with_dir(dir.path())?)) + } + + #[test] + fn test_disk_buffer_with_receipts() -> DuneResult<()> { + let dir = tempdir().unwrap(); + test_buffer_with_receipts(Box::new(DiskBuffer::with_dir(dir.path())?)) + } + + // Factory function tests + #[test] + fn test_create_memory_buffer() -> DuneResult<()> { + let buffer = create_buffer(BufferType::Memory)?; + assert!(buffer.is_empty()); + Ok(()) + } + + #[test] + fn test_create_disk_buffer() -> DuneResult<()> { + let buffer = create_buffer(BufferType::Disk)?; + assert!(buffer.is_empty()); + Ok(()) + } + + #[test] + fn test_buffer_type_from_str() { + use std::str::FromStr; + assert_eq!(BufferType::from_str("memory").unwrap(), BufferType::Memory); + assert_eq!(BufferType::from_str("Memory").unwrap(), BufferType::Memory); + assert_eq!(BufferType::from_str("mem").unwrap(), BufferType::Memory); + assert_eq!(BufferType::from_str("disk").unwrap(), BufferType::Disk); + assert_eq!(BufferType::from_str("Disk").unwrap(), BufferType::Disk); + assert_eq!(BufferType::from_str("file").unwrap(), BufferType::Disk); + assert!(BufferType::from_str("unknown").is_err()); + } +} diff --git a/services/dune/src/cli.rs b/services/dune/src/cli.rs index 1a65fb50..3ecad8c5 100644 --- a/services/dune/src/cli.rs +++ b/services/dune/src/cli.rs @@ -1,4 +1,4 @@ -use crate::processor::StorageTypeConfig; +use crate::{block_buffer::BufferType, processor::StorageTypeConfig}; use clap::Parser; use url::Url; @@ -19,6 +19,16 @@ pub struct Cli { )] pub storage_type: StorageTypeConfig, + #[arg( + long, + value_name = "BUFFER_TYPE", + env = "BUFFER_TYPE", + default_value = "disk", + help = "Type of buffer to use for accumulating blocks. Options are 'memory' or 'disk'. \ + Memory is faster but uses more RAM. Disk uses temporary files to reduce memory usage." + )] + pub buffer_type: BufferType, + #[arg(long, env, default_value = "3600")] pub batch_size: usize, diff --git a/services/dune/src/lib.rs b/services/dune/src/lib.rs index 0378aac3..5466e02e 100644 --- a/services/dune/src/lib.rs +++ b/services/dune/src/lib.rs @@ -4,6 +4,7 @@ // Used in the `main.rs` use fuel_web_utils as _; +pub mod block_buffer; mod cli; mod error; pub mod helpers; @@ -12,5 +13,6 @@ pub mod s3; pub mod schemas; pub mod service; +pub use block_buffer::*; pub use cli::*; pub use error::*; diff --git a/services/dune/src/main.rs b/services/dune/src/main.rs index e8cb2b22..5f925dfb 100644 --- a/services/dune/src/main.rs +++ b/services/dune/src/main.rs @@ -28,6 +28,7 @@ async fn main() -> Result<()> { url: cli.url, starting_height: cli.starting_block.into(), storage_type: cli.storage_type, + buffer_type: cli.buffer_type, batch_size: cli.batch_size, blocks_request_batch_size: cli.blocks_request_batch_size, blocks_request_concurrency: cli.blocks_request_concurrency, diff --git a/services/dune/src/processor.rs b/services/dune/src/processor.rs index 9358ef26..1b107a3f 100644 --- a/services/dune/src/processor.rs +++ b/services/dune/src/processor.rs @@ -205,6 +205,23 @@ impl Processor { Ok(file_paths) } + /// Process pre-serialized data directly (from disk buffer) + /// This method takes data that has already been serialized to Avro format + pub async fn process_data( + &self, + start_height: BlockHeight, + end_height: BlockHeight, + data: Vec, + table: S3TableName, + ) -> DuneResult { + let network = FuelNetwork::load_from_env(); + let key_builder = S3KeyBuilder::new(network).with_table(table); + let key = key_builder.build_key_from_heights(start_height, end_height); + let file_path = self.create_output(data, &key).await?; + tracing::info!("New file saved: {}", file_path); + Ok(file_path) + } + fn spli_batches< T: serde::Serialize + serde::de::DeserializeOwned diff --git a/services/dune/src/service.rs b/services/dune/src/service.rs index 2b746063..7fe25241 100644 --- a/services/dune/src/service.rs +++ b/services/dune/src/service.rs @@ -1,5 +1,6 @@ use crate::{ DuneError, + block_buffer::{BlockBuffer, BufferType, FinalizedBatch, create_buffer}, processor::{ Processor, StorageTypeConfig, @@ -36,7 +37,6 @@ use fuel_streams_domains::{ transactions::Transaction, }; use futures::StreamExt; -use itertools::Itertools; use std::{ cmp::Ordering, num::NonZeroUsize, @@ -48,6 +48,7 @@ pub struct Config { pub url: url::Url, pub starting_height: BlockHeight, pub storage_type: StorageTypeConfig, + pub buffer_type: BufferType, pub batch_size: usize, pub blocks_request_batch_size: usize, pub blocks_request_concurrency: usize, @@ -64,7 +65,10 @@ pub struct Task { height: watch::Sender, fetcher: GraphqlFetcher, blocks_stream: BoxStream>, - pending_events: Vec, + /// Block buffer (can be memory or disk-based) + buffer: Box, + /// Buffer type for creating new buffers after finalization + buffer_type: BufferType, processor: Processor, base_asset_id: AssetId, batch_size: usize, @@ -134,11 +138,16 @@ impl RunnableService for UninitializedTask { .unwrap_or(config.starting_height); shared.block_height.send_replace(current_height); + // Create block buffer based on configuration + let buffer = create_buffer(config.buffer_type)?; + tracing::info!("Using {} buffer for block accumulation", config.buffer_type); + let mut task = Task { blocks_stream: futures::stream::pending().into_boxed(), height: shared.block_height, fetcher, - pending_events: vec![], + buffer, + buffer_type: config.buffer_type, processor, base_asset_id, batch_size: config.batch_size, @@ -152,15 +161,15 @@ impl RunnableService for UninitializedTask { impl RunnableTask for Task { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { - match self.pending_events.len().cmp(&self.batch_size) { + match self.buffer.len().cmp(&self.batch_size) { Ordering::Less => {} Ordering::Equal => { return TaskNextAction::always_continue(self.post_blocks().await) } Ordering::Greater => { tracing::error!( - "Pending events exceeded batch size: {} > {}", - self.pending_events.len(), + "Batch size exceeded: {} > {}", + self.buffer.len(), self.batch_size ); return TaskNextAction::Stop; @@ -177,8 +186,10 @@ impl RunnableTask for Task { block = self.blocks_stream.next() => { match block { Some(Ok(event)) => { - let current_height = if let Some(block) = self.pending_events.last() { - *block.header.height() + // Get the current height, converting from fuel_streams_types::BlockHeight + // to fuel_core_types::fuel_types::BlockHeight if needed + let current_height: BlockHeight = if let Some(last_height) = self.buffer.last_height() { + (*last_height).into() } else { *self.height.borrow() }; @@ -203,9 +214,14 @@ impl RunnableTask for Task { } } - self.pending_events.push(event); - - TaskNextAction::Continue + // Convert event to block and transactions, then buffer + match self.append_event_to_buffer(&event) { + Ok(_) => TaskNextAction::Continue, + Err(e) => { + tracing::error!("Failed to buffer block: {e}"); + TaskNextAction::Stop + } + } } Some(Err(e)) => { tracing::error!("Error receiving block event: {e}; reconnecting stream"); @@ -239,7 +255,8 @@ impl RunnableTask for Task { impl Task { async fn connect_block_stream(&mut self) -> anyhow::Result<()> { - self.pending_events.clear(); + // Reset the buffer when reconnecting + self.buffer.reset()?; let height = *self.height.borrow(); let next_height = height.succ().ok_or_else(|| { @@ -263,47 +280,57 @@ impl Task { Ok(()) } + /// Converts a block event to domain types and adds to the buffer + fn append_event_to_buffer(&mut self, event: &BlockEvent) -> anyhow::Result<()> { + let block = Block::new( + &event.header, + event.consensus.clone(), + event.transactions.len(), + )?; + + let transactions: Vec<_> = event + .transactions + .iter() + .zip(event.statuses.iter()) + .map(|(tx, status)| { + Transaction::new( + &status.id.into(), + tx.as_ref(), + &status.into(), + &self.base_asset_id, + status.result.receipts(), + ) + }) + .collect(); + + // Add to buffer (memory or disk based on configuration) + self.buffer.append(&block, &transactions)?; + + Ok(()) + } + async fn post_blocks(&mut self) -> anyhow::Result<()> { - if self.pending_events.is_empty() { + if self.buffer.is_empty() { return Ok(()) } - let last_height = *self - .pending_events - .last() - .expect("We have pending events; qed") - .header - .height(); + // Create a new buffer for the next round and swap with the current one + let old_buffer = std::mem::replace( + &mut self.buffer, + create_buffer(self.buffer_type)?, + ); - let blocks_and_txs: Vec<_> = self - .pending_events - .iter() - .map(|event| { - let block = Block::new( - &event.header, - event.consensus.clone(), - event.transactions.len(), - )?; - let transaction = event - .transactions - .iter() - .zip(event.statuses.iter()) - .map(|(tx, status)| { - Transaction::new( - &status.id.into(), - tx.as_ref(), - &status.into(), - &self.base_asset_id, - status.result.receipts(), - ) - }) - .collect(); - - Ok::<_, anyhow::Error>((block, transaction)) - }) - .try_collect()?; + // Finalize the buffer and get the data for upload + let finalized = old_buffer.finalize().map_err(|err| { + anyhow::anyhow!("Failed to finalize buffer: {err}") + })?; + + // Convert from fuel_streams_types::BlockHeight to fuel_core_types::fuel_types::BlockHeight + let last_height_u32: u32 = *finalized.last_height; + let last_height: BlockHeight = last_height_u32.into(); - process_batch(&self.processor, &blocks_and_txs) + // Upload the data + process_finalized_batch(&self.processor, finalized) .await .map_err(|err| { anyhow::anyhow!( @@ -314,8 +341,6 @@ impl Task { self.processor.save_latest_height(last_height).await?; self.height.send_replace(last_height); - self.pending_events.clear(); - Ok(()) } } @@ -342,6 +367,58 @@ pub fn new_service(config: Config) -> anyhow::Result anyhow::Result<()> { + let first_height = batch.first_height; + let last_height = batch.last_height; + + let blocks_task = async { + processor + .process_data( + first_height, + last_height, + batch.blocks_data, + S3TableName::Blocks, + ) + .await?; + Ok::<_, DuneError>(()) + }; + + let tx_task = async { + processor + .process_data( + first_height, + last_height, + batch.transactions_data, + S3TableName::Transactions, + ) + .await?; + Ok::<_, DuneError>(()) + }; + + let receipts_task = async { + processor + .process_data( + first_height, + last_height, + batch.receipts_data, + S3TableName::Receipts, + ) + .await?; + Ok::<_, DuneError>(()) + }; + + tokio::try_join!(blocks_task, tx_task, receipts_task)?; + + Ok(()) +} + +/// Process a batch of blocks and transactions (legacy in-memory method) +/// Kept for backwards compatibility with tests pub async fn process_batch( processor: &Processor, blocks_and_txs: &[(Block, Vec)], @@ -395,6 +472,7 @@ mod tests { url: Url::parse(format!("http://{}", node.bound_address).as_str()).unwrap(), starting_height: 0u32.into(), storage_type: StorageTypeConfig::S3, + buffer_type: super::BufferType::Memory, batch_size: 1, blocks_request_batch_size: 10, blocks_request_concurrency: 100, From a7fcecca8e0177c58f5e569171011cecd19ff87d Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Wed, 4 Feb 2026 11:10:06 -0800 Subject: [PATCH 02/14] fix: clean up docker --- cluster/docker/sv-dune.Dockerfile | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cluster/docker/sv-dune.Dockerfile b/cluster/docker/sv-dune.Dockerfile index e77ec3f4..01f9b4e1 100644 --- a/cluster/docker/sv-dune.Dockerfile +++ b/cluster/docker/sv-dune.Dockerfile @@ -56,7 +56,7 @@ FROM debian:bookworm-slim AS run WORKDIR /usr/src RUN apt-get update -y \ - && apt-get install -y --no-install-recommends ca-certificates curl \ + && apt-get install -y --no-install-recommends ca-certificates curl jq less unzip zip \ # Clean up && apt-get autoremove -y \ && apt-get clean -y \ @@ -65,4 +65,7 @@ RUN apt-get update -y \ COPY --from=builder /root/sv-dune . COPY --from=builder /root/sv-dune.d . +# run app as non-root user +USER nobody + CMD ["./sv-dune"] From d9d3dedabfedbfe94f099969ad71f1b4a30de3a4 Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Wed, 4 Feb 2026 11:15:50 -0800 Subject: [PATCH 03/14] fix: lint --- services/dune/src/block_buffer.rs | 98 +++++++++++++++++++++++-------- services/dune/src/cli.rs | 5 +- services/dune/src/service.rs | 19 +++--- 3 files changed, 88 insertions(+), 34 deletions(-) diff --git a/services/dune/src/block_buffer.rs b/services/dune/src/block_buffer.rs index d683efbb..fca07476 100644 --- a/services/dune/src/block_buffer.rs +++ b/services/dune/src/block_buffer.rs @@ -1,19 +1,40 @@ use std::{ - fs::{self, File}, - io::{BufReader, BufWriter, Write}, - path::{Path, PathBuf}, + fs::{ + self, + File, + }, + io::{ + BufReader, + BufWriter, + Write, + }, + path::{ + Path, + PathBuf, + }, }; use fuel_streams_types::BlockHeight; -use serde::{Deserialize, Serialize}; +use serde::{ + Deserialize, + Serialize, +}; use crate::{ DuneError, DuneResult, helpers::AvroParser, - schemas::{AvroBlock, AvroReceipt, AvroTransaction, ReceiptMetadata}, + schemas::{ + AvroBlock, + AvroReceipt, + AvroTransaction, + ReceiptMetadata, + }, +}; +use fuel_streams_domains::{ + blocks::Block, + transactions::Transaction, }; -use fuel_streams_domains::{blocks::Block, transactions::Transaction}; /// The result of finalizing a batch, containing Avro-encoded data ready for upload pub struct FinalizedBatch { @@ -166,15 +187,25 @@ impl MemoryBuffer { fn to_avro(&self) -> DuneResult<(Vec, Vec, Vec)> { let parser = AvroParser::default(); - let mut blocks_writer = parser - .writer_with_schema::() - .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)))?; + let mut blocks_writer = + parser.writer_with_schema::().map_err(|e| { + DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)) + })?; let mut transactions_writer = parser .writer_with_schema::() - .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create transactions writer: {}", e)))?; - let mut receipts_writer = parser - .writer_with_schema::() - .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create receipts writer: {}", e)))?; + .map_err(|e| { + DuneError::Other(anyhow::anyhow!( + "Failed to create transactions writer: {}", + e + )) + })?; + let mut receipts_writer = + parser.writer_with_schema::().map_err(|e| { + DuneError::Other(anyhow::anyhow!( + "Failed to create receipts writer: {}", + e + )) + })?; for buffered in &self.blocks { blocks_writer.append(&buffered.block)?; @@ -305,15 +336,25 @@ impl DiskBuffer { let reader = BufReader::new(file); let parser = AvroParser::default(); - let mut blocks_writer = parser - .writer_with_schema::() - .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)))?; + let mut blocks_writer = + parser.writer_with_schema::().map_err(|e| { + DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)) + })?; let mut transactions_writer = parser .writer_with_schema::() - .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create transactions writer: {}", e)))?; - let mut receipts_writer = parser - .writer_with_schema::() - .map_err(|e| DuneError::Other(anyhow::anyhow!("Failed to create receipts writer: {}", e)))?; + .map_err(|e| { + DuneError::Other(anyhow::anyhow!( + "Failed to create transactions writer: {}", + e + )) + })?; + let mut receipts_writer = + parser.writer_with_schema::().map_err(|e| { + DuneError::Other(anyhow::anyhow!( + "Failed to create receipts writer: {}", + e + )) + })?; for line in std::io::BufRead::lines(reader) { let line = line?; @@ -365,9 +406,10 @@ impl BlockBuffer for DiskBuffer { let buffered = BufferedBlock::from_domain(block, transactions); - let writer = self.writer.as_mut().ok_or_else(|| { - DuneError::Other(anyhow::anyhow!("Writer not available")) - })?; + let writer = self + .writer + .as_mut() + .ok_or_else(|| DuneError::Other(anyhow::anyhow!("Writer not available")))?; serde_json::to_writer(&mut *writer, &buffered).map_err(|e| { DuneError::Other(anyhow::anyhow!("Failed to serialize block: {}", e)) @@ -392,7 +434,8 @@ impl BlockBuffer for DiskBuffer { writer.flush()?; } - let (blocks_data, transactions_data, receipts_data) = self.read_and_convert_to_avro()?; + let (blocks_data, transactions_data, receipts_data) = + self.read_and_convert_to_avro()?; Ok(FinalizedBatch { first_height, @@ -431,7 +474,12 @@ impl Drop for DiskBuffer { #[cfg(test)] mod tests { use super::*; - use fuel_streams_domains::mocks::{MockBlock, MockReceipt, MockTransaction}; + use fuel_streams_domains::mocks::{ + MockBlock, + MockReceipt, + MockTransaction, + }; + use pretty_assertions::assert_eq; use tempfile::tempdir; fn test_buffer_basic_operations(mut buffer: Box) -> DuneResult<()> { diff --git a/services/dune/src/cli.rs b/services/dune/src/cli.rs index 3ecad8c5..a0861f54 100644 --- a/services/dune/src/cli.rs +++ b/services/dune/src/cli.rs @@ -1,4 +1,7 @@ -use crate::{block_buffer::BufferType, processor::StorageTypeConfig}; +use crate::{ + block_buffer::BufferType, + processor::StorageTypeConfig, +}; use clap::Parser; use url::Url; diff --git a/services/dune/src/service.rs b/services/dune/src/service.rs index 7fe25241..9dc3657a 100644 --- a/services/dune/src/service.rs +++ b/services/dune/src/service.rs @@ -1,6 +1,11 @@ use crate::{ DuneError, - block_buffer::{BlockBuffer, BufferType, FinalizedBatch, create_buffer}, + block_buffer::{ + BlockBuffer, + BufferType, + FinalizedBatch, + create_buffer, + }, processor::{ Processor, StorageTypeConfig, @@ -315,15 +320,13 @@ impl Task { } // Create a new buffer for the next round and swap with the current one - let old_buffer = std::mem::replace( - &mut self.buffer, - create_buffer(self.buffer_type)?, - ); + let old_buffer = + std::mem::replace(&mut self.buffer, create_buffer(self.buffer_type)?); // Finalize the buffer and get the data for upload - let finalized = old_buffer.finalize().map_err(|err| { - anyhow::anyhow!("Failed to finalize buffer: {err}") - })?; + let finalized = old_buffer + .finalize() + .map_err(|err| anyhow::anyhow!("Failed to finalize buffer: {err}"))?; // Convert from fuel_streams_types::BlockHeight to fuel_core_types::fuel_types::BlockHeight let last_height_u32: u32 = *finalized.last_height; From ede6c42e815fb9b1b04843865a230213eed9d8d6 Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Wed, 4 Feb 2026 11:18:59 -0800 Subject: [PATCH 04/14] fix: bump deps --- Cargo.lock | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50e13ff7..06a33cdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1251,9 +1251,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" dependencies = [ "serde", ] @@ -1967,7 +1967,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" dependencies = [ "data-encoding", - "syn 2.0.111", + "syn 1.0.109", ] [[package]] @@ -6790,9 +6790,9 @@ dependencies = [ [[package]] name = "rsa" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40a0376c50d0358279d9d643e4bf7b7be212f1f4ff1da9070a7b54d22ef75c88" +checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" dependencies = [ "const-oid", "digest 0.10.7", From 713bf8b2b75d9a0a7e96a41ae82a41c14f936814 Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Wed, 4 Feb 2026 13:08:30 -0800 Subject: [PATCH 05/14] feat: buffer avro to disk as well --- services/dune/src/block_buffer.rs | 150 ++++++++++++++++++++++-------- services/dune/src/helpers/avro.rs | 120 +++++++++++++++++++++++- 2 files changed, 228 insertions(+), 42 deletions(-) diff --git a/services/dune/src/block_buffer.rs b/services/dune/src/block_buffer.rs index fca07476..b2870379 100644 --- a/services/dune/src/block_buffer.rs +++ b/services/dune/src/block_buffer.rs @@ -23,7 +23,11 @@ use serde::{ use crate::{ DuneError, DuneResult, - helpers::AvroParser, + helpers::{ + AvroFileWriter, + AvroParser, + AvroWriter, + }, schemas::{ AvroBlock, AvroReceipt, @@ -111,6 +115,97 @@ pub fn create_buffer(buffer_type: BufferType) -> DuneResult } } +// ============================================================================ +// Avro file writers for disk-based finalization +// ============================================================================ + +/// Manages Avro file writers for blocks, transactions, and receipts. +/// Writes directly to disk to avoid memory accumulation. +/// Used by DiskBuffer during finalization. +struct AvroFileWriters { + temp_dir: PathBuf, + blocks_writer: AvroFileWriter, + transactions_writer: AvroFileWriter, + receipts_writer: AvroFileWriter, +} + +impl AvroFileWriters { + /// Creates new Avro file writers in a temporary directory + fn new() -> DuneResult { + let temp_dir = std::env::temp_dir().join(format!( + "dune-avro-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + )); + Self::with_dir(&temp_dir) + } + + /// Creates new Avro file writers in the specified directory + fn with_dir(dir: impl AsRef) -> DuneResult { + let temp_dir = dir.as_ref().to_path_buf(); + fs::create_dir_all(&temp_dir)?; + + let parser = AvroParser::default(); + + let blocks_writer = parser + .file_writer_with_schema(temp_dir.join("blocks.avro")) + .map_err(|e| { + DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)) + })?; + let transactions_writer = parser + .file_writer_with_schema(temp_dir.join("transactions.avro")) + .map_err(|e| { + DuneError::Other(anyhow::anyhow!( + "Failed to create transactions writer: {}", + e + )) + })?; + let receipts_writer = parser + .file_writer_with_schema(temp_dir.join("receipts.avro")) + .map_err(|e| { + DuneError::Other(anyhow::anyhow!( + "Failed to create receipts writer: {}", + e + )) + })?; + + Ok(Self { + temp_dir, + blocks_writer, + transactions_writer, + receipts_writer, + }) + } + + /// Appends a buffered block's data to all writers + fn append(&mut self, buffered: &BufferedBlock) -> DuneResult<()> { + self.blocks_writer.append(&buffered.block)?; + for tx in &buffered.transactions { + self.transactions_writer.append(tx)?; + } + for receipt in &buffered.receipts { + self.receipts_writer.append(receipt)?; + } + Ok(()) + } + + /// Finalizes all writers and returns the Avro data. + /// Cleans up temporary files after reading. + fn finalize(self) -> DuneResult<(Vec, Vec, Vec)> { + let blocks_data = self.blocks_writer.finalize()?; + let transactions_data = self.transactions_writer.finalize()?; + let receipts_data = self.receipts_writer.finalize()?; + + // Clean up temp directory + let _ = fs::remove_dir_all(&self.temp_dir); + + Ok((blocks_data, transactions_data, receipts_data)) + } +} + // ============================================================================ // Memory-based buffer implementation // ============================================================================ @@ -183,24 +278,24 @@ impl MemoryBuffer { } } - /// Converts buffered blocks to Avro format + /// Converts buffered blocks to Avro format in memory. + /// This is faster than disk-based conversion but uses more RAM. fn to_avro(&self) -> DuneResult<(Vec, Vec, Vec)> { let parser = AvroParser::default(); - let mut blocks_writer = - parser.writer_with_schema::().map_err(|e| { + let mut blocks_writer: AvroWriter = + parser.writer_with_schema().map_err(|e| { DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)) })?; - let mut transactions_writer = parser - .writer_with_schema::() - .map_err(|e| { + let mut transactions_writer: AvroWriter = + parser.writer_with_schema().map_err(|e| { DuneError::Other(anyhow::anyhow!( "Failed to create transactions writer: {}", e )) })?; - let mut receipts_writer = - parser.writer_with_schema::().map_err(|e| { + let mut receipts_writer: AvroWriter = + parser.writer_with_schema().map_err(|e| { DuneError::Other(anyhow::anyhow!( "Failed to create receipts writer: {}", e @@ -331,30 +426,13 @@ impl DiskBuffer { }) } + /// Reads buffered JSON data and converts to Avro format by writing to disk files. + /// This avoids building up large Avro data structures in memory. fn read_and_convert_to_avro(&self) -> DuneResult<(Vec, Vec, Vec)> { let file = File::open(&self.data_file)?; let reader = BufReader::new(file); - let parser = AvroParser::default(); - let mut blocks_writer = - parser.writer_with_schema::().map_err(|e| { - DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)) - })?; - let mut transactions_writer = parser - .writer_with_schema::() - .map_err(|e| { - DuneError::Other(anyhow::anyhow!( - "Failed to create transactions writer: {}", - e - )) - })?; - let mut receipts_writer = - parser.writer_with_schema::().map_err(|e| { - DuneError::Other(anyhow::anyhow!( - "Failed to create receipts writer: {}", - e - )) - })?; + let mut writers = AvroFileWriters::new()?; for line in std::io::BufRead::lines(reader) { let line = line?; @@ -366,20 +444,10 @@ impl DiskBuffer { DuneError::Other(anyhow::anyhow!("Failed to deserialize block: {}", e)) })?; - blocks_writer.append(&buffered.block)?; - for tx in buffered.transactions { - transactions_writer.append(&tx)?; - } - for receipt in buffered.receipts { - receipts_writer.append(&receipt)?; - } + writers.append(&buffered)?; } - let blocks_data = blocks_writer.into_inner()?; - let transactions_data = transactions_writer.into_inner()?; - let receipts_data = receipts_writer.into_inner()?; - - Ok((blocks_data, transactions_data, receipts_data)) + writers.finalize() } } diff --git a/services/dune/src/helpers/avro.rs b/services/dune/src/helpers/avro.rs index 42060f7f..21950e4a 100644 --- a/services/dune/src/helpers/avro.rs +++ b/services/dune/src/helpers/avro.rs @@ -1,4 +1,16 @@ -use std::sync::Arc; +use std::{ + fs::File, + io::{ + BufWriter, + Read, + Write, + }, + path::{ + Path, + PathBuf, + }, + sync::Arc, +}; use apache_avro::{ AvroSchema, @@ -24,6 +36,8 @@ pub enum AvroParserError { Avro(Box), #[error("Schema not found {0}")] SchemaNotFound(String), + #[error("IO error: {0}")] + Io(String), } impl From for AvroParserError { @@ -64,6 +78,97 @@ where } } +/// An Avro writer that writes directly to a file on disk. +/// This reduces memory usage by not accumulating data in memory. +pub struct AvroFileWriter { + writer: Writer<'static, BufWriter>, + file_path: PathBuf, + _phantom: std::marker::PhantomData, +} + +impl AvroFileWriter +where + T: AvroSchema + AvroSchemaComponent + Serialize + Send + Sync + 'static, +{ + /// Creates a new file-based Avro writer at the specified path + pub fn new( + path: impl AsRef, + schema: Schema, + codec: Codec, + ) -> Result { + let file_path = path.as_ref().to_path_buf(); + let file = File::create(&file_path) + .map_err(|e| AvroParserError::Io(format!("Failed to create file: {}", e)))?; + let buf_writer = BufWriter::new(file); + + let schema_static: &'static Schema = Box::leak(Box::new(schema)); + let writer = Writer::builder() + .schema(schema_static) + .codec(codec) + .writer(buf_writer) + .build(); + + Ok(Self { + writer, + file_path, + _phantom: std::marker::PhantomData, + }) + } + + /// Appends a value to the file + pub fn append(&mut self, value: &T) -> Result<(), AvroParserError> { + self.writer.append_ser(value)?; + Ok(()) + } + + /// Flushes any buffered data to disk + pub fn flush(&mut self) -> Result<(), AvroParserError> { + self.writer + .flush() + .map_err(|e| AvroParserError::Io(format!("Failed to flush writer: {}", e)))?; + Ok(()) + } + + /// Returns the path to the file + pub fn file_path(&self) -> &Path { + &self.file_path + } + + /// Finalizes the file and returns the file contents as bytes. + /// This flushes all data, closes the writer, and reads the file. + pub fn finalize(self) -> Result, AvroParserError> { + let mut inner = self.writer.into_inner().map_err(|e| { + AvroParserError::Io(format!("Failed to finalize writer: {}", e)) + })?; + inner.flush().map_err(|e| { + AvroParserError::Io(format!("Failed to flush final data: {}", e)) + })?; + drop(inner); // Close the file + + // Read the file contents + let mut file = File::open(&self.file_path).map_err(|e| { + AvroParserError::Io(format!("Failed to open file for reading: {}", e)) + })?; + let mut contents = Vec::new(); + file.read_to_end(&mut contents) + .map_err(|e| AvroParserError::Io(format!("Failed to read file: {}", e)))?; + + Ok(contents) + } + + /// Finalizes the file and returns just the path (for cleanup later). + /// Use this when you want to handle file reading separately. + pub fn finalize_path(self) -> Result { + let mut inner = self.writer.into_inner().map_err(|e| { + AvroParserError::Io(format!("Failed to finalize writer: {}", e)) + })?; + inner.flush().map_err(|e| { + AvroParserError::Io(format!("Failed to flush final data: {}", e)) + })?; + Ok(self.file_path) + } +} + #[derive(Clone)] pub struct AvroParser { codec: Option, @@ -100,6 +205,19 @@ impl AvroParser { )) } + /// Creates a file-based Avro writer that writes directly to disk. + /// Use this to avoid accumulating large amounts of data in memory. + pub fn file_writer_with_schema< + T: AvroSchema + AvroSchemaComponent + Serialize + Send + Sync + 'static, + >( + &self, + path: impl AsRef, + ) -> Result, AvroParserError> { + let schema = + T::get_schema_in_ctxt(&mut Default::default(), &Namespace::default()); + AvroFileWriter::new(path, schema, self.codec.unwrap_or(Codec::Deflate)) + } + pub fn reader_with_schema< T: AvroSchema + AvroSchemaComponent + DeserializeOwned + Send + Sync + 'static, >( From f5056cefc1f0edf467ff3b6ff5415e688891c800 Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Wed, 4 Feb 2026 13:22:17 -0800 Subject: [PATCH 06/14] fix: handle buffer clears correctly --- services/dune/src/block_buffer.rs | 21 ++++++++++++++------- services/dune/src/service.rs | 17 ++++++++--------- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/services/dune/src/block_buffer.rs b/services/dune/src/block_buffer.rs index b2870379..4f7c6160 100644 --- a/services/dune/src/block_buffer.rs +++ b/services/dune/src/block_buffer.rs @@ -100,10 +100,14 @@ pub trait BlockBuffer: Send { fn append(&mut self, block: &Block, transactions: &[Transaction]) -> DuneResult<()>; /// Finalizes the buffer, converting all data to Avro format for upload. - /// This consumes the buffer. - fn finalize(self: Box) -> DuneResult; - - /// Resets the buffer for reuse, clearing all data + /// + /// This does NOT clear the buffer - call `reset()` after successful upload + /// to clear the data. This design allows retry on upload failure without + /// losing the buffered data. + fn finalize(&mut self) -> DuneResult; + + /// Resets the buffer for reuse, clearing all data. + /// Call this after successful upload to prepare for the next batch. fn reset(&mut self) -> DuneResult<()>; } @@ -353,7 +357,7 @@ impl BlockBuffer for MemoryBuffer { Ok(()) } - fn finalize(self: Box) -> DuneResult { + fn finalize(&mut self) -> DuneResult { let first_height = self.first_height.ok_or_else(|| { DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) })?; @@ -489,7 +493,7 @@ impl BlockBuffer for DiskBuffer { Ok(()) } - fn finalize(mut self: Box) -> DuneResult { + fn finalize(&mut self) -> DuneResult { let first_height = self.first_height.ok_or_else(|| { DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) })?; @@ -497,7 +501,10 @@ impl BlockBuffer for DiskBuffer { DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) })?; - // Flush and close the writer + // Flush the writer so we can read the file. + // We take() the writer to close the file handle, which is required + // to read the complete file on some systems. The writer will be + // recreated on reset(). if let Some(mut writer) = self.writer.take() { writer.flush()?; } diff --git a/services/dune/src/service.rs b/services/dune/src/service.rs index 9dc3657a..618715d0 100644 --- a/services/dune/src/service.rs +++ b/services/dune/src/service.rs @@ -72,8 +72,6 @@ pub struct Task { blocks_stream: BoxStream>, /// Block buffer (can be memory or disk-based) buffer: Box, - /// Buffer type for creating new buffers after finalization - buffer_type: BufferType, processor: Processor, base_asset_id: AssetId, batch_size: usize, @@ -152,7 +150,6 @@ impl RunnableService for UninitializedTask { height: shared.block_height, fetcher, buffer, - buffer_type: config.buffer_type, processor, base_asset_id, batch_size: config.batch_size, @@ -319,12 +316,10 @@ impl Task { return Ok(()) } - // Create a new buffer for the next round and swap with the current one - let old_buffer = - std::mem::replace(&mut self.buffer, create_buffer(self.buffer_type)?); - - // Finalize the buffer and get the data for upload - let finalized = old_buffer + // Finalize the buffer and get the data for upload. + // This does NOT clear the buffer - data is preserved for retry on failure. + let finalized = self + .buffer .finalize() .map_err(|err| anyhow::anyhow!("Failed to finalize buffer: {err}"))?; @@ -341,9 +336,13 @@ impl Task { ) })?; + // Only after successful upload do we update state and clear the buffer self.processor.save_latest_height(last_height).await?; self.height.send_replace(last_height); + // Clear the buffer now that upload succeeded + self.buffer.reset()?; + Ok(()) } } From a75c47dd550fbe929324250ff06daa77c3e8aa71 Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Wed, 4 Feb 2026 13:51:43 -0800 Subject: [PATCH 07/14] fix: clean up s3 uploads --- services/dune/src/block_buffer.rs | 218 ++++++++++++++++++++++++------ services/dune/src/helpers/avro.rs | 50 +------ services/dune/src/processor.rs | 30 ++++ services/dune/src/s3/client.rs | 205 ++++++++++++++++++++++++++++ services/dune/src/service.rs | 87 +++++++++++- 5 files changed, 494 insertions(+), 96 deletions(-) diff --git a/services/dune/src/block_buffer.rs b/services/dune/src/block_buffer.rs index 4f7c6160..41cf0551 100644 --- a/services/dune/src/block_buffer.rs +++ b/services/dune/src/block_buffer.rs @@ -40,7 +40,8 @@ use fuel_streams_domains::{ transactions::Transaction, }; -/// The result of finalizing a batch, containing Avro-encoded data ready for upload +/// The result of finalizing a batch, containing Avro-encoded data ready for upload. +/// WARNING: This holds all data in memory. For large batches, use FinalizedBatchFiles instead. pub struct FinalizedBatch { pub first_height: BlockHeight, pub last_height: BlockHeight, @@ -49,6 +50,59 @@ pub struct FinalizedBatch { pub receipts_data: Vec, } +/// The result of finalizing a batch to files, containing paths to Avro files. +/// This avoids loading all data into memory at once - files are streamed directly to S3. +pub struct FinalizedBatchFiles { + pub first_height: BlockHeight, + pub last_height: BlockHeight, + /// Path to the blocks Avro file + pub blocks_path: PathBuf, + /// Path to the transactions Avro file + pub transactions_path: PathBuf, + /// Path to the receipts Avro file + pub receipts_path: PathBuf, + /// Temporary directory containing the files (for cleanup) + temp_dir: PathBuf, +} + +impl FinalizedBatchFiles { + /// Cleans up all temporary files after upload + pub fn cleanup(&self) { + let _ = fs::remove_dir_all(&self.temp_dir); + } +} + +impl Drop for FinalizedBatchFiles { + fn drop(&mut self) { + self.cleanup(); + } +} + +/// Result of finalizing a buffer - either in-memory data or file paths. +/// This allows the service to handle both memory and disk buffers uniformly. +pub enum FinalizedData { + /// In-memory Avro data (from MemoryBuffer) + InMemory(FinalizedBatch), + /// File paths to Avro files (from DiskBuffer) + OnDisk(FinalizedBatchFiles), +} + +impl FinalizedData { + pub fn first_height(&self) -> BlockHeight { + match self { + FinalizedData::InMemory(batch) => batch.first_height, + FinalizedData::OnDisk(files) => files.first_height, + } + } + + pub fn last_height(&self) -> BlockHeight { + match self { + FinalizedData::InMemory(batch) => batch.last_height, + FinalizedData::OnDisk(files) => files.last_height, + } + } +} + /// Configuration for the buffer type #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum BufferType { @@ -101,10 +155,14 @@ pub trait BlockBuffer: Send { /// Finalizes the buffer, converting all data to Avro format for upload. /// + /// Returns either: + /// - `FinalizedData::InMemory` for MemoryBuffer (data in Vec) + /// - `FinalizedData::OnDisk` for DiskBuffer (paths to temporary files) + /// /// This does NOT clear the buffer - call `reset()` after successful upload /// to clear the data. This design allows retry on upload failure without /// losing the buffered data. - fn finalize(&mut self) -> DuneResult; + fn finalize(&mut self) -> DuneResult; /// Resets the buffer for reuse, clearing all data. /// Call this after successful upload to prepare for the next batch. @@ -123,14 +181,38 @@ pub fn create_buffer(buffer_type: BufferType) -> DuneResult // Avro file writers for disk-based finalization // ============================================================================ +/// Paths to finalized Avro files ready for upload. +/// Used internally by AvroFileWriters. +/// Note: Does NOT implement Drop - ownership of temp_dir is transferred to FinalizedBatchFiles. +struct FinalizedAvroFiles { + temp_dir: PathBuf, + blocks_path: PathBuf, + transactions_path: PathBuf, + receipts_path: PathBuf, +} + /// Manages Avro file writers for blocks, transactions, and receipts. /// Writes directly to disk to avoid memory accumulation. /// Used by DiskBuffer during finalization. +/// +/// Implements Drop to clean up temp directory on error. On success, +/// ownership is transferred via `finalize_to_paths()` and Drop becomes a no-op. struct AvroFileWriters { - temp_dir: PathBuf, - blocks_writer: AvroFileWriter, - transactions_writer: AvroFileWriter, - receipts_writer: AvroFileWriter, + /// Temp directory path. Set to None after successful finalize_to_paths() + /// to transfer ownership and prevent cleanup on drop. + temp_dir: Option, + blocks_writer: Option>, + transactions_writer: Option>, + receipts_writer: Option>, +} + +impl Drop for AvroFileWriters { + fn drop(&mut self) { + // Clean up temp directory if we still own it (i.e., finalize_to_paths wasn't called) + if let Some(ref temp_dir) = self.temp_dir { + let _ = fs::remove_dir_all(temp_dir); + } + } } impl AvroFileWriters { @@ -177,36 +259,68 @@ impl AvroFileWriters { })?; Ok(Self { - temp_dir, - blocks_writer, - transactions_writer, - receipts_writer, + temp_dir: Some(temp_dir), + blocks_writer: Some(blocks_writer), + transactions_writer: Some(transactions_writer), + receipts_writer: Some(receipts_writer), }) } /// Appends a buffered block's data to all writers fn append(&mut self, buffered: &BufferedBlock) -> DuneResult<()> { - self.blocks_writer.append(&buffered.block)?; + let blocks_writer = self.blocks_writer.as_mut().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("blocks_writer not available")) + })?; + let transactions_writer = self.transactions_writer.as_mut().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("transactions_writer not available")) + })?; + let receipts_writer = self.receipts_writer.as_mut().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("receipts_writer not available")) + })?; + + blocks_writer.append(&buffered.block)?; for tx in &buffered.transactions { - self.transactions_writer.append(tx)?; + transactions_writer.append(tx)?; } for receipt in &buffered.receipts { - self.receipts_writer.append(receipt)?; + receipts_writer.append(receipt)?; } Ok(()) } - /// Finalizes all writers and returns the Avro data. - /// Cleans up temporary files after reading. - fn finalize(self) -> DuneResult<(Vec, Vec, Vec)> { - let blocks_data = self.blocks_writer.finalize()?; - let transactions_data = self.transactions_writer.finalize()?; - let receipts_data = self.receipts_writer.finalize()?; + /// Finalizes all writers and returns paths to the Avro files. + /// Does NOT load files into memory - use this for large batches. + /// + /// On success, ownership of temp_dir is transferred to FinalizedAvroFiles, + /// and this struct's Drop will not clean up the directory. + /// On error, Drop will clean up the temp directory. + fn finalize_to_paths(&mut self) -> DuneResult { + let blocks_writer = self.blocks_writer.take().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("blocks_writer already taken")) + })?; + let transactions_writer = self.transactions_writer.take().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("transactions_writer already taken")) + })?; + let receipts_writer = self.receipts_writer.take().ok_or_else(|| { + DuneError::Other(anyhow::anyhow!("receipts_writer already taken")) + })?; - // Clean up temp directory - let _ = fs::remove_dir_all(&self.temp_dir); + let blocks_path = blocks_writer.finalize_path()?; + let transactions_path = transactions_writer.finalize_path()?; + let receipts_path = receipts_writer.finalize_path()?; - Ok((blocks_data, transactions_data, receipts_data)) + // Take ownership of temp_dir so Drop won't clean it up + let temp_dir = self + .temp_dir + .take() + .ok_or_else(|| DuneError::Other(anyhow::anyhow!("temp_dir already taken")))?; + + Ok(FinalizedAvroFiles { + temp_dir, + blocks_path, + transactions_path, + receipts_path, + }) } } @@ -357,7 +471,7 @@ impl BlockBuffer for MemoryBuffer { Ok(()) } - fn finalize(&mut self) -> DuneResult { + fn finalize(&mut self) -> DuneResult { let first_height = self.first_height.ok_or_else(|| { DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) })?; @@ -367,13 +481,13 @@ impl BlockBuffer for MemoryBuffer { let (blocks_data, transactions_data, receipts_data) = self.to_avro()?; - Ok(FinalizedBatch { + Ok(FinalizedData::InMemory(FinalizedBatch { first_height, last_height, blocks_data, transactions_data, receipts_data, - }) + })) } fn reset(&mut self) -> DuneResult<()> { @@ -430,9 +544,9 @@ impl DiskBuffer { }) } - /// Reads buffered JSON data and converts to Avro format by writing to disk files. - /// This avoids building up large Avro data structures in memory. - fn read_and_convert_to_avro(&self) -> DuneResult<(Vec, Vec, Vec)> { + /// Reads buffered JSON data and converts to Avro files on disk. + /// Returns paths to the Avro files without loading them into memory. + fn read_and_convert_to_avro_files(&self) -> DuneResult { let file = File::open(&self.data_file)?; let reader = BufReader::new(file); @@ -451,7 +565,7 @@ impl DiskBuffer { writers.append(&buffered)?; } - writers.finalize() + writers.finalize_to_paths() } } @@ -493,7 +607,7 @@ impl BlockBuffer for DiskBuffer { Ok(()) } - fn finalize(&mut self) -> DuneResult { + fn finalize(&mut self) -> DuneResult { let first_height = self.first_height.ok_or_else(|| { DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) })?; @@ -509,16 +623,17 @@ impl BlockBuffer for DiskBuffer { writer.flush()?; } - let (blocks_data, transactions_data, receipts_data) = - self.read_and_convert_to_avro()?; + // Convert to Avro files on disk (does not load into memory) + let avro_files = self.read_and_convert_to_avro_files()?; - Ok(FinalizedBatch { + Ok(FinalizedData::OnDisk(FinalizedBatchFiles { first_height, last_height, - blocks_data, - transactions_data, - receipts_data, - }) + blocks_path: avro_files.blocks_path, + transactions_path: avro_files.transactions_path, + receipts_path: avro_files.receipts_path, + temp_dir: avro_files.temp_dir, + })) } fn reset(&mut self) -> DuneResult<()> { @@ -575,10 +690,20 @@ mod tests { // Finalize and verify let finalized = buffer.finalize()?; - assert_eq!(*finalized.first_height, 1); - assert_eq!(*finalized.last_height, 10); - assert!(!finalized.blocks_data.is_empty()); - assert!(!finalized.transactions_data.is_empty()); + assert_eq!(*finalized.first_height(), 1); + assert_eq!(*finalized.last_height(), 10); + + // Verify data is present (different validation for in-memory vs on-disk) + match finalized { + FinalizedData::InMemory(batch) => { + assert!(!batch.blocks_data.is_empty()); + assert!(!batch.transactions_data.is_empty()); + } + FinalizedData::OnDisk(files) => { + assert!(files.blocks_path.exists()); + assert!(files.transactions_path.exists()); + } + } Ok(()) } @@ -622,7 +747,16 @@ mod tests { } let finalized = buffer.finalize()?; - assert!(!finalized.receipts_data.is_empty()); + + // Verify receipts data is present + match finalized { + FinalizedData::InMemory(batch) => { + assert!(!batch.receipts_data.is_empty()); + } + FinalizedData::OnDisk(files) => { + assert!(files.receipts_path.exists()); + } + } Ok(()) } diff --git a/services/dune/src/helpers/avro.rs b/services/dune/src/helpers/avro.rs index 21950e4a..2a4dde91 100644 --- a/services/dune/src/helpers/avro.rs +++ b/services/dune/src/helpers/avro.rs @@ -2,14 +2,12 @@ use std::{ fs::File, io::{ BufWriter, - Read, Write, }, path::{ Path, PathBuf, }, - sync::Arc, }; use apache_avro::{ @@ -121,43 +119,8 @@ where Ok(()) } - /// Flushes any buffered data to disk - pub fn flush(&mut self) -> Result<(), AvroParserError> { - self.writer - .flush() - .map_err(|e| AvroParserError::Io(format!("Failed to flush writer: {}", e)))?; - Ok(()) - } - - /// Returns the path to the file - pub fn file_path(&self) -> &Path { - &self.file_path - } - - /// Finalizes the file and returns the file contents as bytes. - /// This flushes all data, closes the writer, and reads the file. - pub fn finalize(self) -> Result, AvroParserError> { - let mut inner = self.writer.into_inner().map_err(|e| { - AvroParserError::Io(format!("Failed to finalize writer: {}", e)) - })?; - inner.flush().map_err(|e| { - AvroParserError::Io(format!("Failed to flush final data: {}", e)) - })?; - drop(inner); // Close the file - - // Read the file contents - let mut file = File::open(&self.file_path).map_err(|e| { - AvroParserError::Io(format!("Failed to open file for reading: {}", e)) - })?; - let mut contents = Vec::new(); - file.read_to_end(&mut contents) - .map_err(|e| AvroParserError::Io(format!("Failed to read file: {}", e)))?; - - Ok(contents) - } - - /// Finalizes the file and returns just the path (for cleanup later). - /// Use this when you want to handle file reading separately. + /// Finalizes the file and returns just the path. + /// The file is flushed and closed, ready for streaming to its destination. pub fn finalize_path(self) -> Result { let mut inner = self.writer.into_inner().map_err(|e| { AvroParserError::Io(format!("Failed to finalize writer: {}", e)) @@ -183,15 +146,6 @@ impl Default for AvroParser { } impl AvroParser { - pub fn arc(self) -> Arc { - Arc::new(self) - } - - pub fn with_codec(&mut self, codec: Codec) -> &mut Self { - self.codec = Some(codec); - self - } - pub fn writer_with_schema< T: AvroSchema + AvroSchemaComponent + Serialize + Send + Sync + 'static, >( diff --git a/services/dune/src/processor.rs b/services/dune/src/processor.rs index 1b107a3f..8bb9d9af 100644 --- a/services/dune/src/processor.rs +++ b/services/dune/src/processor.rs @@ -222,6 +222,36 @@ impl Processor { Ok(file_path) } + /// Process data from a file path, streaming directly to S3. + /// This avoids loading the entire file into memory - ideal for large batches. + pub async fn process_data_from_file( + &self, + start_height: BlockHeight, + end_height: BlockHeight, + file_path: impl AsRef, + table: S3TableName, + ) -> DuneResult { + let network = FuelNetwork::load_from_env(); + let key_builder = S3KeyBuilder::new(network).with_table(table); + let key = key_builder.build_key_from_heights(start_height, end_height); + + match &self.storage_type { + StorageType::File => { + // For file storage, read and write locally + let data = std::fs::read(file_path.as_ref())?; + let output_path = self.create_output(data, &key).await?; + tracing::info!("New file saved: {}", output_path); + Ok(output_path) + } + StorageType::S3(s3_storage) => { + // Stream directly from file to S3 + s3_storage.store_from_file(&key, file_path).await?; + tracing::info!("New file uploaded to S3: {}", key); + Ok(key) + } + } + } + fn spli_batches< T: serde::Serialize + serde::de::DeserializeOwned diff --git a/services/dune/src/s3/client.rs b/services/dune/src/s3/client.rs index 7d6231f2..f13e79cb 100644 --- a/services/dune/src/s3/client.rs +++ b/services/dune/src/s3/client.rs @@ -19,7 +19,9 @@ use aws_sdk_s3::{ config::retry::RetryConfig as S3RetryConfig, error::SdkError, operation::get_object::GetObjectError, + primitives::ByteStream, }; +use std::path::Path; #[derive(Debug, Clone)] pub struct S3Storage { @@ -409,6 +411,209 @@ impl S3Storage { self.retry_config = config; self } + + /// Stores a file by streaming directly from disk to S3. + /// This avoids loading the entire file into memory, making it suitable + /// for large files that would otherwise cause OOM errors. + pub async fn store_from_file( + &self, + key: &str, + file_path: impl AsRef, + ) -> Result<(), StorageError> { + let file_path = file_path.as_ref(); + let file_size = std::fs::metadata(file_path) + .map_err(|e| { + StorageError::StoreError(format!("Failed to get file metadata: {}", e)) + })? + .len() as usize; + + #[allow(clippy::identity_op)] + const LARGE_FILE_THRESHOLD: usize = 100 * 1024 * 1024; // 100MB + + if file_size >= LARGE_FILE_THRESHOLD { + tracing::debug!( + "Uploading file {} to S3 using multipart streaming (size: {} bytes)", + file_path.display(), + file_size + ); + self.upload_multipart_from_file(key, file_path, file_size) + .await + } else { + tracing::debug!( + "Uploading file {} to S3 using streaming put_object (size: {} bytes)", + file_path.display(), + file_size + ); + self.put_object_from_file(key, file_path).await + } + } + + async fn put_object_from_file( + &self, + key: &str, + file_path: &Path, + ) -> Result<(), StorageError> { + let body = ByteStream::from_path(file_path).await.map_err(|e| { + StorageError::StoreError(format!("Failed to create byte stream: {}", e)) + })?; + + let result = self + .client + .put_object() + .bucket(self.config.bucket()) + .key(key) + .body(body) + .send() + .await; + + match result { + Ok(_) => { + tracing::info!("Successfully stored object with key: {}", key); + Ok(()) + } + Err(err) => { + tracing::error!( + "Failed to store object. Error details: {:?}", + err.as_service_error() + ); + Err(StorageError::StoreError(err.to_string())) + } + } + } + + async fn upload_multipart_from_file( + &self, + key: &str, + file_path: &Path, + file_size: usize, + ) -> Result<(), StorageError> { + use std::io::{ + Read, + Seek, + SeekFrom, + }; + + const CHUNK_SIZE: usize = 100 * 1024 * 1024; // 100MB chunks + + // Create multipart upload + let create_multipart = self + .client + .create_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .send() + .await + .map_err(|e| { + StorageError::StoreError(format!( + "Failed to create multipart upload: {:?}", + e.as_service_error() + )) + })?; + + let upload_id = create_multipart.upload_id().ok_or_else(|| { + StorageError::StoreError("Failed to get upload ID".to_string()) + })?; + + let mut completed_parts = Vec::new(); + let total_chunks = file_size.div_ceil(CHUNK_SIZE); + + let mut file = std::fs::File::open(file_path).map_err(|e| { + StorageError::StoreError(format!("Failed to open file: {}", e)) + })?; + + let mut buffer = vec![0u8; CHUNK_SIZE]; + let mut part_number = 1i32; + let mut bytes_uploaded = 0usize; + + loop { + // Seek to the current position + file.seek(SeekFrom::Start(bytes_uploaded as u64)) + .map_err(|e| { + StorageError::StoreError(format!("Failed to seek file: {}", e)) + })?; + + // Read a chunk + let bytes_read = file.read(&mut buffer).map_err(|e| { + StorageError::StoreError(format!("Failed to read file: {}", e)) + })?; + + if bytes_read == 0 { + break; + } + + // Upload the chunk + match self + .client + .upload_part() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .body(buffer[..bytes_read].to_vec().into()) + .part_number(part_number) + .send() + .await + { + Ok(response) => { + if let Some(e_tag) = response.e_tag() { + completed_parts.push( + aws_sdk_s3::types::CompletedPart::builder() + .e_tag(e_tag) + .part_number(part_number) + .build(), + ); + } + } + Err(err) => { + // Abort the multipart upload if a part fails + let _ = self + .client + .abort_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .send() + .await; + + return Err(StorageError::StoreError(format!( + "Failed to upload part: {:?}", + err.as_service_error() + ))); + } + } + + tracing::debug!( + "Uploaded part {}/{} for key={}", + part_number, + total_chunks, + key + ); + + bytes_uploaded += bytes_read; + part_number += 1; + } + + // Complete multipart upload + self.client + .complete_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .multipart_upload( + aws_sdk_s3::types::CompletedMultipartUpload::builder() + .set_parts(Some(completed_parts)) + .build(), + ) + .send() + .await + .map_err(|e| { + StorageError::StoreError(format!( + "Failed to complete multipart upload: {:?}", + e.as_service_error() + )) + })?; + + Ok(()) + } } #[cfg(test)] diff --git a/services/dune/src/service.rs b/services/dune/src/service.rs index 618715d0..72d8bd51 100644 --- a/services/dune/src/service.rs +++ b/services/dune/src/service.rs @@ -4,6 +4,8 @@ use crate::{ BlockBuffer, BufferType, FinalizedBatch, + FinalizedBatchFiles, + FinalizedData, create_buffer, }, processor::{ @@ -324,11 +326,11 @@ impl Task { .map_err(|err| anyhow::anyhow!("Failed to finalize buffer: {err}"))?; // Convert from fuel_streams_types::BlockHeight to fuel_core_types::fuel_types::BlockHeight - let last_height_u32: u32 = *finalized.last_height; + let last_height_u32: u32 = *finalized.last_height(); let last_height: BlockHeight = last_height_u32.into(); - // Upload the data - process_finalized_batch(&self.processor, finalized) + // Upload the data (handles both in-memory and file-based data) + process_finalized_data(&self.processor, finalized) .await .map_err(|err| { anyhow::anyhow!( @@ -369,9 +371,24 @@ pub fn new_service(config: Config) -> anyhow::Result anyhow::Result<()> { + match data { + FinalizedData::InMemory(batch) => { + process_finalized_batch_in_memory(processor, batch).await + } + FinalizedData::OnDisk(files) => { + process_finalized_batch_from_files(processor, files).await + } + } +} + +/// Process in-memory batch data (from MemoryBuffer) +/// Uploads all three data types in parallel since they're already in memory +async fn process_finalized_batch_in_memory( processor: &Processor, batch: FinalizedBatch, ) -> anyhow::Result<()> { @@ -419,6 +436,64 @@ pub async fn process_finalized_batch( Ok(()) } +/// Process file-based batch data (from DiskBuffer) +/// Uploads sequentially to avoid loading multiple large files into memory at once. +/// Each file is streamed directly to S3 without loading into memory. +async fn process_finalized_batch_from_files( + processor: &Processor, + files: FinalizedBatchFiles, +) -> anyhow::Result<()> { + let first_height = files.first_height; + let last_height = files.last_height; + + // Upload sequentially to minimize memory usage + // Each upload streams from disk to S3 without loading into memory + tracing::info!( + "Uploading blocks from file: {}", + files.blocks_path.display() + ); + processor + .process_data_from_file( + first_height, + last_height, + &files.blocks_path, + S3TableName::Blocks, + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to upload blocks: {}", e))?; + + tracing::info!( + "Uploading transactions from file: {}", + files.transactions_path.display() + ); + processor + .process_data_from_file( + first_height, + last_height, + &files.transactions_path, + S3TableName::Transactions, + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to upload transactions: {}", e))?; + + tracing::info!( + "Uploading receipts from file: {}", + files.receipts_path.display() + ); + processor + .process_data_from_file( + first_height, + last_height, + &files.receipts_path, + S3TableName::Receipts, + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to upload receipts: {}", e))?; + + // FinalizedBatchFiles::drop() will clean up the temp directory + Ok(()) +} + /// Process a batch of blocks and transactions (legacy in-memory method) /// Kept for backwards compatibility with tests pub async fn process_batch( From 64e6de2afb4044686b7de8ae4817317113e4fa9c Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Wed, 4 Feb 2026 14:41:28 -0800 Subject: [PATCH 08/14] fix: memory leak --- services/dune/src/helpers/avro.rs | 68 ++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/services/dune/src/helpers/avro.rs b/services/dune/src/helpers/avro.rs index 2a4dde91..d7444294 100644 --- a/services/dune/src/helpers/avro.rs +++ b/services/dune/src/helpers/avro.rs @@ -1,4 +1,6 @@ use std::{ + any::TypeId, + collections::HashMap, fs::File, io::{ BufWriter, @@ -8,6 +10,7 @@ use std::{ Path, PathBuf, }, + sync::RwLock, }; use apache_avro::{ @@ -27,6 +30,42 @@ use serde::{ de::DeserializeOwned, }; +/// Global cache for Avro schemas, keyed by TypeId. +/// Schemas are immutable and type-specific, so we cache them to avoid +/// repeated allocations and memory leaks from Box::leak. +static SCHEMA_CACHE: RwLock>> = RwLock::new(None); + +/// Gets or creates a cached static schema for type T. +/// The schema is created once per type and cached globally. +fn get_cached_schema() -> &'static Schema { + let type_id = TypeId::of::(); + + // Fast path: try to read from cache + { + let cache = SCHEMA_CACHE.read().unwrap(); + if let Some(ref map) = *cache + && let Some(schema) = map.get(&type_id) + { + return schema; + } + } + + // Slow path: create schema and cache it + let mut cache = SCHEMA_CACHE.write().unwrap(); + let map = cache.get_or_insert_with(HashMap::new); + + // Double-check after acquiring write lock + if let Some(schema) = map.get(&type_id) { + return schema; + } + + // Create and cache the schema + let schema = T::get_schema_in_ctxt(&mut Default::default(), &Namespace::default()); + let schema_static: &'static Schema = Box::leak(Box::new(schema)); + map.insert(type_id, schema_static); + schema_static +} + /// Data parser error types. #[derive(Debug, thiserror::Error)] pub enum AvroParserError { @@ -45,7 +84,7 @@ impl From for AvroParserError { } pub struct AvroWriter { - writer: Writer<'static, Vec>, // We'll adjust this + writer: Writer<'static, Vec>, _phantom: std::marker::PhantomData, } @@ -53,10 +92,10 @@ impl AvroWriter where T: AvroSchema + AvroSchemaComponent + Serialize + Send + Sync + 'static, { - pub fn new(schema: Schema, codec: Codec) -> Self { - let schema_static: &'static Schema = Box::leak(Box::new(schema)); + pub fn new(codec: Codec) -> Self { + let schema = get_cached_schema::(); let writer = Writer::builder() - .schema(schema_static) + .schema(schema) .codec(codec) .writer(Vec::new()) .build(); @@ -89,19 +128,15 @@ where T: AvroSchema + AvroSchemaComponent + Serialize + Send + Sync + 'static, { /// Creates a new file-based Avro writer at the specified path - pub fn new( - path: impl AsRef, - schema: Schema, - codec: Codec, - ) -> Result { + pub fn new(path: impl AsRef, codec: Codec) -> Result { let file_path = path.as_ref().to_path_buf(); let file = File::create(&file_path) .map_err(|e| AvroParserError::Io(format!("Failed to create file: {}", e)))?; let buf_writer = BufWriter::new(file); - let schema_static: &'static Schema = Box::leak(Box::new(schema)); + let schema = get_cached_schema::(); let writer = Writer::builder() - .schema(schema_static) + .schema(schema) .codec(codec) .writer(buf_writer) .build(); @@ -151,12 +186,7 @@ impl AvroParser { >( &self, ) -> Result, AvroParserError> { - let schema = - T::get_schema_in_ctxt(&mut Default::default(), &Namespace::default()); - Ok(AvroWriter::new( - schema, - self.codec.unwrap_or(Codec::Deflate), - )) + Ok(AvroWriter::new(self.codec.unwrap_or(Codec::Deflate))) } /// Creates a file-based Avro writer that writes directly to disk. @@ -167,9 +197,7 @@ impl AvroParser { &self, path: impl AsRef, ) -> Result, AvroParserError> { - let schema = - T::get_schema_in_ctxt(&mut Default::default(), &Namespace::default()); - AvroFileWriter::new(path, schema, self.codec.unwrap_or(Codec::Deflate)) + AvroFileWriter::new(path, self.codec.unwrap_or(Codec::Deflate)) } pub fn reader_with_schema< From b1bd1950113e91bfb41e4287524235ad73d29b92 Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Thu, 5 Feb 2026 08:28:26 -0800 Subject: [PATCH 09/14] feat: no intermediate json --- services/dune/src/block_buffer.rs | 616 +++++++----------------------- services/dune/src/cli.rs | 15 +- services/dune/src/lib.rs | 5 +- services/dune/src/main.rs | 1 - services/dune/src/s3/client.rs | 50 ++- services/dune/src/service.rs | 101 +---- 6 files changed, 183 insertions(+), 605 deletions(-) diff --git a/services/dune/src/block_buffer.rs b/services/dune/src/block_buffer.rs index 41cf0551..16f9b9af 100644 --- a/services/dune/src/block_buffer.rs +++ b/services/dune/src/block_buffer.rs @@ -1,13 +1,5 @@ use std::{ - fs::{ - self, - File, - }, - io::{ - BufReader, - BufWriter, - Write, - }, + fs, path::{ Path, PathBuf, @@ -15,10 +7,6 @@ use std::{ }; use fuel_streams_types::BlockHeight; -use serde::{ - Deserialize, - Serialize, -}; use crate::{ DuneError, @@ -26,7 +14,6 @@ use crate::{ helpers::{ AvroFileWriter, AvroParser, - AvroWriter, }, schemas::{ AvroBlock, @@ -40,18 +27,8 @@ use fuel_streams_domains::{ transactions::Transaction, }; -/// The result of finalizing a batch, containing Avro-encoded data ready for upload. -/// WARNING: This holds all data in memory. For large batches, use FinalizedBatchFiles instead. -pub struct FinalizedBatch { - pub first_height: BlockHeight, - pub last_height: BlockHeight, - pub blocks_data: Vec, - pub transactions_data: Vec, - pub receipts_data: Vec, -} - /// The result of finalizing a batch to files, containing paths to Avro files. -/// This avoids loading all data into memory at once - files are streamed directly to S3. +/// Files are streamed directly to S3 without loading into memory. pub struct FinalizedBatchFiles { pub first_height: BlockHeight, pub last_height: BlockHeight, @@ -78,111 +55,11 @@ impl Drop for FinalizedBatchFiles { } } -/// Result of finalizing a buffer - either in-memory data or file paths. -/// This allows the service to handle both memory and disk buffers uniformly. -pub enum FinalizedData { - /// In-memory Avro data (from MemoryBuffer) - InMemory(FinalizedBatch), - /// File paths to Avro files (from DiskBuffer) - OnDisk(FinalizedBatchFiles), -} - -impl FinalizedData { - pub fn first_height(&self) -> BlockHeight { - match self { - FinalizedData::InMemory(batch) => batch.first_height, - FinalizedData::OnDisk(files) => files.first_height, - } - } - - pub fn last_height(&self) -> BlockHeight { - match self { - FinalizedData::InMemory(batch) => batch.last_height, - FinalizedData::OnDisk(files) => files.last_height, - } - } -} - -/// Configuration for the buffer type -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum BufferType { - /// Store blocks in memory (faster but uses more RAM) - Memory, - /// Store blocks on disk (slower but uses less RAM) - Disk, -} - -impl std::fmt::Display for BufferType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - BufferType::Memory => write!(f, "Memory"), - BufferType::Disk => write!(f, "Disk"), - } - } -} - -impl std::str::FromStr for BufferType { - type Err = anyhow::Error; - - fn from_str(s: &str) -> Result { - match s.to_lowercase().as_str() { - "memory" | "mem" => Ok(BufferType::Memory), - "disk" | "file" => Ok(BufferType::Disk), - _ => Err(anyhow::anyhow!("Unknown buffer type: {}", s)), - } - } -} - -/// Trait for block buffering implementations. -/// This allows swapping between memory and disk-based buffering. -pub trait BlockBuffer: Send { - /// Returns the number of blocks in the buffer - fn len(&self) -> usize; - - /// Returns true if the buffer is empty - fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Returns the first block height in the buffer - fn first_height(&self) -> Option; - - /// Returns the last block height in the buffer - fn last_height(&self) -> Option; - - /// Appends a block and its transactions to the buffer - fn append(&mut self, block: &Block, transactions: &[Transaction]) -> DuneResult<()>; - - /// Finalizes the buffer, converting all data to Avro format for upload. - /// - /// Returns either: - /// - `FinalizedData::InMemory` for MemoryBuffer (data in Vec) - /// - `FinalizedData::OnDisk` for DiskBuffer (paths to temporary files) - /// - /// This does NOT clear the buffer - call `reset()` after successful upload - /// to clear the data. This design allows retry on upload failure without - /// losing the buffered data. - fn finalize(&mut self) -> DuneResult; - - /// Resets the buffer for reuse, clearing all data. - /// Call this after successful upload to prepare for the next batch. - fn reset(&mut self) -> DuneResult<()>; -} - -/// Creates a new block buffer of the specified type -pub fn create_buffer(buffer_type: BufferType) -> DuneResult> { - match buffer_type { - BufferType::Memory => Ok(Box::new(MemoryBuffer::new())), - BufferType::Disk => Ok(Box::new(DiskBuffer::new()?)), - } -} - // ============================================================================ -// Avro file writers for disk-based finalization +// Avro file writers for disk-based buffering // ============================================================================ /// Paths to finalized Avro files ready for upload. -/// Used internally by AvroFileWriters. /// Note: Does NOT implement Drop - ownership of temp_dir is transferred to FinalizedBatchFiles. struct FinalizedAvroFiles { temp_dir: PathBuf, @@ -193,7 +70,6 @@ struct FinalizedAvroFiles { /// Manages Avro file writers for blocks, transactions, and receipts. /// Writes directly to disk to avoid memory accumulation. -/// Used by DiskBuffer during finalization. /// /// Implements Drop to clean up temp directory on error. On success, /// ownership is transferred via `finalize_to_paths()` and Drop becomes a no-op. @@ -229,11 +105,25 @@ impl AvroFileWriters { Self::with_dir(&temp_dir) } - /// Creates new Avro file writers in the specified directory + /// Creates new Avro file writers in the specified directory. + /// Cleans up the directory if any writer creation fails. fn with_dir(dir: impl AsRef) -> DuneResult { let temp_dir = dir.as_ref().to_path_buf(); fs::create_dir_all(&temp_dir)?; + // Use inner function to enable cleanup on any failure after dir creation + let result = Self::create_writers(&temp_dir); + + if result.is_err() { + // Clean up the directory we created + let _ = fs::remove_dir_all(&temp_dir); + } + + result + } + + /// Helper to create all writers. Called by with_dir(). + fn create_writers(temp_dir: &Path) -> DuneResult { let parser = AvroParser::default(); let blocks_writer = parser @@ -259,15 +149,15 @@ impl AvroFileWriters { })?; Ok(Self { - temp_dir: Some(temp_dir), + temp_dir: Some(temp_dir.to_path_buf()), blocks_writer: Some(blocks_writer), transactions_writer: Some(transactions_writer), receipts_writer: Some(receipts_writer), }) } - /// Appends a buffered block's data to all writers - fn append(&mut self, buffered: &BufferedBlock) -> DuneResult<()> { + /// Appends block data directly to the Avro writers + fn append(&mut self, block: &Block, transactions: &[Transaction]) -> DuneResult<()> { let blocks_writer = self.blocks_writer.as_mut().ok_or_else(|| { DuneError::Other(anyhow::anyhow!("blocks_writer not available")) })?; @@ -278,13 +168,38 @@ impl AvroFileWriters { DuneError::Other(anyhow::anyhow!("receipts_writer not available")) })?; - blocks_writer.append(&buffered.block)?; - for tx in &buffered.transactions { - transactions_writer.append(tx)?; + // Convert and write block + let avro_block = AvroBlock::new(block); + blocks_writer.append(&avro_block)?; + + // Convert and write transactions + for tx in transactions { + let avro_tx = AvroTransaction::new( + tx, + Some(block.height.into()), + Some(block.header.get_timestamp_utc().timestamp()), + Some(block.id.as_ref().to_vec().into()), + Some(block.version.to_string()), + Some(block.producer.as_ref().to_vec().into()), + ); + transactions_writer.append(&avro_tx)?; } - for receipt in &buffered.receipts { - receipts_writer.append(receipt)?; + + // Convert and write receipts + for tx in transactions { + let receipt_metadata = ReceiptMetadata { + block_time: Some(block.header.get_timestamp_utc().timestamp()), + block_height: Some(block.height.0 as i64), + block_version: Some(block.version.to_string()), + block_producer: Some(block.producer.clone().into()), + transaction_id: Some(tx.id.clone().into()), + }; + for receipt in &tx.receipts { + let avro_receipt = AvroReceipt::new(receipt, &receipt_metadata); + receipts_writer.append(&avro_receipt)?; + } } + Ok(()) } @@ -324,191 +239,14 @@ impl AvroFileWriters { } } -// ============================================================================ -// Memory-based buffer implementation -// ============================================================================ - -/// Intermediate representation of a block for buffering -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BufferedBlock { - pub block: AvroBlock, - pub transactions: Vec, - pub receipts: Vec, -} - -impl BufferedBlock { - /// Creates a new buffered block from domain types - pub fn from_domain(block: &Block, transactions: &[Transaction]) -> Self { - let avro_block = AvroBlock::new(block); - - let avro_transactions: Vec<_> = transactions - .iter() - .map(|tx| { - AvroTransaction::new( - tx, - Some(block.height.into()), - Some(block.header.get_timestamp_utc().timestamp()), - Some(block.id.as_ref().to_vec().into()), - Some(block.version.to_string()), - Some(block.producer.as_ref().to_vec().into()), - ) - }) - .collect(); - - let avro_receipts: Vec<_> = transactions - .iter() - .flat_map(|tx| { - let receipt_metadata = ReceiptMetadata { - block_time: Some(block.header.get_timestamp_utc().timestamp()), - block_height: Some(block.height.0 as i64), - block_version: Some(block.version.to_string()), - block_producer: Some(block.producer.clone().into()), - transaction_id: Some(tx.id.clone().into()), - }; - tx.receipts - .iter() - .map(move |receipt| AvroReceipt::new(receipt, &receipt_metadata)) - }) - .collect(); - - Self { - block: avro_block, - transactions: avro_transactions, - receipts: avro_receipts, - } - } -} - -/// Memory-based block buffer that stores all blocks in a Vec. -/// Faster than disk-based buffering but uses more memory. -pub struct MemoryBuffer { - blocks: Vec, - first_height: Option, - last_height: Option, -} - -impl MemoryBuffer { - pub fn new() -> Self { - Self { - blocks: Vec::new(), - first_height: None, - last_height: None, - } - } - - /// Converts buffered blocks to Avro format in memory. - /// This is faster than disk-based conversion but uses more RAM. - fn to_avro(&self) -> DuneResult<(Vec, Vec, Vec)> { - let parser = AvroParser::default(); - - let mut blocks_writer: AvroWriter = - parser.writer_with_schema().map_err(|e| { - DuneError::Other(anyhow::anyhow!("Failed to create blocks writer: {}", e)) - })?; - let mut transactions_writer: AvroWriter = - parser.writer_with_schema().map_err(|e| { - DuneError::Other(anyhow::anyhow!( - "Failed to create transactions writer: {}", - e - )) - })?; - let mut receipts_writer: AvroWriter = - parser.writer_with_schema().map_err(|e| { - DuneError::Other(anyhow::anyhow!( - "Failed to create receipts writer: {}", - e - )) - })?; - - for buffered in &self.blocks { - blocks_writer.append(&buffered.block)?; - for tx in &buffered.transactions { - transactions_writer.append(tx)?; - } - for receipt in &buffered.receipts { - receipts_writer.append(receipt)?; - } - } - - let blocks_data = blocks_writer.into_inner()?; - let transactions_data = transactions_writer.into_inner()?; - let receipts_data = receipts_writer.into_inner()?; - - Ok((blocks_data, transactions_data, receipts_data)) - } -} - -impl Default for MemoryBuffer { - fn default() -> Self { - Self::new() - } -} - -impl BlockBuffer for MemoryBuffer { - fn len(&self) -> usize { - self.blocks.len() - } - - fn first_height(&self) -> Option { - self.first_height - } - - fn last_height(&self) -> Option { - self.last_height - } - - fn append(&mut self, block: &Block, transactions: &[Transaction]) -> DuneResult<()> { - let height = block.height; - - if self.first_height.is_none() { - self.first_height = Some(height); - } - self.last_height = Some(height); - - let buffered = BufferedBlock::from_domain(block, transactions); - self.blocks.push(buffered); - - Ok(()) - } - - fn finalize(&mut self) -> DuneResult { - let first_height = self.first_height.ok_or_else(|| { - DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) - })?; - let last_height = self.last_height.ok_or_else(|| { - DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) - })?; - - let (blocks_data, transactions_data, receipts_data) = self.to_avro()?; - - Ok(FinalizedData::InMemory(FinalizedBatch { - first_height, - last_height, - blocks_data, - transactions_data, - receipts_data, - })) - } - - fn reset(&mut self) -> DuneResult<()> { - self.blocks.clear(); - self.first_height = None; - self.last_height = None; - Ok(()) - } -} - // ============================================================================ // Disk-based buffer implementation // ============================================================================ -/// Disk-based block buffer that writes blocks to temporary files. -/// Uses less memory than the memory-based buffer but is slower. -/// Blocks are stored in JSON Lines format and converted to Avro on finalize. +/// Disk-based block buffer that writes blocks directly to Avro files. +/// Uses minimal memory by streaming data directly to disk. pub struct DiskBuffer { - temp_dir: PathBuf, - data_file: PathBuf, - writer: Option>, + writers: Option, first_height: Option, last_height: Option, block_count: usize, @@ -516,73 +254,53 @@ pub struct DiskBuffer { impl DiskBuffer { pub fn new() -> DuneResult { - let temp_dir = std::env::temp_dir().join(format!( - "dune-buffer-{}-{}", - std::process::id(), - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() - )); - Self::with_dir(&temp_dir) + let writers = AvroFileWriters::new()?; + Ok(Self { + writers: Some(writers), + first_height: None, + last_height: None, + block_count: 0, + }) } + #[cfg(test)] pub fn with_dir(dir: impl AsRef) -> DuneResult { - let temp_dir = dir.as_ref().to_path_buf(); - fs::create_dir_all(&temp_dir)?; - - let data_file = temp_dir.join("blocks.jsonl"); - let writer = BufWriter::new(File::create(&data_file)?); - + let writers = AvroFileWriters::with_dir(dir)?; Ok(Self { - temp_dir, - data_file, - writer: Some(writer), + writers: Some(writers), first_height: None, last_height: None, block_count: 0, }) } - /// Reads buffered JSON data and converts to Avro files on disk. - /// Returns paths to the Avro files without loading them into memory. - fn read_and_convert_to_avro_files(&self) -> DuneResult { - let file = File::open(&self.data_file)?; - let reader = BufReader::new(file); - - let mut writers = AvroFileWriters::new()?; - - for line in std::io::BufRead::lines(reader) { - let line = line?; - if line.is_empty() { - continue; - } - - let buffered: BufferedBlock = serde_json::from_str(&line).map_err(|e| { - DuneError::Other(anyhow::anyhow!("Failed to deserialize block: {}", e)) - })?; - - writers.append(&buffered)?; - } - - writers.finalize_to_paths() + /// Returns the number of blocks in the buffer + pub fn len(&self) -> usize { + self.block_count } -} -impl BlockBuffer for DiskBuffer { - fn len(&self) -> usize { - self.block_count + /// Returns true if the buffer is empty + pub fn is_empty(&self) -> bool { + self.block_count == 0 } - fn first_height(&self) -> Option { + /// Returns the first block height in the buffer + pub fn first_height(&self) -> Option { self.first_height } - fn last_height(&self) -> Option { + /// Returns the last block height in the buffer + pub fn last_height(&self) -> Option { self.last_height } - fn append(&mut self, block: &Block, transactions: &[Transaction]) -> DuneResult<()> { + /// Appends a block and its transactions to the buffer. + /// Data is written directly to Avro files on disk. + pub fn append( + &mut self, + block: &Block, + transactions: &[Transaction], + ) -> DuneResult<()> { let height = block.height; if self.first_height.is_none() { @@ -590,24 +308,23 @@ impl BlockBuffer for DiskBuffer { } self.last_height = Some(height); - let buffered = BufferedBlock::from_domain(block, transactions); - - let writer = self - .writer + let writers = self + .writers .as_mut() - .ok_or_else(|| DuneError::Other(anyhow::anyhow!("Writer not available")))?; - - serde_json::to_writer(&mut *writer, &buffered).map_err(|e| { - DuneError::Other(anyhow::anyhow!("Failed to serialize block: {}", e)) - })?; - writer.write_all(b"\n")?; + .ok_or_else(|| DuneError::Other(anyhow::anyhow!("Writers not available")))?; + writers.append(block, transactions)?; self.block_count += 1; Ok(()) } - fn finalize(&mut self) -> DuneResult { + /// Finalizes the buffer, returning paths to the Avro files for upload. + /// + /// This does NOT clear the buffer - call `reset()` after successful upload + /// to clear the data. This design allows retry on upload failure without + /// losing the buffered data. + pub fn finalize(&mut self) -> DuneResult { let first_height = self.first_height.ok_or_else(|| { DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) })?; @@ -615,36 +332,31 @@ impl BlockBuffer for DiskBuffer { DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) })?; - // Flush the writer so we can read the file. - // We take() the writer to close the file handle, which is required - // to read the complete file on some systems. The writer will be - // recreated on reset(). - if let Some(mut writer) = self.writer.take() { - writer.flush()?; - } + let writers = self + .writers + .as_mut() + .ok_or_else(|| DuneError::Other(anyhow::anyhow!("Writers not available")))?; - // Convert to Avro files on disk (does not load into memory) - let avro_files = self.read_and_convert_to_avro_files()?; + let avro_files = writers.finalize_to_paths()?; - Ok(FinalizedData::OnDisk(FinalizedBatchFiles { + Ok(FinalizedBatchFiles { first_height, last_height, blocks_path: avro_files.blocks_path, transactions_path: avro_files.transactions_path, receipts_path: avro_files.receipts_path, temp_dir: avro_files.temp_dir, - })) + }) } - fn reset(&mut self) -> DuneResult<()> { - // Close current writer - let _ = self.writer.take(); - - // Remove old file - let _ = fs::remove_file(&self.data_file); + /// Resets the buffer for reuse, clearing all data. + /// Call this after successful upload to prepare for the next batch. + pub fn reset(&mut self) -> DuneResult<()> { + // Drop old writers (this cleans up the temp directory) + let _ = self.writers.take(); - // Create new writer - self.writer = Some(BufWriter::new(File::create(&self.data_file)?)); + // Create new writers + self.writers = Some(AvroFileWriters::new()?); self.first_height = None; self.last_height = None; @@ -654,13 +366,6 @@ impl BlockBuffer for DiskBuffer { } } -impl Drop for DiskBuffer { - fn drop(&mut self) { - let _ = self.writer.take(); - let _ = fs::remove_dir_all(&self.temp_dir); - } -} - #[cfg(test)] mod tests { use super::*; @@ -672,7 +377,11 @@ mod tests { use pretty_assertions::assert_eq; use tempfile::tempdir; - fn test_buffer_basic_operations(mut buffer: Box) -> DuneResult<()> { + #[test] + fn test_disk_buffer_basic_operations() -> DuneResult<()> { + let dir = tempdir().unwrap(); + let mut buffer = DiskBuffer::with_dir(dir.path())?; + assert!(buffer.is_empty()); assert_eq!(buffer.len(), 0); @@ -690,25 +399,22 @@ mod tests { // Finalize and verify let finalized = buffer.finalize()?; - assert_eq!(*finalized.first_height(), 1); - assert_eq!(*finalized.last_height(), 10); - - // Verify data is present (different validation for in-memory vs on-disk) - match finalized { - FinalizedData::InMemory(batch) => { - assert!(!batch.blocks_data.is_empty()); - assert!(!batch.transactions_data.is_empty()); - } - FinalizedData::OnDisk(files) => { - assert!(files.blocks_path.exists()); - assert!(files.transactions_path.exists()); - } - } + assert_eq!(*finalized.first_height, 1); + assert_eq!(*finalized.last_height, 10); + + // Verify files exist + assert!(finalized.blocks_path.exists()); + assert!(finalized.transactions_path.exists()); + assert!(finalized.receipts_path.exists()); Ok(()) } - fn test_buffer_reset(mut buffer: Box) -> DuneResult<()> { + #[test] + fn test_disk_buffer_reset() -> DuneResult<()> { + let dir = tempdir().unwrap(); + let mut buffer = DiskBuffer::with_dir(dir.path())?; + // Add blocks for i in 1..=5 { let mut block = MockBlock::random(); @@ -738,7 +444,11 @@ mod tests { Ok(()) } - fn test_buffer_with_receipts(mut buffer: Box) -> DuneResult<()> { + #[test] + fn test_disk_buffer_with_receipts() -> DuneResult<()> { + let dir = tempdir().unwrap(); + let mut buffer = DiskBuffer::with_dir(dir.path())?; + for i in 1..=3 { let mut block = MockBlock::random(); block.height = BlockHeight::from(i); @@ -748,78 +458,18 @@ mod tests { let finalized = buffer.finalize()?; - // Verify receipts data is present - match finalized { - FinalizedData::InMemory(batch) => { - assert!(!batch.receipts_data.is_empty()); - } - FinalizedData::OnDisk(files) => { - assert!(files.receipts_path.exists()); - } - } + // Verify receipts file exists and has content + assert!(finalized.receipts_path.exists()); + let receipts_size = std::fs::metadata(&finalized.receipts_path)?.len(); + assert!(receipts_size > 0, "Receipts file should not be empty"); Ok(()) } - // Memory buffer tests #[test] - fn test_memory_buffer_basic() -> DuneResult<()> { - test_buffer_basic_operations(Box::new(MemoryBuffer::new())) - } - - #[test] - fn test_memory_buffer_reset() -> DuneResult<()> { - test_buffer_reset(Box::new(MemoryBuffer::new())) - } - - #[test] - fn test_memory_buffer_with_receipts() -> DuneResult<()> { - test_buffer_with_receipts(Box::new(MemoryBuffer::new())) - } - - // Disk buffer tests - #[test] - fn test_disk_buffer_basic() -> DuneResult<()> { - let dir = tempdir().unwrap(); - test_buffer_basic_operations(Box::new(DiskBuffer::with_dir(dir.path())?)) - } - - #[test] - fn test_disk_buffer_reset() -> DuneResult<()> { - let dir = tempdir().unwrap(); - test_buffer_reset(Box::new(DiskBuffer::with_dir(dir.path())?)) - } - - #[test] - fn test_disk_buffer_with_receipts() -> DuneResult<()> { - let dir = tempdir().unwrap(); - test_buffer_with_receipts(Box::new(DiskBuffer::with_dir(dir.path())?)) - } - - // Factory function tests - #[test] - fn test_create_memory_buffer() -> DuneResult<()> { - let buffer = create_buffer(BufferType::Memory)?; + fn test_disk_buffer_new() -> DuneResult<()> { + let buffer = DiskBuffer::new()?; assert!(buffer.is_empty()); Ok(()) } - - #[test] - fn test_create_disk_buffer() -> DuneResult<()> { - let buffer = create_buffer(BufferType::Disk)?; - assert!(buffer.is_empty()); - Ok(()) - } - - #[test] - fn test_buffer_type_from_str() { - use std::str::FromStr; - assert_eq!(BufferType::from_str("memory").unwrap(), BufferType::Memory); - assert_eq!(BufferType::from_str("Memory").unwrap(), BufferType::Memory); - assert_eq!(BufferType::from_str("mem").unwrap(), BufferType::Memory); - assert_eq!(BufferType::from_str("disk").unwrap(), BufferType::Disk); - assert_eq!(BufferType::from_str("Disk").unwrap(), BufferType::Disk); - assert_eq!(BufferType::from_str("file").unwrap(), BufferType::Disk); - assert!(BufferType::from_str("unknown").is_err()); - } } diff --git a/services/dune/src/cli.rs b/services/dune/src/cli.rs index a0861f54..1a65fb50 100644 --- a/services/dune/src/cli.rs +++ b/services/dune/src/cli.rs @@ -1,7 +1,4 @@ -use crate::{ - block_buffer::BufferType, - processor::StorageTypeConfig, -}; +use crate::processor::StorageTypeConfig; use clap::Parser; use url::Url; @@ -22,16 +19,6 @@ pub struct Cli { )] pub storage_type: StorageTypeConfig, - #[arg( - long, - value_name = "BUFFER_TYPE", - env = "BUFFER_TYPE", - default_value = "disk", - help = "Type of buffer to use for accumulating blocks. Options are 'memory' or 'disk'. \ - Memory is faster but uses more RAM. Disk uses temporary files to reduce memory usage." - )] - pub buffer_type: BufferType, - #[arg(long, env, default_value = "3600")] pub batch_size: usize, diff --git a/services/dune/src/lib.rs b/services/dune/src/lib.rs index 5466e02e..33b481ec 100644 --- a/services/dune/src/lib.rs +++ b/services/dune/src/lib.rs @@ -13,6 +13,9 @@ pub mod s3; pub mod schemas; pub mod service; -pub use block_buffer::*; +pub use block_buffer::{ + DiskBuffer, + FinalizedBatchFiles, +}; pub use cli::*; pub use error::*; diff --git a/services/dune/src/main.rs b/services/dune/src/main.rs index 5f925dfb..e8cb2b22 100644 --- a/services/dune/src/main.rs +++ b/services/dune/src/main.rs @@ -28,7 +28,6 @@ async fn main() -> Result<()> { url: cli.url, starting_height: cli.starting_block.into(), storage_type: cli.storage_type, - buffer_type: cli.buffer_type, batch_size: cli.batch_size, blocks_request_batch_size: cli.blocks_request_batch_size, blocks_request_concurrency: cli.blocks_request_concurrency, diff --git a/services/dune/src/s3/client.rs b/services/dune/src/s3/client.rs index f13e79cb..52204b19 100644 --- a/services/dune/src/s3/client.rs +++ b/services/dune/src/s3/client.rs @@ -420,32 +420,42 @@ impl S3Storage { key: &str, file_path: impl AsRef, ) -> Result<(), StorageError> { - let file_path = file_path.as_ref(); - let file_size = std::fs::metadata(file_path) + let file_path = file_path.as_ref().to_path_buf(); + let file_size = std::fs::metadata(&file_path) .map_err(|e| { StorageError::StoreError(format!("Failed to get file metadata: {}", e)) })? .len() as usize; - #[allow(clippy::identity_op)] - const LARGE_FILE_THRESHOLD: usize = 100 * 1024 * 1024; // 100MB + with_retry(&self.retry_config, "store_from_file", || { + let file_path = file_path.clone(); + async move { + #[allow(clippy::identity_op)] + const LARGE_FILE_THRESHOLD: usize = 100 * 1024 * 1024; // 100MB - if file_size >= LARGE_FILE_THRESHOLD { - tracing::debug!( - "Uploading file {} to S3 using multipart streaming (size: {} bytes)", - file_path.display(), - file_size - ); - self.upload_multipart_from_file(key, file_path, file_size) - .await - } else { - tracing::debug!( - "Uploading file {} to S3 using streaming put_object (size: {} bytes)", - file_path.display(), - file_size - ); - self.put_object_from_file(key, file_path).await - } + let result = if file_size >= LARGE_FILE_THRESHOLD { + tracing::debug!( + "Uploading file {} to S3 using multipart streaming (size: {} bytes)", + file_path.display(), + file_size + ); + self.upload_multipart_from_file(key, &file_path, file_size) + .await + } else { + tracing::debug!( + "Uploading file {} to S3 using streaming put_object (size: {} bytes)", + file_path.display(), + file_size + ); + self.put_object_from_file(key, &file_path).await + }; + if let Err(ref e) = result { + tracing::error!("Storage error: {:?}", e); + } + result + } + }) + .await } async fn put_object_from_file( diff --git a/services/dune/src/service.rs b/services/dune/src/service.rs index 72d8bd51..ba6a28a0 100644 --- a/services/dune/src/service.rs +++ b/services/dune/src/service.rs @@ -1,12 +1,8 @@ use crate::{ DuneError, block_buffer::{ - BlockBuffer, - BufferType, - FinalizedBatch, + DiskBuffer, FinalizedBatchFiles, - FinalizedData, - create_buffer, }, processor::{ Processor, @@ -55,7 +51,6 @@ pub struct Config { pub url: url::Url, pub starting_height: BlockHeight, pub storage_type: StorageTypeConfig, - pub buffer_type: BufferType, pub batch_size: usize, pub blocks_request_batch_size: usize, pub blocks_request_concurrency: usize, @@ -72,8 +67,8 @@ pub struct Task { height: watch::Sender, fetcher: GraphqlFetcher, blocks_stream: BoxStream>, - /// Block buffer (can be memory or disk-based) - buffer: Box, + /// Disk-based block buffer that writes directly to Avro files + buffer: DiskBuffer, processor: Processor, base_asset_id: AssetId, batch_size: usize, @@ -143,9 +138,9 @@ impl RunnableService for UninitializedTask { .unwrap_or(config.starting_height); shared.block_height.send_replace(current_height); - // Create block buffer based on configuration - let buffer = create_buffer(config.buffer_type)?; - tracing::info!("Using {} buffer for block accumulation", config.buffer_type); + // Create disk buffer for block accumulation + let buffer = DiskBuffer::new()?; + tracing::info!("Using disk buffer for block accumulation"); let mut task = Task { blocks_stream: futures::stream::pending().into_boxed(), @@ -307,7 +302,7 @@ impl Task { }) .collect(); - // Add to buffer (memory or disk based on configuration) + // Add to disk buffer (writes directly to Avro files) self.buffer.append(&block, &transactions)?; Ok(()) @@ -318,7 +313,7 @@ impl Task { return Ok(()) } - // Finalize the buffer and get the data for upload. + // Finalize the buffer and get the file paths for upload. // This does NOT clear the buffer - data is preserved for retry on failure. let finalized = self .buffer @@ -326,11 +321,11 @@ impl Task { .map_err(|err| anyhow::anyhow!("Failed to finalize buffer: {err}"))?; // Convert from fuel_streams_types::BlockHeight to fuel_core_types::fuel_types::BlockHeight - let last_height_u32: u32 = *finalized.last_height(); + let last_height_u32: u32 = *finalized.last_height; let last_height: BlockHeight = last_height_u32.into(); - // Upload the data (handles both in-memory and file-based data) - process_finalized_data(&self.processor, finalized) + // Upload the Avro files to storage + process_finalized_batch(&self.processor, finalized) .await .map_err(|err| { anyhow::anyhow!( @@ -371,75 +366,10 @@ pub fn new_service(config: Config) -> anyhow::Result anyhow::Result<()> { - match data { - FinalizedData::InMemory(batch) => { - process_finalized_batch_in_memory(processor, batch).await - } - FinalizedData::OnDisk(files) => { - process_finalized_batch_from_files(processor, files).await - } - } -} - -/// Process in-memory batch data (from MemoryBuffer) -/// Uploads all three data types in parallel since they're already in memory -async fn process_finalized_batch_in_memory( - processor: &Processor, - batch: FinalizedBatch, -) -> anyhow::Result<()> { - let first_height = batch.first_height; - let last_height = batch.last_height; - - let blocks_task = async { - processor - .process_data( - first_height, - last_height, - batch.blocks_data, - S3TableName::Blocks, - ) - .await?; - Ok::<_, DuneError>(()) - }; - - let tx_task = async { - processor - .process_data( - first_height, - last_height, - batch.transactions_data, - S3TableName::Transactions, - ) - .await?; - Ok::<_, DuneError>(()) - }; - - let receipts_task = async { - processor - .process_data( - first_height, - last_height, - batch.receipts_data, - S3TableName::Receipts, - ) - .await?; - Ok::<_, DuneError>(()) - }; - - tokio::try_join!(blocks_task, tx_task, receipts_task)?; - - Ok(()) -} - -/// Process file-based batch data (from DiskBuffer) -/// Uploads sequentially to avoid loading multiple large files into memory at once. -/// Each file is streamed directly to S3 without loading into memory. -async fn process_finalized_batch_from_files( +/// Process finalized batch files by uploading to storage. +/// Uploads sequentially to minimize memory usage - each file is streamed +/// directly to S3 without loading into memory. +pub async fn process_finalized_batch( processor: &Processor, files: FinalizedBatchFiles, ) -> anyhow::Result<()> { @@ -549,7 +479,6 @@ mod tests { url: Url::parse(format!("http://{}", node.bound_address).as_str()).unwrap(), starting_height: 0u32.into(), storage_type: StorageTypeConfig::S3, - buffer_type: super::BufferType::Memory, batch_size: 1, blocks_request_batch_size: 10, blocks_request_concurrency: 100, From a4194d5376983959938cf4da3dc41eb5a8f21d98 Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Thu, 5 Feb 2026 08:32:09 -0800 Subject: [PATCH 10/14] fix: clean up lib import --- services/dune/src/lib.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/services/dune/src/lib.rs b/services/dune/src/lib.rs index 33b481ec..5466e02e 100644 --- a/services/dune/src/lib.rs +++ b/services/dune/src/lib.rs @@ -13,9 +13,6 @@ pub mod s3; pub mod schemas; pub mod service; -pub use block_buffer::{ - DiskBuffer, - FinalizedBatchFiles, -}; +pub use block_buffer::*; pub use cli::*; pub use error::*; From d02b8177c6de9402c4b7a62c5637e9243ee70740 Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Thu, 5 Feb 2026 08:44:14 -0800 Subject: [PATCH 11/14] feat: add ps to docker image --- cluster/docker/sv-dune.Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster/docker/sv-dune.Dockerfile b/cluster/docker/sv-dune.Dockerfile index 01f9b4e1..14437c4a 100644 --- a/cluster/docker/sv-dune.Dockerfile +++ b/cluster/docker/sv-dune.Dockerfile @@ -56,7 +56,7 @@ FROM debian:bookworm-slim AS run WORKDIR /usr/src RUN apt-get update -y \ - && apt-get install -y --no-install-recommends ca-certificates curl jq less unzip zip \ + && apt-get install -y --no-install-recommends ca-certificates curl jq less unzip zip procps \ # Clean up && apt-get autoremove -y \ && apt-get clean -y \ From 15c10e3a880d4ac5f8e9cfe56b64405f16b8a51f Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Thu, 5 Feb 2026 09:55:17 -0800 Subject: [PATCH 12/14] fix: memory leak --- services/dune/src/block_buffer.rs | 11 +++- services/dune/src/s3/client.rs | 91 ++++++++++++++++++++----------- services/dune/src/service.rs | 65 ++++++++++++++++------ 3 files changed, 115 insertions(+), 52 deletions(-) diff --git a/services/dune/src/block_buffer.rs b/services/dune/src/block_buffer.rs index 16f9b9af..9beeaea1 100644 --- a/services/dune/src/block_buffer.rs +++ b/services/dune/src/block_buffer.rs @@ -321,9 +321,14 @@ impl DiskBuffer { /// Finalizes the buffer, returning paths to the Avro files for upload. /// - /// This does NOT clear the buffer - call `reset()` after successful upload - /// to clear the data. This design allows retry on upload failure without - /// losing the buffered data. + /// WARNING: This method consumes the internal writers and transfers ownership + /// of the temp directory to `FinalizedBatchFiles`. Once called: + /// - The writers are no longer available (subsequent calls will error) + /// - When `FinalizedBatchFiles` is dropped, the temp files are deleted + /// + /// This means retry is NOT possible after calling finalize(). If upload fails, + /// call `reset()` to create new writers and re-buffer the data from scratch, + /// or let the service reconnect the stream to resume from the last saved height. pub fn finalize(&mut self) -> DuneResult { let first_height = self.first_height.ok_or_else(|| { DuneError::Other(anyhow::anyhow!("Cannot finalize empty buffer")) diff --git a/services/dune/src/s3/client.rs b/services/dune/src/s3/client.rs index 52204b19..5b967d4c 100644 --- a/services/dune/src/s3/client.rs +++ b/services/dune/src/s3/client.rs @@ -241,7 +241,7 @@ impl S3Storage { for (i, chunk) in chunks.enumerate() { let part_number = (i + 1) as i32; - match self + let response = match self .client .upload_part() .bucket(self.config.bucket()) @@ -252,38 +252,49 @@ impl S3Storage { .send() .await { - Ok(response) => { - if let Some(e_tag) = response.e_tag() { - completed_parts.push( - aws_sdk_s3::types::CompletedPart::builder() - .e_tag(e_tag) - .part_number(part_number) - .build(), - ); - } - } + Ok(response) => response, Err(err) => { // Abort the multipart upload if a part fails - self.client + let _ = self + .client .abort_multipart_upload() .bucket(self.config.bucket()) .key(key) .upload_id(upload_id) .send() - .await - .map_err(|e| { - StorageError::StoreError(format!( - "Failed to abort multipart upload: {:?}", - e.as_service_error() - )) - })?; + .await; return Err(StorageError::StoreError(format!( "Failed to upload part: {:?}", err.as_service_error() ))); } - } + }; + + // ETag is required to complete multipart upload. If missing, abort to prevent + // silent data corruption from incomplete uploads. + let Some(e_tag) = response.e_tag() else { + let _ = self + .client + .abort_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .send() + .await; + + return Err(StorageError::StoreError(format!( + "Upload part {} succeeded but returned no ETag", + part_number + ))); + }; + + completed_parts.push( + aws_sdk_s3::types::CompletedPart::builder() + .e_tag(e_tag) + .part_number(part_number) + .build(), + ); tracing::debug!( "Uploaded part {}/{} for key={}", @@ -552,7 +563,7 @@ impl S3Storage { } // Upload the chunk - match self + let response = match self .client .upload_part() .bucket(self.config.bucket()) @@ -563,16 +574,7 @@ impl S3Storage { .send() .await { - Ok(response) => { - if let Some(e_tag) = response.e_tag() { - completed_parts.push( - aws_sdk_s3::types::CompletedPart::builder() - .e_tag(e_tag) - .part_number(part_number) - .build(), - ); - } - } + Ok(response) => response, Err(err) => { // Abort the multipart upload if a part fails let _ = self @@ -589,7 +591,32 @@ impl S3Storage { err.as_service_error() ))); } - } + }; + + // ETag is required to complete multipart upload. If missing, abort to prevent + // silent data corruption from incomplete uploads. + let Some(e_tag) = response.e_tag() else { + let _ = self + .client + .abort_multipart_upload() + .bucket(self.config.bucket()) + .key(key) + .upload_id(upload_id) + .send() + .await; + + return Err(StorageError::StoreError(format!( + "Upload part {} succeeded but returned no ETag", + part_number + ))); + }; + + completed_parts.push( + aws_sdk_s3::types::CompletedPart::builder() + .e_tag(e_tag) + .part_number(part_number) + .build(), + ); tracing::debug!( "Uploaded part {}/{} for key={}", diff --git a/services/dune/src/service.rs b/services/dune/src/service.rs index ba6a28a0..d1a8e1a9 100644 --- a/services/dune/src/service.rs +++ b/services/dune/src/service.rs @@ -59,13 +59,16 @@ pub struct Config { pub struct UninitializedTask { config: Config, - fetcher: GraphqlFetcher, + fetcher_factory: Arc GraphqlFetcher + Send + Sync>, shared: SharedState, } pub struct Task { height: watch::Sender, - fetcher: GraphqlFetcher, + /// Factory function to create new GraphqlFetcher instances on reconnection. + /// We recreate the fetcher on each reconnection to avoid memory leaks from + /// accumulated background tasks and channels in the external library. + fetcher_factory: Arc GraphqlFetcher + Send + Sync>, blocks_stream: BoxStream>, /// Disk-based block buffer that writes directly to Avro files buffer: DiskBuffer, @@ -119,7 +122,7 @@ impl RunnableService for UninitializedTask { ) -> anyhow::Result { let Self { config, - fetcher, + fetcher_factory, shared, } = self; @@ -145,7 +148,7 @@ impl RunnableService for UninitializedTask { let mut task = Task { blocks_stream: futures::stream::pending().into_boxed(), height: shared.block_height, - fetcher, + fetcher_factory, buffer, processor, base_asset_id, @@ -261,8 +264,16 @@ impl Task { let next_height = height.succ().ok_or_else(|| { anyhow::anyhow!("Block height overflowed when connecting block stream") })?; - let stream = self - .fetcher + + // Create a fresh GraphqlFetcher for each connection. + // This is critical to avoid memory leaks: the external library spawns + // background tasks and creates large-capacity channels on each + // blocks_stream_starting_from() call. By creating a new fetcher, + // we ensure the old tasks/channels are properly abandoned. + let fetcher = (self.fetcher_factory)(); + tracing::debug!("Created new GraphqlFetcher for stream connection"); + + let stream = fetcher .blocks_stream_starting_from(next_height) .await? .map(|result| { @@ -314,7 +325,9 @@ impl Task { } // Finalize the buffer and get the file paths for upload. - // This does NOT clear the buffer - data is preserved for retry on failure. + // Note: finalize() consumes the writers and FinalizedBatchFiles owns the temp files. + // If upload fails, the error propagates up and the service will reconnect, + // resuming from the last successfully saved height. let finalized = self .buffer .finalize() @@ -345,19 +358,37 @@ impl Task { } pub fn new_service(config: Config) -> anyhow::Result> { - let graphql_config = GraphqlEventAdapterConfig { - client: Arc::new(FuelClient::new(&config.url)?), - heartbeat_capacity: NonZeroUsize::new(100_000).expect("Is not zero; qed"), - event_capacity: NonZeroUsize::new(100_000).expect("Is not zero; qed"), - blocks_request_batch_size: config.blocks_request_batch_size, - blocks_request_concurrency: config.blocks_request_concurrency, - pending_blocks_limit: config.pending_blocks, - }; - let fetcher = create_graphql_event_adapter(graphql_config); + // Create a shared client that will be reused across all fetcher instances + let client = Arc::new(FuelClient::new(&config.url)?); + + // Capture config values for the factory closure + let blocks_request_batch_size = config.blocks_request_batch_size; + let blocks_request_concurrency = config.blocks_request_concurrency; + let pending_blocks_limit = config.pending_blocks; + + // Create a factory that produces fresh GraphqlFetcher instances. + // Each fetcher is created with reduced channel capacities to limit memory usage. + // The external library spawns background tasks on each stream creation, + // so we need fresh fetchers on reconnection to avoid task/memory accumulation. + let fetcher_factory: Arc GraphqlFetcher + Send + Sync> = + Arc::new(move || { + let graphql_config = GraphqlEventAdapterConfig { + client: client.clone(), + // The external library creates broadcast channels with this capacity + // that persist until background tasks terminate. + heartbeat_capacity: NonZeroUsize::new(100_000).expect("Is not zero; qed"), + event_capacity: NonZeroUsize::new(100_000).expect("Is not zero; qed"), + blocks_request_batch_size, + blocks_request_concurrency, + pending_blocks_limit, + }; + create_graphql_event_adapter(graphql_config) + }); + let (height, _) = watch::channel(config.starting_height); let task = UninitializedTask { config, - fetcher, + fetcher_factory, shared: SharedState { block_height: height, }, From 452d9eb565aff6ee44da71616c3f37261ac4bb9f Mon Sep 17 00:00:00 2001 From: Mike Christopher Date: Thu, 5 Feb 2026 10:59:38 -0800 Subject: [PATCH 13/14] feat: testnet dune memory changes --- services/dune/src/block_buffer.rs | 13 ++++++++++++- services/dune/src/helpers/avro.rs | 15 ++++++++++++++- services/dune/src/service.rs | 21 +++++++++++++-------- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/services/dune/src/block_buffer.rs b/services/dune/src/block_buffer.rs index 9beeaea1..c392516e 100644 --- a/services/dune/src/block_buffer.rs +++ b/services/dune/src/block_buffer.rs @@ -156,7 +156,11 @@ impl AvroFileWriters { }) } - /// Appends block data directly to the Avro writers + /// Appends block data directly to the Avro writers. + /// + /// After writing all data for a block, the writers are flushed to disk + /// to prevent memory accumulation. Without flushing, the Avro Writer + /// buffers all data in memory until finalize_to_paths() is called. fn append(&mut self, block: &Block, transactions: &[Transaction]) -> DuneResult<()> { let blocks_writer = self.blocks_writer.as_mut().ok_or_else(|| { DuneError::Other(anyhow::anyhow!("blocks_writer not available")) @@ -200,6 +204,13 @@ impl AvroFileWriters { } } + // Flush all writers to disk after each block to prevent memory accumulation. + // The Avro Writer buffers data internally for performance, but without + // periodic flushing this buffer grows unboundedly until finalize. + blocks_writer.flush()?; + transactions_writer.flush()?; + receipts_writer.flush()?; + Ok(()) } diff --git a/services/dune/src/helpers/avro.rs b/services/dune/src/helpers/avro.rs index d7444294..f275af95 100644 --- a/services/dune/src/helpers/avro.rs +++ b/services/dune/src/helpers/avro.rs @@ -148,12 +148,25 @@ where }) } - /// Appends a value to the file + /// Appends a value to the file. + /// + /// Note: Data is buffered internally by the Avro Writer. Call `flush()` + /// periodically to write buffered data to disk and prevent memory accumulation. pub fn append(&mut self, value: &T) -> Result<(), AvroParserError> { self.writer.append_ser(value)?; Ok(()) } + /// Flushes buffered data to disk. + /// + /// The Avro Writer buffers data internally for performance. Without + /// periodic flushing, all data accumulates in memory until finalize_path(). + /// Call this after processing each block to bound memory usage. + pub fn flush(&mut self) -> Result<(), AvroParserError> { + self.writer.flush()?; + Ok(()) + } + /// Finalizes the file and returns just the path. /// The file is flushed and closed, ready for streaming to its destination. pub fn finalize_path(self) -> Result { diff --git a/services/dune/src/service.rs b/services/dune/src/service.rs index d1a8e1a9..74f1ef15 100644 --- a/services/dune/src/service.rs +++ b/services/dune/src/service.rs @@ -326,8 +326,6 @@ impl Task { // Finalize the buffer and get the file paths for upload. // Note: finalize() consumes the writers and FinalizedBatchFiles owns the temp files. - // If upload fails, the error propagates up and the service will reconnect, - // resuming from the last successfully saved height. let finalized = self .buffer .finalize() @@ -337,6 +335,16 @@ impl Task { let last_height_u32: u32 = *finalized.last_height; let last_height: BlockHeight = last_height_u32.into(); + // IMPORTANT: Reset buffer immediately after finalize() succeeds. + // finalize() consumes the internal writers, leaving the buffer in an inconsistent + // state where block_count > 0 but writers are None. If we don't reset here and + // the upload fails, the next run() iteration would see len() == batch_size and + // try to call post_blocks() again, which would fail on finalize() with + // "blocks_writer already taken" - creating an infinite error loop. + // By resetting here, the buffer is always in a consistent state. If upload fails, + // the service reconnects and re-buffers blocks from the last saved height. + self.buffer.reset()?; + // Upload the Avro files to storage process_finalized_batch(&self.processor, finalized) .await @@ -346,13 +354,10 @@ impl Task { ) })?; - // Only after successful upload do we update state and clear the buffer + // Only after successful upload do we update the persisted height self.processor.save_latest_height(last_height).await?; self.height.send_replace(last_height); - // Clear the buffer now that upload succeeded - self.buffer.reset()?; - Ok(()) } } @@ -376,8 +381,8 @@ pub fn new_service(config: Config) -> anyhow::Result Date: Thu, 5 Feb 2026 14:23:46 -0800 Subject: [PATCH 14/14] feat: add Drop-based allocation counters to diagnose memory leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add instrumentation to track object lifetimes via AtomicI64 counters that only decrement from Drop impls, proving actual deallocation. Tracked types: GraphqlFetcher, BlockStream, AvroFileWriters, AvroFileWriter, FinalizedBatchFiles, plus tokio alive task count. Counters log every 100 run() iterations — any counter trending upward over time confirms a leak of that object type. --- services/dune/src/alloc_counter.rs | 47 +++++++++++++++++ services/dune/src/block_buffer.rs | 5 ++ services/dune/src/helpers/avro.rs | 63 +++++++++++----------- services/dune/src/lib.rs | 2 + services/dune/src/service.rs | 35 ++++++++++--- services/dune/src/tracked.rs | 84 ++++++++++++++++++++++++++++++ 6 files changed, 199 insertions(+), 37 deletions(-) create mode 100644 services/dune/src/alloc_counter.rs create mode 100644 services/dune/src/tracked.rs diff --git a/services/dune/src/alloc_counter.rs b/services/dune/src/alloc_counter.rs new file mode 100644 index 00000000..8bde82b7 --- /dev/null +++ b/services/dune/src/alloc_counter.rs @@ -0,0 +1,47 @@ +//! Diagnostic allocation counters for tracking object lifetimes. +//! +//! Each counter is incremented on object creation and decremented **only** via +//! `Drop` impls, so a counter that trends upward over time is proof of a leak — +//! the object was created but never deallocated. +//! +//! Call [`log_all`] periodically to emit all live counts. + +use std::sync::atomic::{AtomicI64, Ordering}; + +macro_rules! define_counters { + ($($name:ident),+ $(,)?) => { + $(pub static $name: AtomicI64 = AtomicI64::new(0);)+ + + /// Log all counters at INFO level, including tokio task count. + pub fn log_all() { + let tokio_tasks = tokio::runtime::Handle::current() + .metrics() + .num_alive_tasks(); + tracing::info!( + concat!("alloc_counters: ", $(stringify!($name), "={} ",)+ "TOKIO_TASKS={}"), + $($name.load(Ordering::Relaxed),)+ + tokio_tasks, + ); + } + }; +} + +define_counters! { + GRAPHQL_FETCHER, + BLOCK_STREAM, + AVRO_FILE_WRITERS, + AVRO_FILE_WRITER, + FINALIZED_BATCH_FILES, +} + +/// Increment a counter (call from constructors). +#[inline] +pub fn inc(counter: &AtomicI64) { + counter.fetch_add(1, Ordering::Relaxed); +} + +/// Decrement a counter (call **only** from `Drop` impls). +#[inline] +pub fn dec(counter: &AtomicI64) { + counter.fetch_sub(1, Ordering::Relaxed); +} diff --git a/services/dune/src/block_buffer.rs b/services/dune/src/block_buffer.rs index c392516e..fef7320b 100644 --- a/services/dune/src/block_buffer.rs +++ b/services/dune/src/block_buffer.rs @@ -11,6 +11,7 @@ use fuel_streams_types::BlockHeight; use crate::{ DuneError, DuneResult, + alloc_counter, helpers::{ AvroFileWriter, AvroParser, @@ -52,6 +53,7 @@ impl FinalizedBatchFiles { impl Drop for FinalizedBatchFiles { fn drop(&mut self) { self.cleanup(); + alloc_counter::dec(&alloc_counter::FINALIZED_BATCH_FILES); } } @@ -88,6 +90,7 @@ impl Drop for AvroFileWriters { if let Some(ref temp_dir) = self.temp_dir { let _ = fs::remove_dir_all(temp_dir); } + alloc_counter::dec(&alloc_counter::AVRO_FILE_WRITERS); } } @@ -148,6 +151,7 @@ impl AvroFileWriters { )) })?; + alloc_counter::inc(&alloc_counter::AVRO_FILE_WRITERS); Ok(Self { temp_dir: Some(temp_dir.to_path_buf()), blocks_writer: Some(blocks_writer), @@ -355,6 +359,7 @@ impl DiskBuffer { let avro_files = writers.finalize_to_paths()?; + alloc_counter::inc(&alloc_counter::FINALIZED_BATCH_FILES); Ok(FinalizedBatchFiles { first_height, last_height, diff --git a/services/dune/src/helpers/avro.rs b/services/dune/src/helpers/avro.rs index f275af95..a4511d83 100644 --- a/services/dune/src/helpers/avro.rs +++ b/services/dune/src/helpers/avro.rs @@ -2,33 +2,18 @@ use std::{ any::TypeId, collections::HashMap, fs::File, - io::{ - BufWriter, - Write, - }, - path::{ - Path, - PathBuf, - }, + io::{BufWriter, Write}, + path::{Path, PathBuf}, sync::RwLock, }; +use crate::alloc_counter; + use apache_avro::{ - AvroSchema, - Codec, - Reader, - Schema, - Writer, - from_value, - schema::{ - Namespace, - derive::AvroSchemaComponent, - }, -}; -use serde::{ - Serialize, - de::DeserializeOwned, + AvroSchema, Codec, Reader, Schema, Writer, from_value, + schema::{Namespace, derive::AvroSchemaComponent}, }; +use serde::{Serialize, de::DeserializeOwned}; /// Global cache for Avro schemas, keyed by TypeId. /// Schemas are immutable and type-specific, so we cache them to avoid @@ -118,7 +103,7 @@ where /// An Avro writer that writes directly to a file on disk. /// This reduces memory usage by not accumulating data in memory. pub struct AvroFileWriter { - writer: Writer<'static, BufWriter>, + writer: Option>>, file_path: PathBuf, _phantom: std::marker::PhantomData, } @@ -141,8 +126,9 @@ where .writer(buf_writer) .build(); + alloc_counter::inc(&alloc_counter::AVRO_FILE_WRITER); Ok(Self { - writer, + writer: Some(writer), file_path, _phantom: std::marker::PhantomData, }) @@ -153,7 +139,10 @@ where /// Note: Data is buffered internally by the Avro Writer. Call `flush()` /// periodically to write buffered data to disk and prevent memory accumulation. pub fn append(&mut self, value: &T) -> Result<(), AvroParserError> { - self.writer.append_ser(value)?; + self.writer + .as_mut() + .ok_or_else(|| AvroParserError::Io("Writer already finalized".into()))? + .append_ser(value)?; Ok(()) } @@ -163,20 +152,36 @@ where /// periodic flushing, all data accumulates in memory until finalize_path(). /// Call this after processing each block to bound memory usage. pub fn flush(&mut self) -> Result<(), AvroParserError> { - self.writer.flush()?; + self.writer + .as_mut() + .ok_or_else(|| AvroParserError::Io("Writer already finalized".into()))? + .flush()?; Ok(()) } /// Finalizes the file and returns just the path. /// The file is flushed and closed, ready for streaming to its destination. - pub fn finalize_path(self) -> Result { - let mut inner = self.writer.into_inner().map_err(|e| { + /// The inner Writer is taken via `.take()`, consumed by `into_inner()`, + /// and deallocated. `self` then drops with `writer: None`, firing `Drop` + /// which decrements the counter. + pub fn finalize_path(mut self) -> Result { + let writer = self + .writer + .take() + .ok_or_else(|| AvroParserError::Io("Writer already finalized".into()))?; + let mut inner = writer.into_inner().map_err(|e| { AvroParserError::Io(format!("Failed to finalize writer: {}", e)) })?; inner.flush().map_err(|e| { AvroParserError::Io(format!("Failed to flush final data: {}", e)) })?; - Ok(self.file_path) + Ok(self.file_path.clone()) + } +} + +impl Drop for AvroFileWriter { + fn drop(&mut self) { + alloc_counter::dec(&alloc_counter::AVRO_FILE_WRITER); } } diff --git a/services/dune/src/lib.rs b/services/dune/src/lib.rs index 5466e02e..eca0da2d 100644 --- a/services/dune/src/lib.rs +++ b/services/dune/src/lib.rs @@ -4,6 +4,7 @@ // Used in the `main.rs` use fuel_web_utils as _; +pub mod alloc_counter; pub mod block_buffer; mod cli; mod error; @@ -12,6 +13,7 @@ pub mod processor; pub mod s3; pub mod schemas; pub mod service; +pub mod tracked; pub use block_buffer::*; pub use cli::*; diff --git a/services/dune/src/service.rs b/services/dune/src/service.rs index 74f1ef15..9c92cecb 100644 --- a/services/dune/src/service.rs +++ b/services/dune/src/service.rs @@ -1,5 +1,6 @@ use crate::{ DuneError, + alloc_counter, block_buffer::{ DiskBuffer, FinalizedBatchFiles, @@ -9,6 +10,10 @@ use crate::{ StorageTypeConfig, }, s3::S3TableName, + tracked::{ + TrackedFetcher, + TrackedStream, + }, }; use fuel_core_client::client::FuelClient; use fuel_core_services::{ @@ -17,10 +22,7 @@ use fuel_core_services::{ ServiceRunner, StateWatcher, TaskNextAction, - stream::{ - BoxStream, - IntoBoxStream, - }, + stream::IntoBoxStream, }; use fuel_core_types::{ fuel_tx::AssetId, @@ -69,12 +71,14 @@ pub struct Task { /// We recreate the fetcher on each reconnection to avoid memory leaks from /// accumulated background tasks and channels in the external library. fetcher_factory: Arc GraphqlFetcher + Send + Sync>, - blocks_stream: BoxStream>, + blocks_stream: TrackedStream, /// Disk-based block buffer that writes directly to Avro files buffer: DiskBuffer, processor: Processor, base_asset_id: AssetId, batch_size: usize, + /// Counter for periodic alloc_counter logging + run_iterations: u64, } #[derive(Clone)] @@ -146,13 +150,14 @@ impl RunnableService for UninitializedTask { tracing::info!("Using disk buffer for block accumulation"); let mut task = Task { - blocks_stream: futures::stream::pending().into_boxed(), + blocks_stream: TrackedStream::new(futures::stream::pending().into_boxed()), height: shared.block_height, fetcher_factory, buffer, processor, base_asset_id, batch_size: config.batch_size, + run_iterations: 0, }; task.connect_block_stream().await?; @@ -163,6 +168,12 @@ impl RunnableService for UninitializedTask { impl RunnableTask for Task { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { + // Log allocation counters periodically (every 100 iterations) + self.run_iterations += 1; + if self.run_iterations % 100 == 0 { + alloc_counter::log_all(); + } + match self.buffer.len().cmp(&self.batch_size) { Ordering::Less => {} Ordering::Equal => { @@ -270,7 +281,11 @@ impl Task { // background tasks and creates large-capacity channels on each // blocks_stream_starting_from() call. By creating a new fetcher, // we ensure the old tasks/channels are properly abandoned. - let fetcher = (self.fetcher_factory)(); + // + // Wrapped in TrackedFetcher to observe via alloc counters whether + // Drop actually fires at the end of this scope. If the counter + // doesn't decrement, something is holding the fetcher alive. + let fetcher = TrackedFetcher::new((self.fetcher_factory)()); tracing::debug!("Created new GraphqlFetcher for stream connection"); let stream = fetcher @@ -285,7 +300,11 @@ impl Task { }) }) .into_boxed(); - self.blocks_stream = stream; + + self.blocks_stream = TrackedStream::new(stream); + // `fetcher` drops here — TrackedFetcher::drop decrements the counter. + // If GRAPHQL_FETCHER trends upward, something inside the stream + // is keeping the fetcher (or its spawned tasks) alive. Ok(()) } diff --git a/services/dune/src/tracked.rs b/services/dune/src/tracked.rs new file mode 100644 index 00000000..df8abe14 --- /dev/null +++ b/services/dune/src/tracked.rs @@ -0,0 +1,84 @@ +//! Newtype wrappers that provide `Drop`-based allocation tracking +//! for types we don't own. + +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use fuel_core_services::stream::BoxStream; +use fuel_indexer_types::events::BlockEvent; +use fuel_receipts_manager::adapters::graphql_event_adapter::GraphqlFetcher; +use futures::Stream; + +use crate::alloc_counter; + +// --------------------------------------------------------------------------- +// TrackedStream +// --------------------------------------------------------------------------- + +/// A `BoxStream` wrapper whose `Drop` proves the stream was actually +/// deallocated, not just replaced. +pub struct TrackedStream { + inner: BoxStream>, +} + +impl TrackedStream { + pub fn new(inner: BoxStream>) -> Self { + alloc_counter::inc(&alloc_counter::BLOCK_STREAM); + Self { inner } + } +} + +impl Stream for TrackedStream { + type Item = anyhow::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +impl Drop for TrackedStream { + fn drop(&mut self) { + alloc_counter::dec(&alloc_counter::BLOCK_STREAM); + } +} + +// --------------------------------------------------------------------------- +// TrackedFetcher +// --------------------------------------------------------------------------- + +/// A `GraphqlFetcher` wrapper whose `Drop` proves the fetcher (and any +/// resources it holds) was actually deallocated. +pub struct TrackedFetcher { + inner: GraphqlFetcher, +} + +impl TrackedFetcher { + pub fn new(inner: GraphqlFetcher) -> Self { + alloc_counter::inc(&alloc_counter::GRAPHQL_FETCHER); + Self { inner } + } + + /// Delegate to the inner fetcher. + pub fn inner(&self) -> &GraphqlFetcher { + &self.inner + } +} + +impl std::ops::Deref for TrackedFetcher { + type Target = GraphqlFetcher; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl Drop for TrackedFetcher { + fn drop(&mut self) { + alloc_counter::dec(&alloc_counter::GRAPHQL_FETCHER); + } +}