diff --git a/docs/phases/PHASE_8_ANALYTICS.md b/docs/phases/PHASE_8_ANALYTICS.md index e5de28a..16ceb9a 100644 --- a/docs/phases/PHASE_8_ANALYTICS.md +++ b/docs/phases/PHASE_8_ANALYTICS.md @@ -19,7 +19,7 @@ Developed SIMD-friendly contiguous memory buffers for batch processing. ### 3. Vectorized Execution Engine (`include/executor/vectorized_operator.hpp`) Built a batch-at-a-time physical execution model. -- **Vectorized Operators**: Implemented `Scan`, `Filter`, `Project`, and `Aggregate` operators designed for chunk-based execution. +- **Vectorized Operators**: Implemented `Scan`, `Filter`, `Project`, `Aggregate`, and `GroupBy` operators designed for chunk-based execution. - **Batch-at-a-Time Interface**: Operators pass entire `VectorBatch` objects between themselves, minimizing virtual function call overhead. ### 4. High-Performance Aggregation @@ -27,6 +27,15 @@ Optimized global analytical queries (`COUNT`, `SUM`). - **Vectorized Global Aggregate**: Aggregates entire batches of data with minimal branching and high cache locality. - **Type-Specific Aggregation**: Leverages C++ templates to generate highly efficient aggregation logic for different data types. +### 5. Vectorized GROUP BY +Added `VectorizedGroupByOperator` for hash-based grouped aggregation. +- **Hash-Based Grouping**: Uses `unordered_map` for efficient group key lookup with collision-safe key encoding. +- **Two-Phase Processing**: Input phase builds hash table from batches; Output phase serves grouped results. +- **Supported Aggregates**: COUNT(*), SUM, MIN, and MAX with INT64/FLOAT64 columns. +- **Type-Specific Accumulators**: SUM uses separate `sums_int64` and `sums_float64` accumulators to preserve precision for large INT64 values. +- **Collision-Safe Key Encoding**: Group keys use length-prefixed encoding with dedicated NULL markers, preventing key collisions from string concatenation ambiguities. +- **Pre-resolved Column Indices**: Group-by column indices computed once in constructor to avoid repeated lookups. + ## Recent Improvements (Engine Benchmarking) As of our latest sprint, we have established a high-performance baseline for the engine's core scanning logic: - **Baseline Speed**: 181M rows/s (Sequential Scan). diff --git a/include/executor/vectorized_operator.hpp b/include/executor/vectorized_operator.hpp index 15fa879..1841f39 100644 --- a/include/executor/vectorized_operator.hpp +++ b/include/executor/vectorized_operator.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include "executor/types.hpp" @@ -287,6 +288,255 @@ class VectorizedAggregateOperator : public VectorizedOperator { } }; +/** + * @brief Group state for vectorized GROUP BY - accumulator data per group + */ +struct VectorizedGroupState { + std::vector counts; + std::vector sums_int64; // Separate accumulators to avoid precision loss + std::vector sums_float64; + std::vector has_float_value_; // Tracks whether any float64 values were accumulated + std::vector mins; + std::vector maxes; + + VectorizedGroupState() = default; + explicit VectorizedGroupState(size_t agg_count) { + counts.assign(agg_count, 0); + sums_int64.assign(agg_count, 0); + sums_float64.assign(agg_count, 0.0); + has_float_value_.assign(agg_count, false); + mins.assign(agg_count, common::Value::make_null()); + maxes.assign(agg_count, common::Value::make_null()); + } +}; + +/** + * @brief Vectorized GROUP BY operator with hash-based aggregation + */ +class VectorizedGroupByOperator : public VectorizedOperator { + private: + std::unique_ptr child_; + std::vector> group_by_; + std::vector aggregates_; + + // Pre-resolved column indices for group-by expressions (computed once in ctor) + std::vector group_by_col_indices_; + + // Hash table: group key string -> group state + std::unordered_map groups_; + // Ordered group keys for output iteration + std::vector group_keys_; + // Group key values for each group + std::vector> group_values_; + + // Current output position + size_t current_group_idx_ = 0; + + // Processing state + enum class ProcessPhase { Input, Output }; + ProcessPhase process_phase_ = ProcessPhase::Input; + + // Reusable batch objects + std::unique_ptr input_batch_; + std::unique_ptr group_key_batch_; + + public: + VectorizedGroupByOperator(std::unique_ptr child, + std::vector> group_by, + std::vector aggregates, Schema output_schema) + : VectorizedOperator(std::move(output_schema)), + child_(std::move(child)), + group_by_(std::move(group_by)), + aggregates_(std::move(aggregates)) { + input_batch_ = VectorBatch::create(child_->output_schema()); + + // Pre-resolve column indices once in constructor + const auto& schema = child_->output_schema(); + for (size_t i = 0; i < group_by_.size(); ++i) { + size_t col_idx = schema.find_column(group_by_[i]->to_string()); + group_by_col_indices_.push_back(col_idx); + } + + // Create schema for group key evaluation + Schema key_schema; + for (size_t i = 0; i < group_by_.size(); ++i) { + key_schema.add_column("gb_key_" + std::to_string(i), common::ValueType::TYPE_TEXT); + } + group_key_batch_ = VectorBatch::create(key_schema); + } + + bool next_batch(VectorBatch& out_batch) override { + if (process_phase_ == ProcessPhase::Input) { + // Phase 1: Consume all input batches and populate hash table + while (child_->next_batch(*input_batch_)) { + process_input_batch(*input_batch_); + } + process_phase_ = ProcessPhase::Output; + } + + // Phase 2: Produce grouped output batches + return produce_output_batch(out_batch); + } + + private: + void process_input_batch(VectorBatch& batch) { + // For each row, compute hash key using collision-safe encoding + for (size_t r = 0; r < batch.row_count(); ++r) { + // Build key using length-prefixed, type-tagged encoding + std::string key; + for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { + size_t col_idx = group_by_col_indices_[i]; + if (col_idx == static_cast(-1)) { + // Column not found in schema - fail fast + set_error("GROUP BY: column not found in input schema: " + + group_by_[i]->to_string()); + return; + } + + const auto& val = batch.get_column(col_idx).get(r); + if (val.is_null()) { + // Use a dedicated NULL marker for null values + key.append("\1NULL\0", 6); + } else { + // Length-prefixed value: marker + length (4 bytes) + data + std::string val_str = val.to_string(); + key.push_back('\0'); // non-NULL marker + uint32_t len = static_cast(val_str.size()); + key.append(reinterpret_cast(&len), 4); + key.append(val_str); + } + } + + // Get or create group state + auto it = groups_.find(key); + if (it == groups_.end()) { + // Store group key values for output + std::vector key_vals; + for (size_t i = 0; i < group_by_col_indices_.size(); ++i) { + size_t col_idx = group_by_col_indices_[i]; + if (col_idx == static_cast(-1)) { + key_vals.push_back(common::Value::make_null()); + } else { + key_vals.push_back(batch.get_column(col_idx).get(r)); + } + } + auto result = groups_.emplace(key, VectorizedGroupState(aggregates_.size())); + it = result.first; + group_keys_.push_back(key); + group_values_.push_back(std::move(key_vals)); + } + + // Update accumulators for this row + update_accumulators(it->second, batch, r); + } + input_batch_->clear(); + } + + void update_accumulators(VectorizedGroupState& state, VectorBatch& batch, size_t row_idx) { + for (size_t i = 0; i < aggregates_.size(); ++i) { + const auto& agg = aggregates_[i]; + + if (agg.type == AggregateType::Count && agg.input_col_idx < 0) { + // COUNT(*) - always increment + state.counts[i]++; + } else if (agg.type == AggregateType::Sum && agg.input_col_idx >= 0) { + auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(row_idx)) { + if (col.type() == common::ValueType::TYPE_INT64) { + auto& num_col = dynamic_cast&>(col); + state.sums_int64[i] += num_col.raw_data()[row_idx]; + } else if (col.type() == common::ValueType::TYPE_FLOAT64) { + auto& num_col = dynamic_cast&>(col); + state.sums_float64[i] += num_col.raw_data()[row_idx]; + state.has_float_value_[i] = true; + } + } + } else if (agg.type == AggregateType::Min && agg.input_col_idx >= 0) { + auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(row_idx)) { + if (state.mins[i].is_null() || col.get(row_idx) < state.mins[i]) { + state.mins[i] = col.get(row_idx); + } + } + } else if (agg.type == AggregateType::Max && agg.input_col_idx >= 0) { + auto& col = batch.get_column(agg.input_col_idx); + if (!col.is_null(row_idx)) { + if (state.maxes[i].is_null() || state.maxes[i] < col.get(row_idx)) { + state.maxes[i] = col.get(row_idx); + } + } + } + } + } + + bool produce_output_batch(VectorBatch& out_batch) { + if (current_group_idx_ >= group_keys_.size()) { + return false; // EOF + } + + out_batch.clear(); + if (out_batch.column_count() == 0) { + out_batch.init_from_schema(output_schema_); + } + + constexpr size_t BATCH_SIZE = 1024; + size_t output_count = 0; + + while (current_group_idx_ < group_keys_.size() && output_count < BATCH_SIZE) { + // Append group key columns + const auto& key_vals = group_values_[current_group_idx_]; + for (size_t i = 0; i < key_vals.size(); ++i) { + out_batch.get_column(i).append(key_vals[i]); + } + + // Append aggregate result columns + const auto& state = groups_.at(group_keys_[current_group_idx_]); + for (size_t i = 0; i < aggregates_.size(); ++i) { + size_t col_idx = group_by_.size() + i; + switch (aggregates_[i].type) { + case AggregateType::Count: + out_batch.get_column(col_idx).append( + common::Value::make_int64(state.counts[i])); + break; + case AggregateType::Sum: + // Emit based on output column type to preserve precision + if (output_schema_.get_column(col_idx).type() == + common::ValueType::TYPE_INT64) { + out_batch.get_column(col_idx).append( + common::Value::make_int64(state.sums_int64[i])); + } else if (output_schema_.get_column(col_idx).type() == + common::ValueType::TYPE_FLOAT64) { + // If we saw any float64 values, use the float64 accumulator + // Otherwise convert from int64 accumulator + double float_val = state.has_float_value_[i] + ? state.sums_float64[i] + : static_cast(state.sums_int64[i]); + out_batch.get_column(col_idx).append( + common::Value::make_float64(float_val)); + } else { + out_batch.get_column(col_idx).append(common::Value::make_null()); + } + break; + case AggregateType::Min: + out_batch.get_column(col_idx).append(state.mins[i]); + break; + case AggregateType::Max: + out_batch.get_column(col_idx).append(state.maxes[i]); + break; + default: + out_batch.get_column(col_idx).append(common::Value::make_null()); + break; + } + } + output_count++; + current_group_idx_++; + } + + out_batch.set_row_count(output_count); + return true; + } +}; + } // namespace cloudsql::executor #endif // CLOUDSQL_EXECUTOR_VECTORIZED_OPERATOR_HPP diff --git a/tests/vectorized_operator_tests.cpp b/tests/vectorized_operator_tests.cpp index 54d5956..8faa1f2 100644 --- a/tests/vectorized_operator_tests.cpp +++ b/tests/vectorized_operator_tests.cpp @@ -446,4 +446,385 @@ TEST_F(VectorizedFilterTests, PipelinedBatches) { EXPECT_EQ(total, 1000); } +class VectorizedGroupByTests : public ::testing::Test { + protected: + void SetUp() override { storage_ = std::make_unique("./test_groupby"); } + void TearDown() override { storage_.reset(); } + + std::unique_ptr storage_; +}; + +TEST_F(VectorizedGroupByTests, SingleGroup) { + // GROUP BY with single group (same as global aggregation) + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("single_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert 10 rows all with same category + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 10; ++i) { + batch->append_tuple( + Tuple({common::Value::make_text("A"), common::Value::make_int64(i + 1)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + // SELECT cat, COUNT(*), SUM(val) FROM t GROUP BY cat + // Expected: 1 group (cat=A, count=10, sum=55) + auto scan = std::make_unique("single_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = { + {AggregateType::Count, -1}, // COUNT(*) + {AggregateType::Sum, 1} // SUM(val) + }; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 1); + EXPECT_EQ(result->get_column(0).get(0).as_text(), "A"); + EXPECT_EQ(result->get_column(1).get(0).as_int64(), 10); // COUNT(*) + EXPECT_EQ(result->get_column(2).get(0).to_float64(), 55.0); // SUM(1..10) + + EXPECT_FALSE(groupby.next_batch(*result)); // EOF +} + +TEST_F(VectorizedGroupByTests, MultipleGroups) { + // GROUP BY with 3 distinct groups + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("multi_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert 9 rows: 3 each of A, B, C + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 3; ++i) { + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(10)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(20)})); + batch->append_tuple(Tuple({common::Value::make_text("C"), common::Value::make_int64(30)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("multi_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 3); // 3 groups + + // Verify all 3 groups have count=3 + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(result->get_column(1).get(i).as_int64(), 3); + } +} + +TEST_F(VectorizedGroupByTests, EmptyInput) { + // GROUP BY on empty table should return 0 groups + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("empty_groupby", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + auto scan = std::make_unique("empty_groupby", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + EXPECT_FALSE(groupby.next_batch(*result)); // No groups from empty input +} + +TEST_F(VectorizedGroupByTests, MultiBatchGroups) { + // 2500 rows with 10 groups using TEXT keys, verify groups span multiple input batches + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("multibatch_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // 2500 rows: cat = "cat_" + (i % 10) (10 groups, ~250 rows each) + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 2500; ++i) { + std::string cat = "cat_" + std::to_string(i % 10); + batch->append_tuple(Tuple({common::Value::make_text(cat), common::Value::make_int64(i)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("multibatch_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}, + {AggregateType::Sum, 1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + int group_count = 0; + while (groupby.next_batch(*result)) { + group_count += result->row_count(); + } + EXPECT_EQ(group_count, 10); // 10 groups +} + +TEST_F(VectorizedGroupByTests, MultipleColumnGroupBy) { + // GROUP BY on two columns + Schema schema; + schema.add_column("cat1", common::ValueType::TYPE_TEXT); + schema.add_column("cat2", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("multi_col_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // 20 rows: (cat1 = i%4, cat2 = i%5) -> 20 unique pairs + auto batch = VectorBatch::create(schema); + for (int64_t i = 0; i < 20; ++i) { + std::string c1 = "A" + std::to_string(i % 4); + std::string c2 = "B" + std::to_string(i % 5); + batch->append_tuple(Tuple({common::Value::make_text(c1), common::Value::make_text(c2), + common::Value::make_int64(i)})); + } + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("multi_col_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat1", common::ValueType::TYPE_TEXT); + out_schema.add_column("cat2", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat1")); + group_by.push_back(std::make_unique("cat2")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + int group_count = 0; + while (groupby.next_batch(*result)) { + group_count += result->row_count(); + } + // 4 * 5 = 20 unique pairs + EXPECT_EQ(group_count, 20); +} + +TEST_F(VectorizedGroupByTests, MinMaxAggregates) { + // Test MIN and MAX aggregates + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("minmax_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert rows: A->{5,10}, B->{3,7} + auto batch = VectorBatch::create(schema); + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(5)})); + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(10)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(3)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(7)})); + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("minmax_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + out_schema.add_column("sum", common::ValueType::TYPE_FLOAT64); + out_schema.add_column("min", common::ValueType::TYPE_INT64); + out_schema.add_column("max", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}, + {AggregateType::Sum, 1}, + {AggregateType::Min, 1}, + {AggregateType::Max, 1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 2); + + // Verify values for both groups + for (size_t i = 0; i < result->row_count(); ++i) { + std::string cat = result->get_column(0).get(i).as_text(); + int64_t cnt = result->get_column(1).get(i).as_int64(); + int64_t sum = static_cast(result->get_column(2).get(i).to_float64()); + int64_t min_val = result->get_column(3).get(i).as_int64(); + int64_t max_val = result->get_column(4).get(i).as_int64(); + + EXPECT_EQ(cnt, 2); + if (cat == "A") { + EXPECT_EQ(sum, 15); // 5 + 10 + EXPECT_EQ(min_val, 5); + EXPECT_EQ(max_val, 10); + } else if (cat == "B") { + EXPECT_EQ(sum, 10); // 3 + 7 + EXPECT_EQ(min_val, 3); + EXPECT_EQ(max_val, 7); + } + } +} + +TEST_F(VectorizedGroupByTests, NullGroupKeys) { + // GROUP BY with NULL keys + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("null_group", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + // Insert: A, NULL, B, NULL + auto batch = VectorBatch::create(schema); + batch->append_tuple(Tuple({common::Value::make_text("A"), common::Value::make_int64(1)})); + batch->append_tuple(Tuple({common::Value::make_null(), common::Value::make_int64(2)})); + batch->append_tuple(Tuple({common::Value::make_text("B"), common::Value::make_int64(3)})); + batch->append_tuple(Tuple({common::Value::make_null(), common::Value::make_int64(4)})); + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("null_group", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + int group_count = 0; + while (groupby.next_batch(*result)) { + group_count += result->row_count(); + } + // 3 groups: "A", "B", and NULL + EXPECT_EQ(group_count, 3); +} + +TEST_F(VectorizedGroupByTests, VerifyGroupKeyValues) { + // Verify actual group key values in output + Schema schema; + schema.add_column("cat", common::ValueType::TYPE_TEXT); + schema.add_column("val", common::ValueType::TYPE_INT64); + + ColumnarTable table("verify_keys", *storage_, schema); + ASSERT_TRUE(table.create()); + ASSERT_TRUE(table.open()); + + auto batch = VectorBatch::create(schema); + batch->append_tuple(Tuple({common::Value::make_text("X"), common::Value::make_int64(10)})); + batch->append_tuple(Tuple({common::Value::make_text("X"), common::Value::make_int64(20)})); + batch->append_tuple(Tuple({common::Value::make_text("Y"), common::Value::make_int64(5)})); + ASSERT_TRUE(table.append_batch(*batch)); + + auto scan = std::make_unique("verify_keys", + std::make_shared(table)); + + Schema out_schema; + out_schema.add_column("cat", common::ValueType::TYPE_TEXT); + out_schema.add_column("cnt", common::ValueType::TYPE_INT64); + + std::vector> group_by; + group_by.push_back(std::make_unique("cat")); + + std::vector aggs = {{AggregateType::Count, -1}}; + + VectorizedGroupByOperator groupby(std::move(scan), std::move(group_by), std::move(aggs), + std::move(out_schema)); + + auto result = VectorBatch::create(groupby.output_schema()); + ASSERT_TRUE(groupby.next_batch(*result)); + EXPECT_EQ(result->row_count(), 2); + + // Find X and Y groups and verify + std::string found_x, found_y; + int64_t cnt_x = 0, cnt_y = 0; + for (size_t i = 0; i < result->row_count(); ++i) { + std::string cat = result->get_column(0).get(i).as_text(); + int64_t cnt = result->get_column(1).get(i).as_int64(); + if (cat == "X") { + found_x = cat; + cnt_x = cnt; + } else if (cat == "Y") { + found_y = cat; + cnt_y = cnt; + } + } + EXPECT_EQ(found_x, "X"); + EXPECT_EQ(cnt_x, 2); + EXPECT_EQ(found_y, "Y"); + EXPECT_EQ(cnt_y, 1); +} + } // namespace \ No newline at end of file