From 8e5d762c37e5843b6f7a74600a13b3bd84d4f5d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Mon, 20 Apr 2026 20:17:09 +0300 Subject: [PATCH 1/3] fix(vectorized): correct filter operator to exhaust all child batches VectorizedFilterOperator::next_batch() was returning true after copying matches from the first batch, instead of continuing to consume all child batches until EOF. --- include/executor/vectorized_operator.hpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 0e1e098d..6d742ae7 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -130,12 +130,11 @@ class VectorizedFilterOperator : public VectorizedOperator { } } out_batch.set_row_count(out_batch.row_count() + selection.size()); - input_batch_->clear(); - return true; } input_batch_->clear(); } - return false; + // Return true if we accumulated any rows, false if no matches found + return out_batch.row_count() > 0; } }; From 215ddd56f31f1cb13f321c4fb25b62dd3dbef16b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Mon, 20 Apr 2026 20:43:50 +0300 Subject: [PATCH 2/3] fix(vectorized): enable pipelined filter by processing one child batch at a time Previously, VectorizedFilterOperator::next_batch() would drain the entire child operator in one call, causing unbounded out_batch growth and breaking pipelining. Now each call processes exactly one child batch, enabling pipelined execution downstream. --- include/executor/vectorized_operator.hpp | 53 ++++++++++++++---------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 6d742ae7..847cfd04 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -102,37 +102,46 @@ class VectorizedFilterOperator : public VectorizedOperator { } bool next_batch(VectorBatch& out_batch) override { - out_batch.clear(); + // If we already have accumulated rows from a previous call, return them + if (out_batch.row_count() > 0) { + return true; + } + + // Ensure output batch is structured for current schema if (out_batch.column_count() == 0) { out_batch.init_from_schema(output_schema_); } - while (child_->next_batch(*input_batch_)) { - selection_mask_->clear(); - condition_->evaluate_vectorized(*input_batch_, child_->output_schema(), - *selection_mask_); - - std::vector selection; - for (size_t r = 0; r < input_batch_->row_count(); ++r) { - common::Value val = selection_mask_->get(r); - if (!val.is_null() && val.as_bool()) { - selection.push_back(r); - } + // Process only one child batch at a time to enable pipelining + if (!child_->next_batch(*input_batch_)) { + return false; // No more input + } + + selection_mask_->clear(); + condition_->evaluate_vectorized(*input_batch_, child_->output_schema(), + *selection_mask_); + + std::vector selection; + for (size_t r = 0; r < input_batch_->row_count(); ++r) { + common::Value val = selection_mask_->get(r); + if (!val.is_null() && val.as_bool()) { + selection.push_back(r); } + } - if (!selection.empty()) { - // Batch-level append optimization: iterate columns once - for (size_t c = 0; c < input_batch_->column_count(); ++c) { - auto& src_col = input_batch_->get_column(c); - auto& dest_col = out_batch.get_column(c); - for (size_t r : selection) { - dest_col.append(src_col.get(r)); - } + if (!selection.empty()) { + // Batch-level append optimization: iterate columns once + for (size_t c = 0; c < input_batch_->column_count(); ++c) { + auto& src_col = input_batch_->get_column(c); + auto& dest_col = out_batch.get_column(c); + for (size_t r : selection) { + dest_col.append(src_col.get(r)); } - out_batch.set_row_count(out_batch.row_count() + selection.size()); } - input_batch_->clear(); + out_batch.set_row_count(out_batch.row_count() + selection.size()); } + input_batch_->clear(); + // Return true if we accumulated any rows, false if no matches found return out_batch.row_count() > 0; } From 0996e660982aeb098da7a129ac886c2155b1b2f9 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Mon, 20 Apr 2026 17:45:32 +0000 Subject: [PATCH 3/3] style: automated clang-format fixes --- include/executor/vectorized_operator.hpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 847cfd04..df03e551 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -118,8 +118,7 @@ class VectorizedFilterOperator : public VectorizedOperator { } selection_mask_->clear(); - condition_->evaluate_vectorized(*input_batch_, child_->output_schema(), - *selection_mask_); + condition_->evaluate_vectorized(*input_batch_, child_->output_schema(), *selection_mask_); std::vector selection; for (size_t r = 0; r < input_batch_->row_count(); ++r) {