diff --git a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs index 6438949..659e944 100644 --- a/sequila/sequila-core/src/physical_planner/joins/interval_join.rs +++ b/sequila/sequila-core/src/physical_planner/joins/interval_join.rs @@ -95,7 +95,7 @@ pub struct IntervalJoinExec { pub projection: Option>, /// Information of index and left / right placement of columns column_indices: Vec, - /// Null matching behavior: If `null_equals_null` is true, rows that have + /// If null_equals_null is true, rows that have /// `null`s in both left and right equijoin columns will be matched. /// Otherwise, rows that have `null`s in the join columns will not be /// matched and thus will not appear in the output. @@ -104,6 +104,7 @@ pub struct IntervalJoinExec { cache: PlanProperties, algorithm: Algorithm, + low_memory: bool, } impl IntervalJoinExec { @@ -119,6 +120,7 @@ impl IntervalJoinExec { partition_mode: PartitionMode, null_equals_null: bool, algorithm: Algorithm, + low_memory: bool, ) -> Result { let left_schema = left.schema(); let right_schema = right.schema(); @@ -165,6 +167,7 @@ impl IntervalJoinExec { null_equals_null, cache, algorithm, + low_memory, }) } @@ -247,6 +250,7 @@ impl IntervalJoinExec { self.mode, self.null_equals_null, self.algorithm, + self.low_memory, ) } @@ -436,6 +440,7 @@ impl ExecutionPlan for IntervalJoinExec { self.mode, self.null_equals_null, self.algorithm, + self.low_memory, )?)) } @@ -535,6 +540,17 @@ impl ExecutionPlan for IntervalJoinExec { state: IntervalJoinStreamState::WaitBuildSide, build_side: None, hashes_buffer: vec![], + low_memory: self.low_memory, + // Initialize memory pool optimization buffers (used by streaming path) + reusable_match_buffer: Vec::with_capacity(256), + reusable_rle_buffer: Vec::with_capacity(1024), + reusable_index_buffer: Vec::with_capacity(2048), + // Default to 100K rows per output batch to prevent memory explosion + // Can be overridden by env var SEQUILA_MAX_OUTPUT_BATCH_SIZE + max_output_batch_size: std::env::var("SEQUILA_MAX_OUTPUT_BATCH_SIZE") + .unwrap_or_else(|_| "100000".to_string()) + .parse() + .unwrap_or(100_000), })) } @@ -1045,11 +1061,28 @@ struct IntervalJoinStream { state: IntervalJoinStreamState, build_side: Option>, hashes_buffer: Vec, + /// When true, use capped, streaming-friendly emission; otherwise process fully. + low_memory: bool, + // === Streaming optimizations (from 57d595b0) === + /// Buffer reused for temporary matches + reusable_match_buffer: Vec, + /// Buffer reused for right-side run-length encoding data + reusable_rle_buffer: Vec, + /// Buffer reused for building right-side index arrays + reusable_index_buffer: Vec, + /// Maximum output batch size to emit when streaming + max_output_batch_size: usize, } struct ProcessProbeBatchState { /// Current probe-side batch batch: RecordBatch, + /// Current probe row index being processed (streaming mode) + probe_row_idx: usize, + /// Accumulated matches for streaming output (left indices; u32::MAX marks null) + accumulated_left_matches: Vec, + /// Accumulated right indices for streaming output + accumulated_right_indices: Vec, } enum IntervalJoinStreamState { @@ -1059,6 +1092,8 @@ enum IntervalJoinStreamState { FetchProbeBatch, /// Indicates that non-empty batch has been fetched from probe-side, and is ready to be processed ProcessProbeBatch(ProcessProbeBatchState), + /// Emit accumulated matches before continuing (streaming mode) + EmitAccumulatedMatches(ProcessProbeBatchState), /// Indicates that probe-side has been fully processed ExhaustedProbeSide, } @@ -1088,7 +1123,14 @@ impl IntervalJoinStream { handle_state!(ready!(self.fetch_probe_batch(cx))) } IntervalJoinStreamState::ProcessProbeBatch(_) => { - handle_state!(self.process_probe_batch()) + if self.low_memory { + handle_state!(self.process_probe_batch_streaming()) + } else { + handle_state!(self.process_probe_batch()) + } + } + IntervalJoinStreamState::EmitAccumulatedMatches(_) => { + handle_state!(self.emit_accumulated_matches()) } IntervalJoinStreamState::ExhaustedProbeSide => { log::info!("{:?} finished execution, total processed batches: {:?}, total join time: {:?} ms", @@ -1156,8 +1198,12 @@ impl IntervalJoinStream { self.join_metrics.input_batches.value() ); - self.state = - IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { batch }); + self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch, + probe_row_idx: 0, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }); } Some(Err(err)) => return Poll::Ready(Err(err)), }; @@ -1165,7 +1211,10 @@ impl IntervalJoinStream { Poll::Ready(Ok(StatefulStreamResult::Continue)) } - fn process_probe_batch(&mut self) -> Result>> { + // Streaming low-memory implementation (from 57d595b0) + fn process_probe_batch_streaming( + &mut self, + ) -> Result>> { let state = self.state.try_as_process_probe_batch_mut()?; let build_side = match self.build_side.as_ref() { Some(build_side) => Ok(build_side), @@ -1177,51 +1226,143 @@ impl IntervalJoinStream { let start = evaluate_as_i32(self.right_interval.start(), &state.batch)?; let end = evaluate_as_i32(self.right_interval.end(), &state.batch)?; - let mut builder_left = PrimitiveBuilder::::new(); + let batch_size = state.batch.num_rows(); + let start_row_idx = state.probe_row_idx; + + // Limit rows processed per call to avoid long stalls; approx 1% of max rows + let chunk_end = std::cmp::min(start_row_idx + self.max_output_batch_size / 100, batch_size); + + let mut temp_matches = Vec::with_capacity(64); + let mut total_output_rows = state.accumulated_left_matches.len(); + + for i in start_row_idx..chunk_end { + temp_matches.clear(); - let mut rle_right: Vec = Vec::with_capacity(self.hashes_buffer.len()); - let mut pos_vect: Vec = Vec::with_capacity(100); - for (i, hash_val) in self.hashes_buffer.iter().enumerate() { build_side .hash_map - .get(*hash_val, start.value(i), end.value(i), |pos| { - pos_vect.push(pos as u32); + .get(self.hashes_buffer[i], start.value(i), end.value(i), |pos| { + temp_matches.push(pos as u32); }); + match &build_side.hash_map { - IntervalJoinAlgorithm::CoitreesNearest(_t) => { - // even if there is no hit we need to preserve the right side - rle_right.push(1); - if pos_vect.len() == 0 { - builder_left.append_null(); - } else { - builder_left.append_slice(&pos_vect); - } - } - IntervalJoinAlgorithm::CoitreesCountOverlaps(_t) => { - rle_right.push(1); - if pos_vect.len() == 0 { - builder_left.append_null(); + IntervalJoinAlgorithm::CoitreesNearest(_) + | IntervalJoinAlgorithm::CoitreesCountOverlaps(_) => { + if !temp_matches.is_empty() { + state.accumulated_left_matches.push(temp_matches[0]); + state.accumulated_right_indices.push(i as u32); } else { - builder_left.append_slice(&pos_vect); + // Indicate null on left with u32::MAX marker + state.accumulated_left_matches.push(u32::MAX); + state.accumulated_right_indices.push(i as u32); } + total_output_rows += 1; } _ => { - rle_right.push(pos_vect.len() as u32); - builder_left.append_slice(&pos_vect); + state + .accumulated_left_matches + .extend_from_slice(&temp_matches); + for _ in 0..temp_matches.len() { + state.accumulated_right_indices.push(i as u32); + } + total_output_rows += temp_matches.len(); } } - // builder_left.append_slice(&pos_vect); - pos_vect.clear(); + if total_output_rows >= self.max_output_batch_size { + state.probe_row_idx = i + 1; + self.state = + IntervalJoinStreamState::EmitAccumulatedMatches(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: state.probe_row_idx, + accumulated_left_matches: std::mem::take( + &mut state.accumulated_left_matches, + ), + accumulated_right_indices: std::mem::take( + &mut state.accumulated_right_indices, + ), + }); + timer.done(); + return self.emit_accumulated_matches(); + } + } + + state.probe_row_idx = chunk_end; + + if chunk_end >= batch_size { + if !state.accumulated_left_matches.is_empty() { + self.state = + IntervalJoinStreamState::EmitAccumulatedMatches(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: batch_size, + accumulated_left_matches: std::mem::take( + &mut state.accumulated_left_matches, + ), + accumulated_right_indices: std::mem::take( + &mut state.accumulated_right_indices, + ), + }); + timer.done(); + return self.emit_accumulated_matches(); + } else { + self.state = IntervalJoinStreamState::FetchProbeBatch; + timer.done(); + return Ok(StatefulStreamResult::Continue); + } + } + + timer.done(); + Ok(StatefulStreamResult::Continue) + } + + fn emit_accumulated_matches(&mut self) -> Result>> { + let state = match &mut self.state { + IntervalJoinStreamState::EmitAccumulatedMatches(state) => state, + _ => return internal_err!("Expected EmitAccumulatedMatches state"), + }; + + let build_side = match self.build_side.as_ref() { + Some(build_side) => Ok(build_side), + None => internal_err!("Expected build side in ready state"), + }?; + + if state.accumulated_left_matches.is_empty() { + if state.probe_row_idx >= state.batch.num_rows() { + self.state = IntervalJoinStreamState::FetchProbeBatch; + } else { + self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: state.probe_row_idx, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }); + } + return Ok(StatefulStreamResult::Continue); } - let left_indexes = builder_left.finish(); - let mut index_right = Vec::with_capacity(left_indexes.len()); - for i in 0..rle_right.len() { - for _ in 0..rle_right[i] { - index_right.push(i as u32); + + // Build left indexes, handling null markers + let mut left_indices_with_nulls = Vec::with_capacity(state.accumulated_left_matches.len()); + let mut validity = Vec::with_capacity(state.accumulated_left_matches.len()); + + for &idx in &state.accumulated_left_matches { + if idx == u32::MAX { + left_indices_with_nulls.push(0u32); + validity.push(false); + } else { + left_indices_with_nulls.push(idx); + validity.push(true); } } - let right_indexes = PrimitiveArray::from(index_right); + + let left_indexes = if validity.iter().all(|&v| v) { + PrimitiveArray::::from(left_indices_with_nulls) + } else { + use datafusion::arrow::buffer::NullBuffer; + let null_buffer = NullBuffer::from(validity); + PrimitiveArray::::new(left_indices_with_nulls.into(), Some(null_buffer)) + }; + + let right_indexes = + PrimitiveArray::::from(state.accumulated_right_indices.clone()); let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); @@ -1242,19 +1383,241 @@ impl IntervalJoinStream { self.join_metrics.output_batches.add(1); self.join_metrics.output_rows.add(result.num_rows()); - timer.done(); - - log::debug!( - "{:?} is done processing batch {:?} with {:?} output rows", - std::thread::current().id(), - self.join_metrics.output_batches.value(), - result.num_rows() - ); - self.state = IntervalJoinStreamState::FetchProbeBatch; + if state.probe_row_idx >= state.batch.num_rows() { + self.state = IntervalJoinStreamState::FetchProbeBatch; + } else { + self.state = IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch: state.batch.clone(), + probe_row_idx: state.probe_row_idx, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }); + } Ok(StatefulStreamResult::Ready(Some(result))) } + fn process_probe_batch(&mut self) -> Result>> { + let state = self.state.try_as_process_probe_batch_mut()?; + let build_side = match self.build_side.as_ref() { + Some(build_side) => Ok(build_side), + None => internal_err!("Expected build side in ready state"), + }?; + + let timer = self.join_metrics.join_time.timer(); + + let start = evaluate_as_i32(self.right_interval.start(), &state.batch)?; + let end = evaluate_as_i32(self.right_interval.end(), &state.batch)?; + // Two modes: low-memory (streaming/capped) vs. full-batch (pre-streaming behavior) + if self.low_memory { + // Output-size limited processing to prevent memory explosion + let mut builder_left = PrimitiveBuilder::::new(); + let mut rle_right: Vec = Vec::with_capacity(self.hashes_buffer.len()); + let mut pos_vect: Vec = Vec::with_capacity(100); + + const MAX_OUTPUT_ROWS: usize = 1_000_000; // 1M rows max per output batch + let mut total_output_rows = 0; + let mut processed_input_rows = 0; + + for (i, hash_val) in self.hashes_buffer.iter().enumerate() { + build_side + .hash_map + .get(*hash_val, start.value(i), end.value(i), |pos| { + pos_vect.push(pos as u32); + }); + + let matches_for_this_row = pos_vect.len(); + + match &build_side.hash_map { + IntervalJoinAlgorithm::CoitreesNearest(_) + | IntervalJoinAlgorithm::CoitreesCountOverlaps(_) => { + if total_output_rows >= MAX_OUTPUT_ROWS { + break; + } + rle_right.push(1); + if pos_vect.is_empty() { + builder_left.append_null(); + } else { + builder_left.append_slice(&pos_vect); + } + total_output_rows += 1; + } + _ => { + if total_output_rows + matches_for_this_row > MAX_OUTPUT_ROWS + && total_output_rows > 0 + { + break; + } + rle_right.push(pos_vect.len() as u32); + builder_left.append_slice(&pos_vect); + total_output_rows += matches_for_this_row; + } + } + pos_vect.clear(); + processed_input_rows += 1; + } + + if processed_input_rows < self.hashes_buffer.len() { + let remaining_batch = state.batch.slice( + processed_input_rows, + state.batch.num_rows() - processed_input_rows, + ); + let remaining_hashes = self.hashes_buffer[processed_input_rows..].to_vec(); + + let continuation_data = Some((remaining_batch, remaining_hashes)); + + let (left_indexes, index_right, _should_continue_processing, continuation) = { + let left_indexes = builder_left.finish(); + let mut index_right = Vec::with_capacity(left_indexes.len()); + for i in 0..rle_right.len() { + for _ in 0..rle_right[i] { + index_right.push(i as u32); + } + } + (left_indexes, index_right, true, continuation_data) + }; + + // Build right index array + let right_indexes = PrimitiveArray::from(index_right); + + // Build result columns + let mut columns: Vec> = + Vec::with_capacity(self.schema.fields().len()); + for column_index in &self.column_indices { + let array: Arc = if column_index.side == JoinSide::Left { + let array = build_side.batch.column(column_index.index); + compute::take(array, &left_indexes, None)? + } else if column_index.side == JoinSide::Right { + let array = state.batch.column(column_index.index); + compute::take(array, &right_indexes, None)? + } else { + panic!("Unsupported join_side {:?}", column_index.side); + }; + columns.push(array); + } + + let result = RecordBatch::try_new(self.schema.clone(), columns)?; + + // Update state for continuation + if let Some((remaining_batch, remaining_hashes)) = continuation { + self.state = + IntervalJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { + batch: remaining_batch, + probe_row_idx: 0, + accumulated_left_matches: Vec::new(), + accumulated_right_indices: Vec::new(), + }); + self.hashes_buffer = remaining_hashes; + } + + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(result.num_rows()); + timer.done(); + + return Ok(StatefulStreamResult::Ready(Some(result))); + } + + let left_indexes = builder_left.finish(); + let mut index_right = Vec::with_capacity(left_indexes.len()); + for i in 0..rle_right.len() { + for _ in 0..rle_right[i] { + index_right.push(i as u32); + } + } + let right_indexes = PrimitiveArray::from(index_right); + + let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); + + for column_index in &self.column_indices { + let array: Arc = if column_index.side == JoinSide::Left { + let array = build_side.batch.column(column_index.index); + compute::take(array, &left_indexes, None)? + } else if column_index.side == JoinSide::Right { + let array = state.batch.column(column_index.index); + compute::take(array, &right_indexes, None)? + } else { + panic!("Unsupported join_side {:?}", column_index.side); + }; + columns.push(array); + } + + let result = RecordBatch::try_new(self.schema.clone(), columns)?; + + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(result.num_rows()); + timer.done(); + + log::debug!( + "{:?} is done processing batch {:?} with {:?} output rows", + std::thread::current().id(), + self.join_metrics.output_batches.value(), + result.num_rows() + ); + + self.state = IntervalJoinStreamState::FetchProbeBatch; + Ok(StatefulStreamResult::Ready(Some(result))) + } else { + // Full processing mode: process entire batch without capping or continuation + let mut builder_left = PrimitiveBuilder::::new(); + let mut rle_right: Vec = Vec::with_capacity(self.hashes_buffer.len()); + let mut pos_vect: Vec = Vec::with_capacity(256); + + for (i, hash_val) in self.hashes_buffer.iter().enumerate() { + build_side + .hash_map + .get(*hash_val, start.value(i), end.value(i), |pos| { + pos_vect.push(pos as u32); + }); + + match &build_side.hash_map { + IntervalJoinAlgorithm::CoitreesNearest(_) + | IntervalJoinAlgorithm::CoitreesCountOverlaps(_) => { + rle_right.push(1); + if pos_vect.is_empty() { + builder_left.append_null(); + } else { + builder_left.append_slice(&pos_vect); + } + } + _ => { + rle_right.push(pos_vect.len() as u32); + builder_left.append_slice(&pos_vect); + } + } + pos_vect.clear(); + } + + let left_indexes = builder_left.finish(); + let mut index_right = Vec::with_capacity(left_indexes.len()); + for i in 0..rle_right.len() { + for _ in 0..rle_right[i] { + index_right.push(i as u32); + } + } + let right_indexes = PrimitiveArray::from(index_right); + + let mut columns: Vec> = Vec::with_capacity(self.schema.fields().len()); + for column_index in &self.column_indices { + let array: Arc = if column_index.side == JoinSide::Left { + let array = build_side.batch.column(column_index.index); + compute::take(array, &left_indexes, None)? + } else if column_index.side == JoinSide::Right { + let array = state.batch.column(column_index.index); + compute::take(array, &right_indexes, None)? + } else { + panic!("Unsupported join_side {:?}", column_index.side); + }; + columns.push(array); + } + + let result = RecordBatch::try_new(self.schema.clone(), columns)?; + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(result.num_rows()); + timer.done(); + self.state = IntervalJoinStreamState::FetchProbeBatch; + Ok(StatefulStreamResult::Ready(Some(result))) + } + } } impl RecordBatchStream for IntervalJoinStream { @@ -1349,6 +1712,7 @@ mod tests { let sequila_config = SequilaConfig { prefer_interval_join: algorithm.is_some(), interval_join_algorithm: algorithm.unwrap_or_default(), + ..Default::default() }; let config = SessionConfig::from(options) diff --git a/sequila/sequila-core/src/physical_planner/sequila_physical_planner.rs b/sequila/sequila-core/src/physical_planner/sequila_physical_planner.rs index bf3293d..48b542d 100644 --- a/sequila/sequila-core/src/physical_planner/sequila_physical_planner.rs +++ b/sequila/sequila-core/src/physical_planner/sequila_physical_planner.rs @@ -40,6 +40,8 @@ impl PhysicalOptimizerRule for IntervalJoinPhysicalOptimizationRule { let algorithm = sequila_config.interval_join_algorithm; + let low_memory = sequila_config.interval_join_low_memory; + plan.transform_up(|plan| { match plan.as_any().downcast_ref::() { Some(join_exec) => { @@ -50,6 +52,7 @@ impl PhysicalOptimizerRule for IntervalJoinPhysicalOptimizationRule { join_exec, intervals, algorithm, + low_memory, )?; Ok(Transformed::yes(new_plan)) } else { @@ -70,6 +73,7 @@ impl PhysicalOptimizerRule for IntervalJoinPhysicalOptimizationRule { join_exec, intervals, algorithm, + low_memory, )?; Ok(Transformed::yes(new_plan)) } else { @@ -100,7 +104,10 @@ fn from_hash_join( join_exec: &HashJoinExec, intervals: ColIntervals, algorithm: Algorithm, + low_memory: bool, ) -> Result> { + // Determine low-memory mode from session extensions if available + // (passed through in IntervalJoinExec below) let new_plan = IntervalJoinExec::try_new( join_exec.left().clone(), join_exec.right().clone(), @@ -112,6 +119,7 @@ fn from_hash_join( *join_exec.partition_mode(), join_exec.null_equals_null, algorithm, + low_memory, )?; Ok(Arc::new(new_plan)) } @@ -120,6 +128,7 @@ fn from_nested_loop_join( join_exec: &NestedLoopJoinExec, intervals: ColIntervals, algorithm: Algorithm, + low_memory: bool, ) -> Result> { let new_plan = IntervalJoinExec::try_new( join_exec.left().clone(), @@ -132,6 +141,7 @@ fn from_nested_loop_join( PartitionMode::CollectLeft, true, algorithm, + low_memory, )?; Ok(Arc::new(new_plan)) diff --git a/sequila/sequila-core/src/session_context.rs b/sequila/sequila-core/src/session_context.rs index 9dab7d1..172f6bb 100644 --- a/sequila/sequila-core/src/session_context.rs +++ b/sequila/sequila-core/src/session_context.rs @@ -51,6 +51,7 @@ extensions_options! { pub struct SequilaConfig { pub prefer_interval_join: bool, default = true pub interval_join_algorithm: Algorithm, default = Algorithm::default() + pub interval_join_low_memory: bool, default = false } }