Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/phases/PHASE_8_ANALYTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@ Developed SIMD-friendly contiguous memory buffers for batch processing.

### 3. Vectorized Execution Engine (`include/executor/vectorized_operator.hpp`)
Built a batch-at-a-time physical execution model.
- **Vectorized Operators**: Implemented `Scan`, `Filter`, `Project`, and `Aggregate` operators designed for chunk-based execution.
- **Vectorized Operators**: Implemented `Scan`, `Filter`, `Project`, `Aggregate`, and `GroupBy` operators designed for chunk-based execution.
- **Batch-at-a-Time Interface**: Operators pass entire `VectorBatch` objects between themselves, minimizing virtual function call overhead.

### 4. High-Performance Aggregation
Optimized global analytical queries (`COUNT`, `SUM`).
- **Vectorized Global Aggregate**: Aggregates entire batches of data with minimal branching and high cache locality.
- **Type-Specific Aggregation**: Leverages C++ templates to generate highly efficient aggregation logic for different data types.

### 5. Vectorized GROUP BY
Added `VectorizedGroupByOperator` for hash-based grouped aggregation.
- **Hash-Based Grouping**: Uses `unordered_map` for efficient group key lookup with collision-safe key encoding.
- **Two-Phase Processing**: Input phase builds hash table from batches; Output phase serves grouped results.
- **Supported Aggregates**: COUNT(*), SUM, MIN, and MAX with INT64/FLOAT64 columns.
- **Type-Specific Accumulators**: SUM uses separate `sums_int64` and `sums_float64` accumulators to preserve precision for large INT64 values.
- **Collision-Safe Key Encoding**: Group keys use length-prefixed encoding with dedicated NULL markers, preventing key collisions from string concatenation ambiguities.
- **Pre-resolved Column Indices**: Group-by column indices computed once in constructor to avoid repeated lookups.

Comment thread
coderabbitai[bot] marked this conversation as resolved.
## Recent Improvements (Engine Benchmarking)
As of our latest sprint, we have established a high-performance baseline for the engine's core scanning logic:
- **Baseline Speed**: 181M rows/s (Sequential Scan).
Expand Down
250 changes: 250 additions & 0 deletions include/executor/vectorized_operator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <vector>

#include "executor/types.hpp"
Expand Down Expand Up @@ -287,6 +288,255 @@ class VectorizedAggregateOperator : public VectorizedOperator {
}
};

/**
* @brief Group state for vectorized GROUP BY - accumulator data per group
*/
struct VectorizedGroupState {
std::vector<int64_t> counts;
std::vector<int64_t> sums_int64; // Separate accumulators to avoid precision loss
std::vector<double> sums_float64;
std::vector<bool> has_float_value_; // Tracks whether any float64 values were accumulated
std::vector<common::Value> mins;
std::vector<common::Value> maxes;

VectorizedGroupState() = default;
explicit VectorizedGroupState(size_t agg_count) {
counts.assign(agg_count, 0);
sums_int64.assign(agg_count, 0);
sums_float64.assign(agg_count, 0.0);
has_float_value_.assign(agg_count, false);
mins.assign(agg_count, common::Value::make_null());
maxes.assign(agg_count, common::Value::make_null());
}
};

/**
* @brief Vectorized GROUP BY operator with hash-based aggregation
*/
class VectorizedGroupByOperator : public VectorizedOperator {
private:
std::unique_ptr<VectorizedOperator> child_;
std::vector<std::unique_ptr<parser::Expression>> group_by_;
std::vector<VectorizedAggregateInfo> aggregates_;

// Pre-resolved column indices for group-by expressions (computed once in ctor)
std::vector<size_t> group_by_col_indices_;

// Hash table: group key string -> group state
std::unordered_map<std::string, VectorizedGroupState> groups_;
// Ordered group keys for output iteration
std::vector<std::string> group_keys_;
// Group key values for each group
std::vector<std::vector<common::Value>> group_values_;

// Current output position
size_t current_group_idx_ = 0;

// Processing state
enum class ProcessPhase { Input, Output };
ProcessPhase process_phase_ = ProcessPhase::Input;

// Reusable batch objects
std::unique_ptr<VectorBatch> input_batch_;
std::unique_ptr<VectorBatch> group_key_batch_;

public:
VectorizedGroupByOperator(std::unique_ptr<VectorizedOperator> child,
std::vector<std::unique_ptr<parser::Expression>> group_by,
std::vector<VectorizedAggregateInfo> aggregates, Schema output_schema)
: VectorizedOperator(std::move(output_schema)),
child_(std::move(child)),
group_by_(std::move(group_by)),
aggregates_(std::move(aggregates)) {
input_batch_ = VectorBatch::create(child_->output_schema());

// Pre-resolve column indices once in constructor
const auto& schema = child_->output_schema();
for (size_t i = 0; i < group_by_.size(); ++i) {
size_t col_idx = schema.find_column(group_by_[i]->to_string());
group_by_col_indices_.push_back(col_idx);
}

// Create schema for group key evaluation
Schema key_schema;
for (size_t i = 0; i < group_by_.size(); ++i) {
key_schema.add_column("gb_key_" + std::to_string(i), common::ValueType::TYPE_TEXT);
}
group_key_batch_ = VectorBatch::create(key_schema);
}

bool next_batch(VectorBatch& out_batch) override {
if (process_phase_ == ProcessPhase::Input) {
// Phase 1: Consume all input batches and populate hash table
while (child_->next_batch(*input_batch_)) {
process_input_batch(*input_batch_);
}
process_phase_ = ProcessPhase::Output;
}

// Phase 2: Produce grouped output batches
return produce_output_batch(out_batch);
}

private:
void process_input_batch(VectorBatch& batch) {
// For each row, compute hash key using collision-safe encoding
for (size_t r = 0; r < batch.row_count(); ++r) {
// Build key using length-prefixed, type-tagged encoding
std::string key;
for (size_t i = 0; i < group_by_col_indices_.size(); ++i) {
size_t col_idx = group_by_col_indices_[i];
if (col_idx == static_cast<size_t>(-1)) {
// Column not found in schema - fail fast
set_error("GROUP BY: column not found in input schema: " +
group_by_[i]->to_string());
return;
}

const auto& val = batch.get_column(col_idx).get(r);
if (val.is_null()) {
// Use a dedicated NULL marker for null values
key.append("\1NULL\0", 6);
} else {
// Length-prefixed value: marker + length (4 bytes) + data
std::string val_str = val.to_string();
key.push_back('\0'); // non-NULL marker
uint32_t len = static_cast<uint32_t>(val_str.size());
key.append(reinterpret_cast<const char*>(&len), 4);
key.append(val_str);
}
}

// Get or create group state
auto it = groups_.find(key);
if (it == groups_.end()) {
// Store group key values for output
std::vector<common::Value> key_vals;
for (size_t i = 0; i < group_by_col_indices_.size(); ++i) {
size_t col_idx = group_by_col_indices_[i];
if (col_idx == static_cast<size_t>(-1)) {
key_vals.push_back(common::Value::make_null());
} else {
key_vals.push_back(batch.get_column(col_idx).get(r));
}
}
auto result = groups_.emplace(key, VectorizedGroupState(aggregates_.size()));
it = result.first;
group_keys_.push_back(key);
group_values_.push_back(std::move(key_vals));
}

// Update accumulators for this row
update_accumulators(it->second, batch, r);
}
input_batch_->clear();
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

void update_accumulators(VectorizedGroupState& state, VectorBatch& batch, size_t row_idx) {
for (size_t i = 0; i < aggregates_.size(); ++i) {
const auto& agg = aggregates_[i];

if (agg.type == AggregateType::Count && agg.input_col_idx < 0) {
// COUNT(*) - always increment
state.counts[i]++;
} else if (agg.type == AggregateType::Sum && agg.input_col_idx >= 0) {
auto& col = batch.get_column(agg.input_col_idx);
if (!col.is_null(row_idx)) {
if (col.type() == common::ValueType::TYPE_INT64) {
auto& num_col = dynamic_cast<NumericVector<int64_t>&>(col);
state.sums_int64[i] += num_col.raw_data()[row_idx];
} else if (col.type() == common::ValueType::TYPE_FLOAT64) {
auto& num_col = dynamic_cast<NumericVector<double>&>(col);
state.sums_float64[i] += num_col.raw_data()[row_idx];
state.has_float_value_[i] = true;
}
}
} else if (agg.type == AggregateType::Min && agg.input_col_idx >= 0) {
auto& col = batch.get_column(agg.input_col_idx);
if (!col.is_null(row_idx)) {
if (state.mins[i].is_null() || col.get(row_idx) < state.mins[i]) {
state.mins[i] = col.get(row_idx);
}
}
} else if (agg.type == AggregateType::Max && agg.input_col_idx >= 0) {
auto& col = batch.get_column(agg.input_col_idx);
if (!col.is_null(row_idx)) {
if (state.maxes[i].is_null() || state.maxes[i] < col.get(row_idx)) {
state.maxes[i] = col.get(row_idx);
}
}
}
}
}

bool produce_output_batch(VectorBatch& out_batch) {
if (current_group_idx_ >= group_keys_.size()) {
return false; // EOF
}

out_batch.clear();
if (out_batch.column_count() == 0) {
out_batch.init_from_schema(output_schema_);
}

constexpr size_t BATCH_SIZE = 1024;
size_t output_count = 0;

while (current_group_idx_ < group_keys_.size() && output_count < BATCH_SIZE) {
// Append group key columns
const auto& key_vals = group_values_[current_group_idx_];
for (size_t i = 0; i < key_vals.size(); ++i) {
out_batch.get_column(i).append(key_vals[i]);
}

// Append aggregate result columns
const auto& state = groups_.at(group_keys_[current_group_idx_]);
for (size_t i = 0; i < aggregates_.size(); ++i) {
size_t col_idx = group_by_.size() + i;
switch (aggregates_[i].type) {
case AggregateType::Count:
out_batch.get_column(col_idx).append(
common::Value::make_int64(state.counts[i]));
break;
case AggregateType::Sum:
// Emit based on output column type to preserve precision
if (output_schema_.get_column(col_idx).type() ==
common::ValueType::TYPE_INT64) {
out_batch.get_column(col_idx).append(
common::Value::make_int64(state.sums_int64[i]));
} else if (output_schema_.get_column(col_idx).type() ==
common::ValueType::TYPE_FLOAT64) {
// If we saw any float64 values, use the float64 accumulator
// Otherwise convert from int64 accumulator
double float_val = state.has_float_value_[i]
? state.sums_float64[i]
: static_cast<double>(state.sums_int64[i]);
out_batch.get_column(col_idx).append(
common::Value::make_float64(float_val));
} else {
out_batch.get_column(col_idx).append(common::Value::make_null());
}
break;
case AggregateType::Min:
out_batch.get_column(col_idx).append(state.mins[i]);
break;
case AggregateType::Max:
out_batch.get_column(col_idx).append(state.maxes[i]);
break;
default:
out_batch.get_column(col_idx).append(common::Value::make_null());
break;
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
output_count++;
current_group_idx_++;
}

out_batch.set_row_count(output_count);
return true;
}
};

} // namespace cloudsql::executor

#endif // CLOUDSQL_EXECUTOR_VECTORIZED_OPERATOR_HPP
Loading
Loading