From 57d595b0ada1edbce5f170de68f00978af5d367a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Wieiw=C3=B3rka?= Date: Sat, 13 Sep 2025 15:39:45 +0200 Subject: [PATCH 1/6] Fix streaming output --- .../physical_planner/joins/interval_join.rs | 242 +++++++++++++++--- 1 file changed, 202 insertions(+), 40 deletions(-) diff --git a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs index 6438949..f60e961 100644 --- a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs +++ b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs @@ -6,7 +6,7 @@ use crate::physical_planner::joins::utils::{ use crate::session_context::Algorithm; use ahash::RandomState; use bio::data_structures::interval_tree as rust_bio; -use datafusion::arrow::array::{Array, AsArray, PrimitiveArray, PrimitiveBuilder, RecordBatch}; +use datafusion::arrow::array::{Array, AsArray, PrimitiveArray, RecordBatch}; use datafusion::arrow::compute; use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef, UInt32Type}; use datafusion::common::hash_utils::create_hashes; @@ -535,6 +535,16 @@ impl ExecutionPlan for IntervalJoinExec { state: IntervalJoinStreamState::WaitBuildSide, build_side: None, hashes_buffer: vec![], + // Initialize memory pool optimization buffers + reusable_match_buffer: Vec::with_capacity(256), + reusable_rle_buffer: Vec::with_capacity(1024), + reusable_index_buffer: Vec::with_capacity(2048), + // Default to 100K rows per output batch to prevent memory explosion + // This can be configured via environment variable SEQUILA_MAX_OUTPUT_BATCH_SIZE + max_output_batch_size: std::env::var("SEQUILA_MAX_OUTPUT_BATCH_SIZE") + .unwrap_or_else(|_| "100000".to_string()) + .parse() + .unwrap_or(100_000), })) } @@ -1045,11 +1055,28 @@ struct IntervalJoinStream { state: IntervalJoinStreamState, build_side: Option>, hashes_buffer: Vec, + + // === DataFusion Interval Join Optimizations === + // Memory pool optimization: Persistent reusable buffers to eliminate allocation overhead + /// Buffer for storing interval match positions, reused across batches + reusable_match_buffer: Vec, + /// Buffer for storing right-side run-length encoding data + reusable_rle_buffer: Vec, + /// Buffer for building right-side index arrays + reusable_index_buffer: Vec, + /// Maximum output batch size for streaming (prevents memory explosion) + max_output_batch_size: usize, } struct ProcessProbeBatchState { /// Current probe-side batch batch: RecordBatch, + /// Current probe row index being processed + probe_row_idx: usize, + /// Accumulated matches for streaming output + accumulated_left_matches: Vec, + /// Accumulated right indices for streaming output + accumulated_right_indices: Vec, } enum IntervalJoinStreamState { @@ -1059,6 +1086,8 @@ enum IntervalJoinStreamState { FetchProbeBatch, /// Indicates that non-empty batch has been fetched from probe-side, and is ready to be processed ProcessProbeBatch(ProcessProbeBatchState), + /// Emit accumulated matches as output batch before continuing + EmitAccumulatedMatches(ProcessProbeBatchState), /// Indicates that probe-side has been fully processed ExhaustedProbeSide, } @@ -1088,7 +1117,10 @@ impl IntervalJoinStream { handle_state!(ready!(self.fetch_probe_batch(cx))) } IntervalJoinStreamState::ProcessProbeBatch(_) => { - handle_state!(self.process_probe_batch()) + handle_state!(self.process_probe_batch_streaming()) + } + IntervalJoinStreamState::EmitAccumulatedMatches(_) => { + handle_state!(self.emit_accumulated_matches()) } IntervalJoinStreamState::ExhaustedProbeSide => { log::info!("{:?} finished execution, total processed batches: {:?}, total join time: {:?} ms", @@ -1156,8 +1188,12 @@ impl IntervalJoinStream { self.join_metrics.input_batches.value() ); - self.state = - IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { batch }); + self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch, + probe_row_idx: 0, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }); } Some(Err(err)) => return Poll::Ready(Err(err)), }; @@ -1165,7 +1201,9 @@ impl IntervalJoinStream { Poll::Ready(Ok(StatefulStreamResult::Continue)) } - fn process_probe_batch(&mut self) -> Result>> { + fn process_probe_batch_streaming( + &mut self, + ) -> Result>> { let state = self.state.try_as_process_probe_batch_mut()?; let build_side = match self.build_side.as_ref() { Some(build_side) => Ok(build_side), @@ -1177,51 +1215,164 @@ impl IntervalJoinStream { let start = evaluate_as_i32(self.right_interval.start(), &state.batch)?; let end = evaluate_as_i32(self.right_interval.end(), &state.batch)?; - let mut builder_left = PrimitiveBuilder::::new(); + // Reuse pre-calculated hash values from stream initialization + + // Process probe rows incrementally with streaming output + let batch_size = state.batch.num_rows(); + let start_row_idx = state.probe_row_idx; + + // Process rows in chunks to prevent memory explosion + let chunk_end = std::cmp::min(start_row_idx + self.max_output_batch_size / 100, batch_size); + + let mut temp_matches = Vec::with_capacity(64); + let mut total_output_rows = state.accumulated_left_matches.len(); + + // Process chunk of probe rows + for i in start_row_idx..chunk_end { + temp_matches.clear(); - let mut rle_right: Vec = Vec::with_capacity(self.hashes_buffer.len()); - let mut pos_vect: Vec = Vec::with_capacity(100); - for (i, hash_val) in self.hashes_buffer.iter().enumerate() { build_side .hash_map - .get(*hash_val, start.value(i), end.value(i), |pos| { - pos_vect.push(pos as u32); + .get(self.hashes_buffer[i], start.value(i), end.value(i), |pos| { + temp_matches.push(pos as u32); }); + + // Add matches to accumulation buffers match &build_side.hash_map { - IntervalJoinAlgorithm::CoitreesNearest(_t) => { - // even if there is no hit we need to preserve the right side - rle_right.push(1); - if pos_vect.len() == 0 { - builder_left.append_null(); - } else { - builder_left.append_slice(&pos_vect); - } - } - IntervalJoinAlgorithm::CoitreesCountOverlaps(_t) => { - rle_right.push(1); - if pos_vect.len() == 0 { - builder_left.append_null(); + IntervalJoinAlgorithm::CoitreesNearest(_) + | IntervalJoinAlgorithm::CoitreesCountOverlaps(_) => { + // These algorithms always produce exactly one output row per probe row + // (either a match or null for the left side) + if !temp_matches.is_empty() { + // Found matches - add first match only for nearest algorithms + state.accumulated_left_matches.push(temp_matches[0]); + state.accumulated_right_indices.push(i as u32); } else { - builder_left.append_slice(&pos_vect); + // No matches found - add null entry for left side + // This preserves the behavior expected by nearest/count algorithms + state.accumulated_left_matches.push(u32::MAX); // Use MAX as null marker + state.accumulated_right_indices.push(i as u32); } + total_output_rows += 1; // Always exactly one output row } _ => { - rle_right.push(pos_vect.len() as u32); - builder_left.append_slice(&pos_vect); + // Regular algorithms: add all matches, skip if no matches + state + .accumulated_left_matches + .extend_from_slice(&temp_matches); + // Add right side indices (one per match) + for _ in 0..temp_matches.len() { + state.accumulated_right_indices.push(i as u32); + } + total_output_rows += temp_matches.len(); } } - // builder_left.append_slice(&pos_vect); - pos_vect.clear(); + // Check if we should emit output batch to prevent memory explosion + if total_output_rows >= self.max_output_batch_size { + state.probe_row_idx = i + 1; + self.state = + IntervalJoinStreamState::EmitAccumulatedMatches(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: state.probe_row_idx, + accumulated_left_matches: std::mem::take( + &mut state.accumulated_left_matches, + ), + accumulated_right_indices: std::mem::take( + &mut state.accumulated_right_indices, + ), + }); + timer.done(); + return self.emit_accumulated_matches(); + } + } + + // Update probe row index + state.probe_row_idx = chunk_end; + + // If we've processed all rows in the batch, emit accumulated matches and move to next batch + if chunk_end >= batch_size { + if !state.accumulated_left_matches.is_empty() { + self.state = + IntervalJoinStreamState::EmitAccumulatedMatches(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: batch_size, + accumulated_left_matches: std::mem::take( + &mut state.accumulated_left_matches, + ), + accumulated_right_indices: std::mem::take( + &mut state.accumulated_right_indices, + ), + }); + timer.done(); + return self.emit_accumulated_matches(); + } else { + // No matches, move to next batch + self.state = IntervalJoinStreamState::FetchProbeBatch; + timer.done(); + return Ok(StatefulStreamResult::Continue); + } + } + + // Continue processing current batch + timer.done(); + Ok(StatefulStreamResult::Continue) + } + + fn emit_accumulated_matches(&mut self) -> Result>> { + let state = match &mut self.state { + IntervalJoinStreamState::EmitAccumulatedMatches(state) => state, + _ => return internal_err!("Expected EmitAccumulatedMatches state"), + }; + + let build_side = match self.build_side.as_ref() { + Some(build_side) => Ok(build_side), + None => internal_err!("Expected build side in ready state"), + }?; + + if state.accumulated_left_matches.is_empty() { + // No matches to emit, determine next state + if state.probe_row_idx >= state.batch.num_rows() { + // Done with this batch + self.state = IntervalJoinStreamState::FetchProbeBatch; + } else { + // More rows to process in current batch + self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: state.probe_row_idx, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }); + } + return Ok(StatefulStreamResult::Continue); } - let left_indexes = builder_left.finish(); - let mut index_right = Vec::with_capacity(left_indexes.len()); - for i in 0..rle_right.len() { - for _ in 0..rle_right[i] { - index_right.push(i as u32); + + // Handle null markers for left side indices (u32::MAX indicates null) + let mut left_indices_with_nulls = Vec::with_capacity(state.accumulated_left_matches.len()); + let mut validity = Vec::with_capacity(state.accumulated_left_matches.len()); + + for &idx in &state.accumulated_left_matches { + if idx == u32::MAX { + left_indices_with_nulls.push(0u32); // Use 0 as dummy index + validity.push(false); // Mark as null + } else { + left_indices_with_nulls.push(idx); + validity.push(true); // Mark as valid } } - let right_indexes = PrimitiveArray::from(index_right); + + let left_indexes = if validity.iter().all(|&v| v) { + // No nulls, use regular array + PrimitiveArray::::from(left_indices_with_nulls) + } else { + // Some nulls, use array with validity buffer + use datafusion::arrow::buffer::NullBuffer; + let null_buffer = NullBuffer::from(validity); + PrimitiveArray::::new(left_indices_with_nulls.into(), Some(null_buffer)) + }; + + let right_indexes = + PrimitiveArray::::from(state.accumulated_right_indices.clone()); let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); @@ -1242,16 +1393,27 @@ impl IntervalJoinStream { self.join_metrics.output_batches.add(1); self.join_metrics.output_rows.add(result.num_rows()); - timer.done(); log::debug!( - "{:?} is done processing batch {:?} with {:?} output rows", + "{:?} emitted streaming batch with {:?} rows, total output rows: {:?}", std::thread::current().id(), - self.join_metrics.output_batches.value(), - result.num_rows() + result.num_rows(), + self.join_metrics.output_rows.value() ); - self.state = IntervalJoinStreamState::FetchProbeBatch; + // Determine next state + if state.probe_row_idx >= state.batch.num_rows() { + // Done with this batch + self.state = IntervalJoinStreamState::FetchProbeBatch; + } else { + // More rows to process in current batch + self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: state.probe_row_idx, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }); + } Ok(StatefulStreamResult::Ready(Some(result))) } From 7d39a0a4fcadcf69c42dee8a64f9608604ca2c60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Wieiw=C3=B3rka?= Date: Sat, 13 Sep 2025 16:28:33 +0200 Subject: [PATCH 2/6] Optimizing large streaming] --- .../physical_planner/joins/interval_join.rs | 134 ++++++++++++------ 1 file changed, 87 insertions(+), 47 deletions(-) diff --git a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs index f60e961..66d3685 100644 --- a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs +++ b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs @@ -539,12 +539,12 @@ impl ExecutionPlan for IntervalJoinExec { reusable_match_buffer: Vec::with_capacity(256), reusable_rle_buffer: Vec::with_capacity(1024), reusable_index_buffer: Vec::with_capacity(2048), - // Default to 100K rows per output batch to prevent memory explosion + // Default to 50K rows per output batch to balance memory and performance // This can be configured via environment variable SEQUILA_MAX_OUTPUT_BATCH_SIZE max_output_batch_size: std::env::var("SEQUILA_MAX_OUTPUT_BATCH_SIZE") - .unwrap_or_else(|_| "100000".to_string()) + .unwrap_or_else(|_| "50000".to_string()) .parse() - .unwrap_or(100_000), + .unwrap_or(50_000), })) } @@ -1221,10 +1221,11 @@ impl IntervalJoinStream { let batch_size = state.batch.num_rows(); let start_row_idx = state.probe_row_idx; - // Process rows in chunks to prevent memory explosion - let chunk_end = std::cmp::min(start_row_idx + self.max_output_batch_size / 100, batch_size); + // Process rows in larger chunks to reduce overhead while preventing memory explosion + let chunk_size = std::cmp::max(self.max_output_batch_size / 10, 1000); + let chunk_end = std::cmp::min(start_row_idx + chunk_size, batch_size); - let mut temp_matches = Vec::with_capacity(64); + let mut temp_matches = Vec::with_capacity(256); let mut total_output_rows = state.accumulated_left_matches.len(); // Process chunk of probe rows @@ -1260,10 +1261,12 @@ impl IntervalJoinStream { state .accumulated_left_matches .extend_from_slice(&temp_matches); - // Add right side indices (one per match) - for _ in 0..temp_matches.len() { - state.accumulated_right_indices.push(i as u32); - } + // Add right side indices (one per match) - optimized batch append + let match_count = temp_matches.len(); + state.accumulated_right_indices.resize( + state.accumulated_right_indices.len() + match_count, + i as u32, + ); total_output_rows += temp_matches.len(); } } @@ -1271,6 +1274,9 @@ impl IntervalJoinStream { // Check if we should emit output batch to prevent memory explosion if total_output_rows >= self.max_output_batch_size { state.probe_row_idx = i + 1; + timer.done(); + + // Emit matches by transitioning to emit state self.state = IntervalJoinStreamState::EmitAccumulatedMatches(ProcessProbeBatchState { batch: state.batch.clone(), @@ -1282,7 +1288,6 @@ impl IntervalJoinStream { &mut state.accumulated_right_indices, ), }); - timer.done(); return self.emit_accumulated_matches(); } } @@ -1293,6 +1298,7 @@ impl IntervalJoinStream { // If we've processed all rows in the batch, emit accumulated matches and move to next batch if chunk_end >= batch_size { if !state.accumulated_left_matches.is_empty() { + timer.done(); self.state = IntervalJoinStreamState::EmitAccumulatedMatches(ProcessProbeBatchState { batch: state.batch.clone(), @@ -1304,7 +1310,6 @@ impl IntervalJoinStream { &mut state.accumulated_right_indices, ), }); - timer.done(); return self.emit_accumulated_matches(); } else { // No matches, move to next batch @@ -1319,39 +1324,26 @@ impl IntervalJoinStream { Ok(StatefulStreamResult::Continue) } - fn emit_accumulated_matches(&mut self) -> Result>> { - let state = match &mut self.state { - IntervalJoinStreamState::EmitAccumulatedMatches(state) => state, - _ => return internal_err!("Expected EmitAccumulatedMatches state"), - }; - + fn create_output_batch_from_data( + &self, + accumulated_left_matches: &[u32], + accumulated_right_indices: &[u32], + batch: &RecordBatch, + ) -> Result { let build_side = match self.build_side.as_ref() { Some(build_side) => Ok(build_side), None => internal_err!("Expected build side in ready state"), }?; - if state.accumulated_left_matches.is_empty() { - // No matches to emit, determine next state - if state.probe_row_idx >= state.batch.num_rows() { - // Done with this batch - self.state = IntervalJoinStreamState::FetchProbeBatch; - } else { - // More rows to process in current batch - self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { - batch: state.batch.clone(), - probe_row_idx: state.probe_row_idx, - accumulated_left_matches: Vec::new(), - accumulated_right_indices: Vec::new(), - }); - } - return Ok(StatefulStreamResult::Continue); + if accumulated_left_matches.is_empty() { + return internal_err!("Cannot create output batch with no matches"); } // Handle null markers for left side indices (u32::MAX indicates null) - let mut left_indices_with_nulls = Vec::with_capacity(state.accumulated_left_matches.len()); - let mut validity = Vec::with_capacity(state.accumulated_left_matches.len()); + let mut left_indices_with_nulls = Vec::with_capacity(accumulated_left_matches.len()); + let mut validity = Vec::with_capacity(accumulated_left_matches.len()); - for &idx in &state.accumulated_left_matches { + for &idx in accumulated_left_matches { if idx == u32::MAX { left_indices_with_nulls.push(0u32); // Use 0 as dummy index validity.push(false); // Mark as null @@ -1371,8 +1363,7 @@ impl IntervalJoinStream { PrimitiveArray::::new(left_indices_with_nulls.into(), Some(null_buffer)) }; - let right_indexes = - PrimitiveArray::::from(state.accumulated_right_indices.clone()); + let right_indexes = PrimitiveArray::::from(accumulated_right_indices.to_vec()); let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); @@ -1381,7 +1372,7 @@ impl IntervalJoinStream { let array = build_side.batch.column(column_index.index); compute::take(array, &left_indexes, None)? } else if column_index.side == JoinSide::Right { - let array = state.batch.column(column_index.index); + let array = batch.column(column_index.index); compute::take(array, &right_indexes, None)? } else { panic!("Unsupported join_side {:?}", column_index.side); @@ -1401,19 +1392,68 @@ impl IntervalJoinStream { self.join_metrics.output_rows.value() ); - // Determine next state - if state.probe_row_idx >= state.batch.num_rows() { + Ok(result) + } + + fn emit_accumulated_matches(&mut self) -> Result>> { + // Extract all data first to avoid borrowing conflicts + let (accumulated_left_matches, accumulated_right_indices, batch, probe_row_idx) = + match &mut self.state { + IntervalJoinStreamState::EmitAccumulatedMatches(state) => { + if state.accumulated_left_matches.is_empty() { + // No matches to emit, determine next state + let next_state = if state.probe_row_idx >= state.batch.num_rows() { + IntervalJoinStreamState::FetchProbeBatch + } else { + IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: state.probe_row_idx, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }) + }; + self.state = next_state; + return Ok(StatefulStreamResult::Continue); + } + + // Extract the data + let accumulated_left_matches = + std::mem::take(&mut state.accumulated_left_matches); + let accumulated_right_indices = + std::mem::take(&mut state.accumulated_right_indices); + let batch = state.batch.clone(); + let probe_row_idx = state.probe_row_idx; + + ( + accumulated_left_matches, + accumulated_right_indices, + batch, + probe_row_idx, + ) + } + _ => return internal_err!("Expected EmitAccumulatedMatches state"), + }; + + // Now we can safely call the method without borrowing conflicts + let result = self.create_output_batch_from_data( + &accumulated_left_matches, + &accumulated_right_indices, + &batch, + )?; + + // Set next state + self.state = if probe_row_idx >= batch.num_rows() { // Done with this batch - self.state = IntervalJoinStreamState::FetchProbeBatch; + IntervalJoinStreamState::FetchProbeBatch } else { // More rows to process in current batch - self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { - batch: state.batch.clone(), - probe_row_idx: state.probe_row_idx, + IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch, + probe_row_idx, accumulated_left_matches: Vec::new(), accumulated_right_indices: Vec::new(), - }); - } + }) + }; Ok(StatefulStreamResult::Ready(Some(result))) } From 50c54d10d04ab1006e7fa832f7ba6bc2b2f99b95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Wieiw=C3=B3rka?= Date: Sat, 13 Sep 2025 16:40:13 +0200 Subject: [PATCH 3/6] Hybrid approach --- .../physical_planner/joins/interval_join.rs | 286 +++--------------- 1 file changed, 42 insertions(+), 244 deletions(-) diff --git a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs index 66d3685..e6481ef 100644 --- a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs +++ b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs @@ -6,7 +6,7 @@ use crate::physical_planner::joins::utils::{ use crate::session_context::Algorithm; use ahash::RandomState; use bio::data_structures::interval_tree as rust_bio; -use datafusion::arrow::array::{Array, AsArray, PrimitiveArray, RecordBatch}; +use datafusion::arrow::array::{Array, AsArray, PrimitiveArray, PrimitiveBuilder, RecordBatch}; use datafusion::arrow::compute; use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef, UInt32Type}; use datafusion::common::hash_utils::create_hashes; @@ -95,7 +95,7 @@ pub struct IntervalJoinExec { pub projection: Option>, /// Information of index and left / right placement of columns column_indices: Vec, - /// Null matching behavior: If `null_equals_null` is true, rows that have + /// If null_equals_null is true, rows that have /// `null`s in both left and right equijoin columns will be matched. /// Otherwise, rows that have `null`s in the join columns will not be /// matched and thus will not appear in the output. @@ -535,16 +535,6 @@ impl ExecutionPlan for IntervalJoinExec { state: IntervalJoinStreamState::WaitBuildSide, build_side: None, hashes_buffer: vec![], - // Initialize memory pool optimization buffers - reusable_match_buffer: Vec::with_capacity(256), - reusable_rle_buffer: Vec::with_capacity(1024), - reusable_index_buffer: Vec::with_capacity(2048), - // Default to 50K rows per output batch to balance memory and performance - // This can be configured via environment variable SEQUILA_MAX_OUTPUT_BATCH_SIZE - max_output_batch_size: std::env::var("SEQUILA_MAX_OUTPUT_BATCH_SIZE") - .unwrap_or_else(|_| "50000".to_string()) - .parse() - .unwrap_or(50_000), })) } @@ -1055,28 +1045,11 @@ struct IntervalJoinStream { state: IntervalJoinStreamState, build_side: Option>, hashes_buffer: Vec, - - // === DataFusion Interval Join Optimizations === - // Memory pool optimization: Persistent reusable buffers to eliminate allocation overhead - /// Buffer for storing interval match positions, reused across batches - reusable_match_buffer: Vec, - /// Buffer for storing right-side run-length encoding data - reusable_rle_buffer: Vec, - /// Buffer for building right-side index arrays - reusable_index_buffer: Vec, - /// Maximum output batch size for streaming (prevents memory explosion) - max_output_batch_size: usize, } struct ProcessProbeBatchState { /// Current probe-side batch batch: RecordBatch, - /// Current probe row index being processed - probe_row_idx: usize, - /// Accumulated matches for streaming output - accumulated_left_matches: Vec, - /// Accumulated right indices for streaming output - accumulated_right_indices: Vec, } enum IntervalJoinStreamState { @@ -1086,8 +1059,6 @@ enum IntervalJoinStreamState { FetchProbeBatch, /// Indicates that non-empty batch has been fetched from probe-side, and is ready to be processed ProcessProbeBatch(ProcessProbeBatchState), - /// Emit accumulated matches as output batch before continuing - EmitAccumulatedMatches(ProcessProbeBatchState), /// Indicates that probe-side has been fully processed ExhaustedProbeSide, } @@ -1117,10 +1088,7 @@ impl IntervalJoinStream { handle_state!(ready!(self.fetch_probe_batch(cx))) } IntervalJoinStreamState::ProcessProbeBatch(_) => { - handle_state!(self.process_probe_batch_streaming()) - } - IntervalJoinStreamState::EmitAccumulatedMatches(_) => { - handle_state!(self.emit_accumulated_matches()) + handle_state!(self.process_probe_batch()) } IntervalJoinStreamState::ExhaustedProbeSide => { log::info!("{:?} finished execution, total processed batches: {:?}, total join time: {:?} ms", @@ -1188,12 +1156,8 @@ impl IntervalJoinStream { self.join_metrics.input_batches.value() ); - self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { - batch, - probe_row_idx: 0, - accumulated_left_matches: Vec::new(), - accumulated_right_indices: Vec::new(), - }); + self.state = + IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { batch }); } Some(Err(err)) => return Poll::Ready(Err(err)), }; @@ -1201,9 +1165,7 @@ impl IntervalJoinStream { Poll::Ready(Ok(StatefulStreamResult::Continue)) } - fn process_probe_batch_streaming( - &mut self, - ) -> Result>> { + fn process_probe_batch(&mut self) -> Result>> { let state = self.state.try_as_process_probe_batch_mut()?; let build_side = match self.build_side.as_ref() { Some(build_side) => Ok(build_side), @@ -1215,155 +1177,51 @@ impl IntervalJoinStream { let start = evaluate_as_i32(self.right_interval.start(), &state.batch)?; let end = evaluate_as_i32(self.right_interval.end(), &state.batch)?; - // Reuse pre-calculated hash values from stream initialization - - // Process probe rows incrementally with streaming output - let batch_size = state.batch.num_rows(); - let start_row_idx = state.probe_row_idx; - - // Process rows in larger chunks to reduce overhead while preventing memory explosion - let chunk_size = std::cmp::max(self.max_output_batch_size / 10, 1000); - let chunk_end = std::cmp::min(start_row_idx + chunk_size, batch_size); - - let mut temp_matches = Vec::with_capacity(256); - let mut total_output_rows = state.accumulated_left_matches.len(); - - // Process chunk of probe rows - for i in start_row_idx..chunk_end { - temp_matches.clear(); + // Use simple batch processing (like original) for optimal performance + let mut builder_left = PrimitiveBuilder::::new(); + let mut rle_right: Vec = Vec::with_capacity(self.hashes_buffer.len()); + let mut pos_vect: Vec = Vec::with_capacity(100); + for (i, hash_val) in self.hashes_buffer.iter().enumerate() { build_side .hash_map - .get(self.hashes_buffer[i], start.value(i), end.value(i), |pos| { - temp_matches.push(pos as u32); + .get(*hash_val, start.value(i), end.value(i), |pos| { + pos_vect.push(pos as u32); }); - - // Add matches to accumulation buffers match &build_side.hash_map { - IntervalJoinAlgorithm::CoitreesNearest(_) - | IntervalJoinAlgorithm::CoitreesCountOverlaps(_) => { - // These algorithms always produce exactly one output row per probe row - // (either a match or null for the left side) - if !temp_matches.is_empty() { - // Found matches - add first match only for nearest algorithms - state.accumulated_left_matches.push(temp_matches[0]); - state.accumulated_right_indices.push(i as u32); + IntervalJoinAlgorithm::CoitreesNearest(_t) => { + // even if there is no hit we need to preserve the right side + rle_right.push(1); + if pos_vect.is_empty() { + builder_left.append_null(); } else { - // No matches found - add null entry for left side - // This preserves the behavior expected by nearest/count algorithms - state.accumulated_left_matches.push(u32::MAX); // Use MAX as null marker - state.accumulated_right_indices.push(i as u32); + builder_left.append_slice(&pos_vect); + } + } + IntervalJoinAlgorithm::CoitreesCountOverlaps(_t) => { + rle_right.push(1); + if pos_vect.is_empty() { + builder_left.append_null(); + } else { + builder_left.append_slice(&pos_vect); } - total_output_rows += 1; // Always exactly one output row } _ => { - // Regular algorithms: add all matches, skip if no matches - state - .accumulated_left_matches - .extend_from_slice(&temp_matches); - // Add right side indices (one per match) - optimized batch append - let match_count = temp_matches.len(); - state.accumulated_right_indices.resize( - state.accumulated_right_indices.len() + match_count, - i as u32, - ); - total_output_rows += temp_matches.len(); + rle_right.push(pos_vect.len() as u32); + builder_left.append_slice(&pos_vect); } } - - // Check if we should emit output batch to prevent memory explosion - if total_output_rows >= self.max_output_batch_size { - state.probe_row_idx = i + 1; - timer.done(); - - // Emit matches by transitioning to emit state - self.state = - IntervalJoinStreamState::EmitAccumulatedMatches(ProcessProbeBatchState { - batch: state.batch.clone(), - probe_row_idx: state.probe_row_idx, - accumulated_left_matches: std::mem::take( - &mut state.accumulated_left_matches, - ), - accumulated_right_indices: std::mem::take( - &mut state.accumulated_right_indices, - ), - }); - return self.emit_accumulated_matches(); - } + pos_vect.clear(); } - // Update probe row index - state.probe_row_idx = chunk_end; - - // If we've processed all rows in the batch, emit accumulated matches and move to next batch - if chunk_end >= batch_size { - if !state.accumulated_left_matches.is_empty() { - timer.done(); - self.state = - IntervalJoinStreamState::EmitAccumulatedMatches(ProcessProbeBatchState { - batch: state.batch.clone(), - probe_row_idx: batch_size, - accumulated_left_matches: std::mem::take( - &mut state.accumulated_left_matches, - ), - accumulated_right_indices: std::mem::take( - &mut state.accumulated_right_indices, - ), - }); - return self.emit_accumulated_matches(); - } else { - // No matches, move to next batch - self.state = IntervalJoinStreamState::FetchProbeBatch; - timer.done(); - return Ok(StatefulStreamResult::Continue); + let left_indexes = builder_left.finish(); + let mut index_right = Vec::with_capacity(left_indexes.len()); + for i in 0..rle_right.len() { + for _ in 0..rle_right[i] { + index_right.push(i as u32); } } - - // Continue processing current batch - timer.done(); - Ok(StatefulStreamResult::Continue) - } - - fn create_output_batch_from_data( - &self, - accumulated_left_matches: &[u32], - accumulated_right_indices: &[u32], - batch: &RecordBatch, - ) -> Result { - let build_side = match self.build_side.as_ref() { - Some(build_side) => Ok(build_side), - None => internal_err!("Expected build side in ready state"), - }?; - - if accumulated_left_matches.is_empty() { - return internal_err!("Cannot create output batch with no matches"); - } - - // Handle null markers for left side indices (u32::MAX indicates null) - let mut left_indices_with_nulls = Vec::with_capacity(accumulated_left_matches.len()); - let mut validity = Vec::with_capacity(accumulated_left_matches.len()); - - for &idx in accumulated_left_matches { - if idx == u32::MAX { - left_indices_with_nulls.push(0u32); // Use 0 as dummy index - validity.push(false); // Mark as null - } else { - left_indices_with_nulls.push(idx); - validity.push(true); // Mark as valid - } - } - - let left_indexes = if validity.iter().all(|&v| v) { - // No nulls, use regular array - PrimitiveArray::::from(left_indices_with_nulls) - } else { - // Some nulls, use array with validity buffer - use datafusion::arrow::buffer::NullBuffer; - let null_buffer = NullBuffer::from(validity); - PrimitiveArray::::new(left_indices_with_nulls.into(), Some(null_buffer)) - }; - - let right_indexes = PrimitiveArray::::from(accumulated_right_indices.to_vec()); + let right_indexes = PrimitiveArray::from(index_right); let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); @@ -1372,7 +1230,7 @@ impl IntervalJoinStream { let array = build_side.batch.column(column_index.index); compute::take(array, &left_indexes, None)? } else if column_index.side == JoinSide::Right { - let array = batch.column(column_index.index); + let array = state.batch.column(column_index.index); compute::take(array, &right_indexes, None)? } else { panic!("Unsupported join_side {:?}", column_index.side); @@ -1384,76 +1242,16 @@ impl IntervalJoinStream { self.join_metrics.output_batches.add(1); self.join_metrics.output_rows.add(result.num_rows()); + timer.done(); log::debug!( - "{:?} emitted streaming batch with {:?} rows, total output rows: {:?}", + "{:?} is done processing batch {:?} with {:?} output rows", std::thread::current().id(), - result.num_rows(), - self.join_metrics.output_rows.value() + self.join_metrics.output_batches.value(), + result.num_rows() ); - Ok(result) - } - - fn emit_accumulated_matches(&mut self) -> Result>> { - // Extract all data first to avoid borrowing conflicts - let (accumulated_left_matches, accumulated_right_indices, batch, probe_row_idx) = - match &mut self.state { - IntervalJoinStreamState::EmitAccumulatedMatches(state) => { - if state.accumulated_left_matches.is_empty() { - // No matches to emit, determine next state - let next_state = if state.probe_row_idx >= state.batch.num_rows() { - IntervalJoinStreamState::FetchProbeBatch - } else { - IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { - batch: state.batch.clone(), - probe_row_idx: state.probe_row_idx, - accumulated_left_matches: Vec::new(), - accumulated_right_indices: Vec::new(), - }) - }; - self.state = next_state; - return Ok(StatefulStreamResult::Continue); - } - - // Extract the data - let accumulated_left_matches = - std::mem::take(&mut state.accumulated_left_matches); - let accumulated_right_indices = - std::mem::take(&mut state.accumulated_right_indices); - let batch = state.batch.clone(); - let probe_row_idx = state.probe_row_idx; - - ( - accumulated_left_matches, - accumulated_right_indices, - batch, - probe_row_idx, - ) - } - _ => return internal_err!("Expected EmitAccumulatedMatches state"), - }; - - // Now we can safely call the method without borrowing conflicts - let result = self.create_output_batch_from_data( - &accumulated_left_matches, - &accumulated_right_indices, - &batch, - )?; - - // Set next state - self.state = if probe_row_idx >= batch.num_rows() { - // Done with this batch - IntervalJoinStreamState::FetchProbeBatch - } else { - // More rows to process in current batch - IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { - batch, - probe_row_idx, - accumulated_left_matches: Vec::new(), - accumulated_right_indices: Vec::new(), - }) - }; + self.state = IntervalJoinStreamState::FetchProbeBatch; Ok(StatefulStreamResult::Ready(Some(result))) } From 30f31d0e63285adc5f087d3abfd4756b29f9f708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Wieiw=C3=B3rka?= Date: Sat, 13 Sep 2025 16:47:06 +0200 Subject: [PATCH 4/6] One batch emission --- .../sequila-core/src/physical_planner/joins/interval_join.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs index e6481ef..b66c551 100644 --- a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs +++ b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs @@ -1177,7 +1177,7 @@ impl IntervalJoinStream { let start = evaluate_as_i32(self.right_interval.start(), &state.batch)?; let end = evaluate_as_i32(self.right_interval.end(), &state.batch)?; - // Use simple batch processing (like original) for optimal performance + // Simple batch processing with memory-aware result creation let mut builder_left = PrimitiveBuilder::::new(); let mut rle_right: Vec = Vec::with_capacity(self.hashes_buffer.len()); let mut pos_vect: Vec = Vec::with_capacity(100); @@ -1251,6 +1251,7 @@ impl IntervalJoinStream { result.num_rows() ); + // Move to next probe batch after processing current one self.state = IntervalJoinStreamState::FetchProbeBatch; Ok(StatefulStreamResult::Ready(Some(result))) From 86938a2057bcc766de85dbc85a3a47b1e0a7cd72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Wieiw=C3=B3rka?= Date: Sat, 13 Sep 2025 17:02:35 +0200 Subject: [PATCH 5/6] A new low-memory interval join --- .../physical_planner/joins/interval_join.rs | 260 +++++++++++++----- .../sequila_physical_planner.rs | 10 + sequila/sequila-core/src/session_context.rs | 1 + 3 files changed, 209 insertions(+), 62 deletions(-) diff --git a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs index b66c551..6ddc74d 100644 --- a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs +++ b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs @@ -104,6 +104,7 @@ pub struct IntervalJoinExec { cache: PlanProperties, algorithm: Algorithm, + low_memory: bool, } impl IntervalJoinExec { @@ -119,6 +120,7 @@ impl IntervalJoinExec { partition_mode: PartitionMode, null_equals_null: bool, algorithm: Algorithm, + low_memory: bool, ) -> Result { let left_schema = left.schema(); let right_schema = right.schema(); @@ -165,6 +167,7 @@ impl IntervalJoinExec { null_equals_null, cache, algorithm, + low_memory, }) } @@ -247,6 +250,7 @@ impl IntervalJoinExec { self.mode, self.null_equals_null, self.algorithm, + self.low_memory, ) } @@ -436,6 +440,7 @@ impl ExecutionPlan for IntervalJoinExec { self.mode, self.null_equals_null, self.algorithm, + self.low_memory, )?)) } @@ -535,6 +540,7 @@ impl ExecutionPlan for IntervalJoinExec { state: IntervalJoinStreamState::WaitBuildSide, build_side: None, hashes_buffer: vec![], + low_memory: self.low_memory, })) } @@ -1045,6 +1051,8 @@ struct IntervalJoinStream { state: IntervalJoinStreamState, build_side: Option>, hashes_buffer: Vec, + /// When true, use capped, streaming-friendly emission with continuation; otherwise process fully. + low_memory: bool, } struct ProcessProbeBatchState { @@ -1176,85 +1184,212 @@ impl IntervalJoinStream { let start = evaluate_as_i32(self.right_interval.start(), &state.batch)?; let end = evaluate_as_i32(self.right_interval.end(), &state.batch)?; + // Two modes: low-memory (streaming/capped) vs. full-batch (pre-streaming behavior) + if self.low_memory { + // Output-size limited processing to prevent memory explosion + let mut builder_left = PrimitiveBuilder::::new(); + let mut rle_right: Vec = Vec::with_capacity(self.hashes_buffer.len()); + let mut pos_vect: Vec = Vec::with_capacity(100); + + const MAX_OUTPUT_ROWS: usize = 1_000_000; // 1M rows max per output batch + let mut total_output_rows = 0; + let mut processed_input_rows = 0; + + for (i, hash_val) in self.hashes_buffer.iter().enumerate() { + build_side + .hash_map + .get(*hash_val, start.value(i), end.value(i), |pos| { + pos_vect.push(pos as u32); + }); - // Simple batch processing with memory-aware result creation - let mut builder_left = PrimitiveBuilder::::new(); - let mut rle_right: Vec = Vec::with_capacity(self.hashes_buffer.len()); - let mut pos_vect: Vec = Vec::with_capacity(100); - - for (i, hash_val) in self.hashes_buffer.iter().enumerate() { - build_side - .hash_map - .get(*hash_val, start.value(i), end.value(i), |pos| { - pos_vect.push(pos as u32); - }); - match &build_side.hash_map { - IntervalJoinAlgorithm::CoitreesNearest(_t) => { - // even if there is no hit we need to preserve the right side - rle_right.push(1); - if pos_vect.is_empty() { - builder_left.append_null(); - } else { + let matches_for_this_row = pos_vect.len(); + + match &build_side.hash_map { + IntervalJoinAlgorithm::CoitreesNearest(_) + | IntervalJoinAlgorithm::CoitreesCountOverlaps(_) => { + if total_output_rows >= MAX_OUTPUT_ROWS { + break; + } + rle_right.push(1); + if pos_vect.is_empty() { + builder_left.append_null(); + } else { + builder_left.append_slice(&pos_vect); + } + total_output_rows += 1; + } + _ => { + if total_output_rows + matches_for_this_row > MAX_OUTPUT_ROWS + && total_output_rows > 0 + { + break; + } + rle_right.push(pos_vect.len() as u32); builder_left.append_slice(&pos_vect); + total_output_rows += matches_for_this_row; } } - IntervalJoinAlgorithm::CoitreesCountOverlaps(_t) => { - rle_right.push(1); - if pos_vect.is_empty() { - builder_left.append_null(); - } else { - builder_left.append_slice(&pos_vect); + pos_vect.clear(); + processed_input_rows += 1; + } + + if processed_input_rows < self.hashes_buffer.len() { + let remaining_batch = state.batch.slice( + processed_input_rows, + state.batch.num_rows() - processed_input_rows, + ); + let remaining_hashes = self.hashes_buffer[processed_input_rows..].to_vec(); + + let continuation_data = Some((remaining_batch, remaining_hashes)); + + let (left_indexes, index_right, _should_continue_processing, continuation) = { + let left_indexes = builder_left.finish(); + let mut index_right = Vec::with_capacity(left_indexes.len()); + for i in 0..rle_right.len() { + for _ in 0..rle_right[i] { + index_right.push(i as u32); + } } + (left_indexes, index_right, true, continuation_data) + }; + + // Build right index array + let right_indexes = PrimitiveArray::from(index_right); + + // Build result columns + let mut columns: Vec> = + Vec::with_capacity(self.schema.fields().len()); + for column_index in &self.column_indices { + let array: Arc = if column_index.side == JoinSide::Left { + let array = build_side.batch.column(column_index.index); + compute::take(array, &left_indexes, None)? + } else if column_index.side == JoinSide::Right { + let array = state.batch.column(column_index.index); + compute::take(array, &right_indexes, None)? + } else { + panic!("Unsupported join_side {:?}", column_index.side); + }; + columns.push(array); } - _ => { - rle_right.push(pos_vect.len() as u32); - builder_left.append_slice(&pos_vect); + + let result = RecordBatch::try_new(self.schema.clone(), columns)?; + + // Update state for continuation + if let Some((remaining_batch, remaining_hashes)) = continuation { + self.state = + IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch: remaining_batch, + }); + self.hashes_buffer = remaining_hashes; } + + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(result.num_rows()); + timer.done(); + + return Ok(StatefulStreamResult::Ready(Some(result))); } - pos_vect.clear(); - } - let left_indexes = builder_left.finish(); - let mut index_right = Vec::with_capacity(left_indexes.len()); - for i in 0..rle_right.len() { - for _ in 0..rle_right[i] { - index_right.push(i as u32); + let left_indexes = builder_left.finish(); + let mut index_right = Vec::with_capacity(left_indexes.len()); + for i in 0..rle_right.len() { + for _ in 0..rle_right[i] { + index_right.push(i as u32); + } } - } - let right_indexes = PrimitiveArray::from(index_right); + let right_indexes = PrimitiveArray::from(index_right); - let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); + let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); - for column_index in &self.column_indices { - let array: Arc = if column_index.side == JoinSide::Left { - let array = build_side.batch.column(column_index.index); - compute::take(array, &left_indexes, None)? - } else if column_index.side == JoinSide::Right { - let array = state.batch.column(column_index.index); - compute::take(array, &right_indexes, None)? - } else { - panic!("Unsupported join_side {:?}", column_index.side); - }; - columns.push(array); - } + for column_index in &self.column_indices { + let array: Arc = if column_index.side == JoinSide::Left { + let array = build_side.batch.column(column_index.index); + compute::take(array, &left_indexes, None)? + } else if column_index.side == JoinSide::Right { + let array = state.batch.column(column_index.index); + compute::take(array, &right_indexes, None)? + } else { + panic!("Unsupported join_side {:?}", column_index.side); + }; + columns.push(array); + } - let result = RecordBatch::try_new(self.schema.clone(), columns)?; + let result = RecordBatch::try_new(self.schema.clone(), columns)?; - self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(result.num_rows()); - timer.done(); + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(result.num_rows()); + timer.done(); - log::debug!( - "{:?} is done processing batch {:?} with {:?} output rows", - std::thread::current().id(), - self.join_metrics.output_batches.value(), - result.num_rows() - ); + log::debug!( + "{:?} is done processing batch {:?} with {:?} output rows", + std::thread::current().id(), + self.join_metrics.output_batches.value(), + result.num_rows() + ); - // Move to next probe batch after processing current one - self.state = IntervalJoinStreamState::FetchProbeBatch; + self.state = IntervalJoinStreamState::FetchProbeBatch; + Ok(StatefulStreamResult::Ready(Some(result))) + } else { + // Full processing mode: process entire batch without capping or continuation + let mut builder_left = PrimitiveBuilder::::new(); + let mut rle_right: Vec = Vec::with_capacity(self.hashes_buffer.len()); + let mut pos_vect: Vec = Vec::with_capacity(256); + + for (i, hash_val) in self.hashes_buffer.iter().enumerate() { + build_side + .hash_map + .get(*hash_val, start.value(i), end.value(i), |pos| { + pos_vect.push(pos as u32); + }); - Ok(StatefulStreamResult::Ready(Some(result))) + match &build_side.hash_map { + IntervalJoinAlgorithm::CoitreesNearest(_) + | IntervalJoinAlgorithm::CoitreesCountOverlaps(_) => { + rle_right.push(1); + if pos_vect.is_empty() { + builder_left.append_null(); + } else { + builder_left.append_slice(&pos_vect); + } + } + _ => { + rle_right.push(pos_vect.len() as u32); + builder_left.append_slice(&pos_vect); + } + } + pos_vect.clear(); + } + + let left_indexes = builder_left.finish(); + let mut index_right = Vec::with_capacity(left_indexes.len()); + for i in 0..rle_right.len() { + for _ in 0..rle_right[i] { + index_right.push(i as u32); + } + } + let right_indexes = PrimitiveArray::from(index_right); + + let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); + for column_index in &self.column_indices { + let array: Arc = if column_index.side == JoinSide::Left { + let array = build_side.batch.column(column_index.index); + compute::take(array, &left_indexes, None)? + } else if column_index.side == JoinSide::Right { + let array = state.batch.column(column_index.index); + compute::take(array, &right_indexes, None)? + } else { + panic!("Unsupported join_side {:?}", column_index.side); + }; + columns.push(array); + } + + let result = RecordBatch::try_new(self.schema.clone(), columns)?; + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(result.num_rows()); + timer.done(); + self.state = IntervalJoinStreamState::FetchProbeBatch; + Ok(StatefulStreamResult::Ready(Some(result))) + } } } @@ -1350,6 +1485,7 @@ mod tests { let sequila_config = SequilaConfig { prefer_interval_join: algorithm.is_some(), interval_join_algorithm: algorithm.unwrap_or_default(), + ..Default::default() }; let config = SessionConfig::from(options) diff --git a/sequila/sequila-core/src/physical_planner/sequila_physical_planner.rs b/sequila/sequila-core/src/physical_planner/sequila_physical_planner.rs index bf3293d..48b542d 100644 --- a/sequila/sequila-core/src/physical_planner/sequila_physical_planner.rs +++ b/sequila/sequila-core/src/physical_planner/sequila_physical_planner.rs @@ -40,6 +40,8 @@ impl PhysicalOptimizerRule for IntervalJoinPhysicalOptimizationRule { let algorithm = sequila_config.interval_join_algorithm; + let low_memory = sequila_config.interval_join_low_memory; + plan.transform_up(|plan| { match plan.as_any().downcast_ref::() { Some(join_exec) => { @@ -50,6 +52,7 @@ impl PhysicalOptimizerRule for IntervalJoinPhysicalOptimizationRule { join_exec, intervals, algorithm, + low_memory, )?; Ok(Transformed::yes(new_plan)) } else { @@ -70,6 +73,7 @@ impl PhysicalOptimizerRule for IntervalJoinPhysicalOptimizationRule { join_exec, intervals, algorithm, + low_memory, )?; Ok(Transformed::yes(new_plan)) } else { @@ -100,7 +104,10 @@ fn from_hash_join( join_exec: &HashJoinExec, intervals: ColIntervals, algorithm: Algorithm, + low_memory: bool, ) -> Result> { + // Determine low-memory mode from session extensions if available + // (passed through in IntervalJoinExec below) let new_plan = IntervalJoinExec::try_new( join_exec.left().clone(), join_exec.right().clone(), @@ -112,6 +119,7 @@ fn from_hash_join( *join_exec.partition_mode(), join_exec.null_equals_null, algorithm, + low_memory, )?; Ok(Arc::new(new_plan)) } @@ -120,6 +128,7 @@ fn from_nested_loop_join( join_exec: &NestedLoopJoinExec, intervals: ColIntervals, algorithm: Algorithm, + low_memory: bool, ) -> Result> { let new_plan = IntervalJoinExec::try_new( join_exec.left().clone(), @@ -132,6 +141,7 @@ fn from_nested_loop_join( PartitionMode::CollectLeft, true, algorithm, + low_memory, )?; Ok(Arc::new(new_plan)) diff --git a/sequila/sequila-core/src/session_context.rs b/sequila/sequila-core/src/session_context.rs index 9dab7d1..172f6bb 100644 --- a/sequila/sequila-core/src/session_context.rs +++ b/sequila/sequila-core/src/session_context.rs @@ -51,6 +51,7 @@ extensions_options! { pub struct SequilaConfig { pub prefer_interval_join: bool, default = true pub interval_join_algorithm: Algorithm, default = Algorithm::default() + pub interval_join_low_memory: bool, default = false } } From 3e6652f524543fee19aa49f9feec087250d06de7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Wieiw=C3=B3rka?= Date: Sat, 13 Sep 2025 19:03:55 +0200 Subject: [PATCH 6/6] Streaming refactored --- .../physical_planner/joins/interval_join.rs | 235 +++++++++++++++++- 1 file changed, 231 insertions(+), 4 deletions(-) diff --git a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs index 6ddc74d..659e944 100644 --- a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs +++ b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs @@ -541,6 +541,16 @@ impl ExecutionPlan for IntervalJoinExec { build_side: None, hashes_buffer: vec![], low_memory: self.low_memory, + // Initialize memory pool optimization buffers (used by streaming path) + reusable_match_buffer: Vec::with_capacity(256), + reusable_rle_buffer: Vec::with_capacity(1024), + reusable_index_buffer: Vec::with_capacity(2048), + // Default to 100K rows per output batch to prevent memory explosion + // Can be overridden by env var SEQUILA_MAX_OUTPUT_BATCH_SIZE + max_output_batch_size: std::env::var("SEQUILA_MAX_OUTPUT_BATCH_SIZE") + .unwrap_or_else(|_| "100000".to_string()) + .parse() + .unwrap_or(100_000), })) } @@ -1051,13 +1061,28 @@ struct IntervalJoinStream { state: IntervalJoinStreamState, build_side: Option>, hashes_buffer: Vec, - /// When true, use capped, streaming-friendly emission with continuation; otherwise process fully. + /// When true, use capped, streaming-friendly emission; otherwise process fully. low_memory: bool, + // === Streaming optimizations (from 57d595b0) === + /// Buffer reused for temporary matches + reusable_match_buffer: Vec, + /// Buffer reused for right-side run-length encoding data + reusable_rle_buffer: Vec, + /// Buffer reused for building right-side index arrays + reusable_index_buffer: Vec, + /// Maximum output batch size to emit when streaming + max_output_batch_size: usize, } struct ProcessProbeBatchState { /// Current probe-side batch batch: RecordBatch, + /// Current probe row index being processed (streaming mode) + probe_row_idx: usize, + /// Accumulated matches for streaming output (left indices; u32::MAX marks null) + accumulated_left_matches: Vec, + /// Accumulated right indices for streaming output + accumulated_right_indices: Vec, } enum IntervalJoinStreamState { @@ -1067,6 +1092,8 @@ enum IntervalJoinStreamState { FetchProbeBatch, /// Indicates that non-empty batch has been fetched from probe-side, and is ready to be processed ProcessProbeBatch(ProcessProbeBatchState), + /// Emit accumulated matches before continuing (streaming mode) + EmitAccumulatedMatches(ProcessProbeBatchState), /// Indicates that probe-side has been fully processed ExhaustedProbeSide, } @@ -1096,7 +1123,14 @@ impl IntervalJoinStream { handle_state!(ready!(self.fetch_probe_batch(cx))) } IntervalJoinStreamState::ProcessProbeBatch(_) => { - handle_state!(self.process_probe_batch()) + if self.low_memory { + handle_state!(self.process_probe_batch_streaming()) + } else { + handle_state!(self.process_probe_batch()) + } + } + IntervalJoinStreamState::EmitAccumulatedMatches(_) => { + handle_state!(self.emit_accumulated_matches()) } IntervalJoinStreamState::ExhaustedProbeSide => { log::info!("{:?} finished execution, total processed batches: {:?}, total join time: {:?} ms", @@ -1164,8 +1198,12 @@ impl IntervalJoinStream { self.join_metrics.input_batches.value() ); - self.state = - IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { batch }); + self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch, + probe_row_idx: 0, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }); } Some(Err(err)) => return Poll::Ready(Err(err)), }; @@ -1173,6 +1211,192 @@ impl IntervalJoinStream { Poll::Ready(Ok(StatefulStreamResult::Continue)) } + // Streaming low-memory implementation (from 57d595b0) + fn process_probe_batch_streaming( + &mut self, + ) -> Result>> { + let state = self.state.try_as_process_probe_batch_mut()?; + let build_side = match self.build_side.as_ref() { + Some(build_side) => Ok(build_side), + None => internal_err!("Expected build side in ready state"), + }?; + + let timer = self.join_metrics.join_time.timer(); + + let start = evaluate_as_i32(self.right_interval.start(), &state.batch)?; + let end = evaluate_as_i32(self.right_interval.end(), &state.batch)?; + + let batch_size = state.batch.num_rows(); + let start_row_idx = state.probe_row_idx; + + // Limit rows processed per call to avoid long stalls; approx 1% of max rows + let chunk_end = std::cmp::min(start_row_idx + self.max_output_batch_size / 100, batch_size); + + let mut temp_matches = Vec::with_capacity(64); + let mut total_output_rows = state.accumulated_left_matches.len(); + + for i in start_row_idx..chunk_end { + temp_matches.clear(); + + build_side + .hash_map + .get(self.hashes_buffer[i], start.value(i), end.value(i), |pos| { + temp_matches.push(pos as u32); + }); + + match &build_side.hash_map { + IntervalJoinAlgorithm::CoitreesNearest(_) + | IntervalJoinAlgorithm::CoitreesCountOverlaps(_) => { + if !temp_matches.is_empty() { + state.accumulated_left_matches.push(temp_matches[0]); + state.accumulated_right_indices.push(i as u32); + } else { + // Indicate null on left with u32::MAX marker + state.accumulated_left_matches.push(u32::MAX); + state.accumulated_right_indices.push(i as u32); + } + total_output_rows += 1; + } + _ => { + state + .accumulated_left_matches + .extend_from_slice(&temp_matches); + for _ in 0..temp_matches.len() { + state.accumulated_right_indices.push(i as u32); + } + total_output_rows += temp_matches.len(); + } + } + + if total_output_rows >= self.max_output_batch_size { + state.probe_row_idx = i + 1; + self.state = + IntervalJoinStreamState::EmitAccumulatedMatches(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: state.probe_row_idx, + accumulated_left_matches: std::mem::take( + &mut state.accumulated_left_matches, + ), + accumulated_right_indices: std::mem::take( + &mut state.accumulated_right_indices, + ), + }); + timer.done(); + return self.emit_accumulated_matches(); + } + } + + state.probe_row_idx = chunk_end; + + if chunk_end >= batch_size { + if !state.accumulated_left_matches.is_empty() { + self.state = + IntervalJoinStreamState::EmitAccumulatedMatches(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: batch_size, + accumulated_left_matches: std::mem::take( + &mut state.accumulated_left_matches, + ), + accumulated_right_indices: std::mem::take( + &mut state.accumulated_right_indices, + ), + }); + timer.done(); + return self.emit_accumulated_matches(); + } else { + self.state = IntervalJoinStreamState::FetchProbeBatch; + timer.done(); + return Ok(StatefulStreamResult::Continue); + } + } + + timer.done(); + Ok(StatefulStreamResult::Continue) + } + + fn emit_accumulated_matches(&mut self) -> Result>> { + let state = match &mut self.state { + IntervalJoinStreamState::EmitAccumulatedMatches(state) => state, + _ => return internal_err!("Expected EmitAccumulatedMatches state"), + }; + + let build_side = match self.build_side.as_ref() { + Some(build_side) => Ok(build_side), + None => internal_err!("Expected build side in ready state"), + }?; + + if state.accumulated_left_matches.is_empty() { + if state.probe_row_idx >= state.batch.num_rows() { + self.state = IntervalJoinStreamState::FetchProbeBatch; + } else { + self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: state.probe_row_idx, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }); + } + return Ok(StatefulStreamResult::Continue); + } + + // Build left indexes, handling null markers + let mut left_indices_with_nulls = Vec::with_capacity(state.accumulated_left_matches.len()); + let mut validity = Vec::with_capacity(state.accumulated_left_matches.len()); + + for &idx in &state.accumulated_left_matches { + if idx == u32::MAX { + left_indices_with_nulls.push(0u32); + validity.push(false); + } else { + left_indices_with_nulls.push(idx); + validity.push(true); + } + } + + let left_indexes = if validity.iter().all(|&v| v) { + PrimitiveArray::::from(left_indices_with_nulls) + } else { + use datafusion::arrow::buffer::NullBuffer; + let null_buffer = NullBuffer::from(validity); + PrimitiveArray::::new(left_indices_with_nulls.into(), Some(null_buffer)) + }; + + let right_indexes = + PrimitiveArray::::from(state.accumulated_right_indices.clone()); + + let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); + + for column_index in &self.column_indices { + let array: Arc = if column_index.side == JoinSide::Left { + let array = build_side.batch.column(column_index.index); + compute::take(array, &left_indexes, None)? + } else if column_index.side == JoinSide::Right { + let array = state.batch.column(column_index.index); + compute::take(array, &right_indexes, None)? + } else { + panic!("Unsupported join_side {:?}", column_index.side); + }; + columns.push(array); + } + + let result = RecordBatch::try_new(self.schema.clone(), columns)?; + + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(result.num_rows()); + + if state.probe_row_idx >= state.batch.num_rows() { + self.state = IntervalJoinStreamState::FetchProbeBatch; + } else { + self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: state.probe_row_idx, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }); + } + + Ok(StatefulStreamResult::Ready(Some(result))) + } fn process_probe_batch(&mut self) -> Result>> { let state = self.state.try_as_process_probe_batch_mut()?; let build_side = match self.build_side.as_ref() { @@ -1279,6 +1503,9 @@ impl IntervalJoinStream { self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { batch: remaining_batch, + probe_row_idx: 0, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), }); self.hashes_buffer = remaining_hashes; }