diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
index 2b2fe3cf1c..0d1ea32732 100644
--- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
+++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
@@ -73,6 +73,7 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
+import org.opensearch.sql.ast.tree.GraphLookup;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
@@ -541,6 +542,11 @@ public LogicalPlan visitMvCombine(MvCombine node, AnalysisContext context) {
throw getOnlyForCalciteException("mvcombine");
}
+ @Override
+ public LogicalPlan visitGraphLookup(GraphLookup node, AnalysisContext context) {
+ throw getOnlyForCalciteException("graphlookup");
+ }
+
/** Build {@link ParseExpression} to context and skip to child nodes. */
@Override
public LogicalPlan visitParse(Parse node, AnalysisContext context) {
diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java
index b1082759a3..8abafbc286 100644
--- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java
+++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java
@@ -61,6 +61,7 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
+import org.opensearch.sql.ast.tree.GraphLookup;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
@@ -471,4 +472,8 @@ public T visitAddColTotals(AddColTotals node, C context) {
public T visitMvCombine(MvCombine node, C context) {
return visitChildren(node, context);
}
+
+ public T visitGraphLookup(GraphLookup node, C context) {
+ return visitChildren(node, context);
+ }
}
diff --git a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java
index a6f6671084..a44f0bca41 100644
--- a/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java
+++ b/core/src/main/java/org/opensearch/sql/ast/analysis/FieldResolutionVisitor.java
@@ -40,6 +40,7 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
+import org.opensearch.sql.ast.tree.GraphLookup;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Lookup;
@@ -529,6 +530,12 @@ public Node visitLookup(Lookup node, FieldResolutionContext context) {
throw new IllegalArgumentException("Lookup command cannot be used together with spath command");
}
+ @Override
+ public Node visitGraphLookup(GraphLookup node, FieldResolutionContext context) {
+ throw new IllegalArgumentException(
+ "GraphLookup command cannot be used together with spath command");
+ }
+
@Override
public Node visitValues(Values node, FieldResolutionContext context) {
// do nothing
diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java b/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java
new file mode 100644
index 0000000000..7ab0e04b02
--- /dev/null
+++ b/core/src/main/java/org/opensearch/sql/ast/tree/GraphLookup.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.ast.tree;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import lombok.ToString;
+import org.opensearch.sql.ast.AbstractNodeVisitor;
+import org.opensearch.sql.ast.expression.Field;
+import org.opensearch.sql.ast.expression.Literal;
+
+/**
+ * AST node for graphLookup command. Performs BFS graph traversal on a lookup table.
+ *
+ *
Example: source=employees | graphLookup employees fromField=manager toField=name maxDepth=3
+ * depthField=level direction=uni as hierarchy
+ */
+@Getter
+@Setter
+@ToString
+@EqualsAndHashCode(callSuper = false)
+@RequiredArgsConstructor
+@AllArgsConstructor
+@Builder(toBuilder = true)
+public class GraphLookup extends UnresolvedPlan {
+ /** Direction mode for graph traversal. */
+ public enum Direction {
+ /** Unidirectional - traverse edges in one direction only. */
+ UNI,
+ /** Bidirectional - traverse edges in both directions. */
+ BI
+ }
+
+ /** Target table for graph traversal lookup. */
+ private final UnresolvedPlan fromTable;
+
+ /** Field in sourceTable to start with. */
+ private final Field startField;
+
+ /** Field in fromTable that represents the outgoing edge. */
+ private final Field fromField;
+
+ /** Field in input/fromTable to match against for traversal. */
+ private final Field toField;
+
+ /** Output field name for collected traversal results. */
+ private final Field as;
+
+ /** Maximum traversal depth. Zero means no limit. */
+ private final Literal maxDepth;
+
+ /** Optional field name to include recursion depth in output. */
+ private @Nullable final Field depthField;
+
+ /** Direction mode: UNI (default) or BIO for bidirectional. */
+ private final Direction direction;
+
+ /** Whether to support array-typed fields without early filter pushdown. */
+ private final boolean supportArray;
+
+ /** Whether to batch all source start values into a single unified BFS traversal. */
+ private final boolean batchMode;
+
+ private UnresolvedPlan child;
+
+ public String getDepthFieldName() {
+ return depthField == null ? null : depthField.getField().toString();
+ }
+
+ @Override
+ public UnresolvedPlan attach(UnresolvedPlan child) {
+ this.child = child;
+ return this;
+ }
+
+ @Override
+ public List getChild() {
+ return child == null ? ImmutableList.of() : ImmutableList.of(child);
+ }
+
+ @Override
+ public T accept(AbstractNodeVisitor visitor, C context) {
+ return visitor.visitGraphLookup(this, context);
+ }
+}
diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
index 5825011f65..cc9891ab20 100644
--- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
+++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java
@@ -118,6 +118,8 @@
import org.opensearch.sql.ast.tree.FillNull;
import org.opensearch.sql.ast.tree.Filter;
import org.opensearch.sql.ast.tree.Flatten;
+import org.opensearch.sql.ast.tree.GraphLookup;
+import org.opensearch.sql.ast.tree.GraphLookup.Direction;
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Join;
import org.opensearch.sql.ast.tree.Kmeans;
@@ -151,6 +153,7 @@
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.plan.AliasFieldsWrappable;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
+import org.opensearch.sql.calcite.plan.rel.LogicalGraphLookup;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.utils.BinUtils;
@@ -2573,6 +2576,57 @@ public RelNode visitAddColTotals(AddColTotals node, CalcitePlanContext context)
context, fieldsToAggregate, false, true, null, labelField, label);
}
+ @Override
+ public RelNode visitGraphLookup(GraphLookup node, CalcitePlanContext context) {
+ // 1. Visit source (child) table
+ visitChildren(node, context);
+ RelBuilder builder = context.relBuilder;
+ // TODO: Limit the number of source rows to 100 for now, make it configurable.
+ builder.limit(0, 100);
+ if (node.isBatchMode()) {
+ tryToRemoveMetaFields(context, true);
+ }
+ RelNode sourceTable = builder.build();
+
+ // 2. Extract parameters
+ String startFieldName = node.getStartField().getField().toString();
+ String fromFieldName = node.getFromField().getField().toString();
+ String toFieldName = node.getToField().getField().toString();
+ String outputFieldName = node.getAs().getField().toString();
+ String depthFieldName = node.getDepthFieldName();
+ boolean bidirectional = node.getDirection() == Direction.BI;
+
+ RexLiteral maxDepthNode = (RexLiteral) rexVisitor.analyze(node.getMaxDepth(), context);
+ Integer maxDepthValue = maxDepthNode.getValueAs(Integer.class);
+ maxDepthValue = maxDepthValue == null ? 0 : maxDepthValue;
+ boolean supportArray = node.isSupportArray();
+ boolean batchMode = node.isBatchMode();
+
+ // 3. Visit and materialize lookup table
+ analyze(node.getFromTable(), context);
+ tryToRemoveMetaFields(context, true);
+ RelNode lookupTable = builder.build();
+
+ // 4. Create LogicalGraphLookup RelNode
+ // The conversion rule will extract the OpenSearchIndex from the lookup table
+ RelNode graphLookup =
+ LogicalGraphLookup.create(
+ sourceTable,
+ lookupTable,
+ startFieldName,
+ fromFieldName,
+ toFieldName,
+ outputFieldName,
+ depthFieldName,
+ maxDepthValue,
+ bidirectional,
+ supportArray,
+ batchMode);
+
+ builder.push(graphLookup);
+ return builder.peek();
+ }
+
/**
* Cast integer sum to long, real/float to double to avoid ClassCastException
*
diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java
new file mode 100644
index 0000000000..e0ea048edb
--- /dev/null
+++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/GraphLookup.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.calcite.plan.rel;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import lombok.Getter;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.BiRel;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Abstract RelNode for graphLookup command.
+ *
+ *
Has two inputs:
+ *
+ *
+ *
source: source table (rows to start BFS from)
+ *
lookup: lookup table (graph edges to traverse)
+ *
+ *
+ *
At execution time, performs BFS by dynamically querying OpenSearch with filter pushdown
+ * instead of loading all lookup data into memory.
+ *
+ *
This is a storage-agnostic logical node. Storage-specific implementations (e.g., for
+ * OpenSearch) should extract the necessary index information from the lookup RelNode during
+ * conversion to the physical plan.
+ */
+@Getter
+public abstract class GraphLookup extends BiRel {
+
+ // TODO: use RexInputRef instead of String for there fields
+ protected final String startField; // Field in source table (start entities)
+ protected final String fromField; // Field in lookup table (edge source)
+ protected final String toField; // Field in lookup table (edge target)
+ protected final String outputField; // Name of output array field
+ @Nullable protected final String depthField; // Name of output array field
+
+ // TODO: add limitation on the maxDepth and input rows count
+ protected final int maxDepth; // -1 = unlimited
+ protected final boolean bidirectional;
+ protected final boolean supportArray;
+ protected final boolean batchMode;
+
+ private RelDataType outputRowType;
+
+ /**
+ * Creates a LogicalGraphLookup.
+ *
+ * @param cluster Cluster
+ * @param traitSet Trait set
+ * @param source Source table RelNode
+ * @param lookup Lookup table RelNode
+ * @param startField Field name for start entities
+ * @param fromField Field name for outgoing edges
+ * @param toField Field name for incoming edges
+ * @param outputField Name of the output array field
+ * @param depthField Name of the depth field
+ * @param maxDepth Maximum traversal depth (-1 for unlimited)
+ * @param bidirectional Whether to traverse edges in both directions
+ * @param supportArray Whether to support array-typed fields (disables early visited filter
+ * pushdown)
+ * @param batchMode Whether to batch all source start values into a single unified BFS
+ */
+ protected GraphLookup(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode source,
+ RelNode lookup,
+ String startField,
+ String fromField,
+ String toField,
+ String outputField,
+ @Nullable String depthField,
+ int maxDepth,
+ boolean bidirectional,
+ boolean supportArray,
+ boolean batchMode) {
+ super(cluster, traitSet, source, lookup);
+ this.startField = startField;
+ this.fromField = fromField;
+ this.toField = toField;
+ this.outputField = outputField;
+ this.depthField = depthField;
+ this.maxDepth = maxDepth;
+ this.bidirectional = bidirectional;
+ this.supportArray = supportArray;
+ this.batchMode = batchMode;
+ }
+
+ /** Returns the source table RelNode. */
+ public RelNode getSource() {
+ return left;
+ }
+
+ /** Returns the lookup table RelNode. */
+ public RelNode getLookup() {
+ return right;
+ }
+
+ @Override
+ public abstract RelNode copy(RelTraitSet traitSet, List inputs);
+
+ @Override
+ protected RelDataType deriveRowType() {
+ if (outputRowType == null) {
+ RelDataTypeFactory.Builder builder = getCluster().getTypeFactory().builder();
+
+ if (batchMode) {
+ // Batch mode: Output = [Array, Array]
+ // First field: aggregated source rows as array
+ RelDataType sourceRowType = getSource().getRowType();
+ RelDataType sourceArrayType =
+ getCluster().getTypeFactory().createArrayType(sourceRowType, -1);
+ builder.add(startField, sourceArrayType);
+
+ // Second field: aggregated lookup rows as array
+ RelDataType lookupRowType = getLookup().getRowType();
+ if (this.depthField != null) {
+ final RelDataTypeFactory.Builder lookupBuilder = getCluster().getTypeFactory().builder();
+ lookupBuilder.addAll(lookupRowType.getFieldList());
+ RelDataType depthType = getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER);
+ lookupBuilder.add(this.depthField, depthType);
+ lookupRowType = lookupBuilder.build();
+ }
+ RelDataType lookupArrayType =
+ getCluster().getTypeFactory().createArrayType(lookupRowType, -1);
+ builder.add(outputField, lookupArrayType);
+ } else {
+ // Normal mode: Output = source fields + output array field
+ // Add all source fields
+ for (var field : getSource().getRowType().getFieldList()) {
+ builder.add(field);
+ }
+
+ // Add output field (ARRAY type containing lookup row struct)
+ RelDataType lookupRowType = getLookup().getRowType();
+ if (this.depthField != null) {
+ final RelDataTypeFactory.Builder lookupBuilder = getCluster().getTypeFactory().builder();
+ lookupBuilder.addAll(lookupRowType.getFieldList());
+ RelDataType depthType = getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER);
+ lookupBuilder.add(this.depthField, depthType);
+ lookupRowType = lookupBuilder.build();
+ }
+ RelDataType arrayType = getCluster().getTypeFactory().createArrayType(lookupRowType, -1);
+ builder.add(outputField, arrayType);
+ }
+
+ outputRowType = builder.build();
+ }
+ return outputRowType;
+ }
+
+ @Override
+ public double estimateRowCount(RelMetadataQuery mq) {
+ // Batch mode aggregates all source rows into a single output row
+ return batchMode ? 1 : getSource().estimateRowCount(mq);
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw)
+ .item("fromField", fromField)
+ .item("toField", toField)
+ .item("outputField", outputField)
+ .item("depthField", depthField)
+ .item("maxDepth", maxDepth)
+ .item("bidirectional", bidirectional)
+ .itemIf("supportArray", supportArray, supportArray)
+ .itemIf("batchMode", batchMode, batchMode);
+ }
+}
diff --git a/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java
new file mode 100644
index 0000000000..745d0cb382
--- /dev/null
+++ b/core/src/main/java/org/opensearch/sql/calcite/plan/rel/LogicalGraphLookup.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.calcite.plan.rel;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import lombok.Getter;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+
+/**
+ * Logical RelNode for graphLookup command. TODO: need to support trim fields and several transpose
+ * rules for this new added RelNode
+ */
+@Getter
+public class LogicalGraphLookup extends GraphLookup {
+
+ /**
+ * Creates a LogicalGraphLookup.
+ *
+ * @param cluster Cluster
+ * @param traitSet Trait set
+ * @param source Source table RelNode
+ * @param lookup Lookup table RelNode
+ * @param startField Field name for start entities
+ * @param fromField Field name for outgoing edges
+ * @param toField Field name for incoming edges
+ * @param outputField Name of the output array field
+ * @param depthField Name of the depth field
+ * @param maxDepth Maximum traversal depth (-1 for unlimited)
+ * @param bidirectional Whether to traverse edges in both directions
+ * @param supportArray Whether to support array-typed fields
+ * @param batchMode Whether to batch all source start values into a single unified BFS
+ */
+ protected LogicalGraphLookup(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode source,
+ RelNode lookup,
+ String startField,
+ String fromField,
+ String toField,
+ String outputField,
+ @Nullable String depthField,
+ int maxDepth,
+ boolean bidirectional,
+ boolean supportArray,
+ boolean batchMode) {
+ super(
+ cluster,
+ traitSet,
+ source,
+ lookup,
+ startField,
+ fromField,
+ toField,
+ outputField,
+ depthField,
+ maxDepth,
+ bidirectional,
+ supportArray,
+ batchMode);
+ }
+
+ /**
+ * Creates a LogicalGraphLookup with Convention.NONE.
+ *
+ * @param source Source table RelNode
+ * @param lookup Lookup table RelNode
+ * @param startField Field name for start entities
+ * @param fromField Field name for outgoing edges
+ * @param toField Field name for incoming edges
+ * @param outputField Name of the output array field
+ * @param depthField Named of the output depth field
+ * @param maxDepth Maximum traversal depth (-1 for unlimited)
+ * @param bidirectional Whether to traverse edges in both directions
+ * @param supportArray Whether to support array-typed fields
+ * @param batchMode Whether to batch all source start values into a single unified BFS
+ * @return A new LogicalGraphLookup instance
+ */
+ public static LogicalGraphLookup create(
+ RelNode source,
+ RelNode lookup,
+ String startField,
+ String fromField,
+ String toField,
+ String outputField,
+ @Nullable String depthField,
+ int maxDepth,
+ boolean bidirectional,
+ boolean supportArray,
+ boolean batchMode) {
+ RelOptCluster cluster = source.getCluster();
+ RelTraitSet traitSet = cluster.traitSetOf(Convention.NONE);
+ return new LogicalGraphLookup(
+ cluster,
+ traitSet,
+ source,
+ lookup,
+ startField,
+ fromField,
+ toField,
+ outputField,
+ depthField,
+ maxDepth,
+ bidirectional,
+ supportArray,
+ batchMode);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List inputs) {
+ return new LogicalGraphLookup(
+ getCluster(),
+ traitSet,
+ inputs.get(0),
+ inputs.get(1),
+ startField,
+ fromField,
+ toField,
+ outputField,
+ depthField,
+ maxDepth,
+ bidirectional,
+ supportArray,
+ batchMode);
+ }
+}
diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java
index 17d99fb4fb..8dfe963081 100644
--- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java
+++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java
@@ -223,7 +223,7 @@ public static ExprType convertSqlTypeNameToExprType(SqlTypeName sqlTypeName) {
case BIGINT -> LONG;
case FLOAT, REAL -> FLOAT;
case DOUBLE, DECIMAL -> DOUBLE; // TODO the decimal is only used for literal
- case CHAR, VARCHAR -> STRING;
+ case CHAR, VARCHAR, MULTISET -> STRING; // call toString() for MULTISET
case BOOLEAN -> BOOLEAN;
case DATE -> DATE;
case TIME, TIME_TZ, TIME_WITH_LOCAL_TIME_ZONE -> TIME;
diff --git a/docs/user/ppl/cmd/graphlookup.md b/docs/user/ppl/cmd/graphlookup.md
new file mode 100644
index 0000000000..ef8720af8e
--- /dev/null
+++ b/docs/user/ppl/cmd/graphlookup.md
@@ -0,0 +1,309 @@
+
+# graphLookup
+
+The `graphLookup` command performs recursive graph traversal on a collection using a breadth-first search (BFS) algorithm. It searches for documents matching a start value and recursively traverses connections between documents based on specified fields. This is useful for hierarchical data like organizational charts, social networks, or routing graphs.
+
+## Syntax
+
+The `graphLookup` command has the following syntax:
+
+```syntax
+graphLookup startField= fromField= toField= [maxDepth=] [depthField=] [direction=(uni | bi)] [supportArray=(true | false)] [batchMode=(true | false)] as
+```
+
+The following are examples of the `graphLookup` command syntax:
+
+```syntax
+source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name as reportingHierarchy
+source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name maxDepth=2 as reportingHierarchy
+source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name depthField=level as reportingHierarchy
+source = employees | graphLookup employees startField=reportsTo fromField=reportsTo toField=name direction=bi as connections
+source = travelers | graphLookup airports startField=nearestAirport fromField=connects toField=airport supportArray=true as reachableAirports
+source = airports | graphLookup airports startField=airport fromField=connects toField=airport supportArray=true as reachableAirports
+```
+
+## Parameters
+
+The `graphLookup` command supports the following parameters.
+
+| Parameter | Required/Optional | Description |
+| --- | --- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `` | Required | The name of the index to perform the graph traversal on. Can be the same as the source index for self-referential graphs. |
+| `startField=` | Required | The field in the source documents whose value is used to start the recursive search. The value of this field is matched against `toField` in the lookup index. We support both single value and array values as starting points. |
+| `fromField=` | Required | The field in the lookup index documents that contains the value to recurse on. After matching a document, the value of this field is used to find the next set of documents. It supports both single value and array values. |
+| `toField=` | Required | The field in the lookup index documents to match against. Documents where `toField` equals the current traversal value are included in the results. |
+| `maxDepth=` | Optional | The maximum recursion depth of hops. Default is `0`. A value of `0` means only the direct connections to the statr values are returned. A value of `1` means 1 hop connections (initial match plus one recursive step), and so on. |
+| `depthField=` | Optional | The name of the field to add to each traversed document indicating its recursion depth. If not specified, no depth field is added. Depth starts at `0` for the first level of matches. |
+| `direction=(uni \| bi)` | Optional | The traversal direction. `uni` (default) performs unidirectional traversal following edges in the forward direction only. `bi` performs bidirectional traversal, following edges in both directions. |
+| `supportArray=(true \| false)` | Optional | When `true`, disables early visited-node filter pushdown to OpenSearch. Default is `false`. Set to `true` when `fromField` or `toField` contains array values to ensure correct traversal behavior. See [Array Field Handling](#array-field-handling) for details. |
+| `batchMode=(true \| false)` | Optional | When `true`, collects all start values from all source rows and performs a single unified BFS traversal. Default is `false`. The output changes to two arrays: `[Array, Array]`. See [Batch Mode](#batch-mode) for details. |
+| `as ` | Required | The name of the output array field that will contain all documents found during the graph traversal. |
+
+## How It Works
+
+The `graphLookup` command performs a breadth-first search (BFS) traversal:
+
+1. For each source document, extract the value of `startField`
+2. Query the lookup index to find documents where `toField` matches the start value
+3. Add matched documents to the result array
+4. Extract `fromField` values from matched documents to continue traversal
+5. Repeat steps 2-4 until no new documents are found or `maxDepth` is reached
+
+For bidirectional traversal (`direction=bi`), the algorithm also follows edges in the reverse direction by additionally matching `fromField` values.
+
+## Example 1: Employee Hierarchy Traversal
+
+Given an `employees` index with the following documents:
+
+| id | name | reportsTo |
+|----|------|-----------|
+| 1 | Dev | Eliot |
+| 2 | Eliot | Ron |
+| 3 | Ron | Andrew |
+| 4 | Andrew | null |
+| 5 | Asya | Ron |
+| 6 | Dan | Andrew |
+
+The following query finds the reporting chain for each employee:
+
+```ppl ignore
+source = employees
+ | graphLookup employees
+ startField=reportsTo
+ fromField=reportsTo
+ toField=name
+ as reportingHierarchy
+```
+
+The query returns the following results:
+
+```text
++--------+----------+----+---------------------+
+| name | reportsTo| id | reportingHierarchy |
++--------+----------+----+---------------------+
+| Dev | Eliot | 1 | [{Eliot, Ron, 2}] |
+| Eliot | Ron | 2 | [{Ron, Andrew, 3}] |
+| Ron | Andrew | 3 | [{Andrew, null, 4}] |
+| Andrew | null | 4 | [] |
+| Asya | Ron | 5 | [{Ron, Andrew, 3}] |
+| Dan | Andrew | 6 | [{Andrew, null, 4}] |
++--------+----------+----+---------------------+
+```
+
+For Dev, the traversal starts with `reportsTo="Eliot"`, finds the Eliot record, and returns it in the `reportingHierarchy` array.
+
+## Example 2: Employee Hierarchy with Depth Tracking
+
+The following query adds a depth field to track how many levels each manager is from the employee:
+
+```ppl ignore
+source = employees
+ | graphLookup employees
+ startField=reportsTo
+ fromField=reportsTo
+ toField=name
+ depthField=level
+ as reportingHierarchy
+```
+
+The query returns the following results:
+
+```text
++--------+----------+----+------------------------+
+| name | reportsTo| id | reportingHierarchy |
++--------+----------+----+------------------------+
+| Dev | Eliot | 1 | [{Eliot, Ron, 2, 0}] |
+| Eliot | Ron | 2 | [{Ron, Andrew, 3, 0}] |
+| Ron | Andrew | 3 | [{Andrew, null, 4, 0}] |
+| Andrew | null | 4 | [] |
+| Asya | Ron | 5 | [{Ron, Andrew, 3, 0}] |
+| Dan | Andrew | 6 | [{Andrew, null, 4, 0}] |
++--------+----------+----+------------------------+
+```
+
+The depth field `level` is appended to each document in the result array. A value of `0` indicates the first level of matches.
+
+## Example 3: Limited Depth Traversal
+
+The following query limits traversal to 2 levels using `maxDepth=1`:
+
+```ppl ignore
+source = employees
+ | graphLookup employees
+ startField=reportsTo
+ fromField=reportsTo
+ toField=name
+ maxDepth=1
+ as reportingHierarchy
+```
+
+The query returns the following results:
+
+```text
++--------+----------+----+--------------------------------------+
+| name | reportsTo| id | reportingHierarchy |
++--------+----------+----+--------------------------------------+
+| Dev | Eliot | 1 | [{Eliot, Ron, 2}, {Ron, Andrew, 3}] |
+| Eliot | Ron | 2 | [{Ron, Andrew, 3}, {Andrew, null, 4}]|
+| Ron | Andrew | 3 | [{Andrew, null, 4}] |
+| Andrew | null | 4 | [] |
+| Asya | Ron | 5 | [{Ron, Andrew, 3}, {Andrew, null, 4}]|
+| Dan | Andrew | 6 | [{Andrew, null, 4}] |
++--------+----------+----+--------------------------------------+
+```
+
+With `maxDepth=1`, the traversal goes two levels deep (depth 0 and depth 1).
+
+## Example 4: Airport Connections Graph
+
+Given an `airports` index with the following documents:
+
+| airport | connects |
+|---------|----------|
+| JFK | [BOS, ORD] |
+| BOS | [JFK, PWM] |
+| ORD | [JFK] |
+| PWM | [BOS, LHR] |
+| LHR | [PWM] |
+
+The following query finds reachable airports from each airport:
+
+```ppl ignore
+source = airports
+ | graphLookup airports
+ startField=airport
+ fromField=connects
+ toField=airport
+ as reachableAirports
+```
+
+The query returns the following results:
+
+```text
++---------+------------+---------------------+
+| airport | connects | reachableAirports |
++---------+------------+---------------------+
+| JFK | [BOS, ORD] | [{JFK, [BOS, ORD]}] |
+| BOS | [JFK, PWM] | [{BOS, [JFK, PWM]}] |
+| ORD | [JFK] | [{ORD, [JFK]}] |
+| PWM | [BOS, LHR] | [{PWM, [BOS, LHR]}] |
+| LHR | [PWM] | [{LHR, [PWM]}] |
++---------+------------+---------------------+
+```
+
+## Example 5: Cross-Index Graph Lookup
+
+The `graphLookup` command can use different source and lookup indexes. Given a `travelers` index:
+
+| name | nearestAirport |
+|------|----------------|
+| Dev | JFK |
+| Eliot | JFK |
+| Jeff | BOS |
+
+The following query finds reachable airports for each traveler:
+
+```ppl ignore
+source = travelers
+ | graphLookup airports
+ startField=nearestAirport
+ fromField=connects
+ toField=airport
+ as reachableAirports
+```
+
+The query returns the following results:
+
+```text
++-------+----------------+---------------------+
+| name | nearestAirport | reachableAirports |
++-------+----------------+---------------------+
+| Dev | JFK | [{JFK, [BOS, ORD]}] |
+| Eliot | JFK | [{JFK, [BOS, ORD]}] |
+| Jeff | BOS | [{BOS, [JFK, PWM]}] |
++-------+----------------+---------------------+
+```
+
+## Example 6: Bidirectional Traversal
+
+The following query performs bidirectional traversal to find both managers and colleagues who share the same manager:
+
+```ppl ignore
+source = employees
+ | where name = 'Ron'
+ | graphLookup employees
+ startField=reportsTo
+ fromField=reportsTo
+ toField=name
+ direction=bi
+ as connections
+```
+
+The query returns the following results:
+
+```text
++------+----------+----+------------------------------------------------+
+| name | reportsTo| id | connections |
++------+----------+----+------------------------------------------------+
+| Ron | Andrew | 3 | [{Ron, Andrew, 3}, {Andrew, null, 4}, {Dan, Andrew, 6}] |
++------+----------+----+------------------------------------------------+
+```
+
+With bidirectional traversal, Ron's connections include:
+- His own record (Ron reports to Andrew)
+- His manager (Andrew)
+- His peer (Dan, who also reports to Andrew)
+
+## Batch Mode
+
+When `batchMode=true`, the `graphLookup` command collects all start values from all source rows and performs a single unified BFS traversal instead of separate traversals per row.
+
+### Output Format Change
+
+In batch mode, the output is a **single row** with two arrays:
+- First array: All source rows collected
+- Second array: All lookup results from the unified BFS traversal
+
+### When to Use Batch Mode
+
+Use `batchMode=true` when:
+- You want to find all nodes reachable from **any** of the source start values
+- You need a global view of the graph connectivity from multiple starting points
+- You want to avoid duplicate traversals when multiple source rows share overlapping paths
+
+### Example
+
+```ppl ignore
+source = travelers
+ | graphLookup airports
+ startField=nearestAirport
+ fromField=connects
+ toField=airport
+ batchMode=true
+ maxDepth=2
+ as reachableAirports
+```
+
+**Normal mode** (default): Each traveler gets their own list of reachable airports
+```text
+| name | nearestAirport | reachableAirports |
+|-------|----------------|-------------------|
+| Dev | JFK | [JFK, BOS, ORD] |
+| Jeff | BOS | [BOS, JFK, PWM] |
+```
+
+**Batch mode**: A single row with all travelers and all reachable airports combined
+```text
+| travelers | reachableAirports |
+|----------------------------------------|-----------------------------|
+| [{Dev, JFK}, {Jeff, BOS}] | [JFK, BOS, ORD, PWM, ...] |
+```
+
+## Array Field Handling
+
+When the `fromField` or `toField` contains array values, you should set `supportArray=true` to ensure correct traversal behavior.
+
+## Limitations
+
+- The source input, which provides the starting point for the traversal, has a limitation of 100 documents to avoid performance issues.
+- To avoid PIT (Point in Time) search, each level of traversal search returns documents up to the "max result windows" of the lookup index.
diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java
new file mode 100644
index 0000000000..498b17dab9
--- /dev/null
+++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLGraphLookupIT.java
@@ -0,0 +1,603 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.sql.calcite.remote;
+
+import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_AIRPORTS;
+import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_EMPLOYEES;
+import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_GRAPH_TRAVELERS;
+import static org.opensearch.sql.util.MatcherUtils.rows;
+import static org.opensearch.sql.util.MatcherUtils.schema;
+import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
+import static org.opensearch.sql.util.MatcherUtils.verifySchema;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.json.JSONObject;
+import org.junit.Test;
+import org.opensearch.sql.ppl.PPLIntegTestCase;
+
+/**
+ * Integration tests for graphLookup command. Test cases are inspired by MongoDB's $graphLookup
+ * examples.
+ *
+ *