diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 2b2fe3cf1c..0d1ea32732 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -73,6 +73,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -541,6 +542,11 @@ public LogicalPlan visitMvCombine(MvCombine node, AnalysisContext context) { throw getOnlyForCalciteException("mvcombine"); } + @Override + public LogicalPlan visitGraphLookup(GraphLookup node, AnalysisContext context) { + throw getOnlyForCalciteException("graphlookup"); + } + /** Build {@link ParseExpression} to context and skip to child nodes. */ @Override public LogicalPlan visitParse(Parse node, AnalysisContext context) { diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index b1082759a3..8abafbc286 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -61,6 +61,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -471,4 +472,8 @@ public T visitAddColTotals(AddColTotals node, C context) { public T visitMvCombine(MvCombine node, C context) { return visitChildren(node, context); } + + public T visitGraphLookup(GraphLookup node, C context) { + return visitChildren(node, context); + } } diff --git a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java index a6f6671084..a44f0bca41 100644 --- a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java @@ -40,6 +40,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Lookup; @@ -529,6 +530,12 @@ public Node visitLookup(Lookup node, FieldResolutionContext context) { throw new IllegalArgumentException("Lookup command cannot be used together with spath command"); } + @Override + public Node visitGraphLookup(GraphLookup node, FieldResolutionContext context) { + throw new IllegalArgumentException( + "GraphLookup command cannot be used together with spath command"); + } + @Override public Node visitValues(Values node, FieldResolutionContext context) { // do nothing diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java b/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java new file mode 100644 index 0000000000..7ab0e04b02 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java @@ -0,0 +1,95 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.Literal; + +/** + * AST node for graphLookup command. Performs BFS graph traversal on a lookup table. + * + *

Example: source=employees | graphLookup employees fromField=manager toField=name maxDepth=3 + * depthField=level direction=uni as hierarchy + */ +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +@AllArgsConstructor +@Builder(toBuilder = true) +public class GraphLookup extends UnresolvedPlan { + /** Direction mode for graph traversal. */ + public enum Direction { + /** Unidirectional - traverse edges in one direction only. */ + UNI, + /** Bidirectional - traverse edges in both directions. */ + BI + } + + /** Target table for graph traversal lookup. */ + private final UnresolvedPlan fromTable; + + /** Field in sourceTable to start with. */ + private final Field startField; + + /** Field in fromTable that represents the outgoing edge. */ + private final Field fromField; + + /** Field in input/fromTable to match against for traversal. */ + private final Field toField; + + /** Output field name for collected traversal results. */ + private final Field as; + + /** Maximum traversal depth. Zero means no limit. */ + private final Literal maxDepth; + + /** Optional field name to include recursion depth in output. */ + private @Nullable final Field depthField; + + /** Direction mode: UNI (default) or BIO for bidirectional. */ + private final Direction direction; + + /** Whether to support array-typed fields without early filter pushdown. */ + private final boolean supportArray; + + /** Whether to batch all source start values into a single unified BFS traversal. */ + private final boolean batchMode; + + private UnresolvedPlan child; + + public String getDepthFieldName() { + return depthField == null ? null : depthField.getField().toString(); + } + + @Override + public UnresolvedPlan attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return child == null ? ImmutableList.of() : ImmutableList.of(child); + } + + @Override + public T accept(AbstractNodeVisitor visitor, C context) { + return visitor.visitGraphLookup(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 5825011f65..cc9891ab20 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -118,6 +118,8 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; +import org.opensearch.sql.ast.tree.GraphLookup.Direction; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -151,6 +153,7 @@ import org.opensearch.sql.ast.tree.Window; import org.opensearch.sql.calcite.plan.AliasFieldsWrappable; import org.opensearch.sql.calcite.plan.OpenSearchConstants; +import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup; import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit; import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType; import org.opensearch.sql.calcite.utils.BinUtils; @@ -2573,6 +2576,57 @@ public RelNode visitAddColTotals(AddColTotals node, CalcitePlanContext context) context, fieldsToAggregate, false, true, null, labelField, label); } + @Override + public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) { + // 1. Visit source (child) table + visitChildren(node, context); + RelBuilder builder = context.relBuilder; + // TODO: Limit the number of source rows to 100 for now, make it configurable. + builder.limit(0, 100); + if (node.isBatchMode()) { + tryToRemoveMetaFields(context, true); + } + RelNode sourceTable = builder.build(); + + // 2. Extract parameters + String startFieldName = node.getStartField().getField().toString(); + String fromFieldName = node.getFromField().getField().toString(); + String toFieldName = node.getToField().getField().toString(); + String outputFieldName = node.getAs().getField().toString(); + String depthFieldName = node.getDepthFieldName(); + boolean bidirectional = node.getDirection() == Direction.BI; + + RexLiteral maxDepthNode = (RexLiteral) rexVisitor.analyze(node.getMaxDepth(), context); + Integer maxDepthValue = maxDepthNode.getValueAs(Integer.class); + maxDepthValue = maxDepthValue == null ? 0 : maxDepthValue; + boolean supportArray = node.isSupportArray(); + boolean batchMode = node.isBatchMode(); + + // 3. Visit and materialize lookup table + analyze(node.getFromTable(), context); + tryToRemoveMetaFields(context, true); + RelNode lookupTable = builder.build(); + + // 4. Create LogicalGraphLookup RelNode + // The conversion rule will extract the OpenSearchIndex from the lookup table + RelNode graphLookup = + LogicalGraphLookup.create( + sourceTable, + lookupTable, + startFieldName, + fromFieldName, + toFieldName, + outputFieldName, + depthFieldName, + maxDepthValue, + bidirectional, + supportArray, + batchMode); + + builder.push(graphLookup); + return builder.peek(); + } + /** * Cast integer sum to long, real/float to double to avoid ClassCastException * diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java new file mode 100644 index 0000000000..e0ea048edb --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java @@ -0,0 +1,181 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan.rel; + +import java.util.List; +import javax.annotation.Nullable; +import lombok.Getter; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.BiRel; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Abstract RelNode for graphLookup command. + * + *

Has two inputs: + * + *

+ * + *

At execution time, performs BFS by dynamically querying OpenSearch with filter pushdown + * instead of loading all lookup data into memory. + * + *

This is a storage-agnostic logical node. Storage-specific implementations (e.g., for + * OpenSearch) should extract the necessary index information from the lookup RelNode during + * conversion to the physical plan. + */ +@Getter +public abstract class GraphLookup extends BiRel { + + // TODO: use RexInputRef instead of String for there fields + protected final String startField; // Field in source table (start entities) + protected final String fromField; // Field in lookup table (edge source) + protected final String toField; // Field in lookup table (edge target) + protected final String outputField; // Name of output array field + @Nullable protected final String depthField; // Name of output array field + + // TODO: add limitation on the maxDepth and input rows count + protected final int maxDepth; // -1 = unlimited + protected final boolean bidirectional; + protected final boolean supportArray; + protected final boolean batchMode; + + private RelDataType outputRowType; + + /** + * Creates a LogicalGraphLookup. + * + * @param cluster Cluster + * @param traitSet Trait set + * @param source Source table RelNode + * @param lookup Lookup table RelNode + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Name of the depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields (disables early visited filter + * pushdown) + * @param batchMode Whether to batch all source start values into a single unified BFS + */ + protected GraphLookup( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + @Nullable String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode) { + super(cluster, traitSet, source, lookup); + this.startField = startField; + this.fromField = fromField; + this.toField = toField; + this.outputField = outputField; + this.depthField = depthField; + this.maxDepth = maxDepth; + this.bidirectional = bidirectional; + this.supportArray = supportArray; + this.batchMode = batchMode; + } + + /** Returns the source table RelNode. */ + public RelNode getSource() { + return left; + } + + /** Returns the lookup table RelNode. */ + public RelNode getLookup() { + return right; + } + + @Override + public abstract RelNode copy(RelTraitSet traitSet, List inputs); + + @Override + protected RelDataType deriveRowType() { + if (outputRowType == null) { + RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder(); + + if (batchMode) { + // Batch mode: Output = [Array, Array] + // First field: aggregated source rows as array + RelDataType sourceRowType = getSource().getRowType(); + RelDataType sourceArrayType = + getCluster().getTypeFactory().createArrayType(sourceRowType, -1); + builder.add(startField, sourceArrayType); + + // Second field: aggregated lookup rows as array + RelDataType lookupRowType = getLookup().getRowType(); + if (this.depthField != null) { + final RelDataTypeFactory.Builder lookupBuilder = getCluster().getTypeFactory().builder(); + lookupBuilder.addAll(lookupRowType.getFieldList()); + RelDataType depthType = getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER); + lookupBuilder.add(this.depthField, depthType); + lookupRowType = lookupBuilder.build(); + } + RelDataType lookupArrayType = + getCluster().getTypeFactory().createArrayType(lookupRowType, -1); + builder.add(outputField, lookupArrayType); + } else { + // Normal mode: Output = source fields + output array field + // Add all source fields + for (var field : getSource().getRowType().getFieldList()) { + builder.add(field); + } + + // Add output field (ARRAY type containing lookup row struct) + RelDataType lookupRowType = getLookup().getRowType(); + if (this.depthField != null) { + final RelDataTypeFactory.Builder lookupBuilder = getCluster().getTypeFactory().builder(); + lookupBuilder.addAll(lookupRowType.getFieldList()); + RelDataType depthType = getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER); + lookupBuilder.add(this.depthField, depthType); + lookupRowType = lookupBuilder.build(); + } + RelDataType arrayType = getCluster().getTypeFactory().createArrayType(lookupRowType, -1); + builder.add(outputField, arrayType); + } + + outputRowType = builder.build(); + } + return outputRowType; + } + + @Override + public double estimateRowCount(RelMetadataQuery mq) { + // Batch mode aggregates all source rows into a single output row + return batchMode ? 1 : getSource().estimateRowCount(mq); + } + + @Override + public RelWriter explainTerms(RelWriter pw) { + return super.explainTerms(pw) + .item("fromField", fromField) + .item("toField", toField) + .item("outputField", outputField) + .item("depthField", depthField) + .item("maxDepth", maxDepth) + .item("bidirectional", bidirectional) + .itemIf("supportArray", supportArray, supportArray) + .itemIf("batchMode", batchMode, batchMode); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java new file mode 100644 index 0000000000..745d0cb382 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java @@ -0,0 +1,133 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.plan.rel; + +import java.util.List; +import javax.annotation.Nullable; +import lombok.Getter; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; + +/** + * Logical RelNode for graphLookup command. TODO: need to support trim fields and several transpose + * rules for this new added RelNode + */ +@Getter +public class LogicalGraphLookup extends GraphLookup { + + /** + * Creates a LogicalGraphLookup. + * + * @param cluster Cluster + * @param traitSet Trait set + * @param source Source table RelNode + * @param lookup Lookup table RelNode + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Name of the depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields + * @param batchMode Whether to batch all source start values into a single unified BFS + */ + protected LogicalGraphLookup( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + @Nullable String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode) { + super( + cluster, + traitSet, + source, + lookup, + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode); + } + + /** + * Creates a LogicalGraphLookup with Convention.NONE. + * + * @param source Source table RelNode + * @param lookup Lookup table RelNode + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Named of the output depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields + * @param batchMode Whether to batch all source start values into a single unified BFS + * @return A new LogicalGraphLookup instance + */ + public static LogicalGraphLookup create( + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + @Nullable String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode) { + RelOptCluster cluster = source.getCluster(); + RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE); + return new LogicalGraphLookup( + cluster, + traitSet, + source, + lookup, + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new LogicalGraphLookup( + getCluster(), + traitSet, + inputs.get(0), + inputs.get(1), + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java index 17d99fb4fb..8dfe963081 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java @@ -223,7 +223,7 @@ public static ExprType convertSqlTypeNameToExprType(SqlTypeName sqlTypeName) { case BIGINT -> LONG; case FLOAT, REAL -> FLOAT; case DOUBLE, DECIMAL -> DOUBLE; // TODO the decimal is only used for literal - case CHAR, VARCHAR -> STRING; + case CHAR, VARCHAR, MULTISET -> STRING; // call toString() for MULTISET case BOOLEAN -> BOOLEAN; case DATE -> DATE; case TIME, TIME_TZ, TIME_WITH_LOCAL_TIME_ZONE -> TIME; diff --git a/docs/user/ppl/cmd/graphlookup.md b/docs/user/ppl/cmd/graphlookup.md new file mode 100644 index 0000000000..ef8720af8e --- /dev/null +++ b/docs/user/ppl/cmd/graphlookup.md @@ -0,0 +1,309 @@ + +# graphLookup + +The `graphLookup` command performs recursive graph traversal on a collection using a breadth-first search (BFS) algorithm. It searches for documents matching a start value and recursively traverses connections between documents based on specified fields. This is useful for hierarchical data like organizational charts, social networks, or routing graphs. + +## Syntax + +The `graphLookup` command has the following syntax: + +```syntax +graphLookup startField= fromField= toField= [maxDepth=] [depthField=] [direction=(uni | bi)] [supportArray=(true | false)] [batchMode=(true | false)] as +``` + +The following are examples of the `graphLookup` command syntax: + +```syntax +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name as reportingHierarchy +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name maxDepth=2 as reportingHierarchy +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name depthField=level as reportingHierarchy +source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name direction=bi as connections +source = travelers | graphLookup airports startField=nearestAirport fromField=connects toField=airport supportArray=true as reachableAirports +source = airports | graphLookup airports startField=airport fromField=connects toField=airport supportArray=true as reachableAirports +``` + +## Parameters + +The `graphLookup` command supports the following parameters. + +| Parameter | Required/Optional | Description | +| --- | --- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `` | Required | The name of the index to perform the graph traversal on. Can be the same as the source index for self-referential graphs. | +| `startField=` | Required | The field in the source documents whose value is used to start the recursive search. The value of this field is matched against `toField` in the lookup index. We support both single value and array values as starting points. | +| `fromField=` | Required | The field in the lookup index documents that contains the value to recurse on. After matching a document, the value of this field is used to find the next set of documents. It supports both single value and array values. | +| `toField=` | Required | The field in the lookup index documents to match against. Documents where `toField` equals the current traversal value are included in the results. | +| `maxDepth=` | Optional | The maximum recursion depth of hops. Default is `0`. A value of `0` means only the direct connections to the statr values are returned. A value of `1` means 1 hop connections (initial match plus one recursive step), and so on. | +| `depthField=` | Optional | The name of the field to add to each traversed document indicating its recursion depth. If not specified, no depth field is added. Depth starts at `0` for the first level of matches. | +| `direction=(uni \| bi)` | Optional | The traversal direction. `uni` (default) performs unidirectional traversal following edges in the forward direction only. `bi` performs bidirectional traversal, following edges in both directions. | +| `supportArray=(true \| false)` | Optional | When `true`, disables early visited-node filter pushdown to OpenSearch. Default is `false`. Set to `true` when `fromField` or `toField` contains array values to ensure correct traversal behavior. See [Array Field Handling](#array-field-handling) for details. | +| `batchMode=(true \| false)` | Optional | When `true`, collects all start values from all source rows and performs a single unified BFS traversal. Default is `false`. The output changes to two arrays: `[Array, Array]`. See [Batch Mode](#batch-mode) for details. | +| `as ` | Required | The name of the output array field that will contain all documents found during the graph traversal. | + +## How It Works + +The `graphLookup` command performs a breadth-first search (BFS) traversal: + +1. For each source document, extract the value of `startField` +2. Query the lookup index to find documents where `toField` matches the start value +3. Add matched documents to the result array +4. Extract `fromField` values from matched documents to continue traversal +5. Repeat steps 2-4 until no new documents are found or `maxDepth` is reached + +For bidirectional traversal (`direction=bi`), the algorithm also follows edges in the reverse direction by additionally matching `fromField` values. + +## Example 1: Employee Hierarchy Traversal + +Given an `employees` index with the following documents: + +| id | name | reportsTo | +|----|------|-----------| +| 1 | Dev | Eliot | +| 2 | Eliot | Ron | +| 3 | Ron | Andrew | +| 4 | Andrew | null | +| 5 | Asya | Ron | +| 6 | Dan | Andrew | + +The following query finds the reporting chain for each employee: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + as reportingHierarchy +``` + +The query returns the following results: + +```text ++--------+----------+----+---------------------+ +| name | reportsTo| id | reportingHierarchy | ++--------+----------+----+---------------------+ +| Dev | Eliot | 1 | [{Eliot, Ron, 2}] | +| Eliot | Ron | 2 | [{Ron, Andrew, 3}] | +| Ron | Andrew | 3 | [{Andrew, null, 4}] | +| Andrew | null | 4 | [] | +| Asya | Ron | 5 | [{Ron, Andrew, 3}] | +| Dan | Andrew | 6 | [{Andrew, null, 4}] | ++--------+----------+----+---------------------+ +``` + +For Dev, the traversal starts with `reportsTo="Eliot"`, finds the Eliot record, and returns it in the `reportingHierarchy` array. + +## Example 2: Employee Hierarchy with Depth Tracking + +The following query adds a depth field to track how many levels each manager is from the employee: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + depthField=level + as reportingHierarchy +``` + +The query returns the following results: + +```text ++--------+----------+----+------------------------+ +| name | reportsTo| id | reportingHierarchy | ++--------+----------+----+------------------------+ +| Dev | Eliot | 1 | [{Eliot, Ron, 2, 0}] | +| Eliot | Ron | 2 | [{Ron, Andrew, 3, 0}] | +| Ron | Andrew | 3 | [{Andrew, null, 4, 0}] | +| Andrew | null | 4 | [] | +| Asya | Ron | 5 | [{Ron, Andrew, 3, 0}] | +| Dan | Andrew | 6 | [{Andrew, null, 4, 0}] | ++--------+----------+----+------------------------+ +``` + +The depth field `level` is appended to each document in the result array. A value of `0` indicates the first level of matches. + +## Example 3: Limited Depth Traversal + +The following query limits traversal to 2 levels using `maxDepth=1`: + +```ppl ignore +source = employees + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + maxDepth=1 + as reportingHierarchy +``` + +The query returns the following results: + +```text ++--------+----------+----+--------------------------------------+ +| name | reportsTo| id | reportingHierarchy | ++--------+----------+----+--------------------------------------+ +| Dev | Eliot | 1 | [{Eliot, Ron, 2}, {Ron, Andrew, 3}] | +| Eliot | Ron | 2 | [{Ron, Andrew, 3}, {Andrew, null, 4}]| +| Ron | Andrew | 3 | [{Andrew, null, 4}] | +| Andrew | null | 4 | [] | +| Asya | Ron | 5 | [{Ron, Andrew, 3}, {Andrew, null, 4}]| +| Dan | Andrew | 6 | [{Andrew, null, 4}] | ++--------+----------+----+--------------------------------------+ +``` + +With `maxDepth=1`, the traversal goes two levels deep (depth 0 and depth 1). + +## Example 4: Airport Connections Graph + +Given an `airports` index with the following documents: + +| airport | connects | +|---------|----------| +| JFK | [BOS, ORD] | +| BOS | [JFK, PWM] | +| ORD | [JFK] | +| PWM | [BOS, LHR] | +| LHR | [PWM] | + +The following query finds reachable airports from each airport: + +```ppl ignore +source = airports + | graphLookup airports + startField=airport + fromField=connects + toField=airport + as reachableAirports +``` + +The query returns the following results: + +```text ++---------+------------+---------------------+ +| airport | connects | reachableAirports | ++---------+------------+---------------------+ +| JFK | [BOS, ORD] | [{JFK, [BOS, ORD]}] | +| BOS | [JFK, PWM] | [{BOS, [JFK, PWM]}] | +| ORD | [JFK] | [{ORD, [JFK]}] | +| PWM | [BOS, LHR] | [{PWM, [BOS, LHR]}] | +| LHR | [PWM] | [{LHR, [PWM]}] | ++---------+------------+---------------------+ +``` + +## Example 5: Cross-Index Graph Lookup + +The `graphLookup` command can use different source and lookup indexes. Given a `travelers` index: + +| name | nearestAirport | +|------|----------------| +| Dev | JFK | +| Eliot | JFK | +| Jeff | BOS | + +The following query finds reachable airports for each traveler: + +```ppl ignore +source = travelers + | graphLookup airports + startField=nearestAirport + fromField=connects + toField=airport + as reachableAirports +``` + +The query returns the following results: + +```text ++-------+----------------+---------------------+ +| name | nearestAirport | reachableAirports | ++-------+----------------+---------------------+ +| Dev | JFK | [{JFK, [BOS, ORD]}] | +| Eliot | JFK | [{JFK, [BOS, ORD]}] | +| Jeff | BOS | [{BOS, [JFK, PWM]}] | ++-------+----------------+---------------------+ +``` + +## Example 6: Bidirectional Traversal + +The following query performs bidirectional traversal to find both managers and colleagues who share the same manager: + +```ppl ignore +source = employees + | where name = 'Ron' + | graphLookup employees + startField=reportsTo + fromField=reportsTo + toField=name + direction=bi + as connections +``` + +The query returns the following results: + +```text ++------+----------+----+------------------------------------------------+ +| name | reportsTo| id | connections | ++------+----------+----+------------------------------------------------+ +| Ron | Andrew | 3 | [{Ron, Andrew, 3}, {Andrew, null, 4}, {Dan, Andrew, 6}] | ++------+----------+----+------------------------------------------------+ +``` + +With bidirectional traversal, Ron's connections include: +- His own record (Ron reports to Andrew) +- His manager (Andrew) +- His peer (Dan, who also reports to Andrew) + +## Batch Mode + +When `batchMode=true`, the `graphLookup` command collects all start values from all source rows and performs a single unified BFS traversal instead of separate traversals per row. + +### Output Format Change + +In batch mode, the output is a **single row** with two arrays: +- First array: All source rows collected +- Second array: All lookup results from the unified BFS traversal + +### When to Use Batch Mode + +Use `batchMode=true` when: +- You want to find all nodes reachable from **any** of the source start values +- You need a global view of the graph connectivity from multiple starting points +- You want to avoid duplicate traversals when multiple source rows share overlapping paths + +### Example + +```ppl ignore +source = travelers + | graphLookup airports + startField=nearestAirport + fromField=connects + toField=airport + batchMode=true + maxDepth=2 + as reachableAirports +``` + +**Normal mode** (default): Each traveler gets their own list of reachable airports +```text +| name | nearestAirport | reachableAirports | +|-------|----------------|-------------------| +| Dev | JFK | [JFK, BOS, ORD] | +| Jeff | BOS | [BOS, JFK, PWM] | +``` + +**Batch mode**: A single row with all travelers and all reachable airports combined +```text +| travelers | reachableAirports | +|----------------------------------------|-----------------------------| +| [{Dev, JFK}, {Jeff, BOS}] | [JFK, BOS, ORD, PWM, ...] | +``` + +## Array Field Handling + +When the `fromField` or `toField` contains array values, you should set `supportArray=true` to ensure correct traversal behavior. + +## Limitations + +- The source input, which provides the starting point for the traversal, has a limitation of 100 documents to avoid performance issues. +- To avoid PIT (Point in Time) search, each level of traversal search returns documents up to the "max result windows" of the lookup index. diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java new file mode 100644 index 0000000000..498b17dab9 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java @@ -0,0 +1,603 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_AIRPORTS; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_EMPLOYEES; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_TRAVELERS; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +/** + * Integration tests for graphLookup command. Test cases are inspired by MongoDB's $graphLookup + * examples. + * + *

Test data: + * + *

+ * + * @see MongoDB + * $graphLookup + */ +public class CalcitePPLGraphLookupIT extends PPLIntegTestCase { + + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + + loadIndex(Index.GRAPH_EMPLOYEES); + loadIndex(Index.GRAPH_TRAVELERS); + loadIndex(Index.GRAPH_AIRPORTS); + } + + // ==================== Employee Hierarchy Tests ==================== + + /** Test 1: Basic employee hierarchy traversal. Find all managers in the reporting chain. */ + @Test + public void testEmployeeHierarchyBasicTraversal() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2}")), + rows("Eliot", "Ron", 2, List.of("{Ron, Andrew, 3}")), + rows("Ron", "Andrew", 3, List.of("{Andrew, null, 4}")), + rows("Andrew", null, 4, Collections.emptyList()), + rows("Asya", "Ron", 5, List.of("{Ron, Andrew, 3}")), + rows("Dan", "Andrew", 6, List.of("{Andrew, null, 4}"))); + } + + /** Test 2: Employee hierarchy traversal with depth field. */ + @Test + public void testEmployeeHierarchyWithDepthField() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " depthField=level" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2, 0}")), + rows("Eliot", "Ron", 2, List.of("{Ron, Andrew, 3, 0}")), + rows("Ron", "Andrew", 3, List.of("{Andrew, null, 4, 0}")), + rows("Andrew", null, 4, Collections.emptyList()), + rows("Asya", "Ron", 5, List.of("{Ron, Andrew, 3, 0}")), + rows("Dan", "Andrew", 6, List.of("{Andrew, null, 4, 0}"))); + } + + /** Test 3: Employee hierarchy with maxDepth=1 (allows 2 levels of traversal). */ + @Test + public void testEmployeeHierarchyWithMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " maxDepth=1" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2}", "{Ron, Andrew, 3}")), + rows("Eliot", "Ron", 2, List.of("{Ron, Andrew, 3}", "{Andrew, null, 4}")), + rows("Ron", "Andrew", 3, List.of("{Andrew, null, 4}")), + rows("Andrew", null, 4, Collections.emptyList()), + rows("Asya", "Ron", 5, List.of("{Ron, Andrew, 3}", "{Andrew, null, 4}")), + rows("Dan", "Andrew", 6, List.of("{Andrew, null, 4}"))); + } + + /** Test 4: Query Dev's complete reporting chain: Dev->Eliot->Ron->Andrew */ + @Test + public void testEmployeeHierarchyForSpecificEmployee() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Dev'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows(result, rows("Dev", "Eliot", 1, List.of("{Eliot, Ron, 2}"))); + } + + // ==================== Airport Connections Tests ==================== + + /** Test 5: Find all reachable airports from each airport. */ + @Test + public void testAirportConnections() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=airport" + + " fromField=connects" + + " toField=airport" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows("JFK", List.of("BOS", "ORD"), List.of("{JFK, [BOS, ORD]}")), + rows("BOS", List.of("JFK", "PWM"), List.of("{BOS, [JFK, PWM]}")), + rows("ORD", List.of("JFK"), List.of("{ORD, [JFK]}")), + rows("PWM", List.of("BOS", "LHR"), List.of("{PWM, [BOS, LHR]}")), + rows("LHR", List.of("PWM"), List.of("{LHR, [PWM]}"))); + } + + /** Test 6: Find airports reachable from JFK within maxDepth=1. */ + @Test + public void testAirportConnectionsWithMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where airport = 'JFK'" + + " | graphLookup %s" + + " startField=airport" + + " fromField=connects" + + " toField=airport" + + " maxDepth=1" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows("JFK", List.of("BOS", "ORD"), List.of("{JFK, [BOS, ORD]}", "{BOS, [JFK, PWM]}"))); + } + + /** Test 7: Find airports with default depth(=0) and start value of list */ + @Test + public void testAirportConnectionsWithDepthField() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where airport = 'JFK'" + + " | graphLookup %s" + + " startField=connects" + + " fromField=connects" + + " toField=airport" + + " depthField=numConnections" + + " as reachableAirports", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("reachableAirports", "array")); + verifyDataRows(result, rows("JFK", List.of("BOS", "ORD"), List.of("{BOS, [JFK, PWM], 0}"))); + } + + /** + * Test 8: Find reachable airports for all travelers. Uses travelers as source and airports as + * lookup table, with nearestAirport as the starting point for graph traversal. + */ + @Test + public void testTravelersReachableAirports() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("name", "string"), + schema("nearestAirport", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows("Dev", "JFK", List.of("{JFK, [BOS, ORD]}")), + rows("Eliot", "JFK", List.of("{JFK, [BOS, ORD]}")), + rows("Jeff", "BOS", List.of("{BOS, [JFK, PWM]}"))); + } + + /** + * Test 9: Find reachable airports for a specific traveler (Dev at JFK) with depth tracking. + * Traverses from JFK through connected airports. + */ + @Test + public void testTravelerReachableAirportsWithDepthField() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Dev'" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " depthField=hops" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("name", "string"), + schema("nearestAirport", "string"), + schema("reachableAirports", "array")); + verifyDataRows(result, rows("Dev", "JFK", List.of("{JFK, [BOS, ORD], 0}"))); + } + + /** + * Test 10: Find reachable airports for Jeff (at BOS) with maxDepth=1. Finds BOS record as the + * starting point and traverses one level to connected airports. + */ + @Test + public void testTravelerReachableAirportsWithMaxDepth() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Jeff'" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " maxDepth=1" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("name", "string"), + schema("nearestAirport", "string"), + schema("reachableAirports", "array")); + verifyDataRows( + result, + rows( + "Jeff", "BOS", List.of("{BOS, [JFK, PWM]}", "{JFK, [BOS, ORD]}", "{PWM, [BOS, LHR]}"))); + } + + // ==================== Bidirectional Traversal Tests ==================== + + /** Test 11: Bidirectional traversal for Ron (finds both managers and reports). */ + @Test + public void testBidirectionalEmployeeHierarchy() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Ron'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " direction=bi" + + " as connections", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("connections", "array")); + verifyDataRows( + result, + rows( + "Ron", + "Andrew", + 3, + List.of("{Ron, Andrew, 3}", "{Andrew, null, 4}", "{Dan, Andrew, 6}"))); + } + + /** + * Test 12: Bidirectional airport connections for ORD. Note: Currently returns empty + * allConnections array because the connects field is an array type. + */ + @Test + public void testBidirectionalAirportConnections() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where airport = 'ORD'" + + " | graphLookup %s" + + " startField=connects" + + " fromField=connects" + + " toField=airport" + + " direction=bi" + + " as allConnections", + TEST_INDEX_GRAPH_AIRPORTS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema( + result, + schema("airport", "string"), + schema("connects", "string"), + schema("allConnections", "array")); + verifyDataRows( + result, rows("ORD", List.of("JFK"), List.of("{JFK, [BOS, ORD]}", "{BOS, [JFK, PWM]}"))); + } + + // ==================== Edge Cases ==================== + + /** Test 13: Graph lookup on empty result set (non-existent employee). */ + @Test + public void testEmptySourceResult() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'NonExistent'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows(result); + } + + /** Test 14: CEO (Andrew) with no manager - hierarchy should be empty. */ + @Test + public void testEmployeeWithNoManager() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name = 'Andrew'" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema( + result, + schema("name", "string"), + schema("reportsTo", "string"), + schema("id", "int"), + schema("reportingHierarchy", "array")); + verifyDataRows(result, rows("Andrew", null, 4, Collections.emptyList())); + } + + /** Test 15: Combined with stats command. */ + @Test + public void testGraphLookupWithStats() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy" + + " | stats count() by name", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("count()", "bigint"), schema("name", "string")); + verifyDataRows( + result, + rows(1L, "Ron"), + rows(1L, "Dan"), + rows(1L, "Dev"), + rows(1L, "Andrew"), + rows(1L, "Asya"), + rows(1L, "Eliot")); + } + + /** Test 16: Graph lookup with fields projection (name and reportingHierarchy only). */ + @Test + public void testGraphLookupWithFieldsProjection() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " as reportingHierarchy" + + " | fields name, reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("name", "string"), schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows("Dev", List.of("{Eliot, Ron, 2}")), + rows("Eliot", List.of("{Ron, Andrew, 3}")), + rows("Ron", List.of("{Andrew, null, 4}")), + rows("Andrew", Collections.emptyList()), + rows("Asya", List.of("{Ron, Andrew, 3}")), + rows("Dan", List.of("{Andrew, null, 4}"))); + } + + // ==================== Batch Mode Tests ==================== + + /** + * Test 17: Batch mode - collects all start values and performs unified BFS. Output is a single + * row with [Array, Array]. + * + *

Source: Dev (reportsTo=Eliot), Asya (reportsTo=Ron) Start values: {Eliot, Ron} BFS finds: + * Eliot->Ron, Ron->Andrew, Andrew->null + */ + @Test + public void testBatchModeEmployeeHierarchy() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name in ('Dev', 'Asya')" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " depthField=depth" + + " maxDepth=3" + + " batchMode=true" + + " as reportingHierarchy", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("reportsTo", "array"), schema("reportingHierarchy", "array")); + verifyDataRows( + result, + rows( + List.of("{Dev, Eliot, 1}", "{Asya, Ron, 5}"), + List.of("{Ron, Andrew, 3, 0}", "{Andrew, null, 4, 1}"))); + } + + /** + * Test 18: Batch mode for travelers - find all airports reachable from any traveler. All + * travelers' nearest airports: JFK (Dev, Eliot), BOS (Jeff) Unified BFS from {JFK, BOS} with + * maxDepth=1 finds connected airports. + */ + @Test + public void testBatchModeTravelersAirports() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | graphLookup %s" + + " startField=nearestAirport" + + " fromField=connects" + + " toField=airport" + + " batchMode=true" + + " depthField=depth" + + " maxDepth=3" + + " supportArray=true" + + " as reachableAirports", + TEST_INDEX_GRAPH_TRAVELERS, TEST_INDEX_GRAPH_AIRPORTS)); + + verifySchema(result, schema("nearestAirport", "array"), schema("reachableAirports", "array")); + // Batch mode returns single row with: + // - sourceRows: [{Dev, JFK}, {Eliot, JFK}, {Jeff, BOS}] + // - lookupResults: airports reachable from JFK and BOS within maxDepth=1 + verifyDataRows( + result, + rows( + List.of("{Dev, JFK}", "{Eliot, JFK}", "{Jeff, BOS}"), + List.of("{JFK, [BOS, ORD], 0}", "{BOS, [JFK, PWM], 0}", "{PWM, [BOS, LHR], 1}"))); + } + + /** + * Test 19: Batch mode with bidirectional traversal. Dev (reportsTo=Eliot), Dan (reportsTo=Andrew) + * Bidirectional BFS from {Eliot, Andrew} finds connections in both directions. + */ + @Test + public void testBatchModeBidirectional() throws IOException { + JSONObject result = + executeQuery( + String.format( + "source=%s" + + " | where name in ('Dev', 'Dan')" + + " | graphLookup %s" + + " startField=reportsTo" + + " fromField=reportsTo" + + " toField=name" + + " depthField=depth" + + " maxDepth=3" + + " direction=bi" + + " batchMode=true" + + " as connections", + TEST_INDEX_GRAPH_EMPLOYEES, TEST_INDEX_GRAPH_EMPLOYEES)); + + verifySchema(result, schema("reportsTo", "array"), schema("connections", "array")); + // Batch mode returns single row with bidirectional traversal results + // Start from {Eliot, Andrew}, find connections in both directions + verifyDataRows( + result, + rows( + List.of("{Dev, Eliot, 1}", "{Dan, Andrew, 6}"), + List.of( + "{Dev, Eliot, 1, 0}", + "{Eliot, Ron, 2, 0}", + "{Andrew, null, 4, 0}", + "{Dan, Andrew, 6, 0}", + "{Asya, Ron, 5, 1}"))); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 4e143951bf..de4eca4fa1 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -851,6 +851,22 @@ public enum Index { "duplication_nullable", getDuplicationNullableIndexMapping(), "src/test/resources/duplication_nullable.json"), + // Graph lookup test indices (inspired by MongoDB $graphLookup examples) + GRAPH_EMPLOYEES( + TestsConstants.TEST_INDEX_GRAPH_EMPLOYEES, + "graph_employees", + getGraphEmployeesIndexMapping(), + "src/test/resources/graph_employees.json"), + GRAPH_TRAVELERS( + TestsConstants.TEST_INDEX_GRAPH_TRAVELERS, + "graph_travelers", + getGraphTravelersIndexMapping(), + "src/test/resources/graph_travelers.json"), + GRAPH_AIRPORTS( + TestsConstants.TEST_INDEX_GRAPH_AIRPORTS, + "graph_airports", + getGraphAirportsIndexMapping(), + "src/test/resources/graph_airports.json"), TPCH_ORDERS( "orders", "tpch", diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java index 2ac1763836..4f82f03bfa 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestUtils.java @@ -320,6 +320,21 @@ public static String getWorkInformationIndexMapping() { return getMappingFile(mappingFile); } + public static String getGraphEmployeesIndexMapping() { + String mappingFile = "graph_employees_index_mapping.json"; + return getMappingFile(mappingFile); + } + + public static String getGraphTravelersIndexMapping() { + String mappingFile = "graph_travelers_index_mapping.json"; + return getMappingFile(mappingFile); + } + + public static String getGraphAirportsIndexMapping() { + String mappingFile = "graph_airports_index_mapping.json"; + return getMappingFile(mappingFile); + } + public static String getDuplicationNullableIndexMapping() { String mappingFile = "duplication_nullable_index_mapping.json"; return getMappingFile(mappingFile); diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java index ad8a232bab..b5e49bbe02 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/TestsConstants.java @@ -84,6 +84,9 @@ public class TestsConstants { public static final String TEST_INDEX_WORKER = TEST_INDEX + "_worker"; public static final String TEST_INDEX_WORK_INFORMATION = TEST_INDEX + "_work_information"; public static final String TEST_INDEX_DUPLICATION_NULLABLE = TEST_INDEX + "_duplication_nullable"; + public static final String TEST_INDEX_GRAPH_EMPLOYEES = TEST_INDEX + "_graph_employees"; + public static final String TEST_INDEX_GRAPH_TRAVELERS = TEST_INDEX + "_graph_travelers"; + public static final String TEST_INDEX_GRAPH_AIRPORTS = TEST_INDEX + "_graph_airports"; public static final String TEST_INDEX_MERGE_TEST_1 = TEST_INDEX + "_merge_test_1"; public static final String TEST_INDEX_MERGE_TEST_2 = TEST_INDEX + "_merge_test_2"; public static final String TEST_INDEX_MERGE_TEST_WILDCARD = TEST_INDEX + "_merge_test_*"; diff --git a/integ-test/src/test/resources/graph_airports.json b/integ-test/src/test/resources/graph_airports.json new file mode 100644 index 0000000000..c644a24dc0 --- /dev/null +++ b/integ-test/src/test/resources/graph_airports.json @@ -0,0 +1,10 @@ +{"index":{"_id":"1"}} +{"airport":"JFK","connects":["BOS","ORD"]} +{"index":{"_id":"2"}} +{"airport":"BOS","connects":["JFK","PWM"]} +{"index":{"_id":"3"}} +{"airport":"ORD","connects":["JFK"]} +{"index":{"_id":"4"}} +{"airport":"PWM","connects":["BOS","LHR"]} +{"index":{"_id":"5"}} +{"airport":"LHR","connects":["PWM"]} diff --git a/integ-test/src/test/resources/graph_employees.json b/integ-test/src/test/resources/graph_employees.json new file mode 100644 index 0000000000..a9a2630fc0 --- /dev/null +++ b/integ-test/src/test/resources/graph_employees.json @@ -0,0 +1,12 @@ +{"index":{"_id":"1"}} +{"id":1,"name":"Dev","reportsTo":"Eliot"} +{"index":{"_id":"2"}} +{"id":2,"name":"Eliot","reportsTo":"Ron"} +{"index":{"_id":"3"}} +{"id":3,"name":"Ron","reportsTo":"Andrew"} +{"index":{"_id":"4"}} +{"id":4,"name":"Andrew","reportsTo":null} +{"index":{"_id":"5"}} +{"id":5,"name":"Asya","reportsTo":"Ron"} +{"index":{"_id":"6"}} +{"id":6,"name":"Dan","reportsTo":"Andrew"} diff --git a/integ-test/src/test/resources/graph_travelers.json b/integ-test/src/test/resources/graph_travelers.json new file mode 100644 index 0000000000..eb11d2206c --- /dev/null +++ b/integ-test/src/test/resources/graph_travelers.json @@ -0,0 +1,6 @@ +{"index":{"_id":"1"}} +{"name":"Dev","nearestAirport":"JFK"} +{"index":{"_id":"2"}} +{"name":"Eliot","nearestAirport":"JFK"} +{"index":{"_id":"3"}} +{"name":"Jeff","nearestAirport":"BOS"} diff --git a/integ-test/src/test/resources/indexDefinitions/graph_airports_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/graph_airports_index_mapping.json new file mode 100644 index 0000000000..e93812c8a1 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/graph_airports_index_mapping.json @@ -0,0 +1,12 @@ +{ + "mappings": { + "properties": { + "airport": { + "type": "keyword" + }, + "connects": { + "type": "keyword" + } + } + } +} diff --git a/integ-test/src/test/resources/indexDefinitions/graph_employees_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/graph_employees_index_mapping.json new file mode 100644 index 0000000000..8c6674396e --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/graph_employees_index_mapping.json @@ -0,0 +1,15 @@ +{ + "mappings": { + "properties": { + "id": { + "type": "integer" + }, + "name": { + "type": "keyword" + }, + "reportsTo": { + "type": "keyword" + } + } + } +} diff --git a/integ-test/src/test/resources/indexDefinitions/graph_travelers_index_mapping.json b/integ-test/src/test/resources/indexDefinitions/graph_travelers_index_mapping.json new file mode 100644 index 0000000000..f4697dead1 --- /dev/null +++ b/integ-test/src/test/resources/indexDefinitions/graph_travelers_index_mapping.json @@ -0,0 +1,12 @@ +{ + "mappings": { + "properties": { + "name": { + "type": "keyword" + }, + "nearestAirport": { + "type": "keyword" + } + } + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index a7eb3ad57b..58d797f4bf 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import org.apache.calcite.avatica.util.StructImpl; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelRoot; @@ -245,6 +246,9 @@ private static Object processValue(Object value) { } return convertedMap; } + if (value instanceof StructImpl) { + return ((StructImpl) value).toString(); + } if (value instanceof List) { List list = (List) value; List convertedList = new ArrayList<>(); @@ -330,6 +334,9 @@ private void registerOpenSearchFunctions() { BuiltinFunctionName.DISTINCT_COUNT_APPROX, approxDistinctCountFunction); OperatorTable.addOperator( BuiltinFunctionName.DISTINCT_COUNT_APPROX.name(), approxDistinctCountFunction); + + // Note: GraphLookup is now implemented as a custom RelNode (LogicalGraphLookup) + // instead of a UDF, so no registration is needed here. } /** diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java new file mode 100644 index 0000000000..ed107f641f --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/EnumerableGraphLookupRule.java @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.planner.rules; + +import org.apache.calcite.adapter.enumerable.EnumerableConvention; +import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.convert.ConverterRule; +import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup; +import org.opensearch.sql.opensearch.storage.OpenSearchIndex; +import org.opensearch.sql.opensearch.storage.scan.AbstractCalciteIndexScan; +import org.opensearch.sql.opensearch.storage.scan.CalciteEnumerableGraphLookup; + +/** Rule to convert a {@link LogicalGraphLookup} to a {@link CalciteEnumerableGraphLookup}. */ +public class EnumerableGraphLookupRule extends ConverterRule { + + /** Default configuration. */ + public static final Config DEFAULT_CONFIG = + Config.INSTANCE + .as(Config.class) + .withConversion( + LogicalGraphLookup.class, + Convention.NONE, + EnumerableConvention.INSTANCE, + "EnumerableGraphLookupRule") + .withRuleFactory(EnumerableGraphLookupRule::new); + + /** Creates an EnumerableGraphLookupRule. */ + protected EnumerableGraphLookupRule(Config config) { + super(config); + } + + @Override + public boolean matches(RelOptRuleCall call) { + LogicalGraphLookup graphLookup = call.rel(0); + // Only match if we can extract the OpenSearchIndex from the lookup table + return extractOpenSearchIndex(graphLookup.getLookup()) != null; + } + + /** + * Recursively extracts OpenSearchIndex from a RelNode by traversing down to find the index scan. + * + * @param node The RelNode to extract from + * @return The OpenSearchIndex, or null if not found + */ + private static OpenSearchIndex extractOpenSearchIndex(RelNode node) { + if (node instanceof AbstractCalciteIndexScan scan) { + return scan.getOsIndex(); + } + if (node instanceof RelSubset subset) { + return extractOpenSearchIndex(subset.getOriginal()); + } + // Recursively check inputs + for (RelNode input : node.getInputs()) { + OpenSearchIndex index = extractOpenSearchIndex(input); + if (index != null) { + return index; + } + } + return null; + } + + @Override + public RelNode convert(RelNode rel) { + final LogicalGraphLookup graphLookup = (LogicalGraphLookup) rel; + + // Extract the OpenSearchIndex from the lookup table + OpenSearchIndex lookupIndex = extractOpenSearchIndex(graphLookup.getLookup()); + if (lookupIndex == null) { + throw new IllegalStateException("Cannot extract OpenSearchIndex from lookup table"); + } + + // Convert inputs to enumerable convention + RelTraitSet traitSet = graphLookup.getTraitSet().replace(EnumerableConvention.INSTANCE); + + RelNode convertedSource = + convert( + graphLookup.getSource(), + graphLookup.getSource().getTraitSet().replace(EnumerableConvention.INSTANCE)); + RelNode convertedLookup = + convert( + graphLookup.getLookup(), + graphLookup.getLookup().getTraitSet().replace(EnumerableConvention.INSTANCE)); + return new CalciteEnumerableGraphLookup( + graphLookup.getCluster(), + traitSet, + convertedSource, + convertedLookup, + graphLookup.getStartField(), + graphLookup.getFromField(), + graphLookup.getToField(), + graphLookup.getOutputField(), + graphLookup.getDepthField(), + graphLookup.getMaxDepth(), + graphLookup.isBidirectional(), + graphLookup.isSupportArray(), + graphLookup.isBatchMode()); + } +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java index db65bb51a8..0068f445ce 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/OpenSearchIndexRules.java @@ -16,6 +16,8 @@ public class OpenSearchIndexRules { EnumerableSystemIndexScanRule.DEFAULT_CONFIG.toRule(); private static final RelOptRule NESTED_AGGREGATE_RULE = EnumerableNestedAggregateRule.DEFAULT_CONFIG.toRule(); + private static final RelOptRule GRAPH_LOOKUP_RULE = + EnumerableGraphLookupRule.DEFAULT_CONFIG.toRule(); // Rule that always pushes down relevance functions regardless of pushdown settings private static final RelevanceFunctionPushdownRule RELEVANCE_FUNCTION_RULE = RelevanceFunctionPushdownRule.Config.DEFAULT.toRule(); @@ -23,7 +25,11 @@ public class OpenSearchIndexRules { /** The rules will apply whatever the pushdown setting is. */ public static final List OPEN_SEARCH_NON_PUSHDOWN_RULES = ImmutableList.of( - INDEX_SCAN_RULE, SYSTEM_INDEX_SCAN_RULE, NESTED_AGGREGATE_RULE, RELEVANCE_FUNCTION_RULE); + INDEX_SCAN_RULE, + SYSTEM_INDEX_SCAN_RULE, + NESTED_AGGREGATE_RULE, + GRAPH_LOOKUP_RULE, + RELEVANCE_FUNCTION_RULE); private static final ProjectIndexScanRule PROJECT_INDEX_SCAN = ProjectIndexScanRule.Config.DEFAULT.toRule(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java index 355262b2d6..50f782cd15 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/PredicateAnalyzer.java @@ -1057,7 +1057,7 @@ QueryExpression isTrue() { throw new PredicateAnalyzerException("isTrue cannot be applied to " + this.getClass()); } - QueryExpression in(LiteralExpression literal) { + public QueryExpression in(LiteralExpression literal) { throw new PredicateAnalyzerException("in cannot be applied to " + this.getClass()); } @@ -1065,7 +1065,7 @@ QueryExpression notIn(LiteralExpression literal) { throw new PredicateAnalyzerException("notIn cannot be applied to " + this.getClass()); } - static QueryExpression create(TerminalExpression expression) { + public static QueryExpression create(TerminalExpression expression) { if (expression instanceof CastExpression) { expression = CastExpression.unpack(expression); } @@ -1673,11 +1673,11 @@ public String getReferenceForTermQuery() { } /** Literal like {@code 'foo' or 42 or true} etc. */ - static final class LiteralExpression implements TerminalExpression { + public static final class LiteralExpression implements TerminalExpression { final RexLiteral literal; - LiteralExpression(RexLiteral literal) { + public LiteralExpression(RexLiteral literal) { this.literal = literal; } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index d7539312cd..3350c00fb0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -80,7 +80,7 @@ public class OpenSearchIndex extends AbstractOpenSearchTable { @Getter private final Settings settings; /** {@link OpenSearchRequest.IndexName}. */ - private final OpenSearchRequest.IndexName indexName; + @Getter private final OpenSearchRequest.IndexName indexName; /** The cached mapping of field and type in index. */ private Map cachedFieldOpenSearchTypes = null; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java new file mode 100644 index 0000000000..03a327e27f --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteEnumerableGraphLookup.java @@ -0,0 +1,511 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage.scan; + +import static org.opensearch.index.query.QueryBuilders.boolQuery; +import static org.opensearch.index.query.QueryBuilders.termsQuery; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import lombok.Getter; +import org.apache.calcite.adapter.enumerable.EnumerableRel; +import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor; +import org.apache.calcite.adapter.enumerable.PhysType; +import org.apache.calcite.adapter.enumerable.PhysTypeImpl; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.linq4j.tree.Blocks; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.sql.calcite.plan.Scannable; +import org.opensearch.sql.calcite.plan.rel.GraphLookup; +import org.opensearch.sql.opensearch.request.PredicateAnalyzer.NamedFieldExpression; +import org.opensearch.sql.opensearch.storage.scan.context.LimitDigest; +import org.opensearch.sql.opensearch.storage.scan.context.OSRequestBuilderAction; +import org.opensearch.sql.opensearch.storage.scan.context.PushDownType; +import org.opensearch.sql.opensearch.util.OpenSearchRelOptUtil; + +/** + * Enumerable implementation for graphLookup command. + * + *

Performs BFS graph traversal by dynamically querying OpenSearch with filter pushdown instead + * of loading all lookup data into memory. For each source row, it executes BFS queries to find all + * connected nodes in the graph. + */ +@Getter +public class CalciteEnumerableGraphLookup extends GraphLookup implements EnumerableRel, Scannable { + private static final Logger LOG = LogManager.getLogger(); + + /** + * Creates a CalciteEnumerableGraphLookup. + * + * @param cluster Cluster + * @param traitSet Trait set (must include EnumerableConvention) + * @param source Source table RelNode + * @param lookup Lookup table RelNode // * @param lookupIndex OpenSearchIndex for the lookup table + * (extracted from lookup RelNode) + * @param startField Field name for start entities + * @param fromField Field name for outgoing edges + * @param toField Field name for incoming edges + * @param outputField Name of the output array field + * @param depthField Name of the depth field + * @param maxDepth Maximum traversal depth (-1 for unlimited) + * @param bidirectional Whether to traverse edges in both directions + * @param supportArray Whether to support array-typed fields + * @param batchMode Whether to batch all source start values into a single unified BFS + */ + public CalciteEnumerableGraphLookup( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode source, + RelNode lookup, + String startField, + String fromField, + String toField, + String outputField, + String depthField, + int maxDepth, + boolean bidirectional, + boolean supportArray, + boolean batchMode) { + super( + cluster, + traitSet, + source, + lookup, + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List inputs) { + return new CalciteEnumerableGraphLookup( + getCluster(), + traitSet, + inputs.get(0), + inputs.get(1), + startField, + fromField, + toField, + outputField, + depthField, + maxDepth, + bidirectional, + supportArray, + batchMode); + } + + @Override + public @Nullable RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + // TODO: make it more accurate + return super.computeSelfCost(planner, mq); + } + + // TODO: support non-scannable inputs + @Override + public Result implement(EnumerableRelImplementor implementor, Prefer pref) { + PhysType physType = + PhysTypeImpl.of( + implementor.getTypeFactory(), + OpenSearchRelOptUtil.replaceDot(getCluster().getTypeFactory(), getRowType()), + pref.preferArray()); + + var scanOperator = implementor.stash(this, CalciteEnumerableGraphLookup.class); + return implementor.result(physType, Blocks.toBlock(Expressions.call(scanOperator, "scan"))); + } + + @Override + public Enumerable<@Nullable Object> scan() { + return new GraphLookupEnumerable(this); + } + + /** Enumerable implementation that performs BFS traversal for each source row. */ + private static class GraphLookupEnumerable extends AbstractEnumerable<@Nullable Object> { + + private final CalciteEnumerableGraphLookup graphLookup; + + GraphLookupEnumerable(CalciteEnumerableGraphLookup graphLookup) { + this.graphLookup = graphLookup; + } + + @Override + public Enumerator<@Nullable Object> enumerator() { + return new GraphLookupEnumerator(graphLookup); + } + } + + /** Enumerator that performs BFS for each source row. */ + private static class GraphLookupEnumerator implements Enumerator<@Nullable Object> { + + private final CalciteEnumerableGraphLookup graphLookup; + private final CalciteEnumerableIndexScan lookupScan; + private final Enumerator<@Nullable Object> sourceEnumerator; + private final List lookupFields; + private final int startFieldIndex; + private final int fromFieldIdx; + private final int toFieldIdx; + + private Object[] current = null; + private boolean batchModeCompleted = false; + + @SuppressWarnings("unchecked") + GraphLookupEnumerator(CalciteEnumerableGraphLookup graphLookup) { + this.graphLookup = graphLookup; + this.lookupScan = (CalciteEnumerableIndexScan) graphLookup.getLookup(); + // For performance consideration, limit the size of the lookup table MaxResultWindow to avoid + // PIT search + final int maxResultWindow = this.lookupScan.getOsIndex().getMaxResultWindow(); + this.lookupScan.pushDownContext.add( + PushDownType.LIMIT, + new LimitDigest(maxResultWindow, 0), + (OSRequestBuilderAction) + requestBuilder -> requestBuilder.pushDownLimit(maxResultWindow, 0)); + + // Get the source enumerator + if (graphLookup.getSource() instanceof Scannable scannable) { + Enumerable sourceEnum = scannable.scan(); + this.sourceEnumerator = (Enumerator<@Nullable Object>) sourceEnum.enumerator(); + } else { + throw new IllegalStateException( + "Source must be Scannable, got: " + graphLookup.getSource().getClass()); + } + + List sourceFields = graphLookup.getSource().getRowType().getFieldNames(); + this.lookupFields = graphLookup.getLookup().getRowType().getFieldNames(); + this.startFieldIndex = sourceFields.indexOf(graphLookup.getStartField()); + this.fromFieldIdx = lookupFields.indexOf(graphLookup.fromField); + this.toFieldIdx = lookupFields.indexOf(graphLookup.toField); + } + + @Override + public Object current() { + // source fields + output array (normal mode) or [source array, lookup array] (batch mode) + return current; + } + + @Override + public boolean moveNext() { + if (graphLookup.batchMode) { + return moveNextBatchMode(); + } else { + return moveNextNormalMode(); + } + } + + /** + * Batch mode: collect all source start values, perform unified BFS, return single aggregated + * row. + */ + private boolean moveNextBatchMode() { + // Batch mode only returns one row + if (batchModeCompleted) { + return false; + } + batchModeCompleted = true; + + // Collect all source rows and start values + List allSourceRows = new ArrayList<>(); + Set allStartValues = new HashSet<>(); + + while (sourceEnumerator.moveNext()) { + Object sourceRow = sourceEnumerator.current(); + Object[] sourceValues; + + if (sourceRow instanceof Object[] arr) { + sourceValues = arr; + } else { + sourceValues = new Object[] {sourceRow}; + } + + // Store the source row + allSourceRows.add(sourceValues); + + // Collect start value(s) + Object startValue = + (startFieldIndex >= 0 && startFieldIndex < sourceValues.length) + ? sourceValues[startFieldIndex] + : null; + + if (startValue != null) { + if (startValue instanceof List list) { + allStartValues.addAll(list); + } else { + allStartValues.add(startValue); + } + } + } + + // Perform unified BFS with all start values + List bfsResults = performBfs(allStartValues); + + // Build output row: [Array, Array] + current = new Object[] {allSourceRows, bfsResults}; + + return true; + } + + /** Normal mode: perform BFS for each source row individually. */ + private boolean moveNextNormalMode() { + if (!sourceEnumerator.moveNext()) { + return false; + } + + // Get current source row + Object sourceRow = sourceEnumerator.current(); + Object[] sourceValues; + + if (sourceRow instanceof Object[] arr) { + sourceValues = arr; + } else { + // Single column case + sourceValues = new Object[] {sourceRow}; + } + + // Get the start value for BFS + Object startValue = + (startFieldIndex >= 0 && startFieldIndex < sourceValues.length) + ? sourceValues[startFieldIndex] + : null; + + // Perform BFS traversal + List bfsResults = performBfs(startValue); + + // Build output row: source fields + array of BFS results + current = new Object[sourceValues.length + 1]; + System.arraycopy(sourceValues, 0, current, 0, sourceValues.length); + current[sourceValues.length] = bfsResults; + + return true; + } + + /** + * Performs BFS traversal starting from the given value by dynamically querying OpenSearch. + * + * @param startValue The starting value for BFS + * @return List of rows found during traversal + */ + private List performBfs(Object startValue) { + if (startValue == null) { + return List.of(); + } + + // TODO: support spillable for these collections + List results = new ArrayList<>(); + // TODO: If we want to include loop edges, we also need to track the visited edges + Set visitedNodes = new HashSet<>(); + Queue queue = new ArrayDeque<>(); + + // Initialize BFS with start value + if (startValue instanceof Collection collection) { + collection.forEach(value -> { + if (!visitedNodes.contains(value)) { + visitedNodes.add(value); + queue.offer(value); + } + }); + } else { + visitedNodes.add(startValue); + queue.offer(startValue); + } + + int currentLevelDepth = 0; + while (!queue.isEmpty()) { + // Collect all values at current level for batch query + List currentLevelValues = new ArrayList<>(); + + while (!queue.isEmpty()) { + Object value = queue.poll(); + currentLevelValues.add(value); + } + + if (currentLevelValues.isEmpty()) { + break; + } + + // Query OpenSearch for all current level values + // Forward direction: fromField = currentLevelValues + List forwardResults = queryLookupTable(currentLevelValues, visitedNodes); + + if (forwardResults.size() >= this.lookupScan.getOsIndex().getMaxResultWindow()) { + LOG.warn("BFS result size exceeds max result window, returning partial result."); + } + for (Object row : forwardResults) { + Object[] rowArray = (Object[]) (row); + Object fromValue = rowArray[fromFieldIdx]; + // Collect next values to traverse (may be single value or list) + // For forward traversal: extract fromField values for next level + // For bidirectional: also extract toField values. + // Skip visited values while keep null value + List nextValues = new ArrayList<>(); + collectValues(fromValue, nextValues, visitedNodes); + if (graphLookup.bidirectional) { + Object toValue = rowArray[toFieldIdx]; + collectValues(toValue, nextValues, visitedNodes); + } + + // Add row to results if the nextValues is not empty + if (!nextValues.isEmpty()) { + if (graphLookup.depthField != null) { + Object[] rowWithDepth = new Object[rowArray.length + 1]; + System.arraycopy(rowArray, 0, rowWithDepth, 0, rowArray.length); + rowWithDepth[rowArray.length] = currentLevelDepth; + results.add(rowWithDepth); + } else { + results.add(rowArray); + } + + // Add unvisited non-null values to queue for next level traversal + for (Object val : nextValues) { + if (val != null) { + visitedNodes.add(val); + queue.offer(val); + } + } + } + } + + if (++currentLevelDepth > graphLookup.maxDepth) break; + } + + return results; + } + + /** + * Queries the lookup table with a terms filter. + * + * @param values Values to match + * @param visitedValues Values to not match (ignored when supportArray is true) + * @return List of matching rows + */ + private List queryLookupTable( + Collection values, Collection visitedValues) { + if (values.isEmpty()) { + return List.of(); + } + + // Forward direction query + QueryBuilder query; + if (graphLookup.supportArray) { + // When supportArray is true, don't push down visited filter + // because array fields may contain multiple values that need to be checked individually + query = getQueryBuilder(toFieldIdx, values); + } else { + query = + boolQuery() + .must(getQueryBuilder(toFieldIdx, values)) + .mustNot(getQueryBuilder(fromFieldIdx, visitedValues)); + } + + if (graphLookup.bidirectional) { + // Also query fromField for bidirectional traversal + QueryBuilder backQuery; + if (graphLookup.supportArray) { + backQuery = getQueryBuilder(fromFieldIdx, values); + } else { + backQuery = + boolQuery() + .must(getQueryBuilder(fromFieldIdx, values)) + .mustNot(getQueryBuilder(toFieldIdx, visitedValues)); + } + query = QueryBuilders.boolQuery().should(query).should(backQuery); + } + CalciteEnumerableIndexScan newScan = (CalciteEnumerableIndexScan) this.lookupScan.copy(); + QueryBuilder finalQuery = query; + newScan.pushDownContext.add( + PushDownType.FILTER, + null, + (OSRequestBuilderAction) + requestBuilder -> requestBuilder.pushDownFilterForCalcite(finalQuery)); + Iterator<@Nullable Object> res = newScan.scan().iterator(); + List results = new ArrayList<>(); + while (res.hasNext()) { + results.add(res.next()); + } + closeIterator(res); + return results; + } + + private static void closeIterator(@Nullable Iterator iterator) { + if (iterator instanceof AutoCloseable) { + try { + ((AutoCloseable) iterator).close(); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * Provides a query builder to search edges with the field matching values + * + * @param fieldIdx field index + * @param values values to match + * @return query builder + */ + private QueryBuilder getQueryBuilder(int fieldIdx, Collection values) { + String fieldName = + new NamedFieldExpression(fieldIdx, lookupFields, lookupScan.getOsIndex().getFieldTypes()) + .getReferenceForTermQuery(); + return termsQuery(fieldName, values); + } + + /** + * Collects values from a field that may be a single value or a list. + * + * @param value The field value (may be single value or List) + * @param collector The list to collect values into + * @param visited Previously visited values to avoid duplicates + */ + private void collectValues(Object value, List collector, Set visited) { + if (value instanceof List list) { + for (Object item : list) { + if (!visited.contains(item)) { + collector.add(item); + } + } + } else if (!visited.contains(value)) { + collector.add(value); + } + } + + @Override + public void reset() { + sourceEnumerator.reset(); + current = null; + } + + @Override + public void close() { + sourceEnumerator.close(); + } + } +} diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 1939124eed..5239ef9154 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -52,6 +52,17 @@ TIMECHART: 'TIMECHART'; APPENDCOL: 'APPENDCOL'; ADDTOTALS: 'ADDTOTALS'; ADDCOLTOTALS: 'ADDCOLTOTALS'; +GRAPHLOOKUP: 'GRAPHLOOKUP'; +START_FIELD: 'STARTFIELD'; +FROM_FIELD: 'FROMFIELD'; +TO_FIELD: 'TOFIELD'; +MAX_DEPTH: 'MAXDEPTH'; +DEPTH_FIELD: 'DEPTHFIELD'; +DIRECTION: 'DIRECTION'; +UNI: 'UNI'; +BI: 'BI'; +SUPPORT_ARRAY: 'SUPPORTARRAY'; +BATCH_MODE: 'BATCHMODE'; ROW: 'ROW'; COL: 'COL'; EXPAND: 'EXPAND'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 2131eeae93..65dc24810d 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -90,6 +90,7 @@ commands | appendPipeCommand | replaceCommand | mvcombineCommand + | graphLookupCommand ; commandName @@ -135,6 +136,7 @@ commandName | REPLACE | MVCOMBINE | TRANSPOSE + | GRAPHLOOKUP ; searchCommand @@ -625,6 +627,21 @@ addcoltotalsOption | (LABELFIELD EQUAL stringLiteral) ; +graphLookupCommand + : GRAPHLOOKUP lookupTable = tableSourceClause graphLookupOption* AS outputField = fieldExpression + ; + +graphLookupOption + : (START_FIELD EQUAL fieldExpression) + | (FROM_FIELD EQUAL fieldExpression) + | (TO_FIELD EQUAL fieldExpression) + | (MAX_DEPTH EQUAL integerLiteral) + | (DEPTH_FIELD EQUAL fieldExpression) + | (DIRECTION EQUAL (UNI | BI)) + | (SUPPORT_ARRAY EQUAL booleanLiteral) + | (BATCH_MODE EQUAL booleanLiteral) + ; + // clauses fromClause : SOURCE EQUAL tableOrSubqueryClause @@ -1676,5 +1693,11 @@ searchableKeyWord | ROW | COL | COLUMN_NAME + | FROM_FIELD + | TO_FIELD + | MAX_DEPTH + | DEPTH_FIELD + | DIRECTION + | UNI + | BI ; - diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index d7c725f957..709c0021a9 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -84,6 +84,8 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; +import org.opensearch.sql.ast.tree.GraphLookup.Direction; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Kmeans; @@ -1481,4 +1483,68 @@ public UnresolvedPlan visitAddcoltotalsCommand( java.util.Map options = cmdOptionsBuilder.build(); return new AddColTotals(fieldList, options); } + + @Override + public UnresolvedPlan visitGraphLookupCommand(OpenSearchPPLParser.GraphLookupCommandContext ctx) { + // Parse lookup table + UnresolvedPlan fromTable = visitTableSourceClause(ctx.lookupTable); + + // Parse options with defaults + Field fromField = null; + Field toField = null; + Literal maxDepth = Literal.ZERO; + Field startField = null; + Field depthField = null; + Direction direction = Direction.UNI; + boolean supportArray = false; + boolean batchMode = false; + + for (OpenSearchPPLParser.GraphLookupOptionContext option : ctx.graphLookupOption()) { + if (option.FROM_FIELD() != null) { + fromField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.TO_FIELD() != null) { + toField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.MAX_DEPTH() != null) { + maxDepth = (Literal) internalVisitExpression(option.integerLiteral()); + } + if (option.START_FIELD() != null) { + startField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.DEPTH_FIELD() != null) { + depthField = (Field) internalVisitExpression(option.fieldExpression()); + } + if (option.DIRECTION() != null) { + direction = option.BI() != null ? Direction.BI : Direction.UNI; + } + if (option.SUPPORT_ARRAY() != null) { + Literal literal = (Literal) internalVisitExpression(option.booleanLiteral()); + supportArray = Boolean.TRUE.equals(literal.getValue()); + } + if (option.BATCH_MODE() != null) { + Literal literal = (Literal) internalVisitExpression(option.booleanLiteral()); + batchMode = Boolean.TRUE.equals(literal.getValue()); + } + } + + Field as = (Field) internalVisitExpression(ctx.outputField); + + if (fromField == null || toField == null) { + throw new SemanticCheckException("fromField and toField must be specified for graphLookup"); + } + + return GraphLookup.builder() + .fromTable(fromTable) + .fromField(fromField) + .toField(toField) + .as(as) + .maxDepth(maxDepth) + .startField(startField) + .depthField(depthField) + .direction(direction) + .supportArray(supportArray) + .batchMode(batchMode) + .build(); + } } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 4376b5659d..b0a0c1d9ed 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -77,6 +77,7 @@ import org.opensearch.sql.ast.tree.FillNull; import org.opensearch.sql.ast.tree.Filter; import org.opensearch.sql.ast.tree.Flatten; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Join; import org.opensearch.sql.ast.tree.Lookup; @@ -224,6 +225,33 @@ public String visitLookup(Lookup node, String context) { "%s | lookup %s %s%s%s", child, MASK_TABLE, mappingFields, strategy, outputFields); } + @Override + public String visitGraphLookup(GraphLookup node, String context) { + String child = node.getChild().get(0).accept(this, context); + StringBuilder command = new StringBuilder(); + command.append(child).append(" | graphlookup ").append(MASK_TABLE); + if (node.getStartField() != null) { + command.append(" startField=").append(MASK_COLUMN); + } + command.append(" fromField=").append(MASK_COLUMN); + command.append(" toField=").append(MASK_COLUMN); + if (node.getMaxDepth() != null && !Integer.valueOf(0).equals(node.getMaxDepth().getValue())) { + command.append(" maxDepth=").append(MASK_LITERAL); + } + if (node.getDepthField() != null) { + command.append(" depthField=").append(MASK_COLUMN); + } + command.append(" direction=").append(node.getDirection().name().toLowerCase()); + if (node.isSupportArray()) { + command.append(" supportArray=true"); + } + if (node.isBatchMode()) { + command.append(" batchMode=true"); + } + command.append(" as ").append(MASK_COLUMN); + return command.toString(); + } + private String formatFieldAlias(java.util.Map fieldMap) { return fieldMap.entrySet().stream() .map( diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java new file mode 100644 index 0000000000..da16970492 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLGraphLookupTest.java @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +public class CalcitePPLGraphLookupTest extends CalcitePPLAbstractTest { + + public CalcitePPLGraphLookupTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testGraphLookupBasic() { + // Test basic graphLookup with same source and lookup table + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[0]," + + " bidirectional=[false])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupWithDepthField() { + // Test graphLookup with depthField parameter + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name depthField=level as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[level]," + + " maxDepth=[0], bidirectional=[false])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupWithMaxDepth() { + // Test graphLookup with maxDepth parameter + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name maxDepth=3 as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[3]," + + " bidirectional=[false])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testGraphLookupBidirectional() { + // Test graphLookup with bidirectional traversal + String ppl = + "source=employee | graphLookup employee startField=reportsTo fromField=reportsTo" + + " toField=name direction=bi as reportingHierarchy"; + + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalGraphLookup(fromField=[reportsTo], toField=[name]," + + " outputField=[reportingHierarchy], depthField=[null], maxDepth=[0]," + + " bidirectional=[true])\n" + + " LogicalSort(fetch=[100])\n" + + " LogicalTableScan(table=[[scott, employee]])\n" + + " LogicalTableScan(table=[[scott, employee]])\n"; + verifyLogical(root, expectedLogical); + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + // Add employee table for graphLookup tests + ImmutableList rows = + ImmutableList.of( + new Object[] {1, "Dev", null}, + new Object[] {2, "Eliot", "Dev"}, + new Object[] {3, "Ron", "Eliot"}, + new Object[] {4, "Andrew", "Eliot"}, + new Object[] {5, "Asya", "Ron"}, + new Object[] {6, "Dan", "Andrew"}); + schema.add("employee", new EmployeeTable(rows)); + + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @RequiredArgsConstructor + public static class EmployeeTable implements ScannableTable { + private final ImmutableList rows; + + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("id", SqlTypeName.INTEGER) + .nullable(false) + .add("name", SqlTypeName.VARCHAR) + .nullable(false) + .add("reportsTo", SqlTypeName.VARCHAR) + .nullable(true) + .build(); + + @Override + public Enumerable<@Nullable Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index 9e1cfe05a4..f7cadaaf57 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -77,6 +77,7 @@ import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.tree.AD; import org.opensearch.sql.ast.tree.Chart; +import org.opensearch.sql.ast.tree.GraphLookup; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.ML; import org.opensearch.sql.ast.tree.RareTopN.CommandType; @@ -1643,4 +1644,76 @@ public void testMvmapWithNonFieldFirstArgThrowsException() { () -> plan("source=t | eval result = mvmap(123, 123 * 10)")) .getMessage()); } + + @Test + public void testGraphLookupCommand() { + // Basic graphLookup with required parameters + assertEqual( + "source=t | graphLookup employees fromField=manager toField=name maxDepth=3" + + " as reportingHierarchy", + GraphLookup.builder() + .child(relation("t")) + .fromTable(relation("employees")) + .fromField(field("manager")) + .toField(field("name")) + .as(field("reportingHierarchy")) + .maxDepth(intLiteral(3)) + .startField(null) + .depthField(null) + .direction(GraphLookup.Direction.UNI) + .build()); + + // graphLookup with startField filter + assertEqual( + "source=t | graphLookup employees fromField=manager toField=name" + + " startField=id as reportingHierarchy", + GraphLookup.builder() + .child(relation("t")) + .fromTable(relation("employees")) + .fromField(field("manager")) + .toField(field("name")) + .as(field("reportingHierarchy")) + .maxDepth(intLiteral(0)) + .startField(field("id")) + .depthField(null) + .direction(GraphLookup.Direction.UNI) + .build()); + + // graphLookup with depthField and bidirectional + assertEqual( + "source=t | graphLookup employees fromField=manager toField=name" + + " depthField=level direction=bi as reportingHierarchy", + GraphLookup.builder() + .child(relation("t")) + .fromTable(relation("employees")) + .fromField(field("manager")) + .toField(field("name")) + .as(field("reportingHierarchy")) + .maxDepth(intLiteral(0)) + .startField(null) + .depthField(field("level")) + .direction(GraphLookup.Direction.BI) + .build()); + + // Error: missing fromField - SemanticCheckException thrown by AstBuilder + assertThrows( + SemanticCheckException.class, + () -> + plan( + "source=t | graphLookup employees toField=name startField=id as" + + " reportingHierarchy")); + + // Error: missing lookup table - SyntaxCheckException from grammar + assertThrows( + SyntaxCheckException.class, + () -> + plan( + "source=t | graphLookup fromField=manager toField=name as" + + " reportingHierarchy")); + + // Error: missing toField - SemanticCheckException thrown by AstBuilder + assertThrows( + SemanticCheckException.class, + () -> plan("source=t | graphLookup employees fromField=manager as reportingHierarchy")); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 1e200eb092..943db6c2ba 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -643,6 +643,60 @@ public void testLookup() { + " COUNTRY2")); } + @Test + public void testGraphLookup() { + // Basic graphLookup with required parameters + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " as reportingHierarchy")); + // graphLookup with maxDepth + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " maxDepth=*** direction=uni as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " maxDepth=3 as reportingHierarchy")); + // graphLookup with depthField + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " depthField=identifier direction=uni as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " depthField=level as reportingHierarchy")); + // graphLookup with bidirectional mode + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=bi as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " direction=bi as reportingHierarchy")); + // graphLookup with all optional parameters + assertEquals( + "source=table | graphlookup table startField=identifier fromField=identifier" + + " toField=identifier maxDepth=*** depthField=identifier direction=bi" + + " as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " startField=id maxDepth=5 depthField=level direction=bi as reportingHierarchy")); + // graphLookup with supportArray + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni supportArray=true as identifier", + anonymize( + "source=t | graphLookup airports fromField=connects toField=airport" + + " supportArray=true as reachableAirports")); + // graphLookup with batchMode + assertEquals( + "source=table | graphlookup table fromField=identifier toField=identifier" + + " direction=uni batchMode=true as identifier", + anonymize( + "source=t | graphLookup employees fromField=manager toField=name" + + " batchMode=true as reportingHierarchy")); + } + @Test public void testInSubquery() { assertEquals(