Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ add_library(core
src/query.cpp
src/query_execution.cpp
src/join.cpp
src/row.cpp
src/storage.cpp
src/metadata.cpp
src/file_utils.cpp
Expand Down
131 changes: 120 additions & 11 deletions include/arrow_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,147 @@
#define ARROW_UTILS_HPP

#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <llvm/ADT/DenseSet.h>
#include <llvm/ADT/SmallVector.h>

#include <memory>
#include <string>
#include <vector>

#include "types.hpp"

namespace tundradb {

// Forward declarations
class Schema;
class Node;
class WhereExpr;
class TemporalContext;

/**
* @brief Extracts the set of "id" column values from an Arrow table.
*
* @param table A table containing an Int64 "id" column.
* @return A DenseSet of all row IDs, or an error if the column is missing.
*/
arrow::Result<llvm::DenseSet<int64_t>> get_ids_from_table(
const std::shared_ptr<arrow::Table>& table);

// Initialize Arrow Compute module - should be called once at startup
/**
* @brief Initialises the Arrow Compute function registry.
*
* Must be called once before any Arrow compute operations are used.
*
* @return True on success.
*/
bool initialize_arrow_compute();

// Arrow utility functions
arrow::Result<std::shared_ptr<arrow::Array>> create_int64_array(
const int64_t value);
/**
* @brief Creates a single-element Int64 Arrow array.
*
* @param value The integer value.
* @return A one-element array.
*/
arrow::Result<std::shared_ptr<arrow::Array>> create_int64_array(int64_t value);

/**
* @brief Creates a single-element String Arrow array.
*
* @param value The string value.
* @return A one-element array.
*/
arrow::Result<std::shared_ptr<arrow::Array>> create_str_array(
const std::string& value);

/**
* @brief Creates a single-element null Arrow array of the given type.
*
* @param type The Arrow data type.
* @return A one-element array containing a null value.
*/
arrow::Result<std::shared_ptr<arrow::Array>> create_null_array(
const std::shared_ptr<arrow::DataType>& type);

/**
* @brief Prepends a field to an Arrow schema.
*
* @param field The field to insert at position 0.
* @param target_schema The existing schema.
* @return A new schema with @p field followed by @p target_schema fields.
*/
std::shared_ptr<arrow::Schema> prepend_field(
const std::shared_ptr<arrow::Field>& field,
const std::shared_ptr<arrow::Schema>& target_schema);

/**
* Creates a new schema by prepending an "id" field of type Int64 to the target
* schema.
* @deprecated for remove
* @param target_schema The original schema to modify
* @return A new schema with "id" field prepended
*/
/** @deprecated Scheduled for removal. Use prepend_field() directly. */
std::shared_ptr<arrow::Schema> prepend_id_field(
const std::shared_ptr<arrow::Schema>& target_schema);

/**
* @brief Converts a Value to an Arrow Scalar.
*
* @param value The database value to convert.
* @return The corresponding Arrow scalar, or an error for unsupported types.
*/
arrow::Result<std::shared_ptr<arrow::Scalar>> value_to_arrow_scalar(
const Value& value);

/**
* @brief Converts a raw pointer + type tag to an Arrow Scalar.
*
* @param ptr Pointer to the raw value bytes.
* @param type The value's type tag.
* @return The corresponding Arrow scalar, or an error for unsupported types.
*/
arrow::Result<std::shared_ptr<arrow::Scalar>> value_ptr_to_arrow_scalar(
const char* ptr, ValueType type);

/**
* @brief Translates a WhereExpr into an Arrow compute Expression.
*
* @param condition The WHERE expression tree.
* @param strip_var If true, variable prefixes (e.g. "u.") are removed from
* field names.
* @return An Arrow Expression suitable for compute::Filter.
*/
arrow::compute::Expression where_condition_to_expression(
const WhereExpr& condition, bool strip_var);

/**
* @brief Materialises a set of nodes into an Arrow table.
*
* Creates a table with a single "id" column followed by the schema's fields.
*
* @param schema The database schema for the nodes.
* @param nodes The nodes to serialise.
* @return The resulting table.
*/
arrow::Result<std::shared_ptr<arrow::Table>> create_table_from_nodes(
const std::shared_ptr<Schema>& schema,
const std::vector<std::shared_ptr<Node>>& nodes);

/**
* @brief Creates an empty Arrow table matching the given schema.
*
* @param schema The Arrow schema.
* @return A zero-row table with the correct columns and types.
*/
arrow::Result<std::shared_ptr<arrow::Table>> create_empty_table(
const std::shared_ptr<arrow::Schema>& schema);

/**
* @brief Filters an Arrow table using a WhereExpr predicate.
*
* @param table The table to filter.
* @param condition The WHERE expression.
* @param strip_var If true, variable prefixes are stripped before matching.
* @return The filtered table, or an error on compute failure.
*/
arrow::Result<std::shared_ptr<arrow::Table>> filter(
const std::shared_ptr<arrow::Table>& table, const WhereExpr& condition,
bool strip_var);

} // namespace tundradb

#endif // ARROW_UTILS_HPP
66 changes: 32 additions & 34 deletions include/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,30 @@ namespace tundradb {
* a JoinStrategy can decide which target (and source) IDs survive.
*/
struct JoinInput {
// All node IDs currently in the source schema within query state
const llvm::DenseSet<int64_t>& source_ids;

// All node IDs that exist in the target table (full scan of target schema)
const llvm::DenseSet<int64_t>& all_target_ids;

// Source nodes that had at least one matching edge
const llvm::DenseSet<int64_t>& matched_source_ids;

// Target nodes that were reached via matching edges
const llvm::DenseSet<int64_t>& matched_target_ids;

// Target IDs already accumulated from a previous traversal that shares
// the same target alias (e.g. multi-pattern queries). Empty on the first
// pass.
const llvm::DenseSet<int64_t>& existing_target_ids;

// Source nodes that had NO matching edge
const llvm::DenseSet<int64_t>& unmatched_source_ids;

// Whether source and target resolve to the same concrete schema
bool is_self_join;
const llvm::DenseSet<int64_t>&
source_ids; ///< All source IDs in query state.
const llvm::DenseSet<int64_t>&
all_target_ids; ///< All IDs in the target table.
const llvm::DenseSet<int64_t>&
matched_source_ids; ///< Sources with ≥ 1 matching edge.
const llvm::DenseSet<int64_t>&
matched_target_ids; ///< Targets reached via edges.
const llvm::DenseSet<int64_t>&
existing_target_ids; ///< Targets from a prior traversal pass.
const llvm::DenseSet<int64_t>&
unmatched_source_ids; ///< Sources with no matching edge.
bool is_self_join; ///< True when source == target schema.
};

/**
* @brief Output of join ID computation
*/
struct JoinOutput {
// Final set of target node IDs to store in query_state.ids[target]
llvm::DenseSet<int64_t> target_ids;

// Source IDs that should be removed from query_state (INNER join pruning)
llvm::DenseSet<int64_t> source_ids_to_remove;

// Whether the source table needs to be rebuilt after pruning
bool rebuild_source_table = false;
llvm::DenseSet<int64_t> target_ids; ///< Final target IDs for query_state.
llvm::DenseSet<int64_t>
source_ids_to_remove; ///< Source IDs to prune (INNER join).
bool rebuild_source_table =
false; ///< True if the source table must be rebuilt.
};

/**
Expand All @@ -68,12 +56,15 @@ class JoinStrategy {
virtual ~JoinStrategy() = default;

/**
* Compute which target/source IDs survive this join.
* @brief Computes which target and source IDs survive this join.
*
* @param input The accumulated traversal state.
* @return The join output describing surviving IDs and pruning actions.
*/
[[nodiscard]] virtual JoinOutput compute(const JoinInput& input) const = 0;

/**
* Human-readable name for logging / debugging.
* @brief Returns a human-readable name for logging / debugging.
*/
[[nodiscard]] virtual const char* name() const noexcept = 0;
};
Expand Down Expand Up @@ -158,11 +149,18 @@ class FullJoinStrategy final : public JoinStrategy {
};

/**
* @brief Creates the appropriate JoinStrategy for a given TraverseType
* and join context (self-join vs. cross-schema).
* @brief Factory that selects the correct JoinStrategy for a given traverse
* type.
*/
class JoinStrategyFactory {
public:
/**
* @brief Creates a strategy instance.
*
* @param type The join/traverse type (Inner, Left, Right, Full).
* @param is_self_join True when source and target resolve to the same schema.
* @return A unique_ptr to the selected strategy.
*/
static std::unique_ptr<JoinStrategy> create(TraverseType type,
bool is_self_join);
};
Expand Down
Loading