diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 0e1e098..df03e55 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -102,40 +102,47 @@ class VectorizedFilterOperator : public VectorizedOperator { } bool next_batch(VectorBatch& out_batch) override { - out_batch.clear(); + // If we already have accumulated rows from a previous call, return them + if (out_batch.row_count() > 0) { + return true; + } + + // Ensure output batch is structured for current schema if (out_batch.column_count() == 0) { out_batch.init_from_schema(output_schema_); } - while (child_->next_batch(*input_batch_)) { - selection_mask_->clear(); - condition_->evaluate_vectorized(*input_batch_, child_->output_schema(), - *selection_mask_); - - std::vector selection; - for (size_t r = 0; r < input_batch_->row_count(); ++r) { - common::Value val = selection_mask_->get(r); - if (!val.is_null() && val.as_bool()) { - selection.push_back(r); - } + // Process only one child batch at a time to enable pipelining + if (!child_->next_batch(*input_batch_)) { + return false; // No more input + } + + selection_mask_->clear(); + condition_->evaluate_vectorized(*input_batch_, child_->output_schema(), *selection_mask_); + + std::vector selection; + for (size_t r = 0; r < input_batch_->row_count(); ++r) { + common::Value val = selection_mask_->get(r); + if (!val.is_null() && val.as_bool()) { + selection.push_back(r); } + } - if (!selection.empty()) { - // Batch-level append optimization: iterate columns once - for (size_t c = 0; c < input_batch_->column_count(); ++c) { - auto& src_col = input_batch_->get_column(c); - auto& dest_col = out_batch.get_column(c); - for (size_t r : selection) { - dest_col.append(src_col.get(r)); - } + if (!selection.empty()) { + // Batch-level append optimization: iterate columns once + for (size_t c = 0; c < input_batch_->column_count(); ++c) { + auto& src_col = input_batch_->get_column(c); + auto& dest_col = out_batch.get_column(c); + for (size_t r : selection) { + dest_col.append(src_col.get(r)); } - out_batch.set_row_count(out_batch.row_count() + selection.size()); - input_batch_->clear(); - return true; } - input_batch_->clear(); + out_batch.set_row_count(out_batch.row_count() + selection.size()); } - return false; + input_batch_->clear(); + + // Return true if we accumulated any rows, false if no matches found + return out_batch.row_count() > 0; } };