Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ if(BUILD_TESTS)
add_cloudsql_test(raft_protocol_tests tests/raft_protocol_tests.cpp)
add_cloudsql_test(columnar_table_tests tests/columnar_table_tests.cpp)
add_cloudsql_test(string_vector_tests tests/string_vector_tests.cpp)
add_cloudsql_test(vectorized_operator_tests tests/vectorized_operator_tests.cpp)
add_cloudsql_test(heap_table_tests tests/heap_table_tests.cpp)
add_cloudsql_test(lexer_tests tests/lexer_tests.cpp)
add_cloudsql_test(parser_tests tests/parser_tests.cpp)
Expand Down
61 changes: 32 additions & 29 deletions include/executor/vectorized_operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,47 +102,44 @@ class VectorizedFilterOperator : public VectorizedOperator {
}

bool next_batch(VectorBatch& out_batch) override {
// If we already have accumulated rows from a previous call, return them
if (out_batch.row_count() > 0) {
return true;
}
out_batch.clear();

// Ensure output batch is structured for current schema
if (out_batch.column_count() == 0) {
out_batch.init_from_schema(output_schema_);
}

// Process only one child batch at a time to enable pipelining
if (!child_->next_batch(*input_batch_)) {
return false; // No more input
}

selection_mask_->clear();
condition_->evaluate_vectorized(*input_batch_, child_->output_schema(), *selection_mask_);

std::vector<size_t> selection;
for (size_t r = 0; r < input_batch_->row_count(); ++r) {
common::Value val = selection_mask_->get(r);
if (!val.is_null() && val.as_bool()) {
selection.push_back(r);
// Process child batches until we find matches or exhaust input
while (child_->next_batch(*input_batch_)) {
selection_mask_->clear();
condition_->evaluate_vectorized(*input_batch_, child_->output_schema(),
*selection_mask_);

std::vector<size_t> selection;
for (size_t r = 0; r < input_batch_->row_count(); ++r) {
common::Value val = selection_mask_->get(r);
if (!val.is_null() && val.as_bool()) {
selection.push_back(r);
}
}
}

if (!selection.empty()) {
// Batch-level append optimization: iterate columns once
for (size_t c = 0; c < input_batch_->column_count(); ++c) {
auto& src_col = input_batch_->get_column(c);
auto& dest_col = out_batch.get_column(c);
for (size_t r : selection) {
dest_col.append(src_col.get(r));
if (!selection.empty()) {
// Batch-level append optimization: iterate columns once
for (size_t c = 0; c < input_batch_->column_count(); ++c) {
auto& src_col = input_batch_->get_column(c);
auto& dest_col = out_batch.get_column(c);
for (size_t r : selection) {
dest_col.append(src_col.get(r));
}
}
out_batch.set_row_count(selection.size());
input_batch_->clear();
return true; // Return with matches
}
out_batch.set_row_count(out_batch.row_count() + selection.size());
input_batch_->clear();
}
input_batch_->clear();

// Return true if we accumulated any rows, false if no matches found
return out_batch.row_count() > 0;
return false; // Exhausted without finding matches
}
};

Expand Down Expand Up @@ -214,6 +211,12 @@ class VectorizedAggregateOperator : public VectorizedOperator {
results_int_.assign(aggregates_.size(), 0);
results_double_.assign(aggregates_.size(), 0.0);
has_value_.assign(aggregates_.size(), false);
// COUNT aggregates always have a value (0 for empty input) per SQL spec
for (size_t i = 0; i < aggregates_.size(); ++i) {
if (aggregates_[i].type == AggregateType::Count) {
has_value_[i] = true;
}
}
input_batch_ = VectorBatch::create(child_->output_schema());
}

Expand Down
Loading
Loading