Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/phases/PHASE_8_ANALYTICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ Added `VectorizedGroupByOperator` for hash-based grouped aggregation.
- **Collision-Safe Key Encoding**: Group keys use length-prefixed encoding with dedicated NULL markers, preventing key collisions from string concatenation ambiguities.
- **Pre-resolved Column Indices**: Group-by column indices computed once in constructor to avoid repeated lookups.

### 6. Vectorized Hash Join (`VectorizedHashJoinOperator`)
Implemented a vectorized hash join with graceful partitioning and batch-based processing.
- **Graceful Hash Partitioning**: Right table is partitioned into 64 hash buckets (`NUM_BUCKETS`) for collision-safe key-based partitioning.
- **Two-Phase Processing**: `BuildRight` phase constructs hash table from right relation; `ProbeLeft` phase probes with left rows.
- **Resumable Bucket Scanning**: Uses `resuming_bucket_scan_`, `resumed_bucket_idx_`, `resumed_entry_idx_`, and `resumed_key_val_` to resume interrupted bucket scans when batch capacity is reached, preventing batch overflow during multi-match probes.
- **Batch Size**: Output batches use `BATCH_SIZE` (1024 rows) for memory-efficient processing.
- **Join Type Support**: INNER and LEFT joins supported; LEFT join emits unmatched left rows with NULLs for right columns.
- **Matched Row Tracking**: `left_matched_in_batch_` tracks matched rows within the current batch for LEFT join unmatched emission.

## Recent Improvements (Engine Benchmarking)
As of our latest sprint, we have established a high-performance baseline for the engine's core scanning logic:
- **Baseline Speed**: 181M rows/s (Sequential Scan).
Expand Down
71 changes: 71 additions & 0 deletions tests/vectorized_operator_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <gtest/gtest.h>

#include <algorithm>
#include <memory>
#include <vector>

Expand Down Expand Up @@ -1339,4 +1340,74 @@ TEST_F(VectorizedGroupByTests, VectorizedHashJoinMultiBatch) {
EXPECT_EQ(total_rows, 2000);
}

TEST_F(VectorizedGroupByTests, VectorizedHashJoinLeftMultiBatch) {
// Test LEFT join with >BATCH_SIZE (1024) right rows requiring multiple batches
// Left: id=1,2,3 (3 rows) | Right: id=1 (1500 rows)
// LEFT id=1 matches all 1500 right rows
// LEFT id=2,3 have NO match - should emit with NULLs
Schema left_schema;
left_schema.add_column("id", common::ValueType::TYPE_INT64);

Schema right_schema;
right_schema.add_column("id", common::ValueType::TYPE_INT64);

ColumnarTable left_table("hashjoin_left", *storage_, left_schema);
ASSERT_TRUE(left_table.create());
ASSERT_TRUE(left_table.open());
auto left_batch = VectorBatch::create(left_schema);
left_batch->append_tuple(Tuple({common::Value::make_int64(1)}));
left_batch->append_tuple(Tuple({common::Value::make_int64(2)}));
left_batch->append_tuple(Tuple({common::Value::make_int64(3)}));
ASSERT_TRUE(left_table.append_batch(*left_batch));

ColumnarTable right_table("hashjoin_right", *storage_, right_schema);
ASSERT_TRUE(right_table.create());
ASSERT_TRUE(right_table.open());
auto right_batch = VectorBatch::create(right_schema);
// Right: 1500 rows ALL with id=1 (forces multi-batch processing with BATCH_SIZE=1024)
for (int i = 0; i < 1500; ++i) {
right_batch->append_tuple(Tuple({common::Value::make_int64(1)}));
}
ASSERT_TRUE(right_table.append_batch(*right_batch));

auto left_scan = std::make_unique<VectorizedSeqScanOperator>(
"hashjoin_left", std::make_shared<ColumnarTable>(left_table));
auto right_scan = std::make_unique<VectorizedSeqScanOperator>(
"hashjoin_right", std::make_shared<ColumnarTable>(right_table));

auto join = make_vectorized_hash_join(std::move(left_scan), std::move(right_scan), "id", "id",
JoinType::Left);

auto result = VectorBatch::create(join->output_schema());
int64_t total_rows = 0;
int64_t rows_with_nulls = 0; // LEFT id=2,3 should emit with NULLs
std::vector<int64_t> null_left_ids; // Track which left ids had null right

while (join->next_batch(*result)) {
for (size_t i = 0; i < result->row_count(); ++i) {
int64_t left_id = result->get_column(0).get(i).as_int64();
if (result->get_column(1).get(i).is_null()) {
rows_with_nulls++;
null_left_ids.push_back(left_id);
} else {
int64_t right_id = result->get_column(1).get(i).as_int64();
// Every non-null row should be left.id=1 matched with right.id=1
EXPECT_EQ(left_id, 1);
EXPECT_EQ(right_id, 1);
}
}
total_rows += result->row_count();
result->clear();
}

// LEFT join: id=1 matches 1500 rows, id=2,3 emit with NULLs
EXPECT_EQ(total_rows, 1502); // 1500 matches + 2 unmatched with NULLs
EXPECT_EQ(rows_with_nulls, 2); // id=2 and id=3 have no match
// Verify the two null rows are for left ids 2 and 3
EXPECT_EQ(null_left_ids.size(), 2);
std::sort(null_left_ids.begin(), null_left_ids.end());
EXPECT_EQ(null_left_ids[0], 2);
EXPECT_EQ(null_left_ids[1], 3);
}

} // namespace
Loading