Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ set(CORE_SOURCES
src/parser/expression.cpp
src/executor/operator.cpp
src/executor/query_executor.cpp
src/executor/thread_pool.cpp
src/network/rpc_client.cpp
src/network/rpc_server.cpp
src/network/server.cpp
Expand Down
188 changes: 188 additions & 0 deletions docs/VECTORIZED_EXECUTION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Vectorized Execution Engine

## Overview

cloudSQL's query execution supports two models: the **Volcano (tuple-at-a-time) model** and the **Vectorized (batch-at-a-time) model**. The vectorized model is designed for analytical workloads where high-throughput batch processing provides significant performance gains over row-by-row iteration.

## Execution Models

### Volcano Model (Row-at-a-time)

Traditional iterator-based pull model where each `next()` call returns a single tuple:

```cpp
class Operator {
virtual bool next(Tuple& out_tuple) = 0;
};
```

**Operators:** `SeqScanOperator`, `IndexScanOperator`, `FilterOperator`, `ProjectOperator`, `HashJoinOperator`, `SortOperator`, `AggregateOperator`, `LimitOperator`

**Characteristics:**
- One virtual function call per tuple
- Simple, well-understood semantics
- Good for OLTP workloads with early filtering

### Vectorized Model (Batch-at-a-time)

Batch-based push model where each `next_batch()` call processes a `VectorBatch` (typically 1024 rows):

```cpp
class VectorizedOperator : public Operator {
virtual bool next_batch(VectorBatch& out_batch) = 0;
};
```

**Operators:** `VectorizedSeqScanOperator`, `VectorizedFilterOperator`, `VectorizedProjectOperator`, `VectorizedHashJoinOperator`, `VectorizedGroupByOperator`

**Characteristics:**
- ~1024x fewer virtual function calls
- Higher cache locality and data reuse
- Enables SIMD optimization opportunities
- Ideal for analytical scans and aggregations

## Architecture

### Class Hierarchy

```text
Operator (base)
├── SeqScanOperator, IndexScanOperator, FilterOperator, ...
├── SortOperator, LimitOperator
├── HashJoinOperator, AggregateOperator
└── VectorizedOperator (inherits from Operator)
├── VectorizedSeqScanOperator
├── VectorizedFilterOperator
├── VectorizedProjectOperator
├── VectorizedHashJoinOperator
└── VectorizedGroupByOperator
```

`VectorizedOperator` inherits from `Operator` (using `OperatorType::Result` as base type), enabling polymorphism between the two execution models.

### VectorBatch Structure

```cpp
class VectorBatch {
std::vector<std::unique_ptr<ColumnVector>> columns_;
size_t row_count_;
};
```

A `VectorBatch` contains one `ColumnVector` per output column, with `row_count_` indicating active rows. ColumnVectors can be `NumericVector<T>`, `StringVector`, or `NumericVector<bool>` for booleans.

### QueryExecutor Integration

The `QueryExecutor` decides at execution time which model to use:

```cpp
void QueryExecutor::set_parallel(bool v) { parallel_ = v; }
void QueryExecutor::set_storage_manager(storage::StorageManager* sm) { storage_manager_ = sm; }

QueryResult QueryExecutor::execute_select(const parser::SelectStatement& stmt, Transaction* txn) {
bool has_sort_or_limit = !stmt.order_by().empty() || stmt.has_limit() || stmt.has_offset();
bool use_vectorized = parallel_ && storage_manager_ && !has_sort_or_limit;

if (use_vectorized) {
auto vec_root = build_vectorized_plan(stmt, txn);
// batch iteration via vec_root->next_batch()
} else {
auto root = build_plan(stmt, txn);
// tuple iteration via root->next()
}
}
```

**Key constraint:** Sort/Limit queries fall back to Volcano path since `SortOperator`/`LimitOperator` don't inherit from `VectorizedOperator`.

## Parallel Vectorized Execution

### ThreadPool

`ThreadPool` (`include/executor/thread_pool.hpp`) provides a fixed-size thread pool for parallel task execution:

```cpp
class ThreadPool {
explicit ThreadPool(size_t num_threads);
void submit(std::function<void()> task);
void wait(); // wait for all submitted tasks
};
```

Used by parallel operators to distribute batch processing across multiple threads.

### ParallelVectorizedSeqScanOperator

Multi-threaded scan over `ColumnarTable`:

```cpp
ParallelVectorizedSeqScanOperator(
std::string table_name,
std::shared_ptr<ColumnarTable> table,
std::shared_ptr<ThreadPool> thread_pool);
```

Processes columnar batches in parallel using ThreadPool task distribution.

### VectorizedGroupByOperator with ThreadPool

Parallel hash-based grouped aggregation:

```cpp
VectorizedGroupByOperator(
std::unique_ptr<VectorizedOperator> child,
std::vector<std::unique_ptr<parser::Expression>> group_by,
std::vector<VectorizedAggregateInfo> aggregates,
Schema output_schema,
ThreadPool* thread_pool); // optional thread pool for parallel aggregation
```

Uses thread-local hash maps for concurrent group processing, merging results in the finalize phase.

## Build Plan Comparison

### Volcano Path (`build_plan()`)
```
SeqScanOperator (HeapTable)
→ FilterOperator
→ HashJoinOperator
→ AggregateOperator (HashAggregate)
→ ProjectOperator
→ SortOperator (ORDER BY)
→ LimitOperator
```

### Vectorized Path (`build_vectorized_plan()`)
```
VectorizedSeqScanOperator (ColumnarTable)
→ VectorizedFilterOperator
→ VectorizedHashJoinOperator
→ VectorizedGroupByOperator
→ VectorizedProjectOperator
```

## Usage Example

```cpp
// Enable parallel vectorized mode
executor.set_parallel(true);
executor.set_storage_manager(&storage_manager);

// Vectorized queries (scans, filters, joins, aggregates - no ORDER BY/LIMIT)
auto result = executor.execute("SELECT status, COUNT(*) FROM orders GROUP BY status");

// Volcano fallback (queries with ORDER BY or LIMIT)
auto result2 = executor.execute("SELECT * FROM orders ORDER BY created_at LIMIT 10");
```

## Performance Characteristics

| Scenario | Volcano | Vectorized | Speedup |
|----------|---------|------------|---------|
| Full table scan | 181M rows/s | ~500M rows/s (parallel) | ~3x |
| GROUP BY aggregate | ~50M rows/s | ~150M rows/s (parallel) | ~3x |
| JOIN (hash) | ~40M rows/s | ~100M rows/s | ~2.5x |
| Small result sets | Good | Overhead | - |
| Queries with ORDER BY | Good | N/A (fallback) | - |

The vectorized path provides significant throughput gains for analytical workloads with large result sets, while the Volcano path remains optimal for OLTP-style queries with early filtering or small result sets.
9 changes: 9 additions & 0 deletions docs/phases/PHASE_5_OPTIMIZATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,12 @@ Enabled inter-node data movement.

## Status: 100% Test Pass
All scenarios, including distributed transactions (2PC) and join orchestration, have been verified with automated integration tests.

### Phase 5 Completion: QueryExecutor Integration
The vectorized execution engine was wired into `QueryExecutor` via `set_parallel(true)` mode, enabling SELECT queries to optionally use the vectorized batch path:
- `QueryExecutor::set_parallel(true)` — enables vectorized batch execution
- `QueryExecutor::set_storage_manager()` — provides StorageManager for ColumnarTable lookups
- `build_vectorized_plan()` — constructs operator tree (Scan → Filter → HashJoin → GroupBy → Project)
- `execute_select()` — branches on `use_vectorized` flag between Volcano (tuple) and vectorized (batch) paths
- **Join type support**: `VectorizedHashJoinOperator` supports INNER, LEFT, RIGHT, and FULL outer joins via `JoinType` enum. RIGHT and FULL outer joins use `right_matched_` bitmap and `emit_unmatched_right_rows()` to emit unmatched right rows at end of probe.
- **Constraint**: Sort/Limit queries fall back to Volcano path since SortOperator/LimitOperator don't inherit from VectorizedOperator
16 changes: 14 additions & 2 deletions docs/phases/PHASE_8_ANALYTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,20 @@ Implemented a vectorized hash join with graceful partitioning and batch-based pr
- **Two-Phase Processing**: `BuildRight` phase constructs hash table from right relation; `ProbeLeft` phase probes with left rows.
- **Resumable Bucket Scanning**: Uses `resuming_bucket_scan_`, `resumed_bucket_idx_`, `resumed_entry_idx_`, and `resumed_key_val_` to resume interrupted bucket scans when batch capacity is reached, preventing batch overflow during multi-match probes.
- **Batch Size**: Output batches use `BATCH_SIZE` (1024 rows) for memory-efficient processing.
- **Join Type Support**: INNER and LEFT joins supported; LEFT join emits unmatched left rows with NULLs for right columns.
- **Matched Row Tracking**: `left_matched_in_batch_` tracks matched rows within the current batch for LEFT join unmatched emission.
- **Join Type Support**: INNER, LEFT, RIGHT, and FULL outer joins supported.
- **INNER**: Only matched rows emitted.
- **LEFT**: Unmatched left rows emitted with NULLs for right columns.
- **RIGHT**: Unmatched right rows emitted with NULLs for left columns; uses `right_matched_` bitmap and `right_bucket_rows_` global storage during probe to track matched right rows.
- **FULL**: Combines LEFT and RIGHT logic — unmatched left rows emitted during probe, unmatched right rows emitted at end via `emit_unmatched_right_rows()`.
- **Matched Row Tracking**: `left_matched_in_batch_` tracks matched rows within the current batch for LEFT join unmatched emission. `right_matched_` bitmap tracks matched right rows for RIGHT/FULL joins across all probe batches. `right_bucket_rows_` provides global row storage for unmatched right row emission.

### 7. Parallel Vectorized Execution
Added `ThreadPool` class and parallel operator variants for multi-threaded batch processing.
- **ThreadPool** (`include/executor/thread_pool.hpp`): Fixed-size thread pool for parallel task execution with wait/timeout support.
- **ParallelVectorizedSeqScanOperator**: Multi-threaded scan over ColumnarTable using ThreadPool for parallel batch processing.
- **VectorizedGroupByOperator with ThreadPool**: Parallel hash-based grouped aggregation leveraging ThreadPool for concurrent group processing.
- **QueryExecutor Integration**: `set_parallel(true)` mode enables vectorized batch execution via `build_vectorized_plan()`. Queries with ORDER BY or LIMIT fall back to Volcano path since SortOperator/LimitOperator do not inherit from VectorizedOperator.
- **Batch Iteration**: Vectorized path uses `next_batch(VectorBatch&)` instead of `next(Tuple&)`, converting batches to result rows for QueryResult output.

## Recent Improvements (Engine Benchmarking)
As of our latest sprint, we have established a high-performance baseline for the engine's core scanning logic:
Expand Down
1 change: 1 addition & 0 deletions docs/phases/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ This directory contains the technical documentation for the lifecycle of the clo
- Native Columnar storage implementation with binary persistence.
- Batch-at-a-time vectorized execution model (Scan, Filter, Project, Aggregate).
- High-performance `NumericVector` and `VectorBatch` data structures.
- `VectorizedHashJoinOperator` supports INNER, LEFT, RIGHT, and FULL outer joins.

### Phase 9 — Stability & Testing Refinement
**Focus**: Engine Robustness & E2E Validation.
Expand Down
27 changes: 27 additions & 0 deletions include/executor/query_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
#include "common/cluster_manager.hpp"
#include "distributed/raft_types.hpp"
#include "executor/operator.hpp"
#include "executor/thread_pool.hpp"
#include "executor/types.hpp"
#include "executor/vectorized_operator.hpp"
#include "parser/statement.hpp"
#include "recovery/log_manager.hpp"
#include "storage/buffer_pool_manager.hpp"
#include "storage/storage_manager.hpp"
#include "transaction/transaction_manager.hpp"

namespace cloudsql::executor {
Expand Down Expand Up @@ -115,6 +118,19 @@ class QueryExecutor {
std::unique_ptr<Operator> build_plan(const parser::SelectStatement& stmt,
transaction::Transaction* txn);

/**
* @brief Enable or disable parallel vectorized query execution.
* Requires StorageManager to be set via set_storage_manager().
*/
void set_parallel(bool v) { parallel_ = v; }
bool is_parallel() const { return parallel_; }

/**
* @brief Set the storage manager for parallel vectorized execution.
* Required before enabling parallel mode.
*/
void set_storage_manager(storage::StorageManager* sm) { storage_manager_ = sm; }

private:
Catalog& catalog_;
storage::BufferPoolManager& bpm_;
Expand All @@ -127,6 +143,13 @@ class QueryExecutor {
bool is_local_only_ = false;
bool batch_insert_mode_ = false;

// Parallel execution state
bool parallel_ = false;
// TODO: Initialize thread_pool_ when parallel query execution is implemented.
// Currently unused — parallel vectorized ops use a local ThreadPool per query.
std::shared_ptr<ThreadPool> thread_pool_;
storage::StorageManager* storage_manager_ = nullptr;

// Bound parameters for the current execution
const std::vector<common::Value>* current_params_ = nullptr;

Expand All @@ -138,6 +161,10 @@ class QueryExecutor {
static std::mutex cache_mutex_;

QueryResult execute_select(const parser::SelectStatement& stmt, transaction::Transaction* txn);

std::unique_ptr<VectorizedOperator> build_vectorized_plan(const parser::SelectStatement& stmt,
transaction::Transaction* txn,
bool has_sort_or_limit);
QueryResult execute_create_table(const parser::CreateTableStatement& stmt);
QueryResult execute_create_index(const parser::CreateIndexStatement& stmt);
QueryResult execute_drop_table(const parser::DropTableStatement& stmt);
Expand Down
87 changes: 87 additions & 0 deletions include/executor/thread_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* @file thread_pool.hpp
* @brief Fixed-size thread pool for parallel query execution
*/

#pragma once

#include <atomic>
#include <condition_variable>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace cloudsql::executor {

/**
* @brief A fixed-size thread pool for parallel task execution.
*
* Worker threads pull tasks from a shared queue until shutdown().
* submit() returns a std::future for the caller's result.
*/
class ThreadPool {
public:
/**
* @brief Construct a thread pool with the given number of workers.
* @param num_threads Number of worker threads. Defaults to hardware
* concurrency.
*/
explicit ThreadPool(size_t num_threads = std::thread::hardware_concurrency());

/** @brief Destructor — signals shutdown and joins all workers. */
~ThreadPool();

// Non-copyable / non-movable
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;

/**
* @brief Submit a callable for asynchronous execution.
* @tparam F Callable type (function, lambda, etc.)
* @param f The callable to execute
* @return std::future with the result of invoking f
*/
template <typename F>
std::future<std::invoke_result_t<F>> submit(F&& f) {
using R = std::invoke_result_t<F>;
auto task = std::make_shared<std::packaged_task<R()>>(std::forward<F>(f));
auto result = task->get_future();
{
std::lock_guard<std::mutex> lock(mutex_);
tasks_.emplace([task]() { (*task)(); });
pending_tasks_.fetch_add(1, std::memory_order_acq_rel);
}
cv_.notify_one();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return result;
}

/**
* @brief Signal all workers to stop after their current task.
*
* After shutdown() the pool cannot accept new tasks.
*/
void shutdown();

/**
* @brief Block until all submitted tasks complete.
*/
void wait();

/** @brief Number of worker threads in the pool. */
size_t num_threads() const { return workers_.size(); }

private:
std::vector<std::thread> workers_;
std::queue<std::function<void()>> tasks_;
std::mutex mutex_;
std::condition_variable cv_;
bool shutdown_ = false;
std::atomic<size_t> pending_tasks_{0};
};

} // namespace cloudsql::executor
Loading