diff --git a/CMakeLists.txt b/CMakeLists.txt index 731d80e..014bd95 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -214,6 +214,7 @@ add_library(core src/query.cpp src/query_execution.cpp src/join.cpp + src/row.cpp src/storage.cpp src/metadata.cpp src/file_utils.cpp diff --git a/include/arrow_utils.hpp b/include/arrow_utils.hpp index f5d305f..c3071ba 100644 --- a/include/arrow_utils.hpp +++ b/include/arrow_utils.hpp @@ -2,38 +2,147 @@ #define ARROW_UTILS_HPP #include +#include #include #include +#include +#include +#include + +#include "types.hpp" + namespace tundradb { +// Forward declarations +class Schema; +class Node; +class WhereExpr; +class TemporalContext; + +/** + * @brief Extracts the set of "id" column values from an Arrow table. + * + * @param table A table containing an Int64 "id" column. + * @return A DenseSet of all row IDs, or an error if the column is missing. + */ arrow::Result> get_ids_from_table( const std::shared_ptr& table); -// Initialize Arrow Compute module - should be called once at startup +/** + * @brief Initialises the Arrow Compute function registry. + * + * Must be called once before any Arrow compute operations are used. + * + * @return True on success. + */ bool initialize_arrow_compute(); -// Arrow utility functions -arrow::Result> create_int64_array( - const int64_t value); +/** + * @brief Creates a single-element Int64 Arrow array. + * + * @param value The integer value. + * @return A one-element array. + */ +arrow::Result> create_int64_array(int64_t value); + +/** + * @brief Creates a single-element String Arrow array. + * + * @param value The string value. + * @return A one-element array. + */ arrow::Result> create_str_array( const std::string& value); + +/** + * @brief Creates a single-element null Arrow array of the given type. + * + * @param type The Arrow data type. + * @return A one-element array containing a null value. + */ arrow::Result> create_null_array( const std::shared_ptr& type); + +/** + * @brief Prepends a field to an Arrow schema. + * + * @param field The field to insert at position 0. + * @param target_schema The existing schema. + * @return A new schema with @p field followed by @p target_schema fields. + */ std::shared_ptr prepend_field( const std::shared_ptr& field, const std::shared_ptr& target_schema); -/** - * Creates a new schema by prepending an "id" field of type Int64 to the target - * schema. - * @deprecated for remove - * @param target_schema The original schema to modify - * @return A new schema with "id" field prepended - */ +/** @deprecated Scheduled for removal. Use prepend_field() directly. */ std::shared_ptr prepend_id_field( const std::shared_ptr& target_schema); +/** + * @brief Converts a Value to an Arrow Scalar. + * + * @param value The database value to convert. + * @return The corresponding Arrow scalar, or an error for unsupported types. + */ +arrow::Result> value_to_arrow_scalar( + const Value& value); + +/** + * @brief Converts a raw pointer + type tag to an Arrow Scalar. + * + * @param ptr Pointer to the raw value bytes. + * @param type The value's type tag. + * @return The corresponding Arrow scalar, or an error for unsupported types. + */ +arrow::Result> value_ptr_to_arrow_scalar( + const char* ptr, ValueType type); + +/** + * @brief Translates a WhereExpr into an Arrow compute Expression. + * + * @param condition The WHERE expression tree. + * @param strip_var If true, variable prefixes (e.g. "u.") are removed from + * field names. + * @return An Arrow Expression suitable for compute::Filter. + */ +arrow::compute::Expression where_condition_to_expression( + const WhereExpr& condition, bool strip_var); + +/** + * @brief Materialises a set of nodes into an Arrow table. + * + * Creates a table with a single "id" column followed by the schema's fields. + * + * @param schema The database schema for the nodes. + * @param nodes The nodes to serialise. + * @return The resulting table. + */ +arrow::Result> create_table_from_nodes( + const std::shared_ptr& schema, + const std::vector>& nodes); + +/** + * @brief Creates an empty Arrow table matching the given schema. + * + * @param schema The Arrow schema. + * @return A zero-row table with the correct columns and types. + */ +arrow::Result> create_empty_table( + const std::shared_ptr& schema); + +/** + * @brief Filters an Arrow table using a WhereExpr predicate. + * + * @param table The table to filter. + * @param condition The WHERE expression. + * @param strip_var If true, variable prefixes are stripped before matching. + * @return The filtered table, or an error on compute failure. + */ +arrow::Result> filter( + const std::shared_ptr& table, const WhereExpr& condition, + bool strip_var); + } // namespace tundradb #endif // ARROW_UTILS_HPP \ No newline at end of file diff --git a/include/join.hpp b/include/join.hpp index ed7ab46..6f4aa1a 100644 --- a/include/join.hpp +++ b/include/join.hpp @@ -16,42 +16,30 @@ namespace tundradb { * a JoinStrategy can decide which target (and source) IDs survive. */ struct JoinInput { - // All node IDs currently in the source schema within query state - const llvm::DenseSet& source_ids; - - // All node IDs that exist in the target table (full scan of target schema) - const llvm::DenseSet& all_target_ids; - - // Source nodes that had at least one matching edge - const llvm::DenseSet& matched_source_ids; - - // Target nodes that were reached via matching edges - const llvm::DenseSet& matched_target_ids; - - // Target IDs already accumulated from a previous traversal that shares - // the same target alias (e.g. multi-pattern queries). Empty on the first - // pass. - const llvm::DenseSet& existing_target_ids; - - // Source nodes that had NO matching edge - const llvm::DenseSet& unmatched_source_ids; - - // Whether source and target resolve to the same concrete schema - bool is_self_join; + const llvm::DenseSet& + source_ids; ///< All source IDs in query state. + const llvm::DenseSet& + all_target_ids; ///< All IDs in the target table. + const llvm::DenseSet& + matched_source_ids; ///< Sources with ≥ 1 matching edge. + const llvm::DenseSet& + matched_target_ids; ///< Targets reached via edges. + const llvm::DenseSet& + existing_target_ids; ///< Targets from a prior traversal pass. + const llvm::DenseSet& + unmatched_source_ids; ///< Sources with no matching edge. + bool is_self_join; ///< True when source == target schema. }; /** * @brief Output of join ID computation */ struct JoinOutput { - // Final set of target node IDs to store in query_state.ids[target] - llvm::DenseSet target_ids; - - // Source IDs that should be removed from query_state (INNER join pruning) - llvm::DenseSet source_ids_to_remove; - - // Whether the source table needs to be rebuilt after pruning - bool rebuild_source_table = false; + llvm::DenseSet target_ids; ///< Final target IDs for query_state. + llvm::DenseSet + source_ids_to_remove; ///< Source IDs to prune (INNER join). + bool rebuild_source_table = + false; ///< True if the source table must be rebuilt. }; /** @@ -68,12 +56,15 @@ class JoinStrategy { virtual ~JoinStrategy() = default; /** - * Compute which target/source IDs survive this join. + * @brief Computes which target and source IDs survive this join. + * + * @param input The accumulated traversal state. + * @return The join output describing surviving IDs and pruning actions. */ [[nodiscard]] virtual JoinOutput compute(const JoinInput& input) const = 0; /** - * Human-readable name for logging / debugging. + * @brief Returns a human-readable name for logging / debugging. */ [[nodiscard]] virtual const char* name() const noexcept = 0; }; @@ -158,11 +149,18 @@ class FullJoinStrategy final : public JoinStrategy { }; /** - * @brief Creates the appropriate JoinStrategy for a given TraverseType - * and join context (self-join vs. cross-schema). + * @brief Factory that selects the correct JoinStrategy for a given traverse + * type. */ class JoinStrategyFactory { public: + /** + * @brief Creates a strategy instance. + * + * @param type The join/traverse type (Inner, Left, Right, Full). + * @param is_self_join True when source and target resolve to the same schema. + * @return A unique_ptr to the selected strategy. + */ static std::unique_ptr create(TraverseType type, bool is_self_join); }; diff --git a/include/query.hpp b/include/query.hpp index 4bf4adb..c4cc92f 100644 --- a/include/query.hpp +++ b/include/query.hpp @@ -27,13 +27,18 @@ namespace tundradb { class WhereExpr; class ComparisonExpr; +/** + * @brief A reference to a schema, optionally aliased (e.g. "u:User" or just + * "User"). + * + * Carries a cached 16-bit tag for fast hash packing during BFS traversal. + */ struct SchemaRef { private: std::string schema_; std::string value_; bool declaration_; - // Cached 16-bit tag for fast path operations (e.g., BFS packing) - uint16_t schema_tag_ = 0; + uint16_t schema_tag_ = 0; ///< Cached 16-bit FNV-1a tag. public: [[nodiscard]] std::string schema() const { return schema_; } @@ -43,11 +48,17 @@ struct SchemaRef { void set_schema(const std::string& schema) { schema_ = schema; } void set_tag(uint16_t t) { schema_tag_ = t; } - // Parse a schema reference from a string format "alias:schema" - // If the string does not contain a colon, the value is assigned to the alias - // and schema + /** + * @brief Parses a schema reference from "alias:schema" format. + * + * If no colon is present, the entire string is used as both alias and schema. + * + * @param s The input string. + * @return The parsed SchemaRef. + */ static SchemaRef parse(const std::string& s); + /** @brief Returns a human-readable "alias:schema" string. */ [[nodiscard]] std::string toString() const; friend std::ostream& operator<<(std::ostream& os, const SchemaRef& obj) { @@ -56,6 +67,7 @@ struct SchemaRef { } }; +/** @brief Comparison operators supported in WHERE expressions. */ enum class CompareOp { Eq, NotEq, @@ -116,47 +128,85 @@ struct FieldRef { friend class ComparisonExpr; }; +/** @brief Base class for all query clauses (WHERE, TRAVERSE, SELECT, etc.). */ class Clause { public: virtual ~Clause() = default; enum class Type { WHERE, TRAVERSE, PROJECT, ORDER_BY, LIMIT, SELECT }; + + /** @brief Returns the clause's discriminator. */ [[nodiscard]] virtual Type type() const = 0; }; +/** @brief Wraps a Value as an Arrow compute literal expression. */ arrow::compute::Expression value_to_expression(const Value& value); + +/** + * @brief Builds an Arrow compute comparison expression. + * + * @param field The field reference expression. + * @param value The literal value expression. + * @param op The comparison operator. + * @return The composed comparison expression. + */ arrow::compute::Expression apply_comparison_op( const arrow::compute::Expression& field, const arrow::compute::Expression& value, CompareOp op); +/** @brief Logical operators for combining WHERE expressions. */ enum class LogicalOp { AND, OR }; +/** + * @brief Abstract base for WHERE expression nodes (comparisons and logical + * operators). + */ class WhereExpr { public: virtual ~WhereExpr() = default; + + /** @brief Resolves symbolic field references against the schema registry. */ virtual arrow::Result resolve_field_ref( const std::unordered_map& aliases, const SchemaRegistry* schema_registry) = 0; + /** @brief Evaluates this expression against a node. */ virtual arrow::Result matches( const std::shared_ptr& node) const = 0; + + /** @brief Returns a debug string of the expression. */ virtual std::string toString() const = 0; + + /** @brief Marks this expression as already inlined into a traversal. */ virtual void set_inlined(bool inlined) = 0; + + /** @brief Returns true if this expression has been inlined. */ virtual bool inlined() const = 0; + + /** @brief Converts this expression to an Arrow compute expression. */ virtual arrow::compute::Expression to_arrow_expression( bool strip_var) const = 0; + /** @brief Extracts sub-conditions that reference a specific variable. */ virtual std::vector> get_conditions_for_variable(const std::string& variable) const = 0; + /** @brief Returns the set of all variables referenced in this expression. */ virtual std::set get_all_variables() const = 0; + /** @brief Returns the first variable name found (useful for single-var + * conditions). */ virtual std::string extract_first_variable() const = 0; + /** @brief Returns true if this expression can be inlined for the given + * variable. */ virtual bool can_inline(const std::string& variable) const = 0; }; +/** @brief The type of graph traversal / join to perform. */ enum class TraverseType { Inner, Left, Right, Full }; +/** @brief A TRAVERSE clause specifying an edge traversal between two schemas. + */ class Traverse final : public Clause { private: SchemaRef source_; @@ -184,6 +234,7 @@ class Traverse final : public Clause { SchemaRef& mutable_target() { return target_; } }; +/** @brief A SELECT clause listing the fields to include in the output. */ struct Select final : Clause { std::vector fields_; @@ -198,6 +249,9 @@ struct Select final : Clause { } }; +/** + * @brief A leaf WHERE expression: field op value (e.g. "u.age > 30"). + */ class ComparisonExpr : public Clause, public WhereExpr { private: FieldRef field_ref_; @@ -255,6 +309,9 @@ class ComparisonExpr : public Clause, public WhereExpr { const SchemaRegistry* schema_registry) override; }; +/** + * @brief A composite WHERE expression: left AND/OR right. + */ class LogicalExpr : public Clause, public WhereExpr { private: std::shared_ptr left_; @@ -307,6 +364,7 @@ class LogicalExpr : public Clause, public WhereExpr { bool can_inline(const std::string& variable) const override; }; +/** @brief Configuration knobs for parallel query execution. */ struct ExecutionConfig { private: static size_t get_default_thread_count() { @@ -339,6 +397,12 @@ struct ExecutionConfig { } }; +/** + * @brief Immutable query descriptor built via Query::Builder. + * + * Contains the FROM schema, a list of clauses (TRAVERSE, WHERE, SELECT), + * execution configuration, and optional temporal snapshot. + */ class Query { private: SchemaRef from_; @@ -381,6 +445,7 @@ class Query { static Builder from(const std::string& schema) { return Builder(schema); } + /** @brief Fluent builder for constructing Query objects. */ class Builder { private: SchemaRef from_; @@ -394,12 +459,14 @@ class Query { explicit Builder(const std::string& schema) : from_(SchemaRef::parse(schema)) {} + /** @brief Adds a simple comparison WHERE clause. */ Builder& where(std::string field, CompareOp op, Value value) { clauses_.push_back(std::make_shared(std::move(field), op, std::move(value))); return *this; } + /** @brief Adds a TRAVERSE clause (edge traversal between two schemas). */ Builder& traverse(const std::string& source, std::string edge_type, const std::string& target, TraverseType traverse_type = TraverseType::Inner) { @@ -409,36 +476,43 @@ class Query { return *this; } + /** @brief Sets the SELECT clause. Empty means "all fields". */ Builder& select(std::vector names = {}) { select_ = std::make_shared& select, + const std::shared_ptr& table); + +/** + * @brief Collects WHERE clauses that can be pushed down into a traversal step. + * + * @param target_var The variable name of the traversal target (e.g. "c"). + * @param i The clause index to start scanning from. + * @param clauses All clauses in the query. + * @return WHERE expressions that reference only @p target_var. + */ +std::vector> get_where_to_inline( + const std::string& target_var, size_t i, + const std::vector>& clauses); + +/** + * @brief Applies inlined WHERE expressions to a table and updates query state. + * + * @param ref The schema reference whose table is being filtered. + * @param table The table to filter. + * @param query_state The execution state (IDs are updated after filtering). + * @param where_exprs The WHERE expressions to apply. + * @return The filtered table, or an error. + */ +arrow::Result> inline_where( + const SchemaRef& ref, std::shared_ptr table, + QueryState& query_state, + const std::vector>& where_exprs); + +/** + * @brief Prepares a query for execution: registers aliases, resolves fields, + * precomputes tags. + * + * @param query The query to prepare (modified in-place). + * @param query_state The execution state to populate. + * @return OK on success, or an error. + */ +arrow::Status prepare_query(Query& query, QueryState& query_state); + } // namespace tundradb #endif // QUERY_EXECUTION_HPP diff --git a/include/row.hpp b/include/row.hpp new file mode 100644 index 0000000..d6705f4 --- /dev/null +++ b/include/row.hpp @@ -0,0 +1,409 @@ +#ifndef ROW_HPP +#define ROW_HPP + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "logger.hpp" +#include "node.hpp" +#include "query.hpp" +#include "types.hpp" + +namespace tundradb { + +/** + * @brief A single segment of a BFS traversal path (schema + node ID). + * + * Carries an optional 16-bit tag for fast equality checks without + * string comparison. + */ +struct PathSegment { + uint16_t schema_tag; ///< 16-bit hash of the schema name (0 = unset). + std::string schema; ///< Human-readable schema name. + int64_t node_id; ///< Node identifier within the schema. + + PathSegment(uint16_t tag, const std::string& schema_name, int64_t id) + : schema_tag(tag), schema(schema_name), node_id(id) {} + + PathSegment(const std::string& schema_name, int64_t id) + : schema_tag(0), schema(schema_name), node_id(id) {} + + /** @brief Returns "schema:node_id" for debugging. */ + std::string toString() const { + return schema + ":" + std::to_string(node_id); + } + + /** @brief Equality: compares tags when available, else falls back to string. + */ + bool operator==(const PathSegment& other) const { + if (schema_tag != 0 && other.schema_tag != 0) { + return schema_tag == other.schema_tag && node_id == other.node_id; + } + return schema == other.schema && node_id == other.node_id; + } +}; + +/** + * @brief Checks whether @p prefix is a prefix of @p path. + * + * @param prefix The candidate prefix path. + * @param path The full path to test against. + * @return True if every element of @p prefix matches the corresponding element + * of @p path. + */ +inline bool is_prefix(const std::vector& prefix, + const std::vector& path) { + if (prefix.size() > path.size()) { + return false; + } + for (size_t i = 0; i < prefix.size(); ++i) { + if (!(prefix[i] == path[i])) return false; + } + return true; +} + +/** + * @brief Joins path segments into an arrow-delimited string (e.g. + * "users:0->companies:1"). + * + * @param schema_path The segments to join. + * @return The formatted path string. + */ +inline std::string join_schema_path( + const std::vector& schema_path) { + std::ostringstream oss; + for (size_t i = 0; i < schema_path.size(); ++i) { + if (i != 0) oss << "->"; + oss << schema_path[i].toString(); + } + return oss.str(); +} + +/** + * @brief A single denormalised result row produced during BFS traversal. + * + * Stores cell values by flat field-ID index for O(1) access, + * and tracks the BFS path that produced the row. + */ +struct Row { + int64_t id; ///< Root node ID for this row. + std::vector cells; ///< Cell values indexed by field ID. + std::vector path; ///< BFS path that produced this row. + std::unordered_map + ids; ///< Lazily-populated schema→node-ID map. + bool ids_populated = false; ///< Whether @ref ids has been built. + + /** @brief Constructs an empty row with space for @p max_field_count fields. + */ + explicit Row(size_t max_field_count = 64) : id(0), cells(max_field_count) {} + + /** + * @brief Sets a cell value at the given field index. + * + * @param field_id The zero-based field index. + * @param value_ref The value reference to store. + */ + void set_cell(int field_id, ValueRef value_ref) { + if (field_id >= 0 && field_id < static_cast(cells.size())) { + cells[field_id] = value_ref; + } + } + + /** @brief Returns true if the cell at @p field_id contains a non-null value. + */ + [[nodiscard]] bool has_value(int field_id) const { + return field_id >= 0 && field_id < static_cast(cells.size()) && + cells[field_id].data != nullptr; + } + + /** + * @brief Populates cells from a node's fields using the given index mapping. + * + * @param field_indices Maps each node field position to a row cell index. + * @param node The source node. + * @param temporal_context Temporal snapshot for versioned reads (may be + * nullptr). + */ + void set_cell_from_node(const std::vector& field_indices, + const std::shared_ptr& node, + TemporalContext* temporal_context) { + auto view = node->view(temporal_context); + + const auto& fields = node->get_schema()->fields(); + const size_t n = std::min(fields.size(), field_indices.size()); + for (size_t i = 0; i < n; ++i) { + const auto& field = fields[i]; + const int field_id = field_indices[i]; + auto value_ref_result = view.get_value_ref(field); + if (value_ref_result.ok()) { + this->set_cell(field_id, value_ref_result.ValueOrDie()); + } + } + } + + /** @brief Returns true if this row's path starts with @p prefix. */ + [[nodiscard]] bool start_with(const std::vector& prefix) const { + return is_prefix(prefix, this->path); + } + + /** + * @brief Lazily extracts a schema-name→node-ID map from the "*.id" cells. + * + * @param field_id_to_name Mapping from field index to fully-qualified name. + * @return A reference to the cached schema-ID map. + */ + const std::unordered_map& extract_schema_ids( + const llvm::SmallDenseMap& field_id_to_name) { + if (ids_populated) { + return ids; + } + for (size_t i = 0; i < cells.size(); ++i) { + const auto& value = cells[i]; + if (!value.data) continue; + const auto& field_name = field_id_to_name.at(static_cast(i)); + size_t dot_pos = field_name.find('.'); + if (dot_pos != std::string::npos) { + std::string schema = field_name.substr(0, dot_pos); + if (field_name.substr(dot_pos + 1) == "id") { + ids[schema] = value.as_int64(); + } + } + } + return ids; + } + + /** + * @brief Merges another row into this one (non-destructive). + * + * Fields present in @p other but absent in this row are copied. + * Existing values are kept. + * + * @param other The row to merge from. + * @return A new Row combining both. + */ + [[nodiscard]] std::shared_ptr merge( + const std::shared_ptr& other) const { + std::shared_ptr merged = std::make_shared(*this); + IF_DEBUG_ENABLED { + log_debug("Row::merge() - this: {}", this->ToString()); + log_debug("Row::merge() - other: {}", other->ToString()); + } + + for (size_t i = 0; i < other->cells.size(); ++i) { + if (!merged->has_value(static_cast(i))) { + IF_DEBUG_ENABLED { + log_debug("Row::merge() - adding field '{}' with value: {}", i, + cells[i].ToString()); + } + merged->cells[i] = other->cells[i]; + } else { + IF_DEBUG_ENABLED { + log_debug("Row::merge() - skipping field '{}' (already has value)", + i); + } + } + } + IF_DEBUG_ENABLED { + log_debug("Row::merge() - result: {}", merged->ToString()); + } + return merged; + } + + /** @brief Returns a debug string listing the path and all cell values. */ + [[nodiscard]] std::string ToString() const { + std::stringstream ss; + ss << "Row{"; + ss << "path='" << join_schema_path(path) << "', "; + + bool first = true; + for (size_t i = 0; i < cells.size(); i++) { + if (!first) { + ss << ", "; + } + first = false; + + ss << i << ": "; + const auto value_ref = cells[i]; + if (!value_ref.data) { + ss << "NULL"; + } else { + switch (value_ref.type) { + case ValueType::INT64: + ss << value_ref.as_int64(); + break; + case ValueType::INT32: + ss << value_ref.as_int32(); + break; + case ValueType::DOUBLE: + ss << value_ref.as_double(); + break; + case ValueType::STRING: + ss << "\"" << value_ref.as_string_ref().to_string() << "\""; + break; + case ValueType::BOOL: + ss << (value_ref.as_bool() ? "true" : "false"); + break; + default: + ss << "unknown"; + break; + } + } + } + ss << "}"; + return ss.str(); + } +}; + +/** + * @brief Creates a blank Row sized to fit the given output schema. + * + * @param final_output_schema The Arrow schema of the final query output. + * @return A Row with id = −1 and all cells null. + */ +inline Row create_empty_row_from_schema( + const std::shared_ptr& final_output_schema) { + Row new_row(final_output_schema->num_fields() + 32); + new_row.id = -1; + return new_row; +} + +/** + * @brief Collects rows whose path starts with @p parent's path (excluding @p + * parent itself). + * + * @param parent The parent row. + * @param rows All candidate rows. + * @return The subset of @p rows that are children of @p parent. + */ +inline std::vector get_child_rows(const Row& parent, + const std::vector& rows) { + std::vector child; + for (const auto& row : rows) { + if (parent.id != row.id && row.start_with(parent.path)) { + child.push_back(row); + } + } + return child; +} + +/** + * @brief Tree node used to group and merge rows during BFS result assembly. + * + * Rows with shared path prefixes are placed in the same subtree. + * The tree is then folded bottom-up via merge_rows() to produce the + * Cartesian product of child branches (denormalisation). + */ +struct RowNode { + std::optional> + row; ///< The row payload (set on leaf nodes). + int depth; ///< Depth in the tree. + PathSegment path_segment; ///< Schema + node ID for this tree level. + std::vector> children; ///< Child branches. + + RowNode() : depth(0), path_segment{"", -1} {} + + RowNode(std::optional> r, int d, + std::vector> c = {}) + : row(std::move(r)), + depth(d), + path_segment{"", -1}, + children(std::move(c)) {} + + /** @brief Returns true if this node carries a row (i.e. is a leaf). */ + bool leaf() const { return row.has_value(); } + + /** + * @brief Recursively inserts a row into the tree following its path segments. + * + * @param path_idx Current index into new_row->path. + * @param new_row The row to insert. + */ + void insert_row_dfs(size_t path_idx, const std::shared_ptr& new_row) { + if (path_idx == new_row->path.size()) { + this->row = new_row; + return; + } + + for (const auto& n : children) { + if (n->path_segment == new_row->path[path_idx]) { + n->insert_row_dfs(path_idx + 1, new_row); + return; + } + } + + auto new_node = std::make_unique(); + new_node->depth = depth + 1; + new_node->path_segment = new_row->path[path_idx]; + new_node->insert_row_dfs(path_idx + 1, new_row); + children.emplace_back(std::move(new_node)); + } + + /** @brief Inserts a row starting from the root of its path. */ + void insert_row(const std::shared_ptr& new_row) { + insert_row_dfs(0, new_row); + } + + /** + * @brief Recursively merges child rows via Cartesian product to produce + * denormalised output. + * + * @param field_id_to_name Mapping used to populate schema-ID metadata on + * merged rows. + * @return A vector of fully merged rows. + */ + std::vector> merge_rows( + const llvm::SmallDenseMap& field_id_to_name); + + /** @brief Returns a human-readable tree representation for debugging. */ + std::string toString(bool recursive = true, int indent_level = 0) const; + + friend std::ostream& operator<<(std::ostream& os, const RowNode& node) { + return os << node.toString(); + } + + /** @brief Logs the tree via log_debug. */ + void print(const bool recursive = true) const { + log_debug(toString(recursive)); + } +}; + +/** + * @brief An item in the BFS queue used by populate_rows_bfs(). + * + * Carries the current node, its traversal level, the partial row + * accumulated so far, and a per-path visited set to detect cycles. + */ +struct QueueItem { + int64_t node_id; ///< Current node being visited. + SchemaRef schema_ref; ///< Schema of the current node. + int level; ///< Traversal depth (0 = root). + std::shared_ptr row; ///< Partial row built along the path. + llvm::SmallDenseSet + path_visited_nodes; ///< Packed hashes of nodes on this path. + std::vector path; ///< Full BFS path from root. + + /** + * @brief Constructs a queue item and records the starting node in the path. + * + * @param id The node ID. + * @param schema The schema reference. + * @param l The traversal level. + * @param r The partial row accumulated so far. + */ + QueueItem(int64_t id, const SchemaRef& schema, int l, std::shared_ptr r) + : node_id(id), schema_ref(schema), level(l), row(std::move(r)) { + path.emplace_back(schema.tag(), schema.value(), id); + } +}; + +} // namespace tundradb + +#endif // ROW_HPP diff --git a/include/utils.hpp b/include/utils.hpp index d562892..6982bfb 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -22,6 +22,72 @@ namespace tundradb { +/** @brief Bitmask for extracting the 48-bit node ID from a packed hash. */ +constexpr uint64_t NODE_MASK = (1ULL << 48) - 1; + +/** + * @brief Computes a deterministic 16-bit tag from a schema alias. + * + * Uses FNV-1a 32-bit, folded to 16 bits. + * + * @param ref The schema reference whose value() is hashed. + * @return A 16-bit tag suitable for packing into hash codes. + */ +inline uint16_t compute_tag(const SchemaRef& ref) { + const std::string& s = ref.value(); + uint32_t h = 2166136261u; + for (unsigned char c : s) { + h ^= c; + h *= 16777619u; + } + h ^= (h >> 16); + return static_cast(h & 0xFFFFu); +} + +/** + * @brief Packs a schema tag and node ID into a single 64-bit hash. + * + * Layout: [schema_tag (16 bits) | node_id (48 bits)]. + * + * @param schema The schema reference (its precomputed tag is used). + * @param node_id The node identifier. + * @return A 64-bit value that is unique per (schema, node_id) pair. + */ +inline uint64_t hash_code_(const SchemaRef& schema, int64_t node_id) { + const uint16_t schema_id16 = schema.tag(); + return (static_cast(schema_id16) << 48) | + (static_cast(node_id) & NODE_MASK); +} + +/** + * @brief Joins all elements of a container into a delimited string. + * + * @tparam Container An iterable whose elements support operator<<. + * @param container The elements to join. + * @param delimiter Separator inserted between elements (default ", "). + * @return The concatenated string. + */ +template +std::string join_container(const Container& container, + std::string_view delimiter = ", ") { + std::ostringstream oss; + for (auto it = container.begin(); it != container.end(); ++it) { + oss << (it != container.begin() ? delimiter : "") << *it; + } + return oss.str(); +} + +/** + * @brief Computes the intersection of two sets into @p out. + * + * Iterates the smaller set and probes the larger one. + * + * @tparam SetA,SetB Set types supporting size(), contains(), begin/end. + * @tparam OutSet Output set type supporting clear(), reserve(), insert(). + * @param a First input set. + * @param b Second input set. + * @param[out] out Cleared and filled with elements in both @p a and @p b. + */ template void dense_intersection(const SetA& a, const SetB& b, OutSet& out) { const auto& small = a.size() < b.size() ? a : b; @@ -35,6 +101,15 @@ void dense_intersection(const SetA& a, const SetB& b, OutSet& out) { } } +/** + * @brief Computes the set difference (a − b) into @p out. + * + * @tparam SetA,SetB Set types supporting size(), contains(), begin/end. + * @tparam OutSet Output set type supporting clear(), reserve(), insert(). + * @param a The minuend set. + * @param b The subtrahend set. + * @param[out] out Cleared and filled with elements in @p a but not in @p b. + */ template void dense_difference(const SetA& a, const SetB& b, OutSet& out) { out.clear(); @@ -46,6 +121,7 @@ void dense_difference(const SetA& a, const SetB& b, OutSet& out) { } } +/** @brief Generates a lowercase RFC-4122 UUID string. */ static std::string generate_uuid() { uuid_t uuid; uuid_generate(uuid); @@ -54,6 +130,13 @@ static std::string generate_uuid() { return uuid_str; } +/** + * @brief Generates a monotonically increasing 64-bit snapshot ID. + * + * Upper 32 bits: millisecond timestamp. Lower 32 bits: atomic counter. + * + * @return A unique snapshot identifier. + */ static int64_t generate_unique_snapshot_id() { static std::atomic counter{0}; auto now = std::chrono::system_clock::now(); @@ -67,6 +150,7 @@ static int64_t generate_unique_snapshot_id() { return (timestamp_ms << 32) | (count & 0xFFFFFFFF); } +/** @brief Returns the current wall-clock time in milliseconds since epoch. */ static int64_t now_millis() { auto now = std::chrono::system_clock::now(); return std::chrono::duration_cast( @@ -74,6 +158,15 @@ static int64_t now_millis() { .count(); } +/** + * @brief Filters an Arrow table, keeping only rows whose "id" column is in @p + * filter_ids. + * + * @tparam SetType A set-like type supporting count(). + * @param table The input table (must contain an "id" column of type Int64). + * @param filter_ids The set of IDs to retain. + * @return The filtered table, or an error if the "id" column is missing. + */ template static arrow::Result> filter_table_by_id( const std::shared_ptr& table, const SetType& filter_ids) { @@ -110,6 +203,19 @@ static arrow::Result> filter_table_by_id( return filtered_table.table(); } +/** + * @brief Builds an Arrow table from a collection of nodes using chunked + * encoding. + * + * Nodes invisible at the given temporal snapshot are skipped. + * + * @param schema The database schema describing the node fields. + * @param nodes The nodes to materialise into table rows. + * @param chunk_size Number of nodes per Arrow chunk. + * @param temporal_context Temporal snapshot for versioned reads (may be + * nullptr). + * @return The constructed table, or an error on unsupported column types. + */ static arrow::Result> create_table( const std::shared_ptr& schema, const std::vector>& nodes, size_t chunk_size, @@ -269,6 +375,12 @@ static arrow::Result> create_table( return arrow::Table::Make(arrow_schema, chunked_arrays); } +/** + * @brief Prints a single row of an Arrow table to stdout (tab-separated). + * + * @param table The table to read from. + * @param row_index Zero-based row index to print. + */ static void print_row(const std::shared_ptr& table, const int64_t row_index) { for (int j = 0; j < table->num_columns(); ++j) { @@ -344,6 +456,15 @@ static void print_row(const std::shared_ptr& table, std::cout << std::endl; } +/** + * @brief Prints an Arrow table to stdout (schema, chunk info, and data). + * + * If the table has more than @p max_rows rows, the first and last halves + * are printed with an ellipsis in between. + * + * @param table The table to print. + * @param max_rows Maximum rows to display (0 = unlimited). + */ static void print_table(const std::shared_ptr& table, int64_t max_rows = 100) { if (!table) { @@ -396,6 +517,16 @@ static void print_table(const std::shared_ptr& table, } } +/** + * @brief Extracts the value from an Arrow Result, logging context on failure. + * + * @tparam T The result value type. + * @param result The Arrow Result to unwrap. + * @param context A human-readable label for the operation (used in the error + * log). + * @param location Source location (auto-captured). + * @return The contained value. + */ template T ValueOrDieWithContext( const arrow::Result& result, const std::string& context, @@ -419,9 +550,26 @@ T ValueOrDieWithContext( #define VALUE_OR_DIE_CTX(result, context) \ ValueOrDieWithContext((result), (context)) +/** + * @brief Converts a scalar value in a ChunkedArray to its string + * representation. + * + * @param column The chunked array to read from. + * @param row_idx The global row index. + * @return A human-readable string of the scalar value. + */ std::string stringify_arrow_scalar( const std::shared_ptr& column, int64_t row_idx); +/** + * @brief Extracts all non-null values from a typed column into a vector. + * + * @tparam T One of int32_t, int64_t, double, bool, or std::string. + * @param table The table to read from. + * @param column_name Name of the column. + * @return A vector of the column's non-null values, or an error if the column + * is missing. + */ template arrow::Result> get_column_values( const std::shared_ptr& table, @@ -483,6 +631,14 @@ arrow::Result> get_column_values( return values; } +/** + * @brief Concatenates all chunks of a column into a single contiguous Arrow + * Array. + * + * @param table The table containing the column. + * @param column_name Name of the column. + * @return The concatenated array, or an error if the column is missing. + */ inline arrow::Result> get_column_as_array( const std::shared_ptr& table, const std::string& column_name) { @@ -497,6 +653,13 @@ inline arrow::Result> get_column_as_array( return combined_array; } +/** + * @brief Returns the first non-null value from a typed Arrow Array. + * + * @tparam T One of int64_t, double, bool, or std::string. + * @param array The array to read. + * @return The first element, or an error if the array is empty/null/wrong type. + */ template arrow::Result get_first_value_from_array( const std::shared_ptr& array) { @@ -543,16 +706,25 @@ arrow::Result get_first_value_from_array( } } +/** @brief Convenience wrapper: extracts the first int64 from an array. */ inline arrow::Result get_first_int64( const std::shared_ptr& array) { return get_first_value_from_array(array); } +/** @brief Convenience wrapper: extracts the first string from an array. */ inline arrow::Result get_first_string( const std::shared_ptr& array) { return get_first_value_from_array(array); } +/** + * @brief Evaluates a WHERE expression against a single node. + * + * @param where_expr The condition to test. + * @param node The node to evaluate. + * @return True if the node satisfies the expression. + */ inline arrow::Result apply_where_to_node( const std::shared_ptr& where_expr, const std::shared_ptr& node) { @@ -563,6 +735,13 @@ inline arrow::Result apply_where_to_node( return where_expr->matches(node); } +/** + * @brief Filters a vector of nodes by a WHERE expression. + * + * @param nodes The candidate nodes. + * @param where_expr The predicate to apply. + * @return A vector of nodes that satisfy the expression. + */ inline arrow::Result>> filter_nodes_by_where( const std::vector>& nodes, const std::shared_ptr& where_expr) { diff --git a/src/arrow_utils.cpp b/src/arrow_utils.cpp index 0f01397..dbfafa6 100644 --- a/src/arrow_utils.cpp +++ b/src/arrow_utils.cpp @@ -1,4 +1,4 @@ -#include "../include/arrow_utils.hpp" +#include "arrow_utils.hpp" #include #include @@ -11,7 +11,10 @@ #include -#include "../include/logger.hpp" +#include "logger.hpp" +#include "node.hpp" +#include "query.hpp" +#include "schema.hpp" namespace tundradb { @@ -161,4 +164,260 @@ std::shared_ptr prepend_id_field( return prepend_field(arrow::field("id", arrow::int64()), target_schema); } +arrow::Result> value_to_arrow_scalar( + const Value& value) { + switch (value.type()) { + case ValueType::INT32: + return arrow::MakeScalar(value.as_int32()); + case ValueType::INT64: + return arrow::MakeScalar(value.as_int64()); + case ValueType::DOUBLE: + return arrow::MakeScalar(value.as_double()); + case ValueType::STRING: + return arrow::MakeScalar(value.as_string()); + case ValueType::BOOL: + return arrow::MakeScalar(value.as_bool()); + case ValueType::NA: + return arrow::MakeNullScalar(arrow::null()); + default: + return arrow::Status::NotImplemented( + "Unsupported Value type for Arrow scalar conversion: ", + tundradb::to_string(value.type())); + } +} + +arrow::Result> value_ptr_to_arrow_scalar( + const char* ptr, const ValueType type) { + switch (type) { + case ValueType::INT32: + return arrow::MakeScalar(*reinterpret_cast(ptr)); + case ValueType::INT64: + return arrow::MakeScalar(*reinterpret_cast(ptr)); + case ValueType::DOUBLE: + return arrow::MakeScalar(*reinterpret_cast(ptr)); + case ValueType::STRING: { + auto str_ref = *reinterpret_cast(ptr); + return arrow::MakeScalar(str_ref.to_string()); + } + case ValueType::BOOL: + return arrow::MakeScalar(*reinterpret_cast(ptr)); + case ValueType::NA: + return arrow::MakeNullScalar(arrow::null()); + default: + return arrow::Status::NotImplemented( + "Unsupported Value type for Arrow scalar conversion: ", + tundradb::to_string(type)); + } +} + +arrow::compute::Expression where_condition_to_expression( + const WhereExpr& condition, bool strip_var) { + return condition.to_arrow_expression(strip_var); +} + +arrow::Result> create_table_from_nodes( + const std::shared_ptr& schema, + const std::vector>& nodes) { + auto arrow_schema = schema->arrow(); + IF_DEBUG_ENABLED { + log_debug("Creating table from {} nodes with schema '{}'", nodes.size(), + arrow_schema->ToString()); + } + + // Create builders for each field + std::vector> builders; + for (const auto& field : arrow_schema->fields()) { + IF_DEBUG_ENABLED { + log_debug("Creating builder for field '{}' with type {}", field->name(), + field->type()->ToString()); + } + auto builder_result = arrow::MakeBuilder(field->type()); + if (!builder_result.ok()) { + log_error("Failed to create builder for field '{}': {}", field->name(), + builder_result.status().ToString()); + return builder_result.status(); + } + builders.push_back(std::move(builder_result.ValueOrDie())); + } + + // Populate builders with data from each node + IF_DEBUG_ENABLED { + log_debug("Adding data from {} nodes to builders", nodes.size()); + } + for (const auto& node : nodes) { + auto view = node->view(nullptr); + + for (int i = 0; i < schema->num_fields(); i++) { + auto field = schema->field(i); + const auto& field_name = field->name(); + + auto res = view.get_value_ptr(field); + if (res.ok()) { + auto value = res.ValueOrDie(); + if (value) { + auto scalar_result = value_ptr_to_arrow_scalar(value, field->type()); + if (!scalar_result.ok()) { + log_error("Failed to convert value to scalar for field '{}': {}", + field_name, scalar_result.status().ToString()); + return scalar_result.status(); + } + + const auto& scalar = scalar_result.ValueOrDie(); + auto status = builders[i]->AppendScalar(*scalar); + if (!status.ok()) { + log_error("Failed to append scalar for field '{}': {}", field_name, + status.ToString()); + return status; + } + } else { + IF_DEBUG_ENABLED { + log_debug("Null value for field '{}', appending null", field_name); + } + auto status = builders[i]->AppendNull(); + if (!status.ok()) { + log_error("Failed to append null for field '{}': {}", field_name, + status.ToString()); + return status; + } + } + } else { + IF_DEBUG_ENABLED { + log_debug("Field '{}' not found in node, appending null", field_name); + } + auto status = builders[i]->AppendNull(); + if (!status.ok()) { + log_error("Failed to append null for field '{}': {}", field_name, + status.ToString()); + return status; + } + } + } + } + + // Finish building arrays + IF_DEBUG_ENABLED { log_debug("Finalizing arrays from builders"); } + std::vector> arrays; + arrays.reserve(builders.size()); + for (auto& builder : builders) { + std::shared_ptr array; + auto status = builder->Finish(&array); + if (!status.ok()) { + log_error("Failed to finish array builder: {}", status.ToString()); + return status; + } + arrays.push_back(array); + } + + IF_DEBUG_ENABLED { + log_debug("Creating table with {} rows and {} columns", + arrays.empty() ? 0 : arrays[0]->length(), arrays.size()); + } + return arrow::Table::Make(arrow_schema, arrays); +} + +arrow::Result> create_empty_table( + const std::shared_ptr& schema) { + std::vector> empty_arrays; + + for (const auto& field : schema->fields()) { + std::shared_ptr empty_array; + + switch (field->type()->id()) { + case arrow::Type::INT64: { + arrow::Int64Builder builder; + ARROW_RETURN_NOT_OK(builder.Finish(&empty_array)); + break; + } + case arrow::Type::STRING: { + arrow::StringBuilder builder; + ARROW_RETURN_NOT_OK(builder.Finish(&empty_array)); + break; + } + case arrow::Type::DOUBLE: { + arrow::DoubleBuilder builder; + ARROW_RETURN_NOT_OK(builder.Finish(&empty_array)); + break; + } + case arrow::Type::BOOL: { + arrow::BooleanBuilder builder; + ARROW_RETURN_NOT_OK(builder.Finish(&empty_array)); + break; + } + default: + empty_array = std::make_shared(0); + } + + empty_arrays.push_back(empty_array); + } + + return arrow::Table::Make(schema, empty_arrays); +} + +arrow::Result> filter( + const std::shared_ptr& table, const WhereExpr& condition, + const bool strip_var) { + IF_DEBUG_ENABLED { + log_debug("Filtering table with WhereCondition: {}", condition.toString()); + } + + try { + const auto filter_expr = + where_condition_to_expression(condition, strip_var); + + IF_DEBUG_ENABLED { + log_debug("Creating in-memory dataset from table with {} rows", + table->num_rows()); + } + auto dataset = std::make_shared(table); + + IF_DEBUG_ENABLED { log_debug("Creating scanner builder"); } + auto scan_builder_result = dataset->NewScan(); + if (!scan_builder_result.ok()) { + log_error("Failed to create scanner builder: {}", + scan_builder_result.status().ToString()); + return scan_builder_result.status(); + } + const auto& scan_builder = scan_builder_result.ValueOrDie(); + + IF_DEBUG_ENABLED { + log_debug("Applying compound filter to scanner builder"); + } + auto filter_status = scan_builder->Filter(filter_expr); + if (!filter_status.ok()) { + log_error("Failed to apply filter: {}", filter_status.ToString()); + return filter_status; + } + + IF_DEBUG_ENABLED { log_debug("Finishing scanner"); } + auto scanner_result = scan_builder->Finish(); + if (!scanner_result.ok()) { + log_error("Failed to finish scanner: {}", + scanner_result.status().ToString()); + return scanner_result.status(); + } + const auto& scanner = scanner_result.ValueOrDie(); + + IF_DEBUG_ENABLED { log_debug("Executing scan to table"); } + auto table_result = scanner->ToTable(); + if (!table_result.ok()) { + log_error("Failed to convert scan results to table: {}", + table_result.status().ToString()); + return table_result.status(); + } + + auto result_table = table_result.ValueOrDie(); + IF_DEBUG_ENABLED { + log_debug("Filter completed: {} rows in, {} rows out", table->num_rows(), + result_table->num_rows()); + } + return result_table; + + } catch (const std::exception& e) { + log_error("Failed to convert WhereCondition to Arrow expression: {}", + e.what()); + return arrow::Status::Invalid("Failed to convert WHERE condition: ", + e.what()); + } +} + } // namespace tundradb \ No newline at end of file diff --git a/src/clock.cpp b/src/clock.cpp index 2219864..0f4d9cc 100644 --- a/src/clock.cpp +++ b/src/clock.cpp @@ -1,4 +1,4 @@ -#include "../include/clock.hpp" +#include "clock.hpp" namespace tundradb { diff --git a/src/core.cpp b/src/core.cpp index b707dd0..9f8b3e2 100644 --- a/src/core.cpp +++ b/src/core.cpp @@ -1,14 +1,5 @@ -#include "../include/core.hpp" +#include "core.hpp" -#include "../include/join.hpp" -#include "../include/temporal_context.hpp" - -using namespace tundradb; - -#include -#include -#include -#include #include #include #include @@ -18,1041 +9,27 @@ using namespace tundradb; #include #include -#include #include -#include -#include #include +#include #include #include -#include #include -#include +#include #include -#include #include -#include "container_concepts.hpp" +#include "arrow_utils.hpp" +#include "join.hpp" #include "logger.hpp" +#include "row.hpp" +#include "temporal_context.hpp" #include "utils.hpp" namespace fs = std::filesystem; namespace tundradb { -constexpr static uint64_t NODE_MASK = (1ULL << 48) - 1; - -// Deterministic 16-bit tag from alias string (SchemaRef::value()). -// https://www.ietf.org/archive/id/draft-eastlake-fnv-21.html -static uint16_t compute_tag(const SchemaRef& ref) { - // FNV-1a 32-bit, then fold to 16 bits. - const std::string& s = ref.value(); - uint32_t h = 2166136261u; - for (unsigned char c : s) { - h ^= c; - h *= 16777619u; - } - h ^= (h >> 16); - return static_cast(h & 0xFFFFu); -} - -/** - * @brief Creates a packed 64-bit hash code for schema+node_id pairs - * - * This function combines a schema identifier and node ID into a single 64-bit - * value for efficient storage and comparison in hash sets/maps. This eliminates - * the need for expensive string concatenation and hashing that was previously - * used for tracking visited nodes during graph traversal. - * - * @param schema The schema reference containing a pre-computed 16-bit tag - * @param node_id The node identifier (48-bit max) - * - * @return A 64-bit packed value with layout: - * - Bits 63-48: Schema tag (16 bits) - * - Bits 47-0: Node ID (48 bits, masked) - * - * @details - * Memory Layout: - * ``` - * 63 56 48 40 32 24 16 8 0 - * | Schema | Node ID (48 bits) | - * | (16 bit) | | - * ``` - * - * Performance Benefits: - * - Replaces string operations: "User:12345" → single uint64_t - * - Enables fast integer comparison instead of string hashing - * - Reduces memory allocations (no temporary strings) - * - Compatible with llvm::DenseSet for O(1) lookups - * - * Constraints: - * - Node IDs must fit in 48 bits (max ~281 trillion nodes) - * - Schema tags must be unique within query context - * - NODE_MASK = (1ULL << 48) - 1 = 0x0000FFFFFFFFFFFF - * - * Example: - * ```cpp - * SchemaRef user_schema = SchemaRef::parse("u:User"); - * user_schema.set_tag(0x1234); // Pre-computed schema tag - * - * uint64_t packed = hash_code_(user_schema, 98765); - * // Result: 0x1234000000018149 (schema=0x1234, node=98765) - * - * // Usage in visited tracking: - * llvm::DenseSet visited; - * visited.insert(packed); // Fast O(1) integer hash - * ``` - * - * @see SchemaRef::tag() for schema tag computation - * @see NODE_MASK constant definition - */ -static uint64_t hash_code_(const SchemaRef& schema, int64_t node_id) { - const uint16_t schema_id16 = schema.tag(); - return (static_cast(schema_id16) << 48) | - (static_cast(node_id) & NODE_MASK); -} - -// Utility function to join containers using C++23 ranges -template -std::string join_container(const Container& container, - std::string_view delimiter = ", ") { - return [&]() { - std::ostringstream oss; - for (auto it = container.begin(); it != container.end(); ++it) { - oss << (it != container.begin() ? delimiter : "") << *it; - } - return oss.str(); - }(); -} - -// Convert Value to Arrow scalar for builders -arrow::Result> value_to_arrow_scalar( - const Value& value) { - switch (value.type()) { - case ValueType::INT32: - return arrow::MakeScalar(value.as_int32()); - case ValueType::INT64: - return arrow::MakeScalar(value.as_int64()); - case ValueType::DOUBLE: - return arrow::MakeScalar(value.as_double()); - case ValueType::STRING: - return arrow::MakeScalar(value.as_string()); - case ValueType::BOOL: - return arrow::MakeScalar(value.as_bool()); - case ValueType::NA: - return arrow::MakeNullScalar(arrow::null()); - default: - return arrow::Status::NotImplemented( - "Unsupported Value type for Arrow scalar conversion: ", - tundradb::to_string(value.type())); - } -} - -arrow::Result> value_ptr_to_arrow_scalar( - const char* ptr, const ValueType type) { - switch (type) { - case ValueType::INT32: - return arrow::MakeScalar(*reinterpret_cast(ptr)); - case ValueType::INT64: - return arrow::MakeScalar(*reinterpret_cast(ptr)); - case ValueType::DOUBLE: - return arrow::MakeScalar(*reinterpret_cast(ptr)); - case ValueType::STRING: { - auto str_ref = *reinterpret_cast(ptr); - return arrow::MakeScalar(str_ref.to_string()); - } - case ValueType::BOOL: - return arrow::MakeScalar(*reinterpret_cast(ptr)); - case ValueType::NA: - return arrow::MakeNullScalar(arrow::null()); - default: - return arrow::Status::NotImplemented( - "Unsupported Value type for Arrow scalar conversion: ", - tundradb::to_string(type)); - } -} - -// Recursively convert WhereExpr to Arrow compute expression -arrow::compute::Expression where_condition_to_expression( - const WhereExpr& condition, bool strip_var) { - // Use the unified to_arrow_expression() method from WhereExpr - return condition.to_arrow_expression(strip_var); -} - -arrow::Result> create_table_from_nodes( - const std::shared_ptr& schema, - const std::vector>& nodes) { - auto arrow_schema = schema->arrow(); - IF_DEBUG_ENABLED { - log_debug("Creating table from {} nodes with schema '{}'", nodes.size(), - arrow_schema->ToString()); - } - - // Create builders for each field - std::vector> builders; - for (const auto& field : arrow_schema->fields()) { - IF_DEBUG_ENABLED { - log_debug("Creating builder for field '{}' with type {}", field->name(), - field->type()->ToString()); - } - auto builder_result = arrow::MakeBuilder(field->type()); - if (!builder_result.ok()) { - log_error("Failed to create builder for field '{}': {}", field->name(), - builder_result.status().ToString()); - return builder_result.status(); - } - builders.push_back(std::move(builder_result.ValueOrDie())); - } - - // Populate builders with data from each node - IF_DEBUG_ENABLED { - log_debug("Adding data from {} nodes to builders", nodes.size()); - } - for (const auto& node : nodes) { - // Create temporal view (nullptr = current version) - auto view = node->view(nullptr); - - // Add each field's value to the appropriate builder - for (int i = 0; i < schema->num_fields(); i++) { - auto field = schema->field(i); - const auto& field_name = field->name(); - - // Find the value in the node's data - auto res = view.get_value_ptr(field); - if (res.ok()) { - // Convert Value to Arrow scalar and append to builder - auto value = res.ValueOrDie(); - if (value) { - auto scalar_result = value_ptr_to_arrow_scalar(value, field->type()); - if (!scalar_result.ok()) { - log_error("Failed to convert value to scalar for field '{}': {}", - field_name, scalar_result.status().ToString()); - return scalar_result.status(); - } - - const auto& scalar = scalar_result.ValueOrDie(); - auto status = builders[i]->AppendScalar(*scalar); - if (!status.ok()) { - log_error("Failed to append scalar for field '{}': {}", field_name, - status.ToString()); - return status; - } - } else { - IF_DEBUG_ENABLED { - log_debug("Null value for field '{}', appending null", field_name); - } - auto status = builders[i]->AppendNull(); - if (!status.ok()) { - log_error("Failed to append null for field '{}': {}", field_name, - status.ToString()); - return status; - } - } - } else { - IF_DEBUG_ENABLED { - log_debug("Field '{}' not found in node, appending null", field_name); - } - auto status = builders[i]->AppendNull(); - if (!status.ok()) { - log_error("Failed to append null for field '{}': {}", field_name, - status.ToString()); - return status; - } - } - } - } - - // Finish building arrays - IF_DEBUG_ENABLED { log_debug("Finalizing arrays from builders"); } - std::vector> arrays; - arrays.reserve(builders.size()); - for (auto& builder : builders) { - std::shared_ptr array; - auto status = builder->Finish(&array); - if (!status.ok()) { - log_error("Failed to finish array builder: {}", status.ToString()); - return status; - } - arrays.push_back(array); - } - - // Create table - IF_DEBUG_ENABLED { - log_debug("Creating table with {} rows and {} columns", - arrays.empty() ? 0 : arrays[0]->length(), arrays.size()); - } - return arrow::Table::Make(arrow_schema, arrays); -} - -arrow::Result> filter( - const std::shared_ptr& table, const WhereExpr& condition, - const bool strip_var) { - IF_DEBUG_ENABLED { - log_debug("Filtering table with WhereCondition: {}", condition.toString()); - } - - try { - // Convert WhereCondition to Arrow compute expression - const auto filter_expr = - where_condition_to_expression(condition, strip_var); - - IF_DEBUG_ENABLED { - log_debug("Creating in-memory dataset from table with {} rows", - table->num_rows()); - } - auto dataset = std::make_shared(table); - - // Create scanner builder - IF_DEBUG_ENABLED { log_debug("Creating scanner builder"); } - auto scan_builder_result = dataset->NewScan(); - if (!scan_builder_result.ok()) { - log_error("Failed to create scanner builder: {}", - scan_builder_result.status().ToString()); - return scan_builder_result.status(); - } - const auto& scan_builder = scan_builder_result.ValueOrDie(); - - IF_DEBUG_ENABLED { - log_debug("Applying compound filter to scanner builder"); - } - auto filter_status = scan_builder->Filter(filter_expr); - if (!filter_status.ok()) { - log_error("Failed to apply filter: {}", filter_status.ToString()); - return filter_status; - } - - IF_DEBUG_ENABLED { log_debug("Finishing scanner"); } - auto scanner_result = scan_builder->Finish(); - if (!scanner_result.ok()) { - log_error("Failed to finish scanner: {}", - scanner_result.status().ToString()); - return scanner_result.status(); - } - const auto& scanner = scanner_result.ValueOrDie(); - - IF_DEBUG_ENABLED { log_debug("Executing scan to table"); } - auto table_result = scanner->ToTable(); - if (!table_result.ok()) { - log_error("Failed to convert scan results to table: {}", - table_result.status().ToString()); - return table_result.status(); - } - - auto result_table = table_result.ValueOrDie(); - IF_DEBUG_ENABLED { - log_debug("Filter completed: {} rows in, {} rows out", table->num_rows(), - result_table->num_rows()); - } - return result_table; - - } catch (const std::exception& e) { - log_error("Failed to convert WhereCondition to Arrow expression: {}", - e.what()); - return arrow::Status::Invalid("Failed to convert WHERE condition: ", - e.what()); - } -} - -void debug_connections( - int64_t id, - const std::map>& connections, - std::vector path, std::vector& res) { - if (!connections.contains(id)) { - res.push_back(join_container(path)); - return; - } - for (const auto& conn : connections.at(id)) { - path.push_back(conn.toString()); - debug_connections(conn.target_id, connections, path, res); - } -} - -void print_paths( - const std::map>& connections) { - IF_DEBUG_ENABLED { - log_debug("Printing all paths in connection graph:"); - - if (connections.empty()) { - log_debug(" No connections found"); - return; - } - - for (const auto& [source_id, conn_list] : connections) { - if (conn_list.empty()) { - log_debug(" Node {} has no outgoing connections", source_id); - continue; - } - - for (const auto& conn : conn_list) { - log_debug(" {} -[{}]-> {}", source_id, conn.edge_type, conn.target_id); - } - } - - log_debug("Total of {} source nodes with connections", connections.size()); - } -} - -std::set get_roots( - const std::map>& connections) { - std::set roots; - std::unordered_map count; - std::vector stack; - for (const auto& id : connections | std::views::keys) { - count[id] = 0; - stack.push_back(id); - } - - while (!stack.empty()) { - auto curr = stack[stack.size() - 1]; - stack.pop_back(); - - if (connections.contains(curr)) { - for (auto const& next : connections.at(curr)) { - count[next.target_id]++; - stack.push_back(next.target_id); - } - } - } - for (const auto& [id, c] : count) { - if (c == 0) { - roots.insert(id); - } - } - return roots; -} - -arrow::Result> build_denormalized_schema( - const QueryState& query_state) { - IF_DEBUG_ENABLED { log_debug("Building schema for denormalized table"); } - - std::set processed_fields; - std::vector> fields; - std::set processed_schemas; - - // First add fields from the FROM schema - std::string from_schema = query_state.from.value(); - - IF_DEBUG_ENABLED { - log_debug("Adding fields from FROM schema '{}'", from_schema); - } - - auto schema_result = - query_state.schema_registry()->get(query_state.aliases().at(from_schema)); - if (!schema_result.ok()) { - return schema_result.status(); - } - - auto schema = schema_result.ValueOrDie(); - auto arrow_schema = schema->arrow(); - for (const auto& field : arrow_schema->fields()) { - std::string prefixed_field_name = from_schema + "." + field->name(); - processed_fields.insert(prefixed_field_name); - fields.push_back(arrow::field(prefixed_field_name, field->type())); - } - processed_schemas.insert(from_schema); - - std::vector unique_schemas; - for (const auto& traverse : query_state.traversals) { - if (processed_schemas.insert(traverse.source().value()).second) { - unique_schemas.push_back(traverse.source()); - } - if (processed_schemas.insert(traverse.target().value()).second) { - unique_schemas.push_back(traverse.target()); - } - } - - for (const auto& schema_ref : unique_schemas) { - IF_DEBUG_ENABLED { - log_debug("Adding fields from schema '{}'", schema_ref.value()); - } - - schema_result = query_state.schema_registry()->get( - query_state.aliases().at(schema_ref.value())); - if (!schema_result.ok()) { - return schema_result.status(); - } - - arrow_schema = schema_result.ValueOrDie()->arrow(); - for (const auto& field : arrow_schema->fields()) { - std::string prefixed_field_name = - schema_ref.value() + "." + field->name(); - if (processed_fields.contains(prefixed_field_name)) { - return arrow::Status::KeyError("Field '{}' already exists", - prefixed_field_name); - } - - processed_fields.insert(prefixed_field_name); - fields.push_back(arrow::field(prefixed_field_name, field->type())); - } - } - - return std::make_shared(fields); -} - -struct PathSegment { - uint16_t schema_tag; // Optimized: use 16-bit tag instead of string - std::string schema; // Keep for compatibility/debugging - int64_t node_id; - - // Constructor with both tag and schema for performance - PathSegment(uint16_t tag, const std::string& schema_name, int64_t id) - : schema_tag(tag), schema(schema_name), node_id(id) {} - - // Legacy constructor for compatibility - PathSegment(const std::string& schema_name, int64_t id) - : schema_tag(0), schema(schema_name), node_id(id) {} - - std::string toString() const { - return schema + ":" + std::to_string(node_id); - } - - bool operator==(const PathSegment& other) const { - // Fast path: compare tags first, then fallback to string comparison - if (schema_tag != 0 && other.schema_tag != 0) { - return schema_tag == other.schema_tag && node_id == other.node_id; - } - return schema == other.schema && node_id == other.node_id; - } -}; - -bool is_prefix(const std::vector& prefix, - const std::vector& path) { - if (prefix.size() > path.size()) { - return false; - } - int i = 0; - while (i < prefix.size()) { - if (prefix[i] != path[i]) return false; - i++; - } - return true; -} - -std::string join_schema_path(const std::vector& schema_path) { - std::ostringstream oss; - for (size_t i = 0; i < schema_path.size(); ++i) { - if (i != 0) oss << "->"; - oss << schema_path[i].toString(); - } - return oss.str(); -} - -struct Row { - int64_t id; - std::vector cells; // Optimized: index-based field access - std::vector path; - std::unordered_map ids; - bool ids_populated = false; - - // Optimized constructor that pre-allocates cells - explicit Row(size_t max_field_count = 64) : cells(max_field_count) {} - - // Optimized: set cell by field index - void set_cell(int field_id, ValueRef value_ref) { - if (field_id >= 0 && field_id < static_cast(cells.size())) { - cells[field_id] = value_ref; - } - } - - // Optimized: check value by field index - [[nodiscard]] bool has_value(int field_id) const { - return field_id >= 0 && field_id < static_cast(cells.size()) && - cells[field_id].data != nullptr; - } - - // Optimized: set cells from node using field indices - void set_cell_from_node(const std::vector& field_indices, - const std::shared_ptr& node, - TemporalContext* temporal_context) { - // Create temporal view (nullptr = current version) - auto view = node->view(temporal_context); - - const auto& fields = node->get_schema()->fields(); - const size_t n = std::min(fields.size(), field_indices.size()); - for (size_t i = 0; i < n; ++i) { - const auto& field = fields[i]; - const int field_id = field_indices[i]; - auto value_ref_result = view.get_value_ref(field); - if (value_ref_result.ok()) { - this->set_cell(field_id, value_ref_result.ValueOrDie()); - } - } - } - - [[nodiscard]] bool start_with(const std::vector& prefix) const { - return is_prefix(prefix, this->path); - } - - const std::unordered_map& extract_schema_ids( - const llvm::SmallDenseMap& field_id_to_name) { - if (ids_populated) { - return ids; - } - for (int i = 0; i < cells.size(); ++i) { - const auto& value = cells[i]; - if (!value.data) continue; - const auto& field_name = field_id_to_name.at(i); - // Extract schema prefix (everything before the first dot) - size_t dot_pos = field_name.find('.'); - if (dot_pos != std::string::npos) { - std::string schema = field_name.substr(0, dot_pos); - // Store ID for this schema if it's an ID field - if (field_name.substr(dot_pos + 1) == "id") { - ids[schema] = value.as_int64(); - } - } - } - return ids; - } - - // returns new Row which is result of merging this row and other - [[nodiscard]] std::shared_ptr merge( - const std::shared_ptr& other) const { - std::shared_ptr merged = - std::make_shared(*this); // Copy needed for merge result - IF_DEBUG_ENABLED { - log_debug("Row::merge() - this: {}", this->ToString()); - log_debug("Row::merge() - other: {}", other->ToString()); - } - - for (int i = 0; i < other->cells.size(); ++i) { - if (!merged->has_value(i)) { - IF_DEBUG_ENABLED { - log_debug("Row::merge() - adding field '{}' with value: {}", i, - cells[i].ToString()); - } - merged->cells[i] = other->cells[i]; - } else { - IF_DEBUG_ENABLED { - log_debug("Row::merge() - skipping field '{}' (already has value)", - i); - } - } - } - IF_DEBUG_ENABLED { - log_debug("Row::merge() - result: {}", merged->ToString()); - } - return merged; - } - - [[nodiscard]] std::string ToString() const { - std::stringstream ss; - ss << "Row{"; - ss << "path='" << join_schema_path(path) << "', "; - - bool first = true; - for (int i = 0; i < cells.size(); i++) { - if (!first) { - ss << ", "; - } - first = false; - - ss << i << ": "; - const auto value_ref = cells[i]; - if (!value_ref.data) { - ss << "NULL"; - } else { - // Handle different scalar types appropriately - switch (value_ref.type) { - case ValueType::INT64: - ss << value_ref.as_int64(); - break; - case ValueType::INT32: - ss << value_ref.as_int32(); - break; - case ValueType::DOUBLE: - ss << value_ref.as_double(); - break; - case ValueType::STRING: - ss << "\"" << value_ref.as_string_ref().to_string() << "\""; - break; - case ValueType::BOOL: - ss << (value_ref.as_bool() ? "true" : "false"); - break; - default: - ss << "unknown"; - break; - } - } - } - ss << "}"; - return ss.str(); - } -}; - -static Row create_empty_row_from_schema( - const std::shared_ptr& final_output_schema) { - Row new_row(final_output_schema->num_fields() + - 32); // Pre-allocate with some extra space - new_row.id = -1; - return new_row; -} - -struct RowNode; - -std::vector get_child_rows(const Row& parent, - const std::vector& rows) { - std::vector child; - for (const auto& row : rows) { - if (parent.id != row.id && row.start_with(parent.path)) { - child.push_back(row); - } - } - return child; -} - -struct RowNode { - std::optional> row; - int depth; - PathSegment path_segment; - std::vector> children; - - RowNode() : depth(0), path_segment{"", -1} {} - - RowNode(std::optional> r, int d, - std::vector> c = {}) - : row(std::move(r)), - depth(d), - path_segment{"", -1}, - children(std::move(c)) {} - - bool leaf() const { return row.has_value(); } - - void insert_row_dfs(size_t path_idx, const std::shared_ptr& new_row) { - if (path_idx == new_row->path.size()) { - this->row = - new_row; // Share the same Row - no copy needed in tree insertion - return; - } - - for (const auto& n : children) { - if (n->path_segment == new_row->path[path_idx]) { - n->insert_row_dfs(path_idx + 1, new_row); - return; - } - } - - auto new_node = std::make_unique(); - new_node->depth = depth + 1; - new_node->path_segment = new_row->path[path_idx]; - new_node->insert_row_dfs(path_idx + 1, new_row); - children.emplace_back(std::move(new_node)); - } - - void insert_row(const std::shared_ptr& new_row) { - insert_row_dfs(0, new_row); - } - - std::vector> merge_rows( - const llvm::SmallDenseMap& field_id_to_name) { - if (this->leaf()) { - return {this->row.value()}; - } - - // collect all records from child node and group them by schema - // Optimized: use schema tags instead of strings for faster grouping - llvm::SmallDenseMap>, 8> grouped; - for (const auto& c : children) { - auto child_rows = c->merge_rows(field_id_to_name); - IF_DEBUG_ENABLED { - log_debug("Child node {} returned {} rows", c->path_segment.toString(), - child_rows.size()); - for (size_t i = 0; i < child_rows.size(); ++i) { - log_debug(" Child row [{}]: {}", i, child_rows[i]->ToString()); - } - } - // Use schema_tag for fast integer-based grouping instead of string lookup - uint16_t tag = c->path_segment.schema_tag; - if (tag == 0) { - // Fallback: compute a simple hash of the schema string - tag = static_cast( - std::hash{}(c->path_segment.schema) & 0xFFFFu); - } - grouped[tag].insert(grouped[tag].end(), child_rows.begin(), - child_rows.end()); - } - - std::vector>> groups_for_product; - // Add this->row as its own group (that is important for cartesian product) - // if it exists and has data, - // to represent the node itself if it should be part of the product - // independently. - if (this->row.has_value()) { - std::shared_ptr node_self_row = std::make_shared( - *this->row.value()); // Create a copy like original - // Normalize path for the node's own row to ensure it combines correctly - // and doesn't carry a longer BFS path if it was a leaf of BFS. - // i.e. current node path can be a:0->b:1->c:2 - // this code sets it to 'c:2' - node_self_row->path = {this->path_segment}; - IF_DEBUG_ENABLED { - log_debug("Adding node self row: {}", node_self_row->ToString()); - } - groups_for_product.push_back({node_self_row}); - } - - for (const auto& pair : grouped) { - if (!pair.second.empty()) { - IF_DEBUG_ENABLED { - log_debug("Adding group for schema '{}' with {} rows", pair.first, - pair.second.size()); - for (size_t i = 0; i < pair.second.size(); ++i) { - log_debug(" Group row [{}]: {}", i, pair.second[i]->ToString()); - } - } - groups_for_product.push_back(pair.second); - } - } - - IF_DEBUG_ENABLED { - log_debug("Total groups for Cartesian product: {}", - groups_for_product.size()); - } - - if (groups_for_product.empty()) { - return {}; - } - // If only one group (e.g. only this->row.value() or only one child branch - // with data), no Cartesian product is needed. Just return its rows, but - // ensure paths are correct. - if (groups_for_product.size() == 1) { - std::vector> single_group_rows = - groups_for_product[0]; - // Ensure path is normalized for these rows if they came from children - // For rows that are just this->row.value(), path is already set. - // This might be too aggressive if child rows are already fully merged - // products. For now, let's assume rows from c->merge_rows() are final - // products of that child branch. - return single_group_rows; - } - - std::vector> final_merged_rows = - groups_for_product.back(); - IF_DEBUG_ENABLED { - log_debug("Starting Cartesian product with final group ({} rows)", - final_merged_rows.size()); - for (size_t i = 0; i < final_merged_rows.size(); ++i) { - log_debug(" Final group row [{}]: {}", i, - final_merged_rows[i]->ToString()); - } - } - - for (int i = static_cast(groups_for_product.size()) - 2; i >= 0; --i) { - IF_DEBUG_ENABLED { - log_debug("Processing group {} with {} rows", i, - groups_for_product[i].size()); - for (size_t j = 0; j < groups_for_product[i].size(); ++j) { - log_debug(" Current group row [{}]: {}", j, - groups_for_product[i][j]->ToString()); - } - } - - std::vector> temp_product_accumulator; - for (const auto& r1_from_current_group : groups_for_product[i]) { - for (const auto& r2_from_previous_product : final_merged_rows) { - // Check for conflicts in shared variables between rows - bool can_merge = true; - - // Get variable prefixes (schema names) from cells - std::unordered_map schema_ids_r1 = - r1_from_current_group->extract_schema_ids(field_id_to_name); - std::unordered_map schema_ids_r2 = - r2_from_previous_product->extract_schema_ids(field_id_to_name); - - // Check for conflicts - same schema name but different IDs - for (const auto& [schema, id1] : schema_ids_r1) { - if (schema_ids_r2.contains(schema) && - schema_ids_r2[schema] != id1) { - // Found a conflict - same schema but different IDs - IF_DEBUG_ENABLED { - log_debug( - "Conflict detected: Schema '{}' has different IDs: {} vs " - "{}", - schema, id1, schema_ids_r2[schema]); - } - can_merge = false; - break; - } - } - - // Additional cell-by-cell check for conflicts - if (can_merge) { - for (int field_index = 0; - field_index < r1_from_current_group->cells.size(); - ++field_index) { - if (r1_from_current_group->has_value(i) && - r2_from_previous_product->has_value(i)) { - // Both rows have this field with non-null values - check if - // they match - if (!r1_from_current_group->cells[i].equals( - r2_from_previous_product->cells[i])) { - IF_DEBUG_ENABLED { - log_debug( - "Conflict detected: Field '{}' has different values", - field_id_to_name.at(i)); - } - can_merge = false; - break; - } - } - } - } - - if (can_merge) { - std::shared_ptr merged_r = - r1_from_current_group->merge(r2_from_previous_product); - // Set the path of the newly merged row to the path of the current - // RowNode - merged_r->path = {this->path_segment}; - IF_DEBUG_ENABLED { - log_debug("Merged row: {}", merged_r->ToString()); - } - temp_product_accumulator.push_back(merged_r); - } else { - IF_DEBUG_ENABLED { - log_debug("Cannot merge rows due to conflicts"); - } - } - } - } - final_merged_rows = std::move(temp_product_accumulator); - if (final_merged_rows.empty()) { - IF_DEBUG_ENABLED { - log_debug("product_accumulator is empty. stop merge"); - } - break; - } - } - return final_merged_rows; - } - - std::string toString(bool recursive = true, int indent_level = 0) const { - // Helper to build indentation string based on level - auto get_indent = [](int level) { return std::string(level * 2, ' '); }; - - std::stringstream ss; - std::string indent = get_indent(indent_level); - - // Print basic node info - ss << indent << "RowNode [path=" << path_segment.toString() - << ", depth=" << depth << "] {\n"; - - // Print Row - if (row.has_value()) { - ss << indent << " Path: "; - if (row.value()->path.empty()) { - ss << "(empty)"; - } else { - for (size_t i = 0; i < row.value()->path.size(); ++i) { - if (i > 0) ss << " → "; - ss << row.value()->path[i].schema << ":" - << row.value()->path[i].node_id; - } - } - ss << "\n"; - - // Print key cell values (limited to avoid overwhelming output) - ss << indent << " Cells: "; - if (row.value()->cells.empty()) { - ss << "(empty)"; - } else { - size_t count = 0; - ss << "{ "; - for (int i = 0; i < row.value()->cells.size(); i++) { - if (count++ > 0) ss << ", "; - if (count > 5) { // Limit display - ss << "... +" << (row.value()->cells.size() - 5) << " more"; - break; - } - - ss << i << ": "; - if (!row.value()->has_value(i)) { - ss << "NULL"; - } else { - ss << row.value()->cells[i].ToString(); - } - } - ss << " }"; - } - } - - ss << "\n"; - - // Print children count - ss << indent << " Children: " << children.size() << "\n"; - - // Recursively print children if requested - if (recursive && !children.empty()) { - ss << indent << " [\n"; - for (const auto& child : children) { - if (child) { - ss << child->toString(true, indent_level + 2); - } else { - ss << get_indent(indent_level + 2) << "(null child)\n"; - } - } - ss << indent << " ]\n"; - } - - ss << indent << "}\n"; - return ss.str(); - } - - friend std::ostream& operator<<(std::ostream& os, const RowNode& node) { - return os << node.toString(); - } - - void print(const bool recursive = true) const { - log_debug(toString(recursive)); - } -}; - -struct QueueItem { - int64_t node_id; - SchemaRef schema_ref; - int level; - std::shared_ptr row; - llvm::SmallDenseSet - path_visited_nodes; // packed (schema_id<<48 | node_id) for this path - std::vector path; - - QueueItem(int64_t id, const SchemaRef& schema, int l, std::shared_ptr r) - : node_id(id), schema_ref(schema), level(l), row(std::move(r)) { - path.emplace_back(schema.tag(), schema.value(), id); - } -}; - -// Log grouped connections for a node -void log_grouped_connections( - int64_t node_id, - const llvm::SmallDenseMap, 4>& - grouped_connections) { - IF_DEBUG_ENABLED { - if (grouped_connections.empty()) { - log_debug("Node {} has no grouped connections", node_id); - return; - } - - log_debug("Node {} has connections to {} target schemas:", node_id, - grouped_connections.size()); - - for (const auto& it : grouped_connections) { - auto target_schema = it.first; - const auto& connections = it.second; - log_debug(" To schema '{}': {} connections", target_schema.str(), - connections.size()); - - for (size_t i = 0; i < connections.size(); ++i) { - const auto& conn = connections[i]; - log_debug(" [{}] {} -[{}]-> {}.{} (target_id: {})", i, - conn.source.value(), conn.edge_type, conn.target.value(), - conn.target.schema(), conn.target_id); - } - } - } -} - arrow::Result>>> populate_rows_bfs(int64_t node_id, const SchemaRef& start_schema, const std::shared_ptr& output_schema, @@ -1392,47 +369,6 @@ arrow::Result>>> populate_rows( return rows; } -arrow::Result> create_empty_table( - const std::shared_ptr& schema) { - // Create empty arrays for each field in the schema - std::vector> empty_arrays; - - for (const auto& field : schema->fields()) { - std::shared_ptr empty_array; - - switch (field->type()->id()) { - case arrow::Type::INT64: { - arrow::Int64Builder builder; - ARROW_RETURN_NOT_OK(builder.Finish(&empty_array)); - break; - } - case arrow::Type::STRING: { - arrow::StringBuilder builder; - ARROW_RETURN_NOT_OK(builder.Finish(&empty_array)); - break; - } - case arrow::Type::DOUBLE: { - arrow::DoubleBuilder builder; - ARROW_RETURN_NOT_OK(builder.Finish(&empty_array)); - break; - } - case arrow::Type::BOOL: { - arrow::BooleanBuilder builder; - ARROW_RETURN_NOT_OK(builder.Finish(&empty_array)); - break; - } - default: - // For any other type, create a generic empty array - empty_array = std::make_shared(0); - } - - empty_arrays.push_back(empty_array); - } - - // Create table from schema and empty arrays - return arrow::Table::Make(schema, empty_arrays); -} - arrow::Result> create_table_from_rows( const std::shared_ptr>>& rows, const std::shared_ptr& output_schema) { @@ -1534,173 +470,6 @@ arrow::Result> create_table_from_rows( return arrow::Table::Make(output_schema, arrays); } -// Function to project a table based on Select clause -std::shared_ptr apply_select( - const std::shared_ptr& select, + const std::shared_ptr& table) { + if (!select || select->fields().empty()) { + return table; + } + + std::vector all_columns; + for (const auto& field : table->schema()->fields()) { + all_columns.push_back(field->name()); + } + + std::unordered_set columns_to_keep; + + for (const auto& field : select->fields()) { + bool is_prefix = true; + + if (field.find('.') != std::string::npos) { + for (int i = 0; i < static_cast(all_columns.size()); ++i) { + if (all_columns[i] == field) { + columns_to_keep.insert(i); + break; + } + } + is_prefix = false; + } + + if (is_prefix) { + std::string prefix = field + "."; + for (int i = 0; i < static_cast(all_columns.size()); ++i) { + if (all_columns[i].find(prefix) == 0) { + columns_to_keep.insert(i); + } + } + } + } + + std::vector column_indices(columns_to_keep.begin(), + columns_to_keep.end()); + std::ranges::sort(column_indices); + + arrow::Result> result = + table->SelectColumns(column_indices); + + if (!result.ok()) { + return table; + } + + return result.ValueOrDie(); +} + +std::vector> get_where_to_inline( + const std::string& target_var, size_t i, + const std::vector>& clauses) { + std::vector> inlined; + for (; i < clauses.size(); i++) { + if (clauses[i]->type() == Clause::Type::WHERE) { + auto where_expr = std::dynamic_pointer_cast(clauses[i]); + if (where_expr->can_inline(target_var)) { + IF_DEBUG_ENABLED { + log_debug("inline where: '{}'", where_expr->toString()); + } + inlined.push_back(where_expr); + } + } + } + return inlined; +} + +arrow::Result> inline_where( + const SchemaRef& ref, std::shared_ptr table, + QueryState& query_state, + const std::vector>& where_exprs) { + auto curr_table = std::move(table); + for (const auto& exp : where_exprs) { + IF_DEBUG_ENABLED { log_debug("inline where '{}'", exp->toString()); } + auto result = filter(curr_table, *exp, true); + if (!result.ok()) { + log_error( + "Where inline. Failed to filter table '{}', where: '{}', error: {}", + ref.toString(), exp->toString(), result.status().ToString()); + return result.status(); + } + ARROW_RETURN_NOT_OK(query_state.update_table(result.ValueOrDie(), ref)); + curr_table = result.ValueOrDie(); + exp->set_inlined(true); + } + return curr_table; +} + +arrow::Status prepare_query(Query& query, QueryState& query_state) { + // Phase 1: Process FROM clause to populate aliases + { + ARROW_ASSIGN_OR_RAISE(auto from_schema, + query_state.register_schema(query.from())); + } + + // Phase 2: Process TRAVERSE clauses to populate aliases and traversals + for (const auto& clause : query.clauses()) { + if (clause->type() == Clause::Type::TRAVERSE) { + auto traverse = std::dynamic_pointer_cast(clause); + + ARROW_ASSIGN_OR_RAISE(auto source_schema, + query_state.register_schema(traverse->source())); + ARROW_ASSIGN_OR_RAISE(auto target_schema, + query_state.register_schema(traverse->target())); + + if (!traverse->source().is_declaration()) { + traverse->mutable_source().set_schema(source_schema); + } + + if (!traverse->target().is_declaration()) { + traverse->mutable_target().set_schema(target_schema); + } + + traverse->mutable_source().set_tag(compute_tag(traverse->source())); + traverse->mutable_target().set_tag(compute_tag(traverse->target())); + + query_state.traversals.push_back(*traverse); + } + } + + // Phase 3: Resolve all ComparisonExpr field references + for (const auto& clause : query.clauses()) { + if (clause->type() == Clause::Type::WHERE) { + auto where_expr = std::dynamic_pointer_cast(clause); + auto res = where_expr->resolve_field_ref( + query_state.aliases(), query_state.schema_registry().get()); + if (!res.ok()) { + return res.status(); + } + } + } + + return arrow::Status::OK(); +} + } // namespace tundradb diff --git a/src/row.cpp b/src/row.cpp new file mode 100644 index 0000000..df98663 --- /dev/null +++ b/src/row.cpp @@ -0,0 +1,224 @@ +#include "row.hpp" + +#include "logger.hpp" + +namespace tundradb { + +std::vector> RowNode::merge_rows( + const llvm::SmallDenseMap& field_id_to_name) { + if (this->leaf()) { + return {this->row.value()}; + } + + // Collect all records from child nodes and group them by schema + llvm::SmallDenseMap>, 8> grouped; + for (const auto& c : children) { + auto child_rows = c->merge_rows(field_id_to_name); + IF_DEBUG_ENABLED { + log_debug("Child node {} returned {} rows", c->path_segment.toString(), + child_rows.size()); + for (size_t i = 0; i < child_rows.size(); ++i) { + log_debug(" Child row [{}]: {}", i, child_rows[i]->ToString()); + } + } + uint16_t tag = c->path_segment.schema_tag; + if (tag == 0) { + tag = static_cast( + std::hash{}(c->path_segment.schema) & 0xFFFFu); + } + grouped[tag].insert(grouped[tag].end(), child_rows.begin(), + child_rows.end()); + } + + std::vector>> groups_for_product; + + if (this->row.has_value()) { + std::shared_ptr node_self_row = + std::make_shared(*this->row.value()); + node_self_row->path = {this->path_segment}; + IF_DEBUG_ENABLED { + log_debug("Adding node self row: {}", node_self_row->ToString()); + } + groups_for_product.push_back({node_self_row}); + } + + for (const auto& pair : grouped) { + if (!pair.second.empty()) { + IF_DEBUG_ENABLED { + log_debug("Adding group for schema '{}' with {} rows", pair.first, + pair.second.size()); + for (size_t i = 0; i < pair.second.size(); ++i) { + log_debug(" Group row [{}]: {}", i, pair.second[i]->ToString()); + } + } + groups_for_product.push_back(pair.second); + } + } + + IF_DEBUG_ENABLED { + log_debug("Total groups for Cartesian product: {}", + groups_for_product.size()); + } + + if (groups_for_product.empty()) { + return {}; + } + + if (groups_for_product.size() == 1) { + return groups_for_product[0]; + } + + std::vector> final_merged_rows = + groups_for_product.back(); + IF_DEBUG_ENABLED { + log_debug("Starting Cartesian product with final group ({} rows)", + final_merged_rows.size()); + for (size_t i = 0; i < final_merged_rows.size(); ++i) { + log_debug(" Final group row [{}]: {}", i, + final_merged_rows[i]->ToString()); + } + } + + for (int i = static_cast(groups_for_product.size()) - 2; i >= 0; --i) { + IF_DEBUG_ENABLED { + log_debug("Processing group {} with {} rows", i, + groups_for_product[i].size()); + for (size_t j = 0; j < groups_for_product[i].size(); ++j) { + log_debug(" Current group row [{}]: {}", j, + groups_for_product[i][j]->ToString()); + } + } + + std::vector> temp_product_accumulator; + for (const auto& r1_from_current_group : groups_for_product[i]) { + for (const auto& r2_from_previous_product : final_merged_rows) { + bool can_merge = true; + + std::unordered_map schema_ids_r1 = + r1_from_current_group->extract_schema_ids(field_id_to_name); + std::unordered_map schema_ids_r2 = + r2_from_previous_product->extract_schema_ids(field_id_to_name); + + for (const auto& [schema, id1] : schema_ids_r1) { + if (schema_ids_r2.contains(schema) && schema_ids_r2[schema] != id1) { + IF_DEBUG_ENABLED { + log_debug( + "Conflict detected: Schema '{}' has different IDs: {} vs {}", + schema, id1, schema_ids_r2[schema]); + } + can_merge = false; + break; + } + } + + if (can_merge) { + for (size_t field_index = 0; + field_index < r1_from_current_group->cells.size(); + ++field_index) { + if (r1_from_current_group->has_value(static_cast(i)) && + r2_from_previous_product->has_value(static_cast(i))) { + if (!r1_from_current_group->cells[i].equals( + r2_from_previous_product->cells[i])) { + IF_DEBUG_ENABLED { + log_debug( + "Conflict detected: Field '{}' has different values", + field_id_to_name.at(i)); + } + can_merge = false; + break; + } + } + } + } + + if (can_merge) { + std::shared_ptr merged_r = + r1_from_current_group->merge(r2_from_previous_product); + merged_r->path = {this->path_segment}; + IF_DEBUG_ENABLED { + log_debug("Merged row: {}", merged_r->ToString()); + } + temp_product_accumulator.push_back(merged_r); + } else { + IF_DEBUG_ENABLED { log_debug("Cannot merge rows due to conflicts"); } + } + } + } + final_merged_rows = std::move(temp_product_accumulator); + if (final_merged_rows.empty()) { + IF_DEBUG_ENABLED { + log_debug("product_accumulator is empty. stop merge"); + } + break; + } + } + return final_merged_rows; +} + +std::string RowNode::toString(bool recursive, int indent_level) const { + auto get_indent = [](int level) { return std::string(level * 2, ' '); }; + + std::stringstream ss; + std::string indent = get_indent(indent_level); + + ss << indent << "RowNode [path=" << path_segment.toString() + << ", depth=" << depth << "] {\n"; + + if (row.has_value()) { + ss << indent << " Path: "; + if (row.value()->path.empty()) { + ss << "(empty)"; + } else { + for (size_t i = 0; i < row.value()->path.size(); ++i) { + if (i > 0) ss << " -> "; + ss << row.value()->path[i].schema << ":" + << row.value()->path[i].node_id; + } + } + ss << "\n"; + + ss << indent << " Cells: "; + if (row.value()->cells.empty()) { + ss << "(empty)"; + } else { + size_t count = 0; + ss << "{ "; + for (size_t i = 0; i < row.value()->cells.size(); i++) { + if (count++ > 0) ss << ", "; + if (count > 5) { + ss << "... +" << (row.value()->cells.size() - 5) << " more"; + break; + } + + ss << i << ": "; + if (!row.value()->has_value(static_cast(i))) { + ss << "NULL"; + } else { + ss << row.value()->cells[i].ToString(); + } + } + ss << " }"; + } + } + + ss << "\n"; + + ss << indent << " Children: " << children.size() << "\n"; + + if (recursive && !children.empty()) { + ss << indent << " [\n"; + for (const auto& child : children) { + if (child) { + ss << child->toString(true, indent_level + 2); + } else { + ss << get_indent(indent_level + 2) << "(null child)\n"; + } + } + ss << indent << " ]\n"; + } + + ss << indent << "}\n"; + return ss.str(); +} + +} // namespace tundradb diff --git a/src/types.cpp b/src/types.cpp index 0c30274..116dead 100644 --- a/src/types.cpp +++ b/src/types.cpp @@ -1,6 +1,6 @@ -#include "../include/types.hpp" +#include "types.hpp" -#include "../include/string_arena.hpp" +#include "string_arena.hpp" namespace tundradb {