From 6bb047cc3b07787ebee9de1e2279ec27ba01c093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Poyraz=20K=C3=BC=C3=A7=C3=BCkarslan?= <83272398+PoyrazK@users.noreply.github.com> Date: Thu, 16 Apr 2026 14:26:48 +0300 Subject: [PATCH 1/2] feat: add LEFT-side infrastructure for Phase 3-5 FULL join support - Add ClusterManager storage methods for LEFT side: - set_local_left_rows / get_local_left_rows / clear_local_left_rows - set_unmatched_left_rows / get_unmatched_left_rows / clear_unmatched_left_rows - Add new RPC types for LEFT side: - UnmatchedLeftRowsReport (data node reports unmatched LEFT rows) - FetchUnmatchedLeftRows (coordinator fetches stored unmatched LEFT rows) - Add new RPC structs: - UnmatchedLeftRowsReportArgs - FetchUnmatchedLeftRowsArgs - Add handlers in main.cpp for the new RPC types This is infrastructure only - Phase 3-5 remains disabled. --- include/common/cluster_manager.hpp | 73 ++++++++++++++++++ include/network/rpc_message.hpp | 90 +++++++++++++++++++++++ src/main.cpp | 114 +++++++++++++++++++++++++++++ 3 files changed, 277 insertions(+) diff --git a/include/common/cluster_manager.hpp b/include/common/cluster_manager.hpp index b86b72fc..1c02af71 100644 --- a/include/common/cluster_manager.hpp +++ b/include/common/cluster_manager.hpp @@ -404,6 +404,73 @@ class ClusterManager { unmatched_rows_.erase(context_id); } + /** + * @brief Store local left table rows for outer join processing + * Called during Phase 2 shuffle when sending left table rows to other nodes + */ + void set_local_left_rows(const std::string& context_id, const std::string& table_name, + std::vector rows) { + const std::scoped_lock lock(mutex_); + local_left_table_rows_[context_id][table_name] = std::move(rows); + } + + /** + * @brief Get stored local left table rows + */ + [[nodiscard]] std::vector get_local_left_rows( + const std::string& context_id, const std::string& table_name) const { + const std::scoped_lock lock(mutex_); + auto ctx_it = local_left_table_rows_.find(context_id); + if (ctx_it != local_left_table_rows_.end()) { + auto table_it = ctx_it->second.find(table_name); + if (table_it != ctx_it->second.end()) { + return table_it->second; + } + } + return {}; + } + + /** + * @brief Clear local left table rows for a context + */ + void clear_local_left_rows(const std::string& context_id) { + const std::scoped_lock lock(mutex_); + local_left_table_rows_.erase(context_id); + } + + /** + * @brief Store unmatched LEFT rows for a context (used by FULL join NULL-padding) + */ + void set_unmatched_left_rows(const std::string& context_id, const std::string& table_name, + std::vector rows) { + const std::scoped_lock lock(mutex_); + unmatched_left_rows_[context_id][table_name] = std::move(rows); + } + + /** + * @brief Get stored unmatched LEFT rows for a context + */ + [[nodiscard]] std::vector get_unmatched_left_rows( + const std::string& context_id, const std::string& table_name) const { + const std::scoped_lock lock(mutex_); + auto ctx_it = unmatched_left_rows_.find(context_id); + if (ctx_it != unmatched_left_rows_.end()) { + auto table_it = ctx_it->second.find(table_name); + if (table_it != ctx_it->second.end()) { + return table_it->second; + } + } + return {}; + } + + /** + * @brief Clear unmatched LEFT rows for a context + */ + void clear_unmatched_left_rows(const std::string& context_id) { + const std::scoped_lock lock(mutex_); + unmatched_left_rows_.erase(context_id); + } + private: /** * @brief Stored bloom filter data for a context @@ -438,6 +505,12 @@ class ClusterManager { /* context_id -> table_name -> unmatched rows for outer join NULL-padding */ std::unordered_map>> unmatched_rows_; + /* context_id -> table_name -> local left table rows for outer join tracking */ + std::unordered_map>> + local_left_table_rows_; + /* context_id -> table_name -> unmatched LEFT rows for FULL join NULL-padding */ + std::unordered_map>> + unmatched_left_rows_; mutable std::mutex mutex_; }; diff --git a/include/network/rpc_message.hpp b/include/network/rpc_message.hpp index 5bffba52..1bffe34f 100644 --- a/include/network/rpc_message.hpp +++ b/include/network/rpc_message.hpp @@ -38,6 +38,9 @@ enum class RpcType : uint8_t { UnmatchedRowsReport = 13, // Data node reports unmatched right rows for outer join UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node + // LEFT-side counterparts for FULL join + UnmatchedLeftRowsReport = 16, // Data node reports unmatched LEFT rows for FULL join + FetchUnmatchedLeftRows = 17, // Coordinator fetches stored unmatched LEFT rows Error = 255 }; @@ -700,6 +703,93 @@ struct FetchUnmatchedRowsArgs { } }; +/** + * @brief Arguments for UnmatchedLeftRowsReport RPC + * @note Data node reports unmatched LEFT row keys to coordinator after local FULL join + */ +struct UnmatchedLeftRowsReportArgs { + std::string context_id; + std::string left_table; + std::string join_key_col; // Which column was the join key + std::vector unmatched_keys; // LEFT key values that had no match + uint32_t right_column_count = 0; // Number of right columns for NULL-padding + + [[nodiscard]] std::vector serialize() const { + std::vector out; + Serializer::serialize_string(context_id, out); + Serializer::serialize_string(left_table, out); + Serializer::serialize_string(join_key_col, out); + + // Serialize right column count + const uint32_t rc_count = right_column_count; + const size_t rc_off = out.size(); + out.resize(rc_off + Serializer::VAL_SIZE_32); + std::memcpy(out.data() + rc_off, &rc_count, Serializer::VAL_SIZE_32); + + // Serialize unmatched keys count + const uint32_t count = static_cast(unmatched_keys.size()); + const size_t off = out.size(); + out.resize(off + Serializer::VAL_SIZE_32); + std::memcpy(out.data() + off, &count, Serializer::VAL_SIZE_32); + + // Serialize each key + for (const auto& key : unmatched_keys) { + Serializer::serialize_string(key, out); + } + return out; + } + + static UnmatchedLeftRowsReportArgs deserialize(const std::vector& in) { + UnmatchedLeftRowsReportArgs args; + size_t offset = 0; + args.context_id = Serializer::deserialize_string(in.data(), offset, in.size()); + args.left_table = Serializer::deserialize_string(in.data(), offset, in.size()); + args.join_key_col = Serializer::deserialize_string(in.data(), offset, in.size()); + + // Deserialize right column count + if (offset + Serializer::VAL_SIZE_32 <= in.size()) { + std::memcpy(&args.right_column_count, in.data() + offset, Serializer::VAL_SIZE_32); + offset += Serializer::VAL_SIZE_32; + } + + uint32_t count = 0; + if (offset + Serializer::VAL_SIZE_32 <= in.size()) { + std::memcpy(&count, in.data() + offset, Serializer::VAL_SIZE_32); + offset += Serializer::VAL_SIZE_32; + } + + for (uint32_t i = 0; i < count; ++i) { + args.unmatched_keys.push_back( + Serializer::deserialize_string(in.data(), offset, in.size())); + } + return args; + } +}; + +/** + * @brief Arguments for FetchUnmatchedLeftRows RPC + * @note Coordinator fetches stored unmatched LEFT rows from a data node + */ +struct FetchUnmatchedLeftRowsArgs { + std::string context_id; + std::string table_name; + + [[nodiscard]] std::vector serialize() const { + std::vector out; + Serializer::serialize_string(context_id, out); + Serializer::serialize_string(table_name, out); + return out; + } + + static FetchUnmatchedLeftRowsArgs deserialize(const std::vector& in) { + FetchUnmatchedLeftRowsArgs args; + size_t offset = 0; + args.context_id = Serializer::deserialize_string(in.data(), offset, in.size()); + args.table_name = Serializer::deserialize_string(in.data(), offset, in.size()); + return args; + } +}; + /** * @brief Arguments for TxnPrepare/Commit/Abort RPC */ diff --git a/src/main.cpp b/src/main.cpp index cd1fee33..d2237981 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -837,6 +837,120 @@ int main(int argc, char* argv[]) { args.context_id, args.table_name); } + auto resp_p = reply.serialize(); + cloudsql::network::RpcHeader resp_h; + resp_h.type = cloudsql::network::RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }); + + // Handler for reporting unmatched LEFT rows after join execution + // For FULL outer joins, each node identifies rows from its local left table + // partition that had no matching right row during the distributed join + rpc_server->set_handler( + cloudsql::network::RpcType::UnmatchedLeftRowsReport, + [&](const cloudsql::network::RpcHeader& h, const std::vector& p, + int fd) { + (void)h; + auto args = cloudsql::network::UnmatchedLeftRowsReportArgs::deserialize(p); + cloudsql::network::UnmatchedLeftRowsReportArgs reply; + reply.context_id = args.context_id; + reply.left_table = args.left_table; + reply.join_key_col = args.join_key_col; + + // args.unmatched_keys contains MATCHED keys from coordinator + // We need to return rows that are NOT in this set + std::unordered_set matched_keys_set( + args.unmatched_keys.begin(), args.unmatched_keys.end()); + + try { + // Scan local left table and collect rows that were NOT matched + auto table_meta_opt = catalog->get_table_by_name(args.left_table); + if (table_meta_opt.has_value()) { + const auto* table_meta = table_meta_opt.value(); + cloudsql::executor::Schema schema; + for (const auto& col : table_meta->columns) { + schema.add_column(col.name, col.type); + } + cloudsql::storage::HeapTable table(args.left_table, *bpm, schema); + + const size_t key_idx = schema.find_column(args.join_key_col); + if (key_idx != static_cast(-1)) { + std::vector unmatched_tuples; + auto iter = table.scan(); + cloudsql::storage::HeapTable::TupleMeta t_meta; + while (iter.next_meta(t_meta)) { + if (t_meta.xmax == 0) { + const auto& key_val = t_meta.tuple.get(key_idx); + std::string key_str = key_val.to_string(); + // Only include if NOT in matched keys + if (matched_keys_set.find(key_str) == + matched_keys_set.end()) { + reply.unmatched_keys.push_back(key_str); + // Pad with NULLs for right columns and append left + // row + std::vector padded_values; + padded_values.reserve(t_meta.tuple.size() + + args.right_column_count); + // Append left table column values + for (size_t j = 0; j < t_meta.tuple.size(); ++j) { + padded_values.push_back(t_meta.tuple.get(j)); + } + // Append NULLs for right table columns + for (uint32_t i = 0; i < args.right_column_count; + ++i) { + padded_values.push_back( + cloudsql::common::Value::make_null()); + } + unmatched_tuples.emplace_back( + std::move(padded_values)); + } + } + } + // Store properly padded tuples in ClusterManager for + // coordinator to collect + if (cluster_manager != nullptr && !unmatched_tuples.empty()) { + cluster_manager->set_unmatched_left_rows( + args.context_id, args.left_table, + std::move(unmatched_tuples)); + } + } + } + } catch (const std::exception& /*e*/) { + // Return empty on error + } + + auto resp_p = reply.serialize(); + cloudsql::network::RpcHeader resp_h; + resp_h.type = cloudsql::network::RpcType::QueryResults; + resp_h.payload_len = static_cast(resp_p.size()); + char h_buf[cloudsql::network::RpcHeader::HEADER_SIZE]; + resp_h.encode(h_buf); + static_cast( + send(fd, h_buf, cloudsql::network::RpcHeader::HEADER_SIZE, 0)); + static_cast(send(fd, resp_p.data(), resp_p.size(), 0)); + }); + + // Handler for fetching stored unmatched LEFT rows from a data node + // Coordinator calls this after UnmatchedLeftRowsReport to get full unmatched tuples + rpc_server->set_handler( + cloudsql::network::RpcType::FetchUnmatchedLeftRows, + [&](const cloudsql::network::RpcHeader& h, const std::vector& p, + int fd) { + (void)h; + auto args = cloudsql::network::FetchUnmatchedLeftRowsArgs::deserialize(p); + cloudsql::network::UnmatchedRowsPushArgs reply; + reply.context_id = args.context_id; + + if (cluster_manager != nullptr) { + reply.unmatched_rows = cluster_manager->get_unmatched_left_rows( + args.context_id, args.table_name); + } + auto resp_p = reply.serialize(); cloudsql::network::RpcHeader resp_h; resp_h.type = cloudsql::network::RpcType::QueryResults; From 601844379fdc634cbee9cd1ea5e655e57b36d540 Mon Sep 17 00:00:00 2001 From: poyrazK <83272398+poyrazK@users.noreply.github.com> Date: Thu, 16 Apr 2026 11:28:56 +0000 Subject: [PATCH 2/2] style: automated clang-format fixes --- include/network/rpc_message.hpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/include/network/rpc_message.hpp b/include/network/rpc_message.hpp index 1bffe34f..16cfd43a 100644 --- a/include/network/rpc_message.hpp +++ b/include/network/rpc_message.hpp @@ -39,8 +39,8 @@ enum class RpcType : uint8_t { UnmatchedRowsPush = 14, // Coordinator sends unmatched rows for NULL-padding FetchUnmatchedRows = 15, // Coordinator fetches stored unmatched rows from data node // LEFT-side counterparts for FULL join - UnmatchedLeftRowsReport = 16, // Data node reports unmatched LEFT rows for FULL join - FetchUnmatchedLeftRows = 17, // Coordinator fetches stored unmatched LEFT rows + UnmatchedLeftRowsReport = 16, // Data node reports unmatched LEFT rows for FULL join + FetchUnmatchedLeftRows = 17, // Coordinator fetches stored unmatched LEFT rows Error = 255 }; @@ -710,9 +710,9 @@ struct FetchUnmatchedRowsArgs { struct UnmatchedLeftRowsReportArgs { std::string context_id; std::string left_table; - std::string join_key_col; // Which column was the join key - std::vector unmatched_keys; // LEFT key values that had no match - uint32_t right_column_count = 0; // Number of right columns for NULL-padding + std::string join_key_col; // Which column was the join key + std::vector unmatched_keys; // LEFT key values that had no match + uint32_t right_column_count = 0; // Number of right columns for NULL-padding [[nodiscard]] std::vector serialize() const { std::vector out;