Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
import org.opensearch.sql.ast.tree.GraphLookup;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
Expand Down Expand Up @@ -541,6 +542,11 @@ public LogicalPlan visitMvCombine(MvCombine node, AnalysisContext context) {
throw getOnlyForCalciteException("mvcombine");
}

@Override
public LogicalPlan visitGraphLookup(GraphLookup node, AnalysisContext context) {
throw getOnlyForCalciteException("graphlookup");
}

/** Build {@link ParseExpression} to context and skip to child nodes. */
@Override
public LogicalPlan visitParse(Parse node, AnalysisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
import org.opensearch.sql.ast.tree.GraphLookup;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
Expand Down Expand Up @@ -471,4 +472,8 @@ public T visitAddColTotals(AddColTotals node, C context) {
public T visitMvCombine(MvCombine node, C context) {
return visitChildren(node, context);
}

public T visitGraphLookup(GraphLookup node, C context) {
return visitChildren(node, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
import org.opensearch.sql.ast.tree.GraphLookup;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Lookup;
Expand Down Expand Up @@ -529,6 +530,12 @@ public Node visitLookup(Lookup node, FieldResolutionContext context) {
throw new IllegalArgumentException("Lookup command cannot be used together with spath command");
}

@Override
public Node visitGraphLookup(GraphLookup node, FieldResolutionContext context) {
throw new IllegalArgumentException(
"GraphLookup command cannot be used together with spath command");
}

@Override
public Node visitValues(Values node, FieldResolutionContext context) {
// do nothing
Expand Down
95 changes: 95 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Literal;

/**
* AST node for graphLookup command. Performs BFS graph traversal on a lookup table.
*
* <p>Example: source=employees | graphLookup employees fromField=manager toField=name maxDepth=3
* depthField=level direction=uni as hierarchy
*/
@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class GraphLookup extends UnresolvedPlan {
/** Direction mode for graph traversal. */
public enum Direction {
/** Unidirectional - traverse edges in one direction only. */
UNI,
/** Bidirectional - traverse edges in both directions. */
BI
}

/** Target table for graph traversal lookup. */
private final UnresolvedPlan fromTable;

/** Field in sourceTable to start with. */
private final Field startField;

/** Field in fromTable that represents the outgoing edge. */
private final Field fromField;

/** Field in input/fromTable to match against for traversal. */
private final Field toField;

/** Output field name for collected traversal results. */
private final Field as;

/** Maximum traversal depth. Zero means no limit. */
private final Literal maxDepth;

/** Optional field name to include recursion depth in output. */
private @Nullable final Field depthField;

/** Direction mode: UNI (default) or BIO for bidirectional. */
private final Direction direction;

/** Whether to support array-typed fields without early filter pushdown. */
private final boolean supportArray;

/** Whether to batch all source start values into a single unified BFS traversal. */
private final boolean batchMode;

private UnresolvedPlan child;

public String getDepthFieldName() {
return depthField == null ? null : depthField.getField().toString();
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return child == null ? ImmutableList.of() : ImmutableList.of(child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> visitor, C context) {
return visitor.visitGraphLookup(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
import org.opensearch.sql.ast.tree.GraphLookup;
import org.opensearch.sql.ast.tree.GraphLookup.Direction;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
Expand Down Expand Up @@ -151,6 +153,7 @@
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.utils.BinUtils;
Expand Down Expand Up @@ -2573,6 +2576,57 @@ public RelNode visitAddColTotals(AddColTotals node, CalcitePlanContext context)
context, fieldsToAggregate, false, true, null, labelField, label);
}

@Override
public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) {
// 1. Visit source (child) table
visitChildren(node, context);
RelBuilder builder = context.relBuilder;
// TODO: Limit the number of source rows to 100 for now, make it configurable.
builder.limit(0, 100);
if (node.isBatchMode()) {
tryToRemoveMetaFields(context, true);
}
RelNode sourceTable = builder.build();

// 2. Extract parameters
String startFieldName = node.getStartField().getField().toString();
String fromFieldName = node.getFromField().getField().toString();
String toFieldName = node.getToField().getField().toString();
String outputFieldName = node.getAs().getField().toString();
String depthFieldName = node.getDepthFieldName();
boolean bidirectional = node.getDirection() == Direction.BI;

RexLiteral maxDepthNode = (RexLiteral) rexVisitor.analyze(node.getMaxDepth(), context);
Integer maxDepthValue = maxDepthNode.getValueAs(Integer.class);
maxDepthValue = maxDepthValue == null ? 0 : maxDepthValue;
boolean supportArray = node.isSupportArray();
boolean batchMode = node.isBatchMode();

// 3. Visit and materialize lookup table
analyze(node.getFromTable(), context);
tryToRemoveMetaFields(context, true);
RelNode lookupTable = builder.build();

// 4. Create LogicalGraphLookup RelNode
// The conversion rule will extract the OpenSearchIndex from the lookup table
RelNode graphLookup =
LogicalGraphLookup.create(
sourceTable,
lookupTable,
startFieldName,
fromFieldName,
toFieldName,
outputFieldName,
depthFieldName,
maxDepthValue,
bidirectional,
supportArray,
batchMode);

builder.push(graphLookup);
return builder.peek();
}

/**
* Cast integer sum to long, real/float to double to avoid ClassCastException
*
Expand Down
Loading
Loading