From af6b3692d531327c69c2374af74406bb49c10517 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 21 Apr 2026 13:51:04 +0300 Subject: [PATCH 1/7] feat(vectorized): add VectorizedGroupByOperator class Add VectorizedGroupByOperator for batch-at-a-time grouped aggregation. - Hash-based grouping using unordered_map - Supports COUNT and SUM aggregates - Two-phase processing: Input (populate hash table) then Output (serve groups) --- include/executor/vectorized_operator.hpp | 182 +++++++++++++++++++++++ 1 file changed, 182 insertions(+) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 15fa879..9f9b009 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -287,6 +287,188 @@ class VectorizedAggregateOperator : public VectorizedOperator { } }; +/** + * @brief Group state for vectorized GROUP BY - accumulator data per group + */ +struct VectorizedGroupState { + std::vector counts; + std::vector sums; + std::vector mins; + std::vector maxes; + + VectorizedGroupState() = default; + explicit VectorizedGroupState(size_t agg_count) { + counts.assign(agg_count, 0); + sums.assign(agg_count, 0.0); + mins.assign(agg_count, common::Value::make_null()); + maxes.assign(agg_count, common::Value::make_null()); + } +}; + +/** + * @brief Vectorized GROUP BY operator with hash-based aggregation + */ +class VectorizedGroupByOperator : public VectorizedOperator { + private: + std::unique_ptr child_; + std::vector> group_by_; + std::vector aggregates_; + + // Hash table: group key string -> group state + std::unordered_map groups_; + // Ordered group keys for output iteration + std::vector group_keys_; + // Group key values for each group + std::vector> group_values_; + + // Current output position + size_t current_group_idx_ = 0; + + // Processing state + enum class ProcessState { Input, Output }; + ProcessState state_ = ProcessState::Input; + + // Reusable batch objects + std::unique_ptr input_batch_; + std::unique_ptr group_key_batch_; + + public: + VectorizedGroupByOperator(std::unique_ptr child, + std::vector> group_by, + std::vector aggregates, + Schema output_schema) + : VectorizedOperator(std::move(output_schema)), + child_(std::move(child)), + group_by_(std::move(group_by)), + aggregates_(std::move(aggregates)) { + input_batch_ = VectorBatch::create(child_->output_schema()); + // Create schema for group key evaluation + Schema key_schema; + for (size_t i = 0; i < group_by_.size(); ++i) { + key_schema.add_column("gb_key_" + std::to_string(i), common::ValueType::TYPE_TEXT); + } + group_key_batch_ = VectorBatch::create(key_schema); + } + + bool next_batch(VectorBatch& out_batch) override { + if (state_ == ProcessState::Input) { + // Phase 1: Consume all input batches and populate hash table + while (child_->next_batch(*input_batch_)) { + process_input_batch(*input_batch_); + } + state_ = ProcessState::Output; + } + + // Phase 2: Produce grouped output batches + return produce_output_batch(out_batch); + } + + private: + void process_input_batch(VectorBatch& batch) { + // Evaluate group-by expressions into group_key_batch_ + for (size_t i = 0; i < group_by_.size(); ++i) { + group_by_[i]->evaluate_vectorized(batch, child_->output_schema(), + group_key_batch_->get_column(i)); + } + + // For each row, compute hash key and update/insert group + for (size_t r = 0; r < batch.row_count(); ++r) { + // Build key from group-by values + std::string key; + for (size_t i = 0; i < group_by_.size(); ++i) { + key += group_key_batch_->get_column(i).get(r).to_string(); + key += "|"; + } + + // Get or create group state + auto it = groups_.find(key); + if (it == groups_.end()) { + // Store group key values for output + std::vector key_vals; + for (size_t i = 0; i < group_by_.size(); ++i) { + key_vals.push_back(group_key_batch_->get_column(i).get(r)); + } + auto result = groups_.emplace(key, VectorizedGroupState(aggregates_.size())); + it = result.first; + group_keys_.push_back(key); + group_values_.push_back(std::move(key_vals)); + } + + // Update accumulators for this row + update_accumulators(it->second, batch, r); + } + input_batch_->clear(); + } + + void update_accumulators(VectorizedGroupState& state, VectorBatch& batch, size_t row_idx) { + for (size_t i = 0; i < aggregates_.size(); ++i) { + const auto& agg = aggregates_[i]; + + if (agg.type == AggregateType::Count && agg.input_col_idx < 0) { + // COUNT(*) - always increment + state.counts[i]++; + } else if (agg.type == AggregateType::Sum && agg.input_col_idx >= 0) { + auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(row_idx)) { + if (col.type() == common::ValueType::TYPE_INT64) { + auto& num_col = dynamic_cast&>(col); + state.sums[i] += num_col.raw_data()[row_idx]; + } else if (col.type() == common::ValueType::TYPE_FLOAT64) { + auto& num_col = dynamic_cast&>(col); + state.sums[i] += num_col.raw_data()[row_idx]; + } + } + } + } + } + + bool produce_output_batch(VectorBatch& out_batch) { + if (current_group_idx_ >= group_keys_.size()) { + return false; // EOF + } + + out_batch.clear(); + if (out_batch.column_count() == 0) { + out_batch.init_from_schema(output_schema_); + } + + constexpr size_t BATCH_SIZE = 1024; + size_t output_count = 0; + + while (current_group_idx_ < group_keys_.size() && output_count < BATCH_SIZE) { + // Append group key columns + const auto& key_vals = group_values_[current_group_idx_]; + for (size_t i = 0; i < key_vals.size(); ++i) { + out_batch.get_column(i).append(key_vals[i]); + } + + // Append aggregate result columns + const auto& state = groups_.at(group_keys_[current_group_idx_]); + for (size_t i = 0; i < aggregates_.size(); ++i) { + size_t col_idx = group_by_.size() + i; + switch (aggregates_[i].type) { + case AggregateType::Count: + out_batch.get_column(col_idx).append( + common::Value::make_int64(state.counts[i])); + break; + case AggregateType::Sum: + out_batch.get_column(col_idx).append( + common::Value::make_float64(state.sums[i])); + break; + default: + out_batch.get_column(col_idx).append(common::Value::make_null()); + break; + } + } + output_count++; + current_group_idx_++; + } + + out_batch.set_row_count(output_count); + return true; + } +}; + } // namespace cloudsql::executor #endif // CLOUDSQL_EXECUTOR_VECTORIZED_OPERATOR_HPP From 13feea1c4346066963ec546dc0317d8c135591e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 21 Apr 2026 14:01:31 +0300 Subject: [PATCH 2/7] feat(vectorized): add VectorizedGroupByOperator and tests Add VectorizedGroupByOperator for batch-at-a-time grouped aggregation: - Hash-based grouping using unordered_map - Supports COUNT and SUM aggregates - Two-phase processing: Input (populate hash table) then Output - Store original Value types in group keys Add 4 tests: - SingleGroup: GROUP BY with 1 group - MultipleGroups: GROUP BY with 3 groups - EmptyInput: GROUP BY on empty table - MultiBatchGroups: 2500 rows with 10 groups across batches --- include/executor/vectorized_operator.hpp | 26 ++-- tests/vectorized_operator_tests.cpp | 178 +++++++++++++++++++++++ 2 files changed, 194 insertions(+), 10 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 9f9b009..749b817 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -365,19 +365,20 @@ class VectorizedGroupByOperator : public VectorizedOperator { private: void process_input_batch(VectorBatch& batch) { - // Evaluate group-by expressions into group_key_batch_ - for (size_t i = 0; i < group_by_.size(); ++i) { - group_by_[i]->evaluate_vectorized(batch, child_->output_schema(), - group_key_batch_->get_column(i)); - } - - // For each row, compute hash key and update/insert group + // For each row, compute hash key directly from source columns for (size_t r = 0; r < batch.row_count(); ++r) { // Build key from group-by values std::string key; for (size_t i = 0; i < group_by_.size(); ++i) { - key += group_key_batch_->get_column(i).get(r).to_string(); - key += "|"; + // Find column index for group-by expression + const auto& schema = child_->output_schema(); + size_t col_idx = schema.find_column(group_by_[i]->to_string()); + if (col_idx == static_cast(-1)) { + key += "NULL|"; + } else { + key += batch.get_column(col_idx).get(r).to_string(); + key += "|"; + } } // Get or create group state @@ -386,7 +387,12 @@ class VectorizedGroupByOperator : public VectorizedOperator { // Store group key values for output std::vector key_vals; for (size_t i = 0; i < group_by_.size(); ++i) { - key_vals.push_back(group_key_batch_->get_column(i).get(r)); + size_t col_idx = child_->output_schema().find_column(group_by_[i]->to_string()); + if (col_idx == static_cast(-1)) { + key_vals.push_back(common::Value::make_null()); + } else { + key_vals.push_back(batch.get_column(col_idx).get(r)); + } } auto result = groups_.emplace(key, VectorizedGroupState(aggregates_.size())); it = result.first; diff --git a/tests/vectorized_operator_tests.cpp b/tests/vectorized_operator_tests.cpp index 54d5956..c289e2e 100644 --- a/tests/vectorized_operator_tests.cpp +++ b/tests/vectorized_operator_tests.cpp @@ -446,4 +446,182 @@ TEST_F(VectorizedFilterTests, PipelinedBatches) { EXPECT_EQ(total, 1000); } +class VectorizedGroupByTests : public ::testing::Test { + protected: + void SetUp() override { + storage_ = std::make_unique("./test_groupby"); + } + void TearDown() override { storage_.reset(); } + + std::unique_ptr storage_; +}; + +TEST_F(VectorizedGroupByTests, SingleGroup) { + // GROUP BY with single group (same as global aggregation) + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("single_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert 10 rows all with same category + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 10; ++i) { + batch->append_tuple(Tuple({common::Value::make_text("A"), + common::Value::make_int64(i + 1)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + // SELECT cat, COUNT(*), SUM(val) FROM t GROUP BY cat + // Expected: 1 group (cat=A, count=10, sum=55) + auto scan = std::make_unique("single_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = { + {AggregateType::Count, -1}, // COUNT(*) + {AggregateType::Sum, 1} // SUM(val) + }; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), + std::move(aggs), std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 1); + EXPECT_EQ(result->get_column(0).get(0).as_text(), "A"); + EXPECT_EQ(result->get_column(1).get(0).as_int64(), 10); // COUNT(*) + EXPECT_EQ(result->get_column(2).get(0).to_float64(), 55.0); // SUM(1..10) + + EXPECT_FALSE(groupby.next_batch(*result)); // EOF +} + +TEST_F(VectorizedGroupByTests, MultipleGroups) { + // GROUP BY with 3 distinct groups + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("multi_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert 9 rows: 3 each of A, B, C + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 3; ++i) { + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(10)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(20)})); + batch->append_tuple(Tuple({common::Value::make_text("C"), common::Value::make_int64(30)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("multi_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), + std::move(aggs), std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 3); // 3 groups + + // Verify all 3 groups have count=3 + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(result->get_column(1).get(i).as_int64(), 3); + } +} + +TEST_F(VectorizedGroupByTests, EmptyInput) { + // GROUP BY on empty table should return 0 groups + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("empty_groupby", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + auto scan = std::make_unique("empty_groupby", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), + std::move(aggs), std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + EXPECT_FALSE(groupby.next_batch(*result)); // No groups from empty input +} + +TEST_F(VectorizedGroupByTests, MultiBatchGroups) { + // 2500 rows with 10 groups using TEXT keys, verify groups span multiple input batches + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("multibatch_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // 2500 rows: cat = "cat_" + (i % 10) (10 groups, ~250 rows each) + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 2500; ++i) { + std::string cat = "cat_" + std::to_string(i % 10); + batch->append_tuple(Tuple({common::Value::make_text(cat), + common::Value::make_int64(i)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("multibatch_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = { + {AggregateType::Count, -1}, + {AggregateType::Sum, 1} + }; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), + std::move(aggs), std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + int group_count = 0; + while (groupby.next_batch(*result)) { + group_count += result->row_count(); + } + EXPECT_EQ(group_count, 10); // 10 groups +} + } // namespace \ No newline at end of file From c1d3f91995cf82fb58bcc93d5fd637308e246686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 21 Apr 2026 14:06:28 +0300 Subject: [PATCH 3/7] docs: update PHASE_8_ANALYTICS.md with VectorizedGroupByOperator - Added VectorizedGroupByOperator to vectorized operators list - Added new section 5 documenting Vectorized GROUP BY feature - Mentioned supported aggregates (COUNT, SUM) --- docs/phases/PHASE_8_ANALYTICS.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/phases/PHASE_8_ANALYTICS.md b/docs/phases/PHASE_8_ANALYTICS.md index e5de28a..3727fdc 100644 --- a/docs/phases/PHASE_8_ANALYTICS.md +++ b/docs/phases/PHASE_8_ANALYTICS.md @@ -19,7 +19,7 @@ Developed SIMD-friendly contiguous memory buffers for batch processing. ### 3. Vectorized Execution Engine (`include/executor/vectorized_operator.hpp`) Built a batch-at-a-time physical execution model. -- **Vectorized Operators**: Implemented `Scan`, `Filter`, `Project`, and `Aggregate` operators designed for chunk-based execution. +- **Vectorized Operators**: Implemented `Scan`, `Filter`, `Project`, `Aggregate`, and `GroupBy` operators designed for chunk-based execution. - **Batch-at-a-Time Interface**: Operators pass entire `VectorBatch` objects between themselves, minimizing virtual function call overhead. ### 4. High-Performance Aggregation @@ -27,6 +27,12 @@ Optimized global analytical queries (`COUNT`, `SUM`). - **Vectorized Global Aggregate**: Aggregates entire batches of data with minimal branching and high cache locality. - **Type-Specific Aggregation**: Leverages C++ templates to generate highly efficient aggregation logic for different data types. +### 5. Vectorized GROUP BY +Added `VectorizedGroupByOperator` for hash-based grouped aggregation. +- **Hash-Based Grouping**: Uses `unordered_map` for efficient group key lookup. +- **Two-Phase Processing**: Input phase builds hash table from batches; Output phase serves grouped results. +- **Supported Aggregates**: COUNT(*) and SUM with INT64/FLOAT64 columns. + ## Recent Improvements (Engine Benchmarking) As of our latest sprint, we have established a high-performance baseline for the engine's core scanning logic: - **Baseline Speed**: 181M rows/s (Sequential Scan). From c7b454d222b87b4d175c6855a7a10cbb93434ebc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Tue, 21 Apr 2026 14:15:05 +0300 Subject: [PATCH 4/7] test(vectorized): add MIN/MAX support and extended GROUP BY tests - Add MIN/MAX aggregate handling in VectorizedGroupByOperator - Update update_accumulators to track min/max per group - Update produce_output_batch to output MIN/MAX values - Add MultipleColumnGroupBy test (2-column GROUP BY) - Add MinMaxAggregates test - Add NullGroupKeys test - Add VerifyGroupKeyValues test --- include/executor/vectorized_operator.hpp | 20 +++ tests/vectorized_operator_tests.cpp | 211 +++++++++++++++++++++++ 2 files changed, 231 insertions(+) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 749b817..7e4f1f6 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -424,6 +424,20 @@ class VectorizedGroupByOperator : public VectorizedOperator { state.sums[i] += num_col.raw_data()[row_idx]; } } + } else if (agg.type == AggregateType::Min && agg.input_col_idx >= 0) { + auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(row_idx)) { + if (state.mins[i].is_null() || col.get(row_idx) < state.mins[i]) { + state.mins[i] = col.get(row_idx); + } + } + } else if (agg.type == AggregateType::Max && agg.input_col_idx >= 0) { + auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(row_idx)) { + if (state.maxes[i].is_null() || state.maxes[i] < col.get(row_idx)) { + state.maxes[i] = col.get(row_idx); + } + } } } } @@ -461,6 +475,12 @@ class VectorizedGroupByOperator : public VectorizedOperator { out_batch.get_column(col_idx).append( common::Value::make_float64(state.sums[i])); break; + case AggregateType::Min: + out_batch.get_column(col_idx).append(state.mins[i]); + break; + case AggregateType::Max: + out_batch.get_column(col_idx).append(state.maxes[i]); + break; default: out_batch.get_column(col_idx).append(common::Value::make_null()); break; diff --git a/tests/vectorized_operator_tests.cpp b/tests/vectorized_operator_tests.cpp index c289e2e..0a5d393 100644 --- a/tests/vectorized_operator_tests.cpp +++ b/tests/vectorized_operator_tests.cpp @@ -624,4 +624,215 @@ TEST_F(VectorizedGroupByTests, MultiBatchGroups) { EXPECT_EQ(group_count, 10); // 10 groups } +TEST_F(VectorizedGroupByTests, MultipleColumnGroupBy) { + // GROUP BY on two columns + Schema schema; + schema.add_column("cat1", common::ValueType::TYPE_TEXT); + schema.add_column("cat2", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("multi_col_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // 20 rows: (cat1 = i%4, cat2 = i%5) -> 20 unique pairs + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 20; ++i) { + std::string c1 = "A" + std::to_string(i % 4); + std::string c2 = "B" + std::to_string(i % 5); + batch->append_tuple(Tuple({common::Value::make_text(c1), + common::Value::make_text(c2), + common::Value::make_int64(i)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("multi_col_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat1", common::ValueType::TYPE_TEXT); + out_schema.add_column("cat2", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat1")); + group_by.push_back(std::make_unique("cat2")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), + std::move(aggs), std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + int group_count = 0; + while (groupby.next_batch(*result)) { + group_count += result->row_count(); + } + // 4 * 5 = 20 unique pairs + EXPECT_EQ(group_count, 20); +} + +TEST_F(VectorizedGroupByTests, MinMaxAggregates) { + // Test MIN and MAX aggregates + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("minmax_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert rows: A->{5,10}, B->{3,7} + auto batch = VectorBatch::create(schema); + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(5)})); + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(10)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(3)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(7)})); + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("minmax_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); + out_schema.add_column("min", common::ValueType::TYPE_INT64); + out_schema.add_column("max", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = { + {AggregateType::Count, -1}, + {AggregateType::Sum, 1}, + {AggregateType::Min, 1}, + {AggregateType::Max, 1} + }; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), + std::move(aggs), std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 2); + + // Verify values for both groups + for (size_t i = 0; i < result->row_count(); ++i) { + std::string cat = result->get_column(0).get(i).as_text(); + int64_t cnt = result->get_column(1).get(i).as_int64(); + int64_t sum = static_cast(result->get_column(2).get(i).to_float64()); + int64_t min_val = result->get_column(3).get(i).as_int64(); + int64_t max_val = result->get_column(4).get(i).as_int64(); + + EXPECT_EQ(cnt, 2); + if (cat == "A") { + EXPECT_EQ(sum, 15); // 5 + 10 + EXPECT_EQ(min_val, 5); + EXPECT_EQ(max_val, 10); + } else if (cat == "B") { + EXPECT_EQ(sum, 10); // 3 + 7 + EXPECT_EQ(min_val, 3); + EXPECT_EQ(max_val, 7); + } + } +} + +TEST_F(VectorizedGroupByTests, NullGroupKeys) { + // GROUP BY with NULL keys + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("null_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert: A, NULL, B, NULL + auto batch = VectorBatch::create(schema); + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(1)})); + batch->append_tuple(Tuple({common::Value::make_null(), common::Value::make_int64(2)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(3)})); + batch->append_tuple(Tuple({common::Value::make_null(), common::Value::make_int64(4)})); + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("null_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), + std::move(aggs), std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + int group_count = 0; + while (groupby.next_batch(*result)) { + group_count += result->row_count(); + } + // 3 groups: "A", "B", and NULL + EXPECT_EQ(group_count, 3); +} + +TEST_F(VectorizedGroupByTests, VerifyGroupKeyValues) { + // Verify actual group key values in output + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("verify_keys", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + auto batch = VectorBatch::create(schema); + batch->append_tuple(Tuple({common::Value::make_text("X"), common::Value::make_int64(10)})); + batch->append_tuple(Tuple({common::Value::make_text("X"), common::Value::make_int64(20)})); + batch->append_tuple(Tuple({common::Value::make_text("Y"), common::Value::make_int64(5)})); + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("verify_keys", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), + std::move(aggs), std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 2); + + // Find X and Y groups and verify + std::string found_x, found_y; + int64_t cnt_x = 0, cnt_y = 0; + for (size_t i = 0; i < result->row_count(); ++i) { + std::string cat = result->get_column(0).get(i).as_text(); + int64_t cnt = result->get_column(1).get(i).as_int64(); + if (cat == "X") { + found_x = cat; + cnt_x = cnt; + } else if (cat == "Y") { + found_y = cat; + cnt_y = cnt; + } + } + EXPECT_EQ(found_x, "X"); + EXPECT_EQ(cnt_x, 2); + EXPECT_EQ(found_y, "Y"); + EXPECT_EQ(cnt_y, 1); +} + } // namespace \ No newline at end of file From 7706a7d2cf1e6e550bf0cbaa60ad6e4d37a4936f Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Tue, 21 Apr 2026 11:16:33 +0000 Subject: [PATCH 5/7] style: automated clang-format fixes --- include/executor/vectorized_operator.hpp | 3 +- tests/vectorized_operator_tests.cpp | 682 +++++++++++------------ 2 files changed, 338 insertions(+), 347 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 7e4f1f6..6cd9216 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -335,8 +335,7 @@ class VectorizedGroupByOperator : public VectorizedOperator { public: VectorizedGroupByOperator(std::unique_ptr child, std::vector> group_by, - std::vector aggregates, - Schema output_schema) + std::vector aggregates, Schema output_schema) : VectorizedOperator(std::move(output_schema)), child_(std::move(child)), group_by_(std::move(group_by)), diff --git a/tests/vectorized_operator_tests.cpp b/tests/vectorized_operator_tests.cpp index 0a5d393..8faa1f2 100644 --- a/tests/vectorized_operator_tests.cpp +++ b/tests/vectorized_operator_tests.cpp @@ -447,392 +447,384 @@ TEST_F(VectorizedFilterTests, PipelinedBatches) { } class VectorizedGroupByTests : public ::testing::Test { - protected: - void SetUp() override { - storage_ = std::make_unique("./test_groupby"); - } - void TearDown() override { storage_.reset(); } + protected: + void SetUp() override { storage_ = std::make_unique("./test_groupby"); } + void TearDown() override { storage_.reset(); } - std::unique_ptr storage_; + std::unique_ptr storage_; }; TEST_F(VectorizedGroupByTests, SingleGroup) { - // GROUP BY with single group (same as global aggregation) - Schema schema; - schema.add_column("cat", common::ValueType::TYPE_TEXT); - schema.add_column("val", common::ValueType::TYPE_INT64); - - ColumnarTable table("single_group", *storage_, schema); - ASSERT_TRUE(table.create()); - ASSERT_TRUE(table.open()); - - // Insert 10 rows all with same category - auto batch = VectorBatch::create(schema); - for (int64_t i = 0; i < 10; ++i) { - batch->append_tuple(Tuple({common::Value::make_text("A"), - common::Value::make_int64(i + 1)})); - } - ASSERT_TRUE(table.append_batch(*batch)); - - // SELECT cat, COUNT(*), SUM(val) FROM t GROUP BY cat - // Expected: 1 group (cat=A, count=10, sum=55) - auto scan = std::make_unique("single_group", - std::make_shared(table)); - - Schema out_schema; - out_schema.add_column("cat", common::ValueType::TYPE_TEXT); - out_schema.add_column("cnt", common::ValueType::TYPE_INT64); - out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); - - std::vector> group_by; - group_by.push_back(std::make_unique("cat")); - - std::vector aggs = { - {AggregateType::Count, -1}, // COUNT(*) - {AggregateType::Sum, 1} // SUM(val) - }; - - VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), - std::move(aggs), std::move(out_schema)); - - auto result = VectorBatch::create(groupby.output_schema()); - ASSERT_TRUE(groupby.next_batch(*result)); - EXPECT_EQ(result->row_count(), 1); - EXPECT_EQ(result->get_column(0).get(0).as_text(), "A"); - EXPECT_EQ(result->get_column(1).get(0).as_int64(), 10); // COUNT(*) - EXPECT_EQ(result->get_column(2).get(0).to_float64(), 55.0); // SUM(1..10) - - EXPECT_FALSE(groupby.next_batch(*result)); // EOF + // GROUP BY with single group (same as global aggregation) + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("single_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert 10 rows all with same category + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 10; ++i) { + batch->append_tuple( + Tuple({common::Value::make_text("A"), common::Value::make_int64(i + 1)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + // SELECT cat, COUNT(*), SUM(val) FROM t GROUP BY cat + // Expected: 1 group (cat=A, count=10, sum=55) + auto scan = std::make_unique("single_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = { + {AggregateType::Count, -1}, // COUNT(*) + {AggregateType::Sum, 1} // SUM(val) + }; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 1); + EXPECT_EQ(result->get_column(0).get(0).as_text(), "A"); + EXPECT_EQ(result->get_column(1).get(0).as_int64(), 10); // COUNT(*) + EXPECT_EQ(result->get_column(2).get(0).to_float64(), 55.0); // SUM(1..10) + + EXPECT_FALSE(groupby.next_batch(*result)); // EOF } TEST_F(VectorizedGroupByTests, MultipleGroups) { - // GROUP BY with 3 distinct groups - Schema schema; - schema.add_column("cat", common::ValueType::TYPE_TEXT); - schema.add_column("val", common::ValueType::TYPE_INT64); - - ColumnarTable table("multi_group", *storage_, schema); - ASSERT_TRUE(table.create()); - ASSERT_TRUE(table.open()); - - // Insert 9 rows: 3 each of A, B, C - auto batch = VectorBatch::create(schema); - for (int64_t i = 0; i < 3; ++i) { - batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(10)})); - batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(20)})); - batch->append_tuple(Tuple({common::Value::make_text("C"), common::Value::make_int64(30)})); - } - ASSERT_TRUE(table.append_batch(*batch)); + // GROUP BY with 3 distinct groups + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("multi_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert 9 rows: 3 each of A, B, C + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 3; ++i) { + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(10)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(20)})); + batch->append_tuple(Tuple({common::Value::make_text("C"), common::Value::make_int64(30)})); + } + ASSERT_TRUE(table.append_batch(*batch)); - auto scan = std::make_unique("multi_group", - std::make_shared(table)); + auto scan = std::make_unique("multi_group", + std::make_shared(table)); - Schema out_schema; - out_schema.add_column("cat", common::ValueType::TYPE_TEXT); - out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); - std::vector> group_by; - group_by.push_back(std::make_unique("cat")); + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); - std::vector aggs = {{AggregateType::Count, -1}}; + std::vector aggs = {{AggregateType::Count, -1}}; - VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), - std::move(aggs), std::move(out_schema)); + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); - auto result = VectorBatch::create(groupby.output_schema()); - ASSERT_TRUE(groupby.next_batch(*result)); - EXPECT_EQ(result->row_count(), 3); // 3 groups + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 3); // 3 groups - // Verify all 3 groups have count=3 - for (size_t i = 0; i < 3; ++i) { - EXPECT_EQ(result->get_column(1).get(i).as_int64(), 3); - } + // Verify all 3 groups have count=3 + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(result->get_column(1).get(i).as_int64(), 3); + } } TEST_F(VectorizedGroupByTests, EmptyInput) { - // GROUP BY on empty table should return 0 groups - Schema schema; - schema.add_column("cat", common::ValueType::TYPE_TEXT); - schema.add_column("val", common::ValueType::TYPE_INT64); + // GROUP BY on empty table should return 0 groups + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); - ColumnarTable table("empty_groupby", *storage_, schema); - ASSERT_TRUE(table.create()); - ASSERT_TRUE(table.open()); + ColumnarTable table("empty_groupby", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); - auto scan = std::make_unique("empty_groupby", - std::make_shared(table)); + auto scan = std::make_unique("empty_groupby", + std::make_shared(table)); - Schema out_schema; - out_schema.add_column("cat", common::ValueType::TYPE_TEXT); - out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); - std::vector> group_by; - group_by.push_back(std::make_unique("cat")); + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); - std::vector aggs = {{AggregateType::Count, -1}}; + std::vector aggs = {{AggregateType::Count, -1}}; - VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), - std::move(aggs), std::move(out_schema)); + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); - auto result = VectorBatch::create(groupby.output_schema()); - EXPECT_FALSE(groupby.next_batch(*result)); // No groups from empty input + auto result = VectorBatch::create(groupby.output_schema()); + EXPECT_FALSE(groupby.next_batch(*result)); // No groups from empty input } TEST_F(VectorizedGroupByTests, MultiBatchGroups) { - // 2500 rows with 10 groups using TEXT keys, verify groups span multiple input batches - Schema schema; - schema.add_column("cat", common::ValueType::TYPE_TEXT); - schema.add_column("val", common::ValueType::TYPE_INT64); - - ColumnarTable table("multibatch_group", *storage_, schema); - ASSERT_TRUE(table.create()); - ASSERT_TRUE(table.open()); - - // 2500 rows: cat = "cat_" + (i % 10) (10 groups, ~250 rows each) - auto batch = VectorBatch::create(schema); - for (int64_t i = 0; i < 2500; ++i) { - std::string cat = "cat_" + std::to_string(i % 10); - batch->append_tuple(Tuple({common::Value::make_text(cat), - common::Value::make_int64(i)})); - } - ASSERT_TRUE(table.append_batch(*batch)); - - auto scan = std::make_unique("multibatch_group", - std::make_shared(table)); - - Schema out_schema; - out_schema.add_column("cat", common::ValueType::TYPE_TEXT); - out_schema.add_column("cnt", common::ValueType::TYPE_INT64); - out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); - - std::vector> group_by; - group_by.push_back(std::make_unique("cat")); - - std::vector aggs = { - {AggregateType::Count, -1}, - {AggregateType::Sum, 1} - }; - - VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), - std::move(aggs), std::move(out_schema)); - - auto result = VectorBatch::create(groupby.output_schema()); - int group_count = 0; - while (groupby.next_batch(*result)) { - group_count += result->row_count(); - } - EXPECT_EQ(group_count, 10); // 10 groups + // 2500 rows with 10 groups using TEXT keys, verify groups span multiple input batches + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("multibatch_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // 2500 rows: cat = "cat_" + (i % 10) (10 groups, ~250 rows each) + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 2500; ++i) { + std::string cat = "cat_" + std::to_string(i % 10); + batch->append_tuple(Tuple({common::Value::make_text(cat), common::Value::make_int64(i)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("multibatch_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}, + {AggregateType::Sum, 1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + int group_count = 0; + while (groupby.next_batch(*result)) { + group_count += result->row_count(); + } + EXPECT_EQ(group_count, 10); // 10 groups } TEST_F(VectorizedGroupByTests, MultipleColumnGroupBy) { - // GROUP BY on two columns - Schema schema; - schema.add_column("cat1", common::ValueType::TYPE_TEXT); - schema.add_column("cat2", common::ValueType::TYPE_TEXT); - schema.add_column("val", common::ValueType::TYPE_INT64); - - ColumnarTable table("multi_col_group", *storage_, schema); - ASSERT_TRUE(table.create()); - ASSERT_TRUE(table.open()); - - // 20 rows: (cat1 = i%4, cat2 = i%5) -> 20 unique pairs - auto batch = VectorBatch::create(schema); - for (int64_t i = 0; i < 20; ++i) { - std::string c1 = "A" + std::to_string(i % 4); - std::string c2 = "B" + std::to_string(i % 5); - batch->append_tuple(Tuple({common::Value::make_text(c1), - common::Value::make_text(c2), - common::Value::make_int64(i)})); - } - ASSERT_TRUE(table.append_batch(*batch)); - - auto scan = std::make_unique("multi_col_group", - std::make_shared(table)); - - Schema out_schema; - out_schema.add_column("cat1", common::ValueType::TYPE_TEXT); - out_schema.add_column("cat2", common::ValueType::TYPE_TEXT); - out_schema.add_column("cnt", common::ValueType::TYPE_INT64); - - std::vector> group_by; - group_by.push_back(std::make_unique("cat1")); - group_by.push_back(std::make_unique("cat2")); - - std::vector aggs = {{AggregateType::Count, -1}}; - - VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), - std::move(aggs), std::move(out_schema)); - - auto result = VectorBatch::create(groupby.output_schema()); - int group_count = 0; - while (groupby.next_batch(*result)) { - group_count += result->row_count(); - } - // 4 * 5 = 20 unique pairs - EXPECT_EQ(group_count, 20); + // GROUP BY on two columns + Schema schema; + schema.add_column("cat1", common::ValueType::TYPE_TEXT); + schema.add_column("cat2", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("multi_col_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // 20 rows: (cat1 = i%4, cat2 = i%5) -> 20 unique pairs + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 20; ++i) { + std::string c1 = "A" + std::to_string(i % 4); + std::string c2 = "B" + std::to_string(i % 5); + batch->append_tuple(Tuple({common::Value::make_text(c1), common::Value::make_text(c2), + common::Value::make_int64(i)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("multi_col_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat1", common::ValueType::TYPE_TEXT); + out_schema.add_column("cat2", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat1")); + group_by.push_back(std::make_unique("cat2")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + int group_count = 0; + while (groupby.next_batch(*result)) { + group_count += result->row_count(); + } + // 4 * 5 = 20 unique pairs + EXPECT_EQ(group_count, 20); } TEST_F(VectorizedGroupByTests, MinMaxAggregates) { - // Test MIN and MAX aggregates - Schema schema; - schema.add_column("cat", common::ValueType::TYPE_TEXT); - schema.add_column("val", common::ValueType::TYPE_INT64); - - ColumnarTable table("minmax_group", *storage_, schema); - ASSERT_TRUE(table.create()); - ASSERT_TRUE(table.open()); - - // Insert rows: A->{5,10}, B->{3,7} - auto batch = VectorBatch::create(schema); - batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(5)})); - batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(10)})); - batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(3)})); - batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(7)})); - ASSERT_TRUE(table.append_batch(*batch)); - - auto scan = std::make_unique("minmax_group", - std::make_shared(table)); - - Schema out_schema; - out_schema.add_column("cat", common::ValueType::TYPE_TEXT); - out_schema.add_column("cnt", common::ValueType::TYPE_INT64); - out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); - out_schema.add_column("min", common::ValueType::TYPE_INT64); - out_schema.add_column("max", common::ValueType::TYPE_INT64); - - std::vector> group_by; - group_by.push_back(std::make_unique("cat")); - - std::vector aggs = { - {AggregateType::Count, -1}, - {AggregateType::Sum, 1}, - {AggregateType::Min, 1}, - {AggregateType::Max, 1} - }; - - VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), - std::move(aggs), std::move(out_schema)); - - auto result = VectorBatch::create(groupby.output_schema()); - ASSERT_TRUE(groupby.next_batch(*result)); - EXPECT_EQ(result->row_count(), 2); - - // Verify values for both groups - for (size_t i = 0; i < result->row_count(); ++i) { - std::string cat = result->get_column(0).get(i).as_text(); - int64_t cnt = result->get_column(1).get(i).as_int64(); - int64_t sum = static_cast(result->get_column(2).get(i).to_float64()); - int64_t min_val = result->get_column(3).get(i).as_int64(); - int64_t max_val = result->get_column(4).get(i).as_int64(); - - EXPECT_EQ(cnt, 2); - if (cat == "A") { - EXPECT_EQ(sum, 15); // 5 + 10 - EXPECT_EQ(min_val, 5); - EXPECT_EQ(max_val, 10); - } else if (cat == "B") { - EXPECT_EQ(sum, 10); // 3 + 7 - EXPECT_EQ(min_val, 3); - EXPECT_EQ(max_val, 7); + // Test MIN and MAX aggregates + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("minmax_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert rows: A->{5,10}, B->{3,7} + auto batch = VectorBatch::create(schema); + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(5)})); + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(10)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(3)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(7)})); + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("minmax_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); + out_schema.add_column("min", common::ValueType::TYPE_INT64); + out_schema.add_column("max", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}, + {AggregateType::Sum, 1}, + {AggregateType::Min, 1}, + {AggregateType::Max, 1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 2); + + // Verify values for both groups + for (size_t i = 0; i < result->row_count(); ++i) { + std::string cat = result->get_column(0).get(i).as_text(); + int64_t cnt = result->get_column(1).get(i).as_int64(); + int64_t sum = static_cast(result->get_column(2).get(i).to_float64()); + int64_t min_val = result->get_column(3).get(i).as_int64(); + int64_t max_val = result->get_column(4).get(i).as_int64(); + + EXPECT_EQ(cnt, 2); + if (cat == "A") { + EXPECT_EQ(sum, 15); // 5 + 10 + EXPECT_EQ(min_val, 5); + EXPECT_EQ(max_val, 10); + } else if (cat == "B") { + EXPECT_EQ(sum, 10); // 3 + 7 + EXPECT_EQ(min_val, 3); + EXPECT_EQ(max_val, 7); + } } - } } TEST_F(VectorizedGroupByTests, NullGroupKeys) { - // GROUP BY with NULL keys - Schema schema; - schema.add_column("cat", common::ValueType::TYPE_TEXT); - schema.add_column("val", common::ValueType::TYPE_INT64); - - ColumnarTable table("null_group", *storage_, schema); - ASSERT_TRUE(table.create()); - ASSERT_TRUE(table.open()); - - // Insert: A, NULL, B, NULL - auto batch = VectorBatch::create(schema); - batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(1)})); - batch->append_tuple(Tuple({common::Value::make_null(), common::Value::make_int64(2)})); - batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(3)})); - batch->append_tuple(Tuple({common::Value::make_null(), common::Value::make_int64(4)})); - ASSERT_TRUE(table.append_batch(*batch)); - - auto scan = std::make_unique("null_group", - std::make_shared(table)); - - Schema out_schema; - out_schema.add_column("cat", common::ValueType::TYPE_TEXT); - out_schema.add_column("cnt", common::ValueType::TYPE_INT64); - - std::vector> group_by; - group_by.push_back(std::make_unique("cat")); - - std::vector aggs = {{AggregateType::Count, -1}}; - - VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), - std::move(aggs), std::move(out_schema)); - - auto result = VectorBatch::create(groupby.output_schema()); - int group_count = 0; - while (groupby.next_batch(*result)) { - group_count += result->row_count(); - } - // 3 groups: "A", "B", and NULL - EXPECT_EQ(group_count, 3); + // GROUP BY with NULL keys + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("null_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert: A, NULL, B, NULL + auto batch = VectorBatch::create(schema); + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(1)})); + batch->append_tuple(Tuple({common::Value::make_null(), common::Value::make_int64(2)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(3)})); + batch->append_tuple(Tuple({common::Value::make_null(), common::Value::make_int64(4)})); + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("null_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + int group_count = 0; + while (groupby.next_batch(*result)) { + group_count += result->row_count(); + } + // 3 groups: "A", "B", and NULL + EXPECT_EQ(group_count, 3); } TEST_F(VectorizedGroupByTests, VerifyGroupKeyValues) { - // Verify actual group key values in output - Schema schema; - schema.add_column("cat", common::ValueType::TYPE_TEXT); - schema.add_column("val", common::ValueType::TYPE_INT64); - - ColumnarTable table("verify_keys", *storage_, schema); - ASSERT_TRUE(table.create()); - ASSERT_TRUE(table.open()); - - auto batch = VectorBatch::create(schema); - batch->append_tuple(Tuple({common::Value::make_text("X"), common::Value::make_int64(10)})); - batch->append_tuple(Tuple({common::Value::make_text("X"), common::Value::make_int64(20)})); - batch->append_tuple(Tuple({common::Value::make_text("Y"), common::Value::make_int64(5)})); - ASSERT_TRUE(table.append_batch(*batch)); - - auto scan = std::make_unique("verify_keys", - std::make_shared(table)); - - Schema out_schema; - out_schema.add_column("cat", common::ValueType::TYPE_TEXT); - out_schema.add_column("cnt", common::ValueType::TYPE_INT64); - - std::vector> group_by; - group_by.push_back(std::make_unique("cat")); - - std::vector aggs = {{AggregateType::Count, -1}}; - - VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), - std::move(aggs), std::move(out_schema)); - - auto result = VectorBatch::create(groupby.output_schema()); - ASSERT_TRUE(groupby.next_batch(*result)); - EXPECT_EQ(result->row_count(), 2); - - // Find X and Y groups and verify - std::string found_x, found_y; - int64_t cnt_x = 0, cnt_y = 0; - for (size_t i = 0; i < result->row_count(); ++i) { - std::string cat = result->get_column(0).get(i).as_text(); - int64_t cnt = result->get_column(1).get(i).as_int64(); - if (cat == "X") { - found_x = cat; - cnt_x = cnt; - } else if (cat == "Y") { - found_y = cat; - cnt_y = cnt; + // Verify actual group key values in output + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("verify_keys", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + auto batch = VectorBatch::create(schema); + batch->append_tuple(Tuple({common::Value::make_text("X"), common::Value::make_int64(10)})); + batch->append_tuple(Tuple({common::Value::make_text("X"), common::Value::make_int64(20)})); + batch->append_tuple(Tuple({common::Value::make_text("Y"), common::Value::make_int64(5)})); + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("verify_keys", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 2); + + // Find X and Y groups and verify + std::string found_x, found_y; + int64_t cnt_x = 0, cnt_y = 0; + for (size_t i = 0; i < result->row_count(); ++i) { + std::string cat = result->get_column(0).get(i).as_text(); + int64_t cnt = result->get_column(1).get(i).as_int64(); + if (cat == "X") { + found_x = cat; + cnt_x = cnt; + } else if (cat == "Y") { + found_y = cat; + cnt_y = cnt; + } } - } - EXPECT_EQ(found_x, "X"); - EXPECT_EQ(cnt_x, 2); - EXPECT_EQ(found_y, "Y"); - EXPECT_EQ(cnt_y, 1); + EXPECT_EQ(found_x, "X"); + EXPECT_EQ(cnt_x, 2); + EXPECT_EQ(found_y, "Y"); + EXPECT_EQ(cnt_y, 1); } } // namespace \ No newline at end of file From 8b58ad5ab6f954be7666d3603ed1da38179c4114 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Wed, 22 Apr 2026 23:23:54 +0300 Subject: [PATCH 6/7] fix(vectorized): resolve review comments on VectorizedGroupByOperator - Add explicit #include - Rename ProcessState/state_ to ProcessPhase/process_phase_ to avoid shadowing base class - Use separate sums_int64/sums_float64 accumulators with has_float_value_ tracking - Branch in produce_output_batch to emit based on output column type - Add collision-safe key encoding with length-prefixed values and dedicated NULL marker - Pre-resolve group_by column indices in constructor (group_by_col_indices_) - Fail fast on unresolved group-by expressions instead of silently emitting "NULL|" - Update PHASE_8_ANALYTICS.md to document MIN/MAX support and implementation details --- docs/phases/PHASE_8_ANALYTICS.md | 7 +- include/executor/vectorized_operator.hpp | 83 ++++++++++++++++++------ 2 files changed, 67 insertions(+), 23 deletions(-) diff --git a/docs/phases/PHASE_8_ANALYTICS.md b/docs/phases/PHASE_8_ANALYTICS.md index 3727fdc..16ceb9a 100644 --- a/docs/phases/PHASE_8_ANALYTICS.md +++ b/docs/phases/PHASE_8_ANALYTICS.md @@ -29,9 +29,12 @@ Optimized global analytical queries (`COUNT`, `SUM`). ### 5. Vectorized GROUP BY Added `VectorizedGroupByOperator` for hash-based grouped aggregation. -- **Hash-Based Grouping**: Uses `unordered_map` for efficient group key lookup. +- **Hash-Based Grouping**: Uses `unordered_map` for efficient group key lookup with collision-safe key encoding. - **Two-Phase Processing**: Input phase builds hash table from batches; Output phase serves grouped results. -- **Supported Aggregates**: COUNT(*) and SUM with INT64/FLOAT64 columns. +- **Supported Aggregates**: COUNT(*), SUM, MIN, and MAX with INT64/FLOAT64 columns. +- **Type-Specific Accumulators**: SUM uses separate `sums_int64` and `sums_float64` accumulators to preserve precision for large INT64 values. +- **Collision-Safe Key Encoding**: Group keys use length-prefixed encoding with dedicated NULL markers, preventing key collisions from string concatenation ambiguities. +- **Pre-resolved Column Indices**: Group-by column indices computed once in constructor to avoid repeated lookups. ## Recent Improvements (Engine Benchmarking) As of our latest sprint, we have established a high-performance baseline for the engine's core scanning logic: diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 6cd9216..c626f42 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include "executor/types.hpp" @@ -292,14 +293,18 @@ class VectorizedAggregateOperator : public VectorizedOperator { */ struct VectorizedGroupState { std::vector counts; - std::vector sums; + std::vector sums_int64; // Separate accumulators to avoid precision loss + std::vector sums_float64; + std::vector has_float_value_; // Tracks whether any float64 values were accumulated std::vector mins; std::vector maxes; VectorizedGroupState() = default; explicit VectorizedGroupState(size_t agg_count) { counts.assign(agg_count, 0); - sums.assign(agg_count, 0.0); + sums_int64.assign(agg_count, 0); + sums_float64.assign(agg_count, 0.0); + has_float_value_.assign(agg_count, false); mins.assign(agg_count, common::Value::make_null()); maxes.assign(agg_count, common::Value::make_null()); } @@ -314,6 +319,9 @@ class VectorizedGroupByOperator : public VectorizedOperator { std::vector> group_by_; std::vector aggregates_; + // Pre-resolved column indices for group-by expressions (computed once in ctor) + std::vector group_by_col_indices_; + // Hash table: group key string -> group state std::unordered_map groups_; // Ordered group keys for output iteration @@ -325,8 +333,8 @@ class VectorizedGroupByOperator : public VectorizedOperator { size_t current_group_idx_ = 0; // Processing state - enum class ProcessState { Input, Output }; - ProcessState state_ = ProcessState::Input; + enum class ProcessPhase { Input, Output }; + ProcessPhase process_phase_ = ProcessPhase::Input; // Reusable batch objects std::unique_ptr input_batch_; @@ -341,6 +349,14 @@ class VectorizedGroupByOperator : public VectorizedOperator { group_by_(std::move(group_by)), aggregates_(std::move(aggregates)) { input_batch_ = VectorBatch::create(child_->output_schema()); + + // Pre-resolve column indices once in constructor + const auto& schema = child_->output_schema(); + for (size_t i = 0; i < group_by_.size(); ++i) { + size_t col_idx = schema.find_column(group_by_[i]->to_string()); + group_by_col_indices_.push_back(col_idx); + } + // Create schema for group key evaluation Schema key_schema; for (size_t i = 0; i < group_by_.size(); ++i) { @@ -350,12 +366,12 @@ class VectorizedGroupByOperator : public VectorizedOperator { } bool next_batch(VectorBatch& out_batch) override { - if (state_ == ProcessState::Input) { + if (process_phase_ == ProcessPhase::Input) { // Phase 1: Consume all input batches and populate hash table while (child_->next_batch(*input_batch_)) { process_input_batch(*input_batch_); } - state_ = ProcessState::Output; + process_phase_ = ProcessPhase::Output; } // Phase 2: Produce grouped output batches @@ -364,19 +380,30 @@ class VectorizedGroupByOperator : public VectorizedOperator { private: void process_input_batch(VectorBatch& batch) { - // For each row, compute hash key directly from source columns + // For each row, compute hash key using collision-safe encoding for (size_t r = 0; r < batch.row_count(); ++r) { - // Build key from group-by values + // Build key using length-prefixed, type-tagged encoding std::string key; - for (size_t i = 0; i < group_by_.size(); ++i) { - // Find column index for group-by expression - const auto& schema = child_->output_schema(); - size_t col_idx = schema.find_column(group_by_[i]->to_string()); + for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { + size_t col_idx = group_by_col_indices_[i]; if (col_idx == static_cast(-1)) { - key += "NULL|"; + // Column not found in schema - fail fast + set_error("GROUP BY: column not found in input schema: " + + group_by_[i]->to_string()); + return; + } + + const auto& val = batch.get_column(col_idx).get(r); + if (val.is_null()) { + // Use a dedicated NULL marker for null values + key.append("\1NULL\0", 6); } else { - key += batch.get_column(col_idx).get(r).to_string(); - key += "|"; + // Length-prefixed value: marker + length (4 bytes) + data + std::string val_str = val.to_string(); + key.push_back('\0'); // non-NULL marker + uint32_t len = static_cast(val_str.size()); + key.append(reinterpret_cast(&len), 4); + key.append(val_str); } } @@ -385,8 +412,8 @@ class VectorizedGroupByOperator : public VectorizedOperator { if (it == groups_.end()) { // Store group key values for output std::vector key_vals; - for (size_t i = 0; i < group_by_.size(); ++i) { - size_t col_idx = child_->output_schema().find_column(group_by_[i]->to_string()); + for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { + size_t col_idx = group_by_col_indices_[i]; if (col_idx == static_cast(-1)) { key_vals.push_back(common::Value::make_null()); } else { @@ -417,10 +444,11 @@ class VectorizedGroupByOperator : public VectorizedOperator { if (!col.is_null(row_idx)) { if (col.type() == common::ValueType::TYPE_INT64) { auto& num_col = dynamic_cast&>(col); - state.sums[i] += num_col.raw_data()[row_idx]; + state.sums_int64[i] += num_col.raw_data()[row_idx]; } else if (col.type() == common::ValueType::TYPE_FLOAT64) { auto& num_col = dynamic_cast&>(col); - state.sums[i] += num_col.raw_data()[row_idx]; + state.sums_float64[i] += num_col.raw_data()[row_idx]; + state.has_float_value_[i] = true; } } } else if (agg.type == AggregateType::Min && agg.input_col_idx >= 0) { @@ -471,8 +499,21 @@ class VectorizedGroupByOperator : public VectorizedOperator { common::Value::make_int64(state.counts[i])); break; case AggregateType::Sum: - out_batch.get_column(col_idx).append( - common::Value::make_float64(state.sums[i])); + // Emit based on output column type to preserve precision + if (output_schema_.get_column(col_idx).type() == common::ValueType::TYPE_INT64) { + out_batch.get_column(col_idx).append( + common::Value::make_int64(state.sums_int64[i])); + } else if (output_schema_.get_column(col_idx).type() == common::ValueType::TYPE_FLOAT64) { + // If we saw any float64 values, use the float64 accumulator + // Otherwise convert from int64 accumulator + double float_val = state.has_float_value_[i] + ? state.sums_float64[i] + : static_cast(state.sums_int64[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(float_val)); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } break; case AggregateType::Min: out_batch.get_column(col_idx).append(state.mins[i]); From 2f5bb4a9236cef3fb696c0e2f2d9deb63efe305b Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Wed, 22 Apr 2026 20:31:23 +0000 Subject: [PATCH 7/7] style: automated clang-format fixes --- include/executor/vectorized_operator.hpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index c626f42..1841f39 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -500,15 +500,17 @@ class VectorizedGroupByOperator : public VectorizedOperator { break; case AggregateType::Sum: // Emit based on output column type to preserve precision - if (output_schema_.get_column(col_idx).type() == common::ValueType::TYPE_INT64) { + if (output_schema_.get_column(col_idx).type() == + common::ValueType::TYPE_INT64) { out_batch.get_column(col_idx).append( common::Value::make_int64(state.sums_int64[i])); - } else if (output_schema_.get_column(col_idx).type() == common::ValueType::TYPE_FLOAT64) { + } else if (output_schema_.get_column(col_idx).type() == + common::ValueType::TYPE_FLOAT64) { // If we saw any float64 values, use the float64 accumulator // Otherwise convert from int64 accumulator double float_val = state.has_float_value_[i] - ? state.sums_float64[i] - : static_cast(state.sums_int64[i]); + ? state.sums_float64[i] + : static_cast(state.sums_int64[i]); out_batch.get_column(col_idx).append( common::Value::make_float64(float_val)); } else {